syndicate-rkt/syndicate/syntax.rkt

1128 lines
42 KiB
Racket
Raw Normal View History

#lang racket/base
;; DSL syntax over the API of dataspace.rkt
(provide spawn
spawn*
react
react/suspend
until
current-facet-id
field
assert
stop-facet
stop-current-facet
stop-when
stop-when-true
on-start
on-stop
on
during
during/spawn
begin/dataflow
define/dataflow
asserted
retracted
message
let-event
query-value
query-set
query-hash
query-hash-set
query-count
query-value*
query-set*
query-hash*
query-hash-set*
query-count*
define/query-value
define/query-set
define/query-hash
define/query-hash-set
define/query-count
immediate-query
send!
assert!
retract!
patch!
perform-actions!
flush!
syndicate-effects-available?
? ;; from pattern.rkt
;;
current-action-transformer
schedule-action!
schedule-actions!
(for-syntax (rename-out [name actor-name]))
pretty-print-actor-state
)
(require (for-syntax racket/base))
(require (for-syntax syntax/parse))
(require (for-syntax syntax/srcloc))
(require "syntax-classes.rkt")
(require "dataspace.rkt")
(require (submod "dataspace.rkt" priorities))
(begin-for-syntax
(define-splicing-syntax-class actor-wrapper
(pattern (~seq #:spawn wrapper))
(pattern (~seq) #:attr wrapper #'spawn))
(define-splicing-syntax-class on-crash-option
(pattern (~seq #:on-crash expr))
(pattern (~seq) #:attr expr #f))
(define-splicing-syntax-class let-option
(pattern (~seq #:let clauses))
(pattern (~seq) #:attr clauses #'()))
(define-splicing-syntax-class when-pred
(pattern (~seq #:when Pred))
(pattern (~seq) #:attr Pred #'#t))
(define-splicing-syntax-class priority
(pattern (~seq #:priority level))
(pattern (~seq) #:attr level #'*normal-priority*)))
(define-syntax (spawn stx)
(syntax-parse stx
[(_ (~or (~optional (~seq #:name name-expr) #:defaults ([name-expr #'#f])
#:name "#:name")
(~optional (~seq #:assertions [assertion-exprs ...])
#:name "#:assertions")
(~optional (~seq #:linkage [linkage-expr ...]) #:defaults ([(linkage-expr 1) '()])
#:name "#:linkage"))
...
O ...)
(quasisyntax/loc stx
(spawn** #:name name-expr
#:assertions #,(cond [(attribute assertion-exprs) #'[assertion-exprs ...]]
[else #'[]])
linkage-expr ... O ...))]))
(define-syntax (spawn* stx)
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(spawn** #:name name.N
#:assertions [assertions.exprs ...]
(on-start script ...)))]))
(define-syntax (spawn** stx)
(syntax-parse stx
[(_ name:name assertions:assertions script ...)
(quasisyntax/loc stx
(dataspace-spawn!
(current-dataspace)
name.N
(lambda () (begin/void-default script ...))
(set assertions.exprs ...)))]))
(define (react* where boot-proc)
(define ds (current-dataspace))
(add-facet! ds
where
(current-actor)
(current-facet)
boot-proc))
(define-syntax (react stx)
(syntax-parse stx
[(_ O ...)
(quasisyntax/loc stx
(react* #,(source-location->string stx)
(lambda () (begin/void-default O ...))))]))
(define-syntax (react/suspend stx)
(syntax-parse stx
[(_ (resume-parent) O ...)
(quasisyntax/loc stx
(suspend-script* #,(source-location->string stx)
(lambda (resume-parent)
(react* #,(source-location->string stx)
(lambda () (begin/void-default O ...))))))]))
(define-syntax (until stx)
(syntax-parse stx
[(_ E O ...)
(syntax/loc stx
(react/suspend (continue)
(stop-when E (continue (void)))
O ...))]))
(define (make-field name init)
(field-handle name
(generate-id! (current-dataspace))
(fid->actor-fid (current-facet-id))
init))
(define-syntax (define-field stx)
(syntax-parse stx
[(_ id init)
#'(define id (make-field 'id init))]))
(define-syntax (field stx)
(syntax-parse stx
[(_ [id:id init] ...)
(quasisyntax/loc stx
(begin (define-field id init)
...))]))
(define-syntax (assert stx)
(syntax-parse stx
[(_ w:when-pred P)
(quasisyntax/loc stx
(add-endpoint! (current-dataspace)
#,(source-location->string stx)
(lambda () (when #'w.Pred P))
#f))]))
(define-syntax (stop-facet stx)
(syntax-parse stx
[(_ fid-expr script ...)
(quasisyntax/loc stx
(let ((fid fid-expr))
(when (not (fid-ancestor? (current-facet-id) fid))
(error 'stop-facet "Attempt to stop non-ancestor facet ~a" fid))
(stop-facet! (current-dataspace) fid (lambda () (begin/void-default script ...)))))]))
(define-syntax-rule (stop-current-facet script ...)
(stop-facet (current-facet-id) script ...))
(define-syntax-rule (stop-when-true condition script ...)
(begin/dataflow
(when condition
(stop-facet (current-facet-id) script ...))))
(define-syntax (on-start stx)
(syntax-parse stx
[(_ script ...)
(quasisyntax/loc stx
(schedule-script! (current-dataspace)
(lambda () (begin/void-default script ...))))]))
(define-syntax (on-stop stx)
(syntax-parse stx
[(_ script ...)
(quasisyntax/loc stx
(add-stop-script! (current-dataspace)
(lambda () (begin/void-default script ...))))]))
(define-syntax (stop-when stx)
(syntax-parse stx
[(_ w:when-pred E prio:priority script ...)
(analyze-event stx
#'w.Pred
#'E
(syntax/loc stx (stop-current-facet script ...))
#'prio.level)]))
(define-syntax (on stx)
(syntax-parse stx
[(_ w:when-pred E prio:priority script ...)
(analyze-event stx
#'w.Pred
#'E
(syntax/loc stx (begin/void-default script ...))
#'prio.level)]))
(define-syntax (begin/dataflow stx)
(syntax-parse stx
[(_ prio:priority expr ...)
(quasisyntax/loc stx
(let ()
(add-endpoint! (current-dataspace)
#,(source-location->string stx)
(lambda ()
(define subject-id (current-dataflow-subject-id))
(schedule-script!
#:priority prio.level
(lambda ()
(parameterize ((current-dataflow-subject-id subject-id))
expr ...)))
(void))
#f)))]))
(define-syntax (define/dataflow stx)
(syntax-parse stx
[(_ fieldname expr)
(quasisyntax/loc stx (define/dataflow fieldname expr #:default #f))]
[(_ fieldname expr #:default default-expr)
(quasisyntax/loc stx
(begin
(field [fieldname default-expr])
(begin/dataflow (fieldname expr))))]))
(define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx))
(define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx))
(define-syntax (suspend-script stx)
(syntax-parse stx
[(_ proc)
(quasisyntax/loc stx
(suspend-script* #,(source-location->string stx) proc))]))
(define-syntax (let-event stx)
(syntax-parse stx
[(_ [e ...] body ...)
(syntax/loc stx
((react/suspend (k)
(on-start (-let-event [e ...] (stop-current-facet (k (lambda () body ...))))))))]))
(define-syntax (-let-event stx)
(syntax-parse stx
[(_ [] expr) #'expr]
[(_ [e es ...] expr) (quasisyntax/loc #'e (react (stop-when e (-let-event [es ...] expr))))]))
;; HERE
(define-syntax (during stx)
(syntax-parse stx
[(_ P O ...)
(define E-stx (syntax/loc #'P (asserted P)))
(define-values (_proj _pat _bindings instantiated)
(analyze-pattern E-stx #'P))
(quasisyntax/loc stx
(on #,E-stx
(let ((p #,instantiated))
(react (stop-when (retracted p))
O ...))))]))
(define-syntax (during/spawn stx)
(syntax-parse stx
[(_ P w:actor-wrapper name:name assertions:assertions parent-let:let-option
oncrash:on-crash-option
O ...)
(define E-stx (syntax/loc #'P (asserted P)))
(define-values (_proj _pat _bindings instantiated)
(analyze-pattern E-stx #'P))
(quasisyntax/loc stx
(on #,E-stx
(let* ((id (gensym 'during/spawn))
(p #,instantiated) ;; this is the concrete assertion corresponding to demand
(inst (instance id p))) ;; this is the assertion representing supply
(react (stop-when (asserted inst)
;; Supply (inst) appeared before demand (p) retracted.
;; Transition to a state where we monitor demand, but also
;; express interest in supply: this latter acts as a signal
;; to the supply that it should stick around. We react to
;; retraction of supply before retraction of demand by
;; invoking the on-crash expression, if supplied. Once
;; demand is retracted, this facet terminates, retracting
;; its interest in supply, thereby signalling to the supply
;; that it is no longer wanted.
(react (stop-when (retracted inst) ;; NOT OPTIONAL
#,@(if (attribute oncrash.expr)
#'(oncrash.expr)
#'()))
(stop-when (retracted p))))
(stop-when (retracted p)
;; Demand (p) retracted before supply (inst) appeared. We
;; MUST wait for the supply to fully appear so that we can
;; reliably tell it to shut down. We must maintain interest
;; in supply until we see supply, and then terminate, thus
;; signalling to supply that it is no longer wanted.
(react (stop-when (asserted inst)))))
(let parent-let.clauses
(w.wrapper #:linkage [(assert inst)
(stop-when (retracted (observe inst)))]
#:name name.N
#:assertions* assertions.P
O ...)))))]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Queries
(begin-for-syntax
(define-splicing-syntax-class on-add
(pattern (~optional (~seq #:on-add expr) #:defaults ([expr #f]))))
(define-splicing-syntax-class on-remove
(pattern (~optional (~seq #:on-remove expr) #:defaults ([expr #f]))))
(define (schedule-query-handler-stxs maybe-expr-stx)
(if maybe-expr-stx
(quasisyntax/loc maybe-expr-stx
((schedule-script! #:priority *query-handler-priority*
(lambda () #,maybe-expr-stx))))
#'())))
(define-syntax (query-value stx)
(syntax-parse stx
[(_ field-name absent-expr args ...)
(quasisyntax/loc stx
(let ()
(field [field-name absent-expr])
(query-value* field-name absent-expr args ...)))]))
(define-syntax (query-value* stx)
(syntax-parse stx
[(_ field-name absent-expr P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F expr))
(on (retracted P) #:priority *query-priority-high*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(F absent-expr))
F))]))
(define-syntax (query-set stx)
(syntax-parse stx
[(_ field-name args ...)
(quasisyntax/loc stx
(let ()
(field [field-name (set)])
(query-set* field-name args ...)))]))
(define-syntax (query-set* stx)
(syntax-parse stx
[(_ field-name P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
(let ((V expr))
(when (not (set-member? (F) V))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F (set-add (F) V)))))
(on (retracted P) #:priority *query-priority-high*
(let ((V expr))
(when (set-member? (F) V)
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(F (set-remove (F) V)))))
F))]))
(define-syntax (query-hash stx)
(syntax-parse stx
[(_ field-name args ...)
(quasisyntax/loc stx
(let ()
(field [field-name (hash)])
(query-hash* field-name args ...)))]))
(define-syntax (query-hash* stx)
(syntax-parse stx
[(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
(let ((key key-expr))
(when (hash-has-key? (F) key)
(log-warning "query-hash: field ~v with pattern ~v: overwriting existing entry ~v"
'field-name
'P
key))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F (hash-set (F) key value-expr))))
(on (retracted P) #:priority *query-priority-high*
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(F (hash-remove (F) key-expr)))
F))]))
(define-syntax (query-hash-set stx)
(syntax-parse stx
[(_ field-name args ...)
(quasisyntax/loc stx
(let ()
(field [field-name (hash)])
(query-hash-set* field-name args ...)))]))
(define-syntax (query-hash-set* stx)
(syntax-parse stx
[(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
(let ((K key-expr) (V value-expr))
(when (not (hashset-member? (F) K V))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F (hashset-add (F) K V)))))
(on (retracted P) #:priority *query-priority-high*
(let ((K key-expr) (V value-expr))
(when (hashset-member? (F) K V)
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(F (hashset-remove (F) K V)))))
F))]))
(define-syntax (query-count stx)
(syntax-parse stx
[(_ field-name args ...)
(quasisyntax/loc stx
(let ()
(field [field-name (hash)])
(query-count* field-name args ...)))]))
(define-syntax (query-count* stx)
(syntax-parse stx
[(_ field-name P expr on-add:on-add on-remove:on-remove)
(quasisyntax/loc stx
(let ((F field-name))
(on (asserted P) #:priority *query-priority*
(let ((E expr))
#,@(schedule-query-handler-stxs (attribute on-add.expr))
(F (hash-set (F) E (+ 1 (hash-ref (F) E 0))))))
(on (retracted P) #:priority *query-priority-high*
(let ((E expr))
#,@(schedule-query-handler-stxs (attribute on-remove.expr))
(let ((F0 (F)))
(F (match (hash-ref F0 E 0)
[0 F0] ;; huh
[1 (hash-remove F0 E)]
[n (hash-set F0 E (- n 1))])))))
F))]))
(define-syntax-rule (define/query-value id ae P x ...) (define id (query-value id ae P x ...)))
(define-syntax-rule (define/query-set id P x ...) (define id (query-set id P x ...)))
(define-syntax-rule (define/query-hash id P x ...) (define id (query-hash id P x ...)))
(define-syntax-rule (define/query-hash-set id P x ...) (define id (query-hash-set id P x ...)))
(define-syntax-rule (define/query-count id P x ...) (define id (query-count id P x ...)))
(define-syntax (immediate-query stx)
(syntax-case stx ()
[(_ [op args ...] ...)
(with-syntax [((query-result ...) (generate-temporaries #'(op ...)))]
(syntax/loc stx
(react/suspend (k)
(define query-result (op query-result args ...)) ...
(on-start (flush!) (k (query-result) ...)))))]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(require auxiliary-macro-context)
(define-auxiliary-macro-context
#:context-name event-expander
#:prop-name prop:event-expander
#:prop-predicate-name event-expander?
#:prop-accessor-name event-expander-proc
#:macro-definer-name define-event-expander
#:introducer-parameter-name current-event-expander-introducer
#:local-introduce-name syntax-local-event-expander-introduce
#:expander-id-predicate-name event-expander-id?
#:expander-transform-name event-expander-transform)
(provide (for-syntax
prop:event-expander
event-expander?
event-expander-proc
syntax-local-event-expander-introduce
event-expander-id?
event-expander-transform)
define-event-expander)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Syntax-time support
(define (interests-pre-and-post-patch pat synthetic?)
(define (or* x y) (or x y))
(define a (current-actor-state))
(define previous-knowledge (if synthetic? trie-empty (actor-state-previous-knowledge a)))
(define old (trie-lookup previous-knowledge pat #f #:wildcard-union or*))
(define new (trie-lookup (actor-state-knowledge a) pat #f #:wildcard-union or*))
(values old new))
(define (interest-just-appeared-matching? pat synthetic?)
(define-values (old new) (interests-pre-and-post-patch pat synthetic?))
(and (not old) new))
(define (interest-just-disappeared-matching? pat synthetic?)
(define-values (old new) (interests-pre-and-post-patch pat synthetic?))
(and old (not new)))
(define-for-syntax (analyze-asserted/retracted outer-expr-stx
when-pred-stx
event-stx
script-stx
asserted?
P-stx
priority-stx)
(define-values (proj-stx pat bindings _instantiated)
(analyze-pattern event-stx P-stx))
(define event-predicate-stx (if asserted? #'patch/added? #'patch/removed?))
(define patch-accessor-stx (if asserted? #'patch-added #'patch-removed))
(define change-detector-stx
(if asserted? #'interest-just-appeared-matching? #'interest-just-disappeared-matching?))
(quasisyntax/loc outer-expr-stx
(add-endpoint! #,(source-location->string outer-expr-stx)
(lambda () (if #,when-pred-stx
(core:sub #,pat)
patch-empty))
(lambda (e current-interests synthetic?)
(when (not (trie-empty? current-interests))
(core:match-event e
[(? #,event-predicate-stx p)
(define proj #,proj-stx)
(define proj-arity (projection-arity proj))
(define entry-set (trie-project/set #:take proj-arity
(#,patch-accessor-stx p)
proj))
(when (not entry-set)
(error 'asserted
"Wildcard interest discovered while projecting by ~v at ~a"
proj
#,(source-location->string P-stx)))
(for [(entry (in-set entry-set))]
(let ((instantiated (instantiate-projection proj entry)))
(and (#,change-detector-stx instantiated synthetic?)
(schedule-script!
#:priority #,priority-stx
(lambda ()
(match-define (list #,@bindings) entry)
#,script-stx)))))]))))))
(define-for-syntax orig-insp
(variable-reference->module-declaration-inspector (#%variable-reference)))
(define-for-syntax (analyze-event outer-expr-stx
when-pred-stx
armed-event-stx
script-stx
priority-stx)
(define event-stx (syntax-disarm armed-event-stx orig-insp))
(syntax-parse event-stx
#:literals [core:message asserted retracted rising-edge]
[(expander args ...)
#:when (event-expander-id? #'expander)
(event-expander-transform
event-stx
(lambda (result)
(analyze-event outer-expr-stx
when-pred-stx
(syntax-rearm result event-stx)
script-stx
priority-stx)))]
[(core:message P)
(define-values (proj pat bindings _instantiated)
(analyze-pattern event-stx #'P))
(quasisyntax/loc outer-expr-stx
(add-endpoint! #,(source-location->string outer-expr-stx)
(lambda () (if #,when-pred-stx
(core:sub #,pat)
patch-empty))
(lambda (e current-interests _synthetic?)
(when (not (trie-empty? current-interests))
(core:match-event e
[(core:message body)
(define capture-vals
(match-value/captures
body
#,proj))
(and capture-vals
(schedule-script!
#:priority #,priority-stx
(lambda ()
(apply (lambda #,bindings #,script-stx)
capture-vals))))])))))]
[(asserted P)
(analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx
#t #'P priority-stx)]
[(retracted P)
(analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx
#f #'P priority-stx)]
[(rising-edge Pred)
(define field-name
(datum->syntax event-stx
(string->symbol
(format "~a:rising-edge" (source-location->string event-stx)))))
(quasisyntax/loc outer-expr-stx
(let ()
(field [#,field-name #f])
(add-endpoint! #,(source-location->string outer-expr-stx)
(lambda ()
(when #,when-pred-stx
(define old-val (#,field-name))
(define new-val Pred)
(when (not (eq? old-val new-val))
(#,field-name new-val)
(when new-val
(schedule-script! #:priority #,priority-stx
(lambda () #,script-stx)))))
patch-empty)
void)))]))
(define-syntax (begin/void-default stx)
(syntax-parse stx
[(_)
(syntax/loc stx (void))]
[(_ expr0 expr ...)
(syntax/loc stx (begin expr0 expr ...))]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Field Construction and Access
(define field-counter 0)
(define (make-field name initial-value)
(define desc (field-descriptor name field-counter))
(set! field-counter (+ field-counter 1))
(hash-set! (actor-state-field-table (current-actor-state))
desc
(make-ephemeron desc initial-value))
(field-handle desc))
(define (field-scope-error who desc)
(error who "Field ~a used out-of-scope" (field-descriptor-name desc)))
(define (field-ref desc)
(ephemeron-value
(hash-ref (actor-state-field-table (current-actor-state))
desc
(lambda () (field-scope-error 'field-ref desc)))))
(define (field-set! desc v)
(define a (current-actor-state))
(define ft (actor-state-field-table a))
(unless (hash-has-key? ft desc)
(field-scope-error 'field-set! desc))
(hash-set! ft desc (make-ephemeron desc v)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Facet Storage in an Actor
(define (facet-live? fid)
(hash-has-key? (actor-state-facets (current-actor-state)) fid))
(define (lookup-facet fid)
(hash-ref (actor-state-facets (current-actor-state)) fid #f))
(define (facet-live-but-inert? fid)
(define f (lookup-facet fid))
(and f
(hash-empty? (facet-endpoints f))
(set-empty? (facet-children f))))
(define (update-facet! fid proc)
(define old-facet (lookup-facet fid))
(define new-facet (proc old-facet))
(store-facet! fid new-facet))
(define (store-facet! fid new-facet)
(define a (current-actor-state))
(current-actor-state
(struct-copy actor-state a
[facets (hash-set/remove (actor-state-facets a) fid new-facet)])))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Entering and Leaving Facet Context; Queueing of Work Items
(define-syntax-rule (with-current-facet fid in? body ...)
(parameterize ((current-facet-id fid)
(in-script? in?))
body ...))
(define (capture-facet-context proc)
(let ((fid (current-facet-id)))
(lambda args
(with-current-facet fid #t
(call-with-syndicate-effects
(lambda () (apply proc args)))))))
(define (schedule-script! #:priority [priority *normal-priority*] thunk)
(push-script! priority (capture-facet-context thunk)))
(define (push-script! priority thunk-with-context)
(define v (current-pending-scripts))
(vector-set! v priority (enqueue (vector-ref v priority) thunk-with-context)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Action Queue Management
(define (schedule-action! ac)
(if (patch? ac)
(when (patch-non-empty? ac)
(current-pending-patch (compose-patch ac (current-pending-patch))))
(begin (flush-pending-patch!)
(current-pending-actions (list (current-pending-actions)
((current-action-transformer) ac))))))
(define (schedule-actions! . acs)
(for [(ac (core:clean-actions acs))] (schedule-action! ac)))
(define (flush-pending-patch!)
(define p (current-pending-patch))
(when (patch-non-empty? p)
(current-pending-patch patch-empty)
(current-pending-actions (list (current-pending-actions)
((current-action-transformer) p)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Endpoint Creation
(define (add-endpoint! where patch-fn handler-fn)
(when (in-script?)
(error 'add-endpoint!
"~a: Cannot add endpoint in script; are you missing a (react ...)?"
where))
(define-values (new-eid delta-aggregate)
(let ()
(define a (current-actor-state))
(define new-eid (mux-next-pid (actor-state-mux a)))
(define-values (new-mux _new-eid _delta delta-aggregate)
(mux-add-stream (actor-state-mux a)
(parameterize ((current-dataflow-subject-id
(list (current-facet-id) new-eid)))
(patch-fn))))
(current-actor-state (struct-copy actor-state (current-actor-state) [mux new-mux]))
(values new-eid delta-aggregate)))
(update-facet! (current-facet-id)
(lambda (f)
(and f
(struct-copy facet f
[endpoints
(hash-set (facet-endpoints f)
new-eid
(endpoint new-eid patch-fn handler-fn))]))))
(schedule-action! delta-aggregate))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Facet Lifecycle
(define next-fid-uid 0)
(define (add-facet! where setup-proc)
(when (not (in-script?))
(error 'add-facet!
"~a: Cannot add facet outside script; are you missing an (on ...)?"
where))
(define parent-fid (current-facet-id))
(define fid-uid next-fid-uid)
(define fid (cons fid-uid parent-fid))
(set! next-fid-uid (+ next-fid-uid 1))
(update-facet! fid (lambda (_f) (facet fid (hasheqv) '() (set))))
(update-facet! parent-fid
(lambda (pf)
(and pf (struct-copy facet pf
[children (set-add (facet-children pf) fid)]))))
(with-current-facet fid #f
(setup-proc)
(schedule-script!
(lambda ()
(when (and (facet-live? fid)
(or (and (pair? parent-fid) (not (facet-live? parent-fid)))
(facet-live-but-inert? fid)))
(terminate-facet! fid)))))
(facet-handle-event! fid
(lookup-facet fid)
(patch (actor-state-knowledge (current-actor-state)) trie-empty)
#t))
;; If the named facet is live, terminate it.
(define (terminate-facet! fid)
(define f (lookup-facet fid))
(when f
(define parent-fid (cdr fid))
(when (pair? parent-fid)
(update-facet! parent-fid
(lambda (f)
(and f
(struct-copy facet f
[children (set-remove (facet-children f)
fid)])))))
(store-facet! fid #f)
(for [(child-fid (in-set (facet-children f)))]
(terminate-facet! child-fid))
;; Run stop-scripts after terminating children. This means that
;; children's stop-scripts run before ours.
(with-current-facet fid #t
(map schedule-script! (reverse (facet-stop-scripts f))))
(schedule-script!
(lambda ()
(for [((eid ep) (in-hash (facet-endpoints f)))]
(define a (current-actor-state))
(dataflow-forget-subject! (actor-state-field-dataflow a) (list fid eid))
(define-values (new-mux _eid _delta delta-aggregate)
(mux-remove-stream (actor-state-mux a) eid))
(current-actor-state (struct-copy actor-state a [mux new-mux]))
(schedule-action! delta-aggregate))))
(schedule-script!
#:priority *gc-priority*
(lambda ()
(when (facet-live-but-inert? parent-fid)
(log-info "terminating ~v because it's dead and child ~v terminated" parent-fid fid)
(terminate-facet! parent-fid))))))
(define (add-stop-script! script-proc)
(update-facet! (current-facet-id)
(lambda (f)
(and f
(struct-copy facet f
[stop-scripts (cons script-proc (facet-stop-scripts f))])))))
(define (make-empty-pending-scripts)
(make-vector priority-count (make-queue)))
(define (boot-actor script-proc)
(with-store [(current-actor-state
(actor-state (mux)
(hash)
trie-empty
trie-empty
(make-weak-hasheq)
(make-dataflow-graph)))
(current-pending-patch patch-empty)
(current-pending-actions '())
(current-pending-scripts (make-empty-pending-scripts))
(current-action-transformer values)]
(with-current-facet '() #f
(schedule-action! (core:retract ?))
;; Retract any initial-assertions we might have been given. We
;; must ensure that we explicitly maintain them: retracting them
;; here prevents us from accidentally relying on their
;; persistence from our creation.
(schedule-script! script-proc)
(run-scripts!))))
(define (pop-next-script!)
(define priority-levels (current-pending-scripts))
(let loop ((level 0))
(and (< level (vector-length priority-levels))
(let ((q (vector-ref priority-levels level)))
(if (queue-empty? q)
(loop (+ level 1))
(let-values (((script q) (dequeue q)))
(vector-set! priority-levels level q)
script))))))
(define (run-all-pending-scripts!)
(define script (pop-next-script!))
(when script
(script)
(refresh-facet-assertions!)
(run-all-pending-scripts!)))
(define (run-scripts!)
(run-all-pending-scripts!)
(flush-pending-patch!)
(define pending-actions (current-pending-actions))
(current-pending-actions '())
(if (hash-empty? (actor-state-facets (current-actor-state)))
(core:quit pending-actions)
(core:transition (current-actor-state) pending-actions)))
(define (refresh-facet-assertions!)
(dataflow-repair-damage! (actor-state-field-dataflow (current-actor-state))
(lambda (subject-id)
(match-define (list fid eid) subject-id)
(define f (lookup-facet fid))
(when f
(with-current-facet fid #f
(define ep (hash-ref (facet-endpoints f) eid))
(define new-patch ((endpoint-patch-fn ep)))
(define a (current-actor-state))
(define new-interests
(trie-subtract (patch-added new-patch)
(mux-interests-of (actor-state-mux a) eid)
#:combiner (lambda (v1 v2) trie-empty)))
(define newly-relevant-knowledge
(biased-intersection (actor-state-knowledge a) new-interests))
(update-stream! eid (compose-patch new-patch (core:retract ?)))
(facet-handle-event! fid
(lookup-facet fid)
(patch newly-relevant-knowledge trie-empty)
#t))))))
(define (update-stream! eid patch)
(define a (current-actor-state))
(define-values (new-mux _eid _delta delta-aggregate)
(mux-update-stream (actor-state-mux a) eid patch))
(current-actor-state (struct-copy actor-state a [mux new-mux]))
(schedule-action! delta-aggregate))
(define (actor-behavior e a)
(and e
(with-store [(current-actor-state
(if (patch? e)
(struct-copy actor-state a
[previous-knowledge (actor-state-knowledge a)]
[knowledge (update-interests (actor-state-knowledge a) e)])
a))
(current-pending-patch patch-empty)
(current-pending-actions '())
(current-pending-scripts (make-empty-pending-scripts))
(current-action-transformer values)]
(for [((fid f) (in-hash (actor-state-facets a)))]
(facet-handle-event! fid f e #f))
(run-scripts!))))
(define (facet-handle-event! fid f e synthetic?)
(define mux (actor-state-mux (current-actor-state)))
(with-current-facet fid #f
(for [(ep (in-hash-values (facet-endpoints f)))]
((endpoint-handler-fn ep) e (mux-interests-of mux (endpoint-id ep)) synthetic?))))
(module+ implementation-details
(provide actor-behavior
boot-actor
make-field
(struct-out field-descriptor)
(struct-out field-handle)
(struct-out actor-state)
(struct-out facet)
(struct-out endpoint)
field-ref
field-set!
suspend-script
suspend-script*
capture-actor-actions))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Script suspend-and-resume.
(define prompt-tag (make-continuation-prompt-tag 'syndicate))
(define (syndicate-effects-available?)
(continuation-prompt-available? prompt-tag))
(define (call-with-syndicate-effects thunk)
(call-with-continuation-prompt thunk prompt-tag))
(define (capture-actor-actions thunk)
(call-with-syndicate-effects
(lambda ()
(with-store [(current-pending-actions '())
(current-pending-patch patch-empty)
(current-action-transformer values)]
(call-with-values thunk
(lambda results
(flush-pending-patch!)
(when (> (length results) 1)
(error 'capture-actor-actions
"~a values supplied in top-level Syndicate action; more than one is unacceptable"
(length results)))
(cons results (current-pending-actions))))))))
(module+ for-module-begin
(provide capture-actor-actions))
(define (suspend-script* where proc)
(when (not (in-script?))
(error 'suspend-script
"~a: Cannot suspend script outside script; are you missing an (on ...)?"
where))
(call-with-composable-continuation
(lambda (k)
(abort-current-continuation
prompt-tag
(lambda ()
(define suspended-fid (current-facet-id))
(define in? (in-script?))
(define stale? #f)
(define raw-resume-parent
(capture-facet-context
(lambda results
(parameterize ((in-script? in?))
(apply k results)))))
(define resume-parent
(lambda results
(when stale? (error 'suspend-script
"Attempt to resume suspension (suspended at ~a) more than once"
where))
(set! stale? #t)
(abort-current-continuation
prompt-tag
(lambda ()
(let ((invoking-fid (current-facet-id)))
(when (not (equal? invoking-fid suspended-fid))
(terminate-facet! invoking-fid)))
(push-script! *normal-priority*
(lambda () (apply raw-resume-parent results)))))))
(proc resume-parent))))
prompt-tag))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Immediate actions
(define (ensure-in-script! who)
(when (not (in-script?))
(error who "Attempt to perform action outside script; are you missing an (on ...)?")))
(define (send! M)
(ensure-in-script! 'send!)
(schedule-action! (core:message M)))
(define *adhoc-label* -1)
(define (assert! P)
(ensure-in-script! 'assert!)
(update-stream! *adhoc-label* (core:assert P)))
(define (retract! P)
(ensure-in-script! 'retract!)
(update-stream! *adhoc-label* (core:retract P)))
(define (patch! p)
(ensure-in-script! 'patch!)
(update-stream! *adhoc-label* p))
(define (perform-actions! acs)
(ensure-in-script! 'perform-actions!)
(for [(ac (core:clean-actions acs))]
(match ac
[(? patch? p) (update-stream! *adhoc-label* p)]
[_ (schedule-action! ac)])))
(define (flush!)
(ensure-in-script! 'flush!)
(define ack (gensym 'flush!))
(until (core:message ack)
(on-start (send! ack))))
(define (quit-dataspace!)
(ensure-in-script! 'quit-dataspace!)
(schedule-action! (core:quit-dataspace)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (format-field-descriptor d)
(match-define (field-descriptor name id) d)
(format "~a/~a" name id))
(define (pretty-print-actor-state a p)
(match-define (actor-state mux facets _ knowledge field-table dfg) a)
(fprintf p "ACTOR:\n")
(fprintf p " - ")
(display (indented-port-output 3 (lambda (p) (syndicate-pretty-print mux p)) #:first-line? #f) p)
(newline p)
(fprintf p " - Knowledge:\n ~a" (trie->pretty-string knowledge #:indent 3))
(fprintf p " - Facets:\n")
(for ([(fid f) (in-hash facets)])
(match-define (facet _fid endpoints _ children) f)
(fprintf p " ---- facet ~a, children=~a" fid (set->list children))
(when (not (hash-empty? endpoints))
(fprintf p ", endpoints=~a" (hash-keys endpoints)))
(newline p))
(when (not (hash-empty? field-table))
(fprintf p " - Fields:\n")
(for ([(d ve) (in-hash field-table)])
(define subject-ids (hash-ref (dataflow-graph-edges-forward dfg) d set))
(define v (ephemeron-value ve))
(define v*
(indented-port-output 6 (lambda (p) (syndicate-pretty-print v p)) #:first-line? #f))
(if (set-empty? subject-ids)
(fprintf p " - ~a: ~a\n" (format-field-descriptor d) v*)
(fprintf p " - ~a: ~a ~a\n"
(format-field-descriptor d)
(for/list [(subject-id subject-ids)]
(match-define (list fid eid) subject-id)
(format "~a:~a" fid eid))
v*)))))