783 lines
29 KiB
Racket
783 lines
29 KiB
Racket
#lang racket/base
|
|
;; DSL syntax over the API of dataspace.rkt
|
|
|
|
(provide spawn
|
|
spawn*
|
|
|
|
react
|
|
react/suspend
|
|
until
|
|
|
|
field
|
|
assert
|
|
stop-facet
|
|
stop-current-facet
|
|
stop-when
|
|
stop-when-true
|
|
on-start
|
|
on-stop
|
|
on
|
|
during
|
|
during/spawn
|
|
begin/dataflow
|
|
define/dataflow
|
|
|
|
asserted
|
|
retracted
|
|
message
|
|
|
|
let-event
|
|
|
|
;; query-value
|
|
;; query-set
|
|
;; query-hash
|
|
;; query-hash-set
|
|
;; query-count
|
|
;; query-value*
|
|
;; query-set*
|
|
;; query-hash*
|
|
;; query-hash-set*
|
|
;; query-count*
|
|
;; define/query-value
|
|
;; define/query-set
|
|
;; define/query-hash
|
|
;; define/query-hash-set
|
|
;; define/query-count
|
|
;; immediate-query
|
|
|
|
send!
|
|
flush!
|
|
assert!
|
|
retract!
|
|
current-adhoc-assertions
|
|
|
|
;;
|
|
|
|
;; current-action-transformer
|
|
)
|
|
|
|
(require (for-syntax racket/base))
|
|
(require (for-syntax syntax/parse))
|
|
(require (for-syntax syntax/srcloc))
|
|
(require "syntax-classes.rkt")
|
|
|
|
(require "dataspace.rkt")
|
|
(require (submod "dataspace.rkt" priorities))
|
|
(require "event-expander.rkt")
|
|
(require "skeleton.rkt")
|
|
(require "pattern.rkt")
|
|
|
|
(require racket/set)
|
|
(require syndicate/dataflow)
|
|
(require syndicate/protocol/instance)
|
|
|
|
(begin-for-syntax
|
|
(define-splicing-syntax-class actor-wrapper
|
|
(pattern (~seq #:spawn wrapper))
|
|
(pattern (~seq) #:attr wrapper #'spawn))
|
|
|
|
(define-splicing-syntax-class on-crash-option
|
|
(pattern (~seq #:on-crash expr))
|
|
(pattern (~seq) #:attr expr #f))
|
|
|
|
(define-splicing-syntax-class let-option
|
|
(pattern (~seq #:let clauses))
|
|
(pattern (~seq) #:attr clauses #'()))
|
|
|
|
(define-splicing-syntax-class when-pred
|
|
(pattern (~seq #:when Pred))
|
|
(pattern (~seq) #:attr Pred #'#t))
|
|
|
|
(define-splicing-syntax-class priority
|
|
(pattern (~seq #:priority level))
|
|
(pattern (~seq) #:attr level #'*normal-priority*)))
|
|
|
|
(define-syntax (spawn stx)
|
|
(syntax-parse stx
|
|
[(_ (~or (~optional (~seq #:name name-expr) #:defaults ([name-expr #'#f])
|
|
#:name "#:name")
|
|
(~optional (~seq #:assertions [assertion-exprs ...])
|
|
#:name "#:assertions")
|
|
(~optional (~seq #:linkage [linkage-expr ...]) #:defaults ([(linkage-expr 1) '()])
|
|
#:name "#:linkage"))
|
|
...
|
|
O ...)
|
|
(quasisyntax/loc stx
|
|
(spawn** #:name name-expr
|
|
#:assertions #,(cond [(attribute assertion-exprs) #'[assertion-exprs ...]]
|
|
[else #'[]])
|
|
linkage-expr ... O ...))]))
|
|
|
|
(define-syntax (spawn* stx)
|
|
(syntax-parse stx
|
|
[(_ name:name assertions:assertions script ...)
|
|
(quasisyntax/loc stx
|
|
(spawn** #:name name.N
|
|
#:assertions [assertions.exprs ...]
|
|
(on-start script ...)))]))
|
|
|
|
(define-syntax (spawn** stx)
|
|
(syntax-parse stx
|
|
[(_ name:name assertions:assertions script ...)
|
|
(quasisyntax/loc stx
|
|
(spawn!
|
|
(current-actor)
|
|
name.N
|
|
(lambda () (begin/void-default script ...))
|
|
(set assertions.exprs ...)))]))
|
|
|
|
(define-syntax (begin/void-default stx)
|
|
(syntax-parse stx
|
|
[(_) (syntax/loc stx (void))]
|
|
[(_ expr0 expr ...) (syntax/loc stx (begin expr0 expr ...))]))
|
|
|
|
(define (react* where boot-proc)
|
|
(define ds (current-dataspace))
|
|
(add-facet! ds
|
|
where
|
|
(current-actor)
|
|
(current-facet)
|
|
boot-proc))
|
|
|
|
(define-syntax (react stx)
|
|
(syntax-parse stx
|
|
[(_ O ...)
|
|
(quasisyntax/loc stx
|
|
(react* #,(source-location->string stx)
|
|
(lambda () (begin/void-default O ...))))]))
|
|
|
|
(define-syntax (react/suspend stx)
|
|
(syntax-parse stx
|
|
[(_ (resume-parent) O ...)
|
|
(quasisyntax/loc stx
|
|
(suspend-script* #,(source-location->string stx)
|
|
(lambda (resume-parent)
|
|
(react* #,(source-location->string stx)
|
|
(lambda () (begin/void-default O ...))))))]))
|
|
|
|
(define-syntax (until stx)
|
|
(syntax-parse stx
|
|
[(_ E O ...)
|
|
(syntax/loc stx
|
|
(react/suspend (continue)
|
|
(stop-when E (continue (void)))
|
|
O ...))]))
|
|
|
|
(define (make-field name init)
|
|
(field-handle name
|
|
(generate-id! (current-dataspace))
|
|
(current-actor)
|
|
init))
|
|
|
|
(define-syntax (define-field stx)
|
|
(syntax-parse stx
|
|
[(_ id init)
|
|
#'(define id (make-field 'id init))]))
|
|
|
|
(define-syntax (field stx)
|
|
(syntax-parse stx
|
|
[(_ [id:id init] ...)
|
|
(quasisyntax/loc stx
|
|
(begin (define-field id init)
|
|
...))]))
|
|
|
|
(define-syntax (assert stx)
|
|
(syntax-parse stx
|
|
[(_ w:when-pred P)
|
|
(quasisyntax/loc stx
|
|
(add-endpoint! (current-dataspace)
|
|
#,(source-location->string stx)
|
|
(lambda () (when w.Pred P))
|
|
#f))]))
|
|
|
|
(define-syntax (stop-facet stx)
|
|
(syntax-parse stx
|
|
[(_ f-expr script ...)
|
|
(quasisyntax/loc stx
|
|
(let ((f f-expr))
|
|
(when (not (equal? (facet-actor f) (current-actor)))
|
|
(error 'stop-facet "Attempt to stop unrelated facet ~a from actor ~a" f (current-actor)))
|
|
(stop-facet! (current-dataspace) f (lambda () (begin/void-default script ...)))))]))
|
|
|
|
(define-syntax-rule (stop-current-facet script ...)
|
|
(stop-facet (current-facet) script ...))
|
|
|
|
(define-syntax-rule (stop-when-true condition script ...)
|
|
(begin/dataflow
|
|
(when condition
|
|
(stop-facet (current-facet) script ...))))
|
|
|
|
(define-syntax (on-start stx)
|
|
(syntax-parse stx
|
|
[(_ script ...)
|
|
(quasisyntax/loc stx
|
|
(schedule-script! (current-dataspace)
|
|
(current-actor)
|
|
(lambda () (begin/void-default script ...))))]))
|
|
|
|
(define-syntax (on-stop stx)
|
|
(syntax-parse stx
|
|
[(_ script ...)
|
|
(quasisyntax/loc stx
|
|
(add-stop-script! (current-dataspace)
|
|
(lambda () (begin/void-default script ...))))]))
|
|
|
|
(define-syntax (stop-when stx)
|
|
(syntax-parse stx
|
|
[(_ w:when-pred E prio:priority script ...)
|
|
(analyse-event stx
|
|
#'w.Pred
|
|
#'E
|
|
(syntax/loc stx (stop-current-facet script ...))
|
|
#'prio.level)]))
|
|
|
|
(define-syntax (on stx)
|
|
(syntax-parse stx
|
|
[(_ w:when-pred E prio:priority script ...)
|
|
(analyse-event stx
|
|
#'w.Pred
|
|
#'E
|
|
(syntax/loc stx (begin/void-default script ...))
|
|
#'prio.level)]))
|
|
|
|
(define-syntax (begin/dataflow stx)
|
|
(syntax-parse stx
|
|
[(_ prio:priority expr ...)
|
|
(quasisyntax/loc stx
|
|
(let ()
|
|
(add-endpoint! (current-dataspace)
|
|
#,(source-location->string stx)
|
|
(lambda ()
|
|
(define subject-id (current-dataflow-subject-id))
|
|
(schedule-script!
|
|
#:priority prio.level
|
|
(current-dataspace)
|
|
(current-actor)
|
|
(lambda ()
|
|
(parameterize ((current-dataflow-subject-id subject-id))
|
|
expr ...)))
|
|
(void))
|
|
#f)))]))
|
|
|
|
(define-syntax (define/dataflow stx)
|
|
(syntax-parse stx
|
|
[(_ fieldname expr)
|
|
(quasisyntax/loc stx (define/dataflow fieldname expr #:default #f))]
|
|
[(_ fieldname expr #:default default-expr)
|
|
(quasisyntax/loc stx
|
|
(begin
|
|
(field [fieldname default-expr])
|
|
(begin/dataflow (fieldname expr))))]))
|
|
|
|
(define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx))
|
|
(define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx))
|
|
(define-syntax (message stx) (raise-syntax-error #f "message: Used outside event spec" stx))
|
|
|
|
(define-syntax (suspend-script stx)
|
|
(syntax-parse stx
|
|
[(_ proc)
|
|
(quasisyntax/loc stx
|
|
(suspend-script* #,(source-location->string stx) proc))]))
|
|
|
|
(define-syntax (let-event stx)
|
|
(syntax-parse stx
|
|
[(_ [e ...] body ...)
|
|
(syntax/loc stx
|
|
((react/suspend (k)
|
|
(on-start (-let-event [e ...] (stop-current-facet (k (lambda () body ...))))))))]))
|
|
|
|
(define-syntax (-let-event stx)
|
|
(syntax-parse stx
|
|
[(_ [] expr) #'expr]
|
|
[(_ [e es ...] expr) (quasisyntax/loc #'e (react (stop-when e (-let-event [es ...] expr))))]))
|
|
|
|
(define-for-syntax orig-insp
|
|
(variable-reference->module-declaration-inspector (#%variable-reference)))
|
|
|
|
(define-for-syntax (analyse-event outer-expr-stx
|
|
when-pred-stx
|
|
armed-event-stx
|
|
script-stx
|
|
priority-stx)
|
|
(define event-stx (syntax-disarm armed-event-stx orig-insp))
|
|
(syntax-parse event-stx
|
|
#:literals [message asserted retracted]
|
|
[(expander args ...) #:when (event-expander-id? #'expander)
|
|
(event-expander-transform event-stx
|
|
(lambda (result)
|
|
(analyse-event outer-expr-stx
|
|
when-pred-stx
|
|
(syntax-rearm result event-stx)
|
|
script-stx
|
|
priority-stx)))]
|
|
[(message P)
|
|
(define desc (analyse-pattern #'P))
|
|
(quasisyntax/loc outer-expr-stx
|
|
(add-endpoint! (current-dataspace)
|
|
#,(source-location->string outer-expr-stx)
|
|
(lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc))))
|
|
(skeleton-interest #,(desc->skeleton-stx desc)
|
|
'#,(desc->skeleton-proj desc)
|
|
(list #,@(desc->key desc))
|
|
'#,(desc->capture-proj desc)
|
|
(capture-facet-context
|
|
(lambda (op #,@(desc->capture-names desc))
|
|
(when (eq? op '!)
|
|
(schedule-script!
|
|
#:priority #,priority-stx
|
|
(current-dataspace)
|
|
(current-actor)
|
|
#,(quasisyntax/loc script-stx
|
|
(lambda ()
|
|
#,script-stx)))))))))]
|
|
[(asserted P)
|
|
(analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #t #'P priority-stx)]
|
|
[(retracted P)
|
|
(analyse-asserted/retracted outer-expr-stx when-pred-stx script-stx #f #'P priority-stx)]))
|
|
|
|
(define-for-syntax (analyse-asserted/retracted outer-expr-stx
|
|
when-pred-stx
|
|
script-stx
|
|
asserted?
|
|
P-stx
|
|
priority-stx)
|
|
(define desc (analyse-pattern P-stx))
|
|
(quasisyntax/loc outer-expr-stx
|
|
(add-endpoint! (current-dataspace)
|
|
#,(source-location->string outer-expr-stx)
|
|
(lambda () (when #,when-pred-stx (observe #,(desc->assertion-stx desc))))
|
|
(skeleton-interest #,(desc->skeleton-stx desc)
|
|
'#,(desc->skeleton-proj desc)
|
|
(list #,@(desc->key desc))
|
|
'#,(desc->capture-proj desc)
|
|
(capture-facet-context
|
|
(lambda (op #,@(desc->capture-names desc))
|
|
(when (eq? op #,(if asserted? #''+ #''-))
|
|
(schedule-script!
|
|
#:priority #,priority-stx
|
|
(current-dataspace)
|
|
(current-actor)
|
|
#,(quasisyntax/loc script-stx
|
|
(lambda ()
|
|
#,script-stx))))))))))
|
|
|
|
(define-syntax (during stx)
|
|
(syntax-parse stx
|
|
[(_ P O ...)
|
|
(quasisyntax/loc stx
|
|
(on (asserted P)
|
|
(react (stop-when (retracted #,(instantiate-pattern->pattern #'P)))
|
|
O ...)))]))
|
|
|
|
(define-syntax (during/spawn stx)
|
|
(syntax-parse stx
|
|
[(_ P w:actor-wrapper name:name assertions:assertions parent-let:let-option
|
|
oncrash:on-crash-option
|
|
O ...)
|
|
(define Q-stx (instantiate-pattern->pattern #'P))
|
|
(quasisyntax/loc stx
|
|
(on (asserted P)
|
|
(let* ((id (gensym 'during/spawn))
|
|
(inst (instance id #,(instantiate-pattern->value #'P)))
|
|
;; ^ this is the assertion representing supply
|
|
)
|
|
(react (stop-when (asserted inst)
|
|
;; Supply (inst) appeared before demand (p) retracted.
|
|
;; Transition to a state where we monitor demand, but also
|
|
;; express interest in supply: this latter acts as a signal
|
|
;; to the supply that it should stick around. We react to
|
|
;; retraction of supply before retraction of demand by
|
|
;; invoking the on-crash expression, if supplied. Once
|
|
;; demand is retracted, this facet terminates, retracting
|
|
;; its interest in supply, thereby signalling to the supply
|
|
;; that it is no longer wanted.
|
|
(react (stop-when (retracted inst) ;; NOT OPTIONAL
|
|
#,@(if (attribute oncrash.expr)
|
|
#'(oncrash.expr)
|
|
#'()))
|
|
(stop-when (retracted #,Q-stx))))
|
|
(stop-when (retracted #,Q-stx)
|
|
;; Demand (p) retracted before supply (inst) appeared. We
|
|
;; MUST wait for the supply to fully appear so that we can
|
|
;; reliably tell it to shut down. We must maintain interest
|
|
;; in supply until we see supply, and then terminate, thus
|
|
;; signalling to supply that it is no longer wanted.
|
|
(react (stop-when (asserted inst)))))
|
|
(let parent-let.clauses
|
|
(w.wrapper #:linkage [(assert inst)
|
|
(stop-when (retracted (observe inst)))]
|
|
#:name name.N
|
|
#:assertions [assertions.exprs ...]
|
|
O ...)))))]))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
;; Queries
|
|
|
|
;; (begin-for-syntax
|
|
;; (define-splicing-syntax-class on-add
|
|
;; (pattern (~optional (~seq #:on-add expr) #:defaults ([expr #f]))))
|
|
;; (define-splicing-syntax-class on-remove
|
|
;; (pattern (~optional (~seq #:on-remove expr) #:defaults ([expr #f]))))
|
|
|
|
;; (define (schedule-query-handler-stxs maybe-expr-stx)
|
|
;; (if maybe-expr-stx
|
|
;; (quasisyntax/loc maybe-expr-stx
|
|
;; ((schedule-script! #:priority *query-handler-priority*
|
|
;; (lambda () #,maybe-expr-stx))))
|
|
;; #'())))
|
|
|
|
;; (define-syntax (query-value stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name absent-expr args ...)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ()
|
|
;; (field [field-name absent-expr])
|
|
;; (query-value* field-name absent-expr args ...)))]))
|
|
|
|
;; (define-syntax (query-value* stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name absent-expr P expr on-add:on-add on-remove:on-remove)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ((F field-name))
|
|
;; (on (asserted P) #:priority *query-priority*
|
|
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
|
|
;; (F expr))
|
|
;; (on (retracted P) #:priority *query-priority-high*
|
|
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
|
|
;; (F absent-expr))
|
|
;; F))]))
|
|
|
|
;; (define-syntax (query-set stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name args ...)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ()
|
|
;; (field [field-name (set)])
|
|
;; (query-set* field-name args ...)))]))
|
|
|
|
;; (define-syntax (query-set* stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name P expr on-add:on-add on-remove:on-remove)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ((F field-name))
|
|
;; (on (asserted P) #:priority *query-priority*
|
|
;; (let ((V expr))
|
|
;; (when (not (set-member? (F) V))
|
|
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
|
|
;; (F (set-add (F) V)))))
|
|
;; (on (retracted P) #:priority *query-priority-high*
|
|
;; (let ((V expr))
|
|
;; (when (set-member? (F) V)
|
|
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
|
|
;; (F (set-remove (F) V)))))
|
|
;; F))]))
|
|
|
|
;; (define-syntax (query-hash stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name args ...)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ()
|
|
;; (field [field-name (hash)])
|
|
;; (query-hash* field-name args ...)))]))
|
|
|
|
;; (define-syntax (query-hash* stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ((F field-name))
|
|
;; (on (asserted P) #:priority *query-priority*
|
|
;; (let ((key key-expr))
|
|
;; (when (hash-has-key? (F) key)
|
|
;; (log-warning "query-hash: field ~v with pattern ~v: overwriting existing entry ~v"
|
|
;; 'field-name
|
|
;; 'P
|
|
;; key))
|
|
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
|
|
;; (F (hash-set (F) key value-expr))))
|
|
;; (on (retracted P) #:priority *query-priority-high*
|
|
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
|
|
;; (F (hash-remove (F) key-expr)))
|
|
;; F))]))
|
|
|
|
;; (define-syntax (query-hash-set stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name args ...)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ()
|
|
;; (field [field-name (hash)])
|
|
;; (query-hash-set* field-name args ...)))]))
|
|
|
|
;; (define-syntax (query-hash-set* stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ((F field-name))
|
|
;; (on (asserted P) #:priority *query-priority*
|
|
;; (let ((K key-expr) (V value-expr))
|
|
;; (when (not (hashset-member? (F) K V))
|
|
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
|
|
;; (F (hashset-add (F) K V)))))
|
|
;; (on (retracted P) #:priority *query-priority-high*
|
|
;; (let ((K key-expr) (V value-expr))
|
|
;; (when (hashset-member? (F) K V)
|
|
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
|
|
;; (F (hashset-remove (F) K V)))))
|
|
;; F))]))
|
|
|
|
;; (define-syntax (query-count stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name args ...)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ()
|
|
;; (field [field-name (hash)])
|
|
;; (query-count* field-name args ...)))]))
|
|
|
|
;; (define-syntax (query-count* stx)
|
|
;; (syntax-parse stx
|
|
;; [(_ field-name P expr on-add:on-add on-remove:on-remove)
|
|
;; (quasisyntax/loc stx
|
|
;; (let ((F field-name))
|
|
;; (on (asserted P) #:priority *query-priority*
|
|
;; (let ((E expr))
|
|
;; #,@(schedule-query-handler-stxs (attribute on-add.expr))
|
|
;; (F (hash-set (F) E (+ 1 (hash-ref (F) E 0))))))
|
|
;; (on (retracted P) #:priority *query-priority-high*
|
|
;; (let ((E expr))
|
|
;; #,@(schedule-query-handler-stxs (attribute on-remove.expr))
|
|
;; (let ((F0 (F)))
|
|
;; (F (match (hash-ref F0 E 0)
|
|
;; [0 F0] ;; huh
|
|
;; [1 (hash-remove F0 E)]
|
|
;; [n (hash-set F0 E (- n 1))])))))
|
|
;; F))]))
|
|
|
|
;; (define-syntax-rule (define/query-value id ae P x ...) (define id (query-value id ae P x ...)))
|
|
;; (define-syntax-rule (define/query-set id P x ...) (define id (query-set id P x ...)))
|
|
;; (define-syntax-rule (define/query-hash id P x ...) (define id (query-hash id P x ...)))
|
|
;; (define-syntax-rule (define/query-hash-set id P x ...) (define id (query-hash-set id P x ...)))
|
|
;; (define-syntax-rule (define/query-count id P x ...) (define id (query-count id P x ...)))
|
|
|
|
;; (define-syntax (immediate-query stx)
|
|
;; (syntax-case stx ()
|
|
;; [(_ [op args ...] ...)
|
|
;; (with-syntax [((query-result ...) (generate-temporaries #'(op ...)))]
|
|
;; (syntax/loc stx
|
|
;; (react/suspend (k)
|
|
;; (define query-result (op query-result args ...)) ...
|
|
;; (on-start (flush!) (k (query-result) ...)))))]))
|
|
|
|
(define (send! m)
|
|
(enqueue-send! (current-actor) m))
|
|
|
|
(define (flush!)
|
|
(ensure-in-script! 'flush!)
|
|
(define ack (gensym 'flush!))
|
|
(until (message ack)
|
|
(on-start (send! ack))))
|
|
|
|
(define (assert! a)
|
|
(ensure-in-script! 'assert!)
|
|
(adhoc-assert! (current-actor) a))
|
|
|
|
(define (retract! a)
|
|
(ensure-in-script! 'retract!)
|
|
(adhoc-retract! (current-actor) a))
|
|
|
|
(define (current-adhoc-assertions)
|
|
(actor-adhoc-assertions (current-actor)))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
(module+ test
|
|
(message-struct set-box (new-value))
|
|
(assertion-struct box-state (value))
|
|
|
|
(define ds
|
|
(make-dataspace
|
|
(lambda ()
|
|
(schedule-script!
|
|
(current-dataspace)
|
|
(current-actor)
|
|
|
|
#;(lambda ()
|
|
(struct foo (x y) #:prefab)
|
|
|
|
(spawn (field [x 123])
|
|
(assert (foo (x) 999))
|
|
(during (foo (x) $v)
|
|
(log-info "x=~a v=~a" (x) v)
|
|
(when (= (x) 123) (x 124))
|
|
(on-stop
|
|
(log-info "finally for x=~a v=~a" (x) v))))
|
|
)
|
|
|
|
(lambda ()
|
|
(spawn #:name 'factory-1
|
|
(on (asserted (list 'X 1))
|
|
(spawn #:name 'service-1
|
|
#:assertions [(observe (list 'X 1))] ;; (A)
|
|
(stop-when (retracted (list 'X 1))) ;; (B)
|
|
(on (message 'dummy))) ;; exists just to keep the service alive if
|
|
;; there are no other endpoints
|
|
;; spawn executes *before* teardown of this on-asserted endpoint, and thus
|
|
;; before the patch withdrawing (observe (list 'X 1)).
|
|
(stop-current-facet)))
|
|
|
|
(spawn (on (asserted (observe (list 'X $supplier)))
|
|
(printf "Supply ~v asserted.\n" supplier)
|
|
(assert! (list 'X supplier)))
|
|
(on (retracted (observe (list 'X $supplier)))
|
|
(printf "Supply ~v retracted.\n" supplier)))
|
|
)
|
|
|
|
#;(lambda ()
|
|
(spawn #:name 'demand-watcher
|
|
(during/spawn 'demand
|
|
#:name (gensym 'intermediate-demand-asserter)
|
|
(assert 'intermediate-demand)))
|
|
|
|
(spawn #:name 'intermediate-demand-watcher
|
|
(during/spawn 'intermediate-demand
|
|
#:name (gensym 'supply-asserter)
|
|
(assert 'supply)))
|
|
|
|
(spawn* #:name 'driver
|
|
(react (on (asserted 'supply) (log-info "Supply asserted."))
|
|
(on (retracted 'supply) (log-info "Supply retracted.")))
|
|
(until (asserted (observe 'demand)))
|
|
(log-info "Asserting demand.")
|
|
(assert! 'demand)
|
|
(until (asserted 'supply))
|
|
(log-info "Glitching demand.")
|
|
(retract! 'demand)
|
|
(flush!)
|
|
(assert! 'demand)
|
|
(log-info "Demand now steady."))
|
|
)
|
|
|
|
#;(lambda ()
|
|
;; Trivial example program to demonstrate tracing
|
|
|
|
(assertion-struct one-plus (n m))
|
|
|
|
(spawn #:name 'add1-server
|
|
(during/spawn (observe (one-plus $n _))
|
|
#:name (list 'solving 'one-plus n)
|
|
(assert (one-plus n (+ n 1)))))
|
|
|
|
(spawn #:name 'client-process
|
|
(stop-when (asserted (one-plus 3 $value))
|
|
(printf "1 + 3 = ~a\n" value)))
|
|
)
|
|
|
|
#;(lambda ()
|
|
;; .../racket/syndicate/examples/actor/example-partial-retraction.rkt
|
|
;;
|
|
(struct ready (what) #:prefab)
|
|
(struct entry (key val) #:prefab)
|
|
|
|
(spawn (assert (ready 'listener))
|
|
(on (asserted (entry $key _))
|
|
(log-info "key ~v asserted" key)
|
|
(until (retracted (entry key _))
|
|
(on (asserted (entry key $value))
|
|
(log-info "add binding: ~v -> ~v" key value))
|
|
(on (retracted (entry key $value))
|
|
(log-info "del binding: ~v -> ~v" key value)))
|
|
(log-info "key ~v retracted" key)))
|
|
|
|
(spawn (assert (ready 'other-listener))
|
|
(during (entry $key _)
|
|
(log-info "(other-listener) key ~v asserted" key)
|
|
(on-stop (log-info "(other-listener) key ~v retracted" key))
|
|
(during (entry key $value)
|
|
(log-info "(other-listener) ~v ---> ~v" key value)
|
|
(on-stop (log-info "(other-listener) ~v -/-> ~v" key value)))))
|
|
|
|
(define (pause)
|
|
(log-info "pause")
|
|
(define token (gensym 'pause)) ;; FIXME:: If we use the same token every time, need epochs!
|
|
(until (asserted (ready token))
|
|
(assert (ready token))))
|
|
|
|
(spawn* (until (asserted (ready 'listener)))
|
|
(until (asserted (ready 'other-listener)))
|
|
(assert! (entry 'a 1))
|
|
(assert! (entry 'a 2))
|
|
(assert! (entry 'b 3))
|
|
(assert! (entry 'c 33))
|
|
(assert! (entry 'a 4))
|
|
(assert! (entry 'a 5))
|
|
(pause)
|
|
(retract! (entry 'a 2))
|
|
(retract! (entry 'c 33))
|
|
(assert! (entry 'a 9))
|
|
(pause)
|
|
(for [(a (current-adhoc-assertions))]
|
|
(local-require racket/match)
|
|
(match a
|
|
[(entry 'a _) (retract! a)]
|
|
[_ (void)]))
|
|
;; ^ (retract! (entry 'a ?))
|
|
(pause))
|
|
)
|
|
|
|
#;(lambda ()
|
|
(spawn (on (message $v)
|
|
(if (= v 10000000)
|
|
(stop-current-facet)
|
|
(send! (+ v 1))))
|
|
(on-start (send! 0)))
|
|
)
|
|
|
|
#;(lambda ()
|
|
|
|
(message-struct stage (n))
|
|
|
|
(spawn #:name 'actor0
|
|
(on (message (stage 0))
|
|
(send! (stage 1)))
|
|
|
|
(on (message (stage 2))
|
|
(send! (stage 3))
|
|
(/ 1 0)
|
|
(send! (stage 3))))
|
|
|
|
(spawn #:name 'main
|
|
(on (message (stage $v))
|
|
(printf "Got message ~v\n" v))
|
|
(on-start
|
|
(until (asserted (observe (stage 0))))
|
|
(send! (stage 0))
|
|
(until (message (stage 1)))
|
|
(send! (stage 2))))
|
|
)
|
|
|
|
#;(lambda ()
|
|
(spawn (field [current-value 0])
|
|
(assert (box-state (current-value)))
|
|
(stop-when-true (= (current-value) 10)
|
|
(log-info "box: terminating"))
|
|
(on (message (set-box $new-value))
|
|
(log-info "box: taking on new-value ~v" new-value)
|
|
(current-value new-value)))
|
|
|
|
(spawn (stop-when (retracted (observe (set-box _)))
|
|
(log-info "client: box has gone"))
|
|
(on (asserted (box-state $v))
|
|
(log-info "client: learned that box's value is now ~v" v)
|
|
(send! (set-box (+ v 1)))))
|
|
)
|
|
))))
|
|
|
|
(require racket/pretty)
|
|
;; (pretty-print ds)
|
|
(#;time values
|
|
(let loop ((i 0))
|
|
;; (printf "--- i = ~v\n" i)
|
|
(when (run-scripts! ds)
|
|
;; (pretty-print ds)
|
|
(loop (+ i 1)))))
|
|
;; (pretty-print ds)
|
|
)
|