diff --git a/syndicate/dataspace.rkt b/syndicate/dataspace.rkt index f359cd8..9bf2ba9 100644 --- a/syndicate/dataspace.rkt +++ b/syndicate/dataspace.rkt @@ -1,5 +1,7 @@ #lang racket/base +(provide ) + (require syndicate/functional-queue) (require syndicate/dataflow) (require racket/match) @@ -23,15 +25,15 @@ ;; A `Dataspace` is a ... TODO ;; An `Action` is either `(patch (Deltaof Assertion))` or `(message -;; Assertion)` or `(spawn BootProc)`. +;; Assertion)` or `(spawn Any BootProc (Set Assertion))`. (struct patch (changes) #:prefab) (struct message (body) #:prefab) -(struct spawn (boot-proc) #:prefab) +(struct spawn (name boot-proc initial-assertions) #:prefab) (struct dataspace ([next-id #:mutable] ;; Nat routing-table ;; Skeleton facets ;; (MutableHash FID Facet) - actors ;; (MutableSetof FID) + actors ;; (MutableHash FID Any) ;; maps FID to actor name assertions ;; (Bagof Assertion) dataflow ;; DataflowGraph pending-scripts ;; (MutableVectorof (Queueof (-> Any))) @@ -116,15 +118,17 @@ ;;--------------------------------------------------------------------------- -(define (make-dataspace) - (dataspace 0 - (make-empty-skeleton) - (make-hash) - (mutable-set) - (make-bag) - (make-dataflow-graph) - (make-vector priority-count (make-queue)) - (make-queue))) +(define (make-dataspace name boot-proc) + (define ds (dataspace 0 + (make-empty-skeleton) + (make-hash) + (make-hash) + (make-bag) + (make-dataflow-graph) + (make-vector priority-count (make-queue)) + (make-queue))) + (dataspace-spawn! ds name boot-proc (set)) + ds) (define (generate-id! ds) (let ((id (dataspace-next-id ds))) @@ -140,19 +144,41 @@ (define (actor-fid? fid) (null? (fid-parent fid))) -(define (add-actor! ds boot-proc) +(define (fid->actor-fid fid) + (if (actor-fid? fid) + fid + (fid->actor-fid (fid-parent fid)))) + +(define (fid-ancestor? fid maybe-ancestor) + (and (pair? fid) ;; empty fid lists obviously no ancestors at all! + (or (equal? fid maybe-ancestor) + (fid-ancestor? (cdr fid) maybe-ancestor)))) + +(define (add-actor! ds name boot-proc initial-assertions) (define actor-fid (generate-fid! ds '())) - (set-add! (dataspace-actors ds) actor-fid) - (add-facet! ds actor-fid boot-proc)) + (hash-set! (dataspace-actors ds) actor-fid name) + (for [(a initial-assertions)] + (match (bag-change! (dataspace-assertions ds) a 1) + ['absent->present (add-assertion! (dataspace-routing-table ds) a)] + ;; 'absent->absent and 'present->absent absurd + ['present->present (void)])) ;; i.e. no visible change + (add-facet! ds #f actor-fid (lambda () + (boot-proc) + (for [(a initial-assertions)] (dataspace-retract! ds a))))) (define (lookup-facet ds fid) (hash-ref (dataspace-facets ds) fid #f)) -(define-syntax-rule (with-current-facet [ds fid script?] body ...) - (parameterize ((current-dataspace ds) - (current-facet-id fid) - (in-script? script?)) - body ...)) +(define-syntax-rule (with-current-facet [ds0 fid0 script?] body ...) + (let ((ds ds0) + (fid fid0)) + (parameterize ((current-dataspace ds) + (current-facet-id fid) + (in-script? script?)) + (with-handlers ([(lambda (e) (not (exn:break? e))) + (lambda (e) (terminate-actor! ds (fid->actor-fid fid)))]) ;; TODO: tracing + body ... + (void))))) (define (capture-facet-context proc) (let ((ds (current-dataspace)) @@ -209,8 +235,8 @@ ['present->present (void)]))] ;; i.e. no visible change [(message body) (send-assertion! (dataspace-routing-table ds) body)] - [(spawn boot-proc) - (add-actor! ds boot-proc)])) + [(spawn name boot-proc initial-assertions) + (add-actor! ds name boot-proc initial-assertions)])) (not (null? actions))) (define (run-scripts! ds) @@ -221,7 +247,11 @@ ;; being held elsewhere! (or ran-a-script performed-an-action)) -(define (add-facet! ds fid boot-proc) +(define (add-facet! ds where fid boot-proc) + (when (and (not (in-script?)) where) + (error 'add-facet! + "~a: Cannot add facet outside script; are you missing an (on ...)?" + where)) (define parent-fid (fid-parent fid)) (define f (facet fid (make-hash) @@ -233,11 +263,11 @@ (when pf (set-facet-children! pf (set-add (facet-children pf) fid)))) (with-current-facet [ds fid #f] (boot-proc)) - (schedule-script! ds (lambda () - (when (and (facet-live? ds fid) - (or (and (pair? parent-fid) (not (facet-live? ds parent-fid))) - (facet-live-but-inert? ds fid))) - (terminate-facet! ds fid))))) + (schedule-script!* ds (lambda () + (when (and (facet-live? ds fid) + (or (and (pair? parent-fid) (not (facet-live? ds parent-fid))) + (facet-live-but-inert? ds fid))) + (terminate-facet! ds fid))))) (define (facet-live? ds fid) (hash-has-key? (dataspace-facets ds) fid)) @@ -249,9 +279,34 @@ (set-empty? (facet-children f)))) (define (schedule-script! #:priority [priority *normal-priority*] ds thunk) - (define v (dataspace-pending-scripts ds)) - (vector-set! v priority (enqueue (vector-ref v priority) (capture-facet-context thunk)))) + (schedule-script!* #:priority priority ds (capture-facet-context thunk))) +(define (schedule-script!* #:priority [priority *normal-priority*] ds thunk) + (define v (dataspace-pending-scripts ds)) + (vector-set! v priority (enqueue (vector-ref v priority) thunk))) + +;; Precondition: `f` is the `facet` struct that is/was associated with `fid` in `ds` +(define (retract-facet-assertions-and-subscriptions! ds fid f) + (schedule-script!* ds (lambda () + (for [((eid ep) (in-hash (facet-endpoints f)))] + (dataflow-forget-subject! (dataspace-dataflow ds) (list fid eid)) + (dataspace-retract! ds (endpoint-assertion ep)) + (define h (endpoint-handler ep)) + (when h (dataspace-unsubscribe! ds h)))))) + +;; Abruptly terminates an entire actor, without running stop-scripts etc. +(define (terminate-actor! ds actor-fid) + (when (not (actor-fid? actor-fid)) + (error 'terminate-actor! "Attempt to terminate non-actor FID ~a" actor-fid)) + (hash-remove! (dataspace-actors ds) actor-fid) + (let abort-facet! ((fid actor-fid)) + (define f (lookup-facet ds fid)) + (when f + (hash-remove! (dataspace-facets ds) fid) + (for [(child-fid (in-set (facet-children f)))] (abort-facet! child-fid)) + (retract-facet-assertions-and-subscriptions! ds fid f)))) + +;; Cleanly terminates a facet and its children, running stop-scripts etc. (define (terminate-facet! ds fid) (define f (lookup-facet ds fid)) (when f @@ -269,24 +324,20 @@ ;; Run stop-scripts after terminating children. This means that ;; children's stop-scripts run before ours. (for [(script (reverse (facet-stop-scripts f)))] - (schedule-script! ds script)) + (schedule-script! ds + (lambda () + (with-current-facet [ds fid #t] + (script))))) - (schedule-script! - ds - (lambda () - (for [((eid ep) (in-hash (facet-endpoints f)))] - (dataflow-forget-subject! (dataspace-dataflow ds) (list fid eid)) - (dataspace-retract! ds (endpoint-assertion ep)) - (define h (endpoint-handler ep)) - (when h (dataspace-unsubscribe! ds h))))) + (retract-facet-assertions-and-subscriptions! ds fid f) - (schedule-script! + (schedule-script!* #:priority *gc-priority* ds (lambda () - (when (and (pair? parent-fid) (facet-live-but-inert? ds parent-fid)) - (log-info "terminating ~v because it's dead and child ~v terminated" parent-fid fid) - (terminate-facet! ds parent-fid)))))) + (if (pair? parent-fid) + (when (facet-live-but-inert? ds parent-fid) (terminate-facet! ds parent-fid)) + (terminate-actor! ds fid)))))) (define (stop-facet! ds fid stop-script) (with-current-facet [ds (fid-parent fid) #t] ;; run in parent context wrt terminating facet @@ -294,6 +345,10 @@ (terminate-facet! ds fid) (schedule-script! ds stop-script))))) +(define (add-stop-script! ds script-proc) + (define f (lookup-facet ds (current-facet-id))) + (when f (set-facet-stop-scripts! f (cons script-proc (facet-stop-scripts f))))) + (define (add-endpoint! ds where assertion-fn handler) (when (in-script?) (error 'add-endpoint! @@ -309,10 +364,13 @@ (when handler (dataspace-subscribe! ds handler)) (hash-set! (facet-endpoints (lookup-facet ds fid)) eid ep)) +(define (enqueue-action! ds action) + (set-dataspace-pending-actions! ds (enqueue (dataspace-pending-actions ds) action))) + (define (ensure-patch-action! ds) - (define old-q (dataspace-pending-actions ds)) - (when (or (queue-empty? old-q) (not (patch? (queue-last old-q)))) - (set-dataspace-pending-actions! ds (enqueue old-q (patch (make-bag))))) + (let ((q (dataspace-pending-actions ds))) + (when (or (queue-empty? q) (not (patch? (queue-last q)))) + (enqueue-action! ds (patch (make-bag))))) (patch-changes (queue-last (dataspace-pending-actions ds)))) (define (dataspace-retract! ds assertion) @@ -330,7 +388,10 @@ (add-interest! (dataspace-routing-table ds) h)) (define (dataspace-send! ds body) - (set-dataspace-pending-actions! ds (enqueue (dataspace-pending-actions ds) (message body)))) + (enqueue-action! ds (message body))) + +(define (dataspace-spawn! ds name boot-proc initial-assertions) + (enqueue-action! ds (spawn name boot-proc initial-assertions))) (module+ test ;; TODO: move somewhere sensible @@ -344,82 +405,92 @@ ;; TODO: move somewhere sensible (assertion-struct observe (specification)) - (define ds (make-dataspace)) - (add-actor! ds - (lambda () - (define current-value (field-handle 'current-value - (generate-id! (current-dataspace)) - (current-facet-id) - 0)) - (add-endpoint! (current-dataspace) - 'stop-when-ten - (lambda () - (when (= (current-value) 10) - (stop-facet! (current-dataspace) - (current-facet-id) - (lambda () - (log-info "box: terminating")))) - (void)) - #f) - (add-endpoint! (current-dataspace) - 'assert-box-state - (lambda () (box-state (current-value))) - #f) - (add-endpoint! (current-dataspace) - 'on-message-set-box - (lambda () (observe (set-box (capture (discard))))) - (skeleton-interest (list struct:set-box #f) - '() - '() - '((0 0)) - (capture-facet-context - (lambda (op new-value) - (when (eq? '! op) - (schedule-script! - (current-dataspace) - (lambda () - (log-info "box: taking on new-value ~v" new-value) - (current-value new-value)))))))))) - (add-actor! ds - (lambda () - (add-endpoint! (current-dataspace) - 'stop-when-retracted-observe-set-box - (lambda () (observe (observe (set-box (discard))))) - (skeleton-interest (list struct:observe (list struct:set-box #f)) - '() - '() - '() - (capture-facet-context - (lambda (op) - (when (eq? '- op) - (stop-facet! - (current-dataspace) - (current-facet-id) - (lambda () - (log-info "client: box has gone")))))))) - (add-endpoint! (current-dataspace) - 'on-asserted-box-state - (lambda () (observe (box-state (capture (discard))))) - (skeleton-interest (list struct:box-state #f) - '() - '() - '((0 0)) - (capture-facet-context - (lambda (op v) - (when (eq? '+ op) - (schedule-script! - (current-dataspace) - (lambda () - (log-info "client: learned that box's value is now ~v" v) - (dataspace-send! (current-dataspace) - (set-box (+ v 1)))))))))))) + (define ds + (make-dataspace + 'ground + (lambda () + (dataspace-spawn! + ds + 'box + (lambda () + (define current-value (field-handle 'current-value + (generate-id! (current-dataspace)) + (fid->actor-fid (current-facet-id)) + 0)) + (add-endpoint! (current-dataspace) + 'stop-when-ten + (lambda () + (when (= (current-value) 10) + (stop-facet! (current-dataspace) + (current-facet-id) + (lambda () + (log-info "box: terminating")))) + (void)) + #f) + (add-endpoint! (current-dataspace) + 'assert-box-state + (lambda () (box-state (current-value))) + #f) + (add-endpoint! (current-dataspace) + 'on-message-set-box + (lambda () (observe (set-box (capture (discard))))) + (skeleton-interest (list struct:set-box #f) + '() + '() + '((0 0)) + (capture-facet-context + (lambda (op new-value) + (when (eq? '! op) + (schedule-script! + (current-dataspace) + (lambda () + (log-info "box: taking on new-value ~v" new-value) + (current-value new-value))))))))) + (set)) + (dataspace-spawn! + ds + 'client + (lambda () + (add-endpoint! (current-dataspace) + 'stop-when-retracted-observe-set-box + (lambda () (observe (observe (set-box (discard))))) + (skeleton-interest (list struct:observe (list struct:set-box #f)) + '() + '() + '() + (capture-facet-context + (lambda (op) + (when (eq? '- op) + (stop-facet! + (current-dataspace) + (current-facet-id) + (lambda () + (log-info "client: box has gone")))))))) + (add-endpoint! (current-dataspace) + 'on-asserted-box-state + (lambda () (observe (box-state (capture (discard))))) + (skeleton-interest (list struct:box-state #f) + '() + '() + '((0 0)) + (capture-facet-context + (lambda (op v) + (when (eq? '+ op) + (schedule-script! + (current-dataspace) + (lambda () + (log-info "client: learned that box's value is now ~v" v) + (dataspace-send! (current-dataspace) + (set-box (+ v 1))))))))))) + (set))))) (require racket/pretty) ;; (pretty-print ds) - (let loop ((i 0)) - ;; (printf "--- i = ~v\n" i) - (when (run-scripts! ds) - ;; (pretty-print ds) - (loop (+ i 1)))) + (#;time values + (let loop ((i 0)) + ;; (printf "--- i = ~v\n" i) + (when (run-scripts! ds) + ;; (pretty-print ds) + (loop (+ i 1))))) ;; (pretty-print ds) ) diff --git a/syndicate/syntax-classes.rkt b/syndicate/syntax-classes.rkt new file mode 100644 index 0000000..e67de9b --- /dev/null +++ b/syndicate/syntax-classes.rkt @@ -0,0 +1,20 @@ +#lang racket/base +;; Common syntax classes. + +(provide (for-syntax assertions + name)) + +(require racket/set) + +(require (for-syntax racket/base)) +(require (for-syntax syntax/parse)) +(require (for-syntax syntax/srcloc)) + +(begin-for-syntax + (define-splicing-syntax-class assertions + (pattern (~seq #:assertions [exprs ...])) + (pattern (~seq) #:attr (exprs 1) #'())) + + (define-splicing-syntax-class name + (pattern (~seq #:name N)) + (pattern (~seq) #:attr N #'#f))) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt new file mode 100644 index 0000000..45198cc --- /dev/null +++ b/syndicate/syntax.rkt @@ -0,0 +1,1126 @@ +#lang racket/base +;; DSL syntax over the API of dataspace.rkt + +(provide spawn + spawn* + + react + react/suspend + until + + current-facet-id + field + assert + stop-facet + stop-current-facet + stop-when + stop-when-true + on-start + on-stop + on + during + during/spawn + begin/dataflow + define/dataflow + + asserted + retracted + message + + let-event + + query-value + query-set + query-hash + query-hash-set + query-count + query-value* + query-set* + query-hash* + query-hash-set* + query-count* + define/query-value + define/query-set + define/query-hash + define/query-hash-set + define/query-count + immediate-query + + send! + assert! + retract! + patch! + perform-actions! + flush! + + syndicate-effects-available? + + ? ;; from pattern.rkt + + ;; + + current-action-transformer + schedule-action! + schedule-actions! + (for-syntax (rename-out [name actor-name])) + + pretty-print-actor-state + ) + +(require (for-syntax racket/base)) +(require (for-syntax syntax/parse)) +(require (for-syntax syntax/srcloc)) +(require "syntax-classes.rkt") + +(require "dataspace.rkt") +(require (submod "dataspace.rkt" priorities)) + +(begin-for-syntax + (define-splicing-syntax-class actor-wrapper + (pattern (~seq #:spawn wrapper)) + (pattern (~seq) #:attr wrapper #'spawn)) + + (define-splicing-syntax-class on-crash-option + (pattern (~seq #:on-crash expr)) + (pattern (~seq) #:attr expr #f)) + + (define-splicing-syntax-class let-option + (pattern (~seq #:let clauses)) + (pattern (~seq) #:attr clauses #'())) + + (define-splicing-syntax-class when-pred + (pattern (~seq #:when Pred)) + (pattern (~seq) #:attr Pred #'#t)) + + (define-splicing-syntax-class priority + (pattern (~seq #:priority level)) + (pattern (~seq) #:attr level #'*normal-priority*))) + +(define-syntax (spawn stx) + (syntax-parse stx + [(_ (~or (~optional (~seq #:name name-expr) #:defaults ([name-expr #'#f]) + #:name "#:name") + (~optional (~seq #:assertions [assertion-exprs ...]) + #:name "#:assertions") + (~optional (~seq #:linkage [linkage-expr ...]) #:defaults ([(linkage-expr 1) '()]) + #:name "#:linkage")) + ... + O ...) + (quasisyntax/loc stx + (spawn** #:name name-expr + #:assertions #,(cond [(attribute assertion-exprs) #'[assertion-exprs ...]] + [else #'[]]) + linkage-expr ... O ...))])) + +(define-syntax (spawn* stx) + (syntax-parse stx + [(_ name:name assertions:assertions script ...) + (quasisyntax/loc stx + (spawn** #:name name.N + #:assertions [assertions.exprs ...] + (on-start script ...)))])) + +(define-syntax (spawn** stx) + (syntax-parse stx + [(_ name:name assertions:assertions script ...) + (quasisyntax/loc stx + (dataspace-spawn! + (current-dataspace) + name.N + (lambda () (begin/void-default script ...)) + (set assertions.exprs ...)))])) + +(define (react* where boot-proc) + (define ds (current-dataspace)) + (add-facet! ds + where + (generate-fid! ds (current-facet-id)) + boot-proc)) + +(define-syntax (react stx) + (syntax-parse stx + [(_ O ...) + (quasisyntax/loc stx + (react* #,(source-location->string stx) + (lambda () (begin/void-default O ...))))])) + +(define-syntax (react/suspend stx) + (syntax-parse stx + [(_ (resume-parent) O ...) + (quasisyntax/loc stx + (suspend-script* #,(source-location->string stx) + (lambda (resume-parent) + (react* #,(source-location->string stx) + (lambda () (begin/void-default O ...))))))])) + +(define-syntax (until stx) + (syntax-parse stx + [(_ E O ...) + (syntax/loc stx + (react/suspend (continue) + (stop-when E (continue (void))) + O ...))])) + +(define (make-field name init) + (field-handle name + (generate-id! (current-dataspace)) + (fid->actor-fid (current-facet-id)) + init)) + +(define-syntax (define-field stx) + (syntax-parse stx + [(_ id init) + #'(define id (make-field 'id init))])) + +(define-syntax (field stx) + (syntax-parse stx + [(_ [id:id init] ...) + (quasisyntax/loc stx + (begin (define-field id init) + ...))])) + +(define-syntax (assert stx) + (syntax-parse stx + [(_ w:when-pred P) + (quasisyntax/loc stx + (add-endpoint! (current-dataspace) + #,(source-location->string stx) + (lambda () (when #'w.Pred P)) + #f))])) + +(define-syntax (stop-facet stx) + (syntax-parse stx + [(_ fid-expr script ...) + (quasisyntax/loc stx + (let ((fid fid-expr)) + (when (not (fid-ancestor? (current-facet-id) fid)) + (error 'stop-facet "Attempt to stop non-ancestor facet ~a" fid)) + (stop-facet! (current-dataspace) fid (lambda () (begin/void-default script ...)))))])) + +(define-syntax-rule (stop-current-facet script ...) + (stop-facet (current-facet-id) script ...)) + +(define-syntax-rule (stop-when-true condition script ...) + (begin/dataflow + (when condition + (stop-facet (current-facet-id) script ...)))) + +(define-syntax (on-start stx) + (syntax-parse stx + [(_ script ...) + (quasisyntax/loc stx + (schedule-script! (current-dataspace) + (lambda () (begin/void-default script ...))))])) + +(define-syntax (on-stop stx) + (syntax-parse stx + [(_ script ...) + (quasisyntax/loc stx + (add-stop-script! (current-dataspace) + (lambda () (begin/void-default script ...))))])) + +(define-syntax (stop-when stx) + (syntax-parse stx + [(_ w:when-pred E prio:priority script ...) + (analyze-event stx + #'w.Pred + #'E + (syntax/loc stx (stop-current-facet script ...)) + #'prio.level)])) + +(define-syntax (on stx) + (syntax-parse stx + [(_ w:when-pred E prio:priority script ...) + (analyze-event stx + #'w.Pred + #'E + (syntax/loc stx (begin/void-default script ...)) + #'prio.level)])) + +(define-syntax (begin/dataflow stx) + (syntax-parse stx + [(_ prio:priority expr ...) + (quasisyntax/loc stx + (let () + (add-endpoint! (current-dataspace) + #,(source-location->string stx) + (lambda () + (define subject-id (current-dataflow-subject-id)) + (schedule-script! + #:priority prio.level + (lambda () + (parameterize ((current-dataflow-subject-id subject-id)) + expr ...))) + (void)) + #f)))])) + +(define-syntax (define/dataflow stx) + (syntax-parse stx + [(_ fieldname expr) + (quasisyntax/loc stx (define/dataflow fieldname expr #:default #f))] + [(_ fieldname expr #:default default-expr) + (quasisyntax/loc stx + (begin + (field [fieldname default-expr]) + (begin/dataflow (fieldname expr))))])) + +(define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx)) +(define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx)) + +(define-syntax (suspend-script stx) + (syntax-parse stx + [(_ proc) + (quasisyntax/loc stx + (suspend-script* #,(source-location->string stx) proc))])) + +(define-syntax (let-event stx) + (syntax-parse stx + [(_ [e ...] body ...) + (syntax/loc stx + ((react/suspend (k) + (on-start (-let-event [e ...] (stop-current-facet (k (lambda () body ...))))))))])) + +(define-syntax (-let-event stx) + (syntax-parse stx + [(_ [] expr) #'expr] + [(_ [e es ...] expr) (quasisyntax/loc #'e (react (stop-when e (-let-event [es ...] expr))))])) + +;; HERE + +(define-syntax (during stx) + (syntax-parse stx + [(_ P O ...) + (define E-stx (syntax/loc #'P (asserted P))) + (define-values (_proj _pat _bindings instantiated) + (analyze-pattern E-stx #'P)) + (quasisyntax/loc stx + (on #,E-stx + (let ((p #,instantiated)) + (react (stop-when (retracted p)) + O ...))))])) + +(define-syntax (during/spawn stx) + (syntax-parse stx + [(_ P w:actor-wrapper name:name assertions:assertions parent-let:let-option + oncrash:on-crash-option + O ...) + (define E-stx (syntax/loc #'P (asserted P))) + (define-values (_proj _pat _bindings instantiated) + (analyze-pattern E-stx #'P)) + (quasisyntax/loc stx + (on #,E-stx + (let* ((id (gensym 'during/spawn)) + (p #,instantiated) ;; this is the concrete assertion corresponding to demand + (inst (instance id p))) ;; this is the assertion representing supply + (react (stop-when (asserted inst) + ;; Supply (inst) appeared before demand (p) retracted. + ;; Transition to a state where we monitor demand, but also + ;; express interest in supply: this latter acts as a signal + ;; to the supply that it should stick around. We react to + ;; retraction of supply before retraction of demand by + ;; invoking the on-crash expression, if supplied. Once + ;; demand is retracted, this facet terminates, retracting + ;; its interest in supply, thereby signalling to the supply + ;; that it is no longer wanted. + (react (stop-when (retracted inst) ;; NOT OPTIONAL + #,@(if (attribute oncrash.expr) + #'(oncrash.expr) + #'())) + (stop-when (retracted p)))) + (stop-when (retracted p) + ;; Demand (p) retracted before supply (inst) appeared. We + ;; MUST wait for the supply to fully appear so that we can + ;; reliably tell it to shut down. We must maintain interest + ;; in supply until we see supply, and then terminate, thus + ;; signalling to supply that it is no longer wanted. + (react (stop-when (asserted inst))))) + (let parent-let.clauses + (w.wrapper #:linkage [(assert inst) + (stop-when (retracted (observe inst)))] + #:name name.N + #:assertions* assertions.P + O ...)))))])) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Queries + +(begin-for-syntax + (define-splicing-syntax-class on-add + (pattern (~optional (~seq #:on-add expr) #:defaults ([expr #f])))) + (define-splicing-syntax-class on-remove + (pattern (~optional (~seq #:on-remove expr) #:defaults ([expr #f])))) + + (define (schedule-query-handler-stxs maybe-expr-stx) + (if maybe-expr-stx + (quasisyntax/loc maybe-expr-stx + ((schedule-script! #:priority *query-handler-priority* + (lambda () #,maybe-expr-stx)))) + #'()))) + +(define-syntax (query-value stx) + (syntax-parse stx + [(_ field-name absent-expr args ...) + (quasisyntax/loc stx + (let () + (field [field-name absent-expr]) + (query-value* field-name absent-expr args ...)))])) + +(define-syntax (query-value* stx) + (syntax-parse stx + [(_ field-name absent-expr P expr on-add:on-add on-remove:on-remove) + (quasisyntax/loc stx + (let ((F field-name)) + (on (asserted P) #:priority *query-priority* + #,@(schedule-query-handler-stxs (attribute on-add.expr)) + (F expr)) + (on (retracted P) #:priority *query-priority-high* + #,@(schedule-query-handler-stxs (attribute on-remove.expr)) + (F absent-expr)) + F))])) + +(define-syntax (query-set stx) + (syntax-parse stx + [(_ field-name args ...) + (quasisyntax/loc stx + (let () + (field [field-name (set)]) + (query-set* field-name args ...)))])) + +(define-syntax (query-set* stx) + (syntax-parse stx + [(_ field-name P expr on-add:on-add on-remove:on-remove) + (quasisyntax/loc stx + (let ((F field-name)) + (on (asserted P) #:priority *query-priority* + (let ((V expr)) + (when (not (set-member? (F) V)) + #,@(schedule-query-handler-stxs (attribute on-add.expr)) + (F (set-add (F) V))))) + (on (retracted P) #:priority *query-priority-high* + (let ((V expr)) + (when (set-member? (F) V) + #,@(schedule-query-handler-stxs (attribute on-remove.expr)) + (F (set-remove (F) V))))) + F))])) + +(define-syntax (query-hash stx) + (syntax-parse stx + [(_ field-name args ...) + (quasisyntax/loc stx + (let () + (field [field-name (hash)]) + (query-hash* field-name args ...)))])) + +(define-syntax (query-hash* stx) + (syntax-parse stx + [(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove) + (quasisyntax/loc stx + (let ((F field-name)) + (on (asserted P) #:priority *query-priority* + (let ((key key-expr)) + (when (hash-has-key? (F) key) + (log-warning "query-hash: field ~v with pattern ~v: overwriting existing entry ~v" + 'field-name + 'P + key)) + #,@(schedule-query-handler-stxs (attribute on-add.expr)) + (F (hash-set (F) key value-expr)))) + (on (retracted P) #:priority *query-priority-high* + #,@(schedule-query-handler-stxs (attribute on-remove.expr)) + (F (hash-remove (F) key-expr))) + F))])) + +(define-syntax (query-hash-set stx) + (syntax-parse stx + [(_ field-name args ...) + (quasisyntax/loc stx + (let () + (field [field-name (hash)]) + (query-hash-set* field-name args ...)))])) + +(define-syntax (query-hash-set* stx) + (syntax-parse stx + [(_ field-name P key-expr value-expr on-add:on-add on-remove:on-remove) + (quasisyntax/loc stx + (let ((F field-name)) + (on (asserted P) #:priority *query-priority* + (let ((K key-expr) (V value-expr)) + (when (not (hashset-member? (F) K V)) + #,@(schedule-query-handler-stxs (attribute on-add.expr)) + (F (hashset-add (F) K V))))) + (on (retracted P) #:priority *query-priority-high* + (let ((K key-expr) (V value-expr)) + (when (hashset-member? (F) K V) + #,@(schedule-query-handler-stxs (attribute on-remove.expr)) + (F (hashset-remove (F) K V))))) + F))])) + +(define-syntax (query-count stx) + (syntax-parse stx + [(_ field-name args ...) + (quasisyntax/loc stx + (let () + (field [field-name (hash)]) + (query-count* field-name args ...)))])) + +(define-syntax (query-count* stx) + (syntax-parse stx + [(_ field-name P expr on-add:on-add on-remove:on-remove) + (quasisyntax/loc stx + (let ((F field-name)) + (on (asserted P) #:priority *query-priority* + (let ((E expr)) + #,@(schedule-query-handler-stxs (attribute on-add.expr)) + (F (hash-set (F) E (+ 1 (hash-ref (F) E 0)))))) + (on (retracted P) #:priority *query-priority-high* + (let ((E expr)) + #,@(schedule-query-handler-stxs (attribute on-remove.expr)) + (let ((F0 (F))) + (F (match (hash-ref F0 E 0) + [0 F0] ;; huh + [1 (hash-remove F0 E)] + [n (hash-set F0 E (- n 1))]))))) + F))])) + +(define-syntax-rule (define/query-value id ae P x ...) (define id (query-value id ae P x ...))) +(define-syntax-rule (define/query-set id P x ...) (define id (query-set id P x ...))) +(define-syntax-rule (define/query-hash id P x ...) (define id (query-hash id P x ...))) +(define-syntax-rule (define/query-hash-set id P x ...) (define id (query-hash-set id P x ...))) +(define-syntax-rule (define/query-count id P x ...) (define id (query-count id P x ...))) + +(define-syntax (immediate-query stx) + (syntax-case stx () + [(_ [op args ...] ...) + (with-syntax [((query-result ...) (generate-temporaries #'(op ...)))] + (syntax/loc stx + (react/suspend (k) + (define query-result (op query-result args ...)) ... + (on-start (flush!) (k (query-result) ...)))))])) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(require auxiliary-macro-context) + +(define-auxiliary-macro-context + #:context-name event-expander + #:prop-name prop:event-expander + #:prop-predicate-name event-expander? + #:prop-accessor-name event-expander-proc + #:macro-definer-name define-event-expander + #:introducer-parameter-name current-event-expander-introducer + #:local-introduce-name syntax-local-event-expander-introduce + #:expander-id-predicate-name event-expander-id? + #:expander-transform-name event-expander-transform) + +(provide (for-syntax + prop:event-expander + event-expander? + event-expander-proc + syntax-local-event-expander-introduce + event-expander-id? + event-expander-transform) + define-event-expander) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Syntax-time support + +(define (interests-pre-and-post-patch pat synthetic?) + (define (or* x y) (or x y)) + (define a (current-actor-state)) + (define previous-knowledge (if synthetic? trie-empty (actor-state-previous-knowledge a))) + (define old (trie-lookup previous-knowledge pat #f #:wildcard-union or*)) + (define new (trie-lookup (actor-state-knowledge a) pat #f #:wildcard-union or*)) + (values old new)) + +(define (interest-just-appeared-matching? pat synthetic?) + (define-values (old new) (interests-pre-and-post-patch pat synthetic?)) + (and (not old) new)) + +(define (interest-just-disappeared-matching? pat synthetic?) + (define-values (old new) (interests-pre-and-post-patch pat synthetic?)) + (and old (not new))) + +(define-for-syntax (analyze-asserted/retracted outer-expr-stx + when-pred-stx + event-stx + script-stx + asserted? + P-stx + priority-stx) + (define-values (proj-stx pat bindings _instantiated) + (analyze-pattern event-stx P-stx)) + (define event-predicate-stx (if asserted? #'patch/added? #'patch/removed?)) + (define patch-accessor-stx (if asserted? #'patch-added #'patch-removed)) + (define change-detector-stx + (if asserted? #'interest-just-appeared-matching? #'interest-just-disappeared-matching?)) + (quasisyntax/loc outer-expr-stx + (add-endpoint! #,(source-location->string outer-expr-stx) + (lambda () (if #,when-pred-stx + (core:sub #,pat) + patch-empty)) + (lambda (e current-interests synthetic?) + (when (not (trie-empty? current-interests)) + (core:match-event e + [(? #,event-predicate-stx p) + (define proj #,proj-stx) + (define proj-arity (projection-arity proj)) + (define entry-set (trie-project/set #:take proj-arity + (#,patch-accessor-stx p) + proj)) + (when (not entry-set) + (error 'asserted + "Wildcard interest discovered while projecting by ~v at ~a" + proj + #,(source-location->string P-stx))) + (for [(entry (in-set entry-set))] + (let ((instantiated (instantiate-projection proj entry))) + (and (#,change-detector-stx instantiated synthetic?) + (schedule-script! + #:priority #,priority-stx + (lambda () + (match-define (list #,@bindings) entry) + #,script-stx)))))])))))) + +(define-for-syntax orig-insp + (variable-reference->module-declaration-inspector (#%variable-reference))) + +(define-for-syntax (analyze-event outer-expr-stx + when-pred-stx + armed-event-stx + script-stx + priority-stx) + (define event-stx (syntax-disarm armed-event-stx orig-insp)) + (syntax-parse event-stx + #:literals [core:message asserted retracted rising-edge] + [(expander args ...) + #:when (event-expander-id? #'expander) + (event-expander-transform + event-stx + (lambda (result) + (analyze-event outer-expr-stx + when-pred-stx + (syntax-rearm result event-stx) + script-stx + priority-stx)))] + [(core:message P) + (define-values (proj pat bindings _instantiated) + (analyze-pattern event-stx #'P)) + (quasisyntax/loc outer-expr-stx + (add-endpoint! #,(source-location->string outer-expr-stx) + (lambda () (if #,when-pred-stx + (core:sub #,pat) + patch-empty)) + (lambda (e current-interests _synthetic?) + (when (not (trie-empty? current-interests)) + (core:match-event e + [(core:message body) + (define capture-vals + (match-value/captures + body + #,proj)) + (and capture-vals + (schedule-script! + #:priority #,priority-stx + (lambda () + (apply (lambda #,bindings #,script-stx) + capture-vals))))])))))] + [(asserted P) + (analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx + #t #'P priority-stx)] + [(retracted P) + (analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx + #f #'P priority-stx)] + [(rising-edge Pred) + (define field-name + (datum->syntax event-stx + (string->symbol + (format "~a:rising-edge" (source-location->string event-stx))))) + (quasisyntax/loc outer-expr-stx + (let () + (field [#,field-name #f]) + (add-endpoint! #,(source-location->string outer-expr-stx) + (lambda () + (when #,when-pred-stx + (define old-val (#,field-name)) + (define new-val Pred) + (when (not (eq? old-val new-val)) + (#,field-name new-val) + (when new-val + (schedule-script! #:priority #,priority-stx + (lambda () #,script-stx))))) + patch-empty) + void)))])) + +(define-syntax (begin/void-default stx) + (syntax-parse stx + [(_) + (syntax/loc stx (void))] + [(_ expr0 expr ...) + (syntax/loc stx (begin expr0 expr ...))])) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Field Construction and Access + +(define field-counter 0) +(define (make-field name initial-value) + (define desc (field-descriptor name field-counter)) + (set! field-counter (+ field-counter 1)) + (hash-set! (actor-state-field-table (current-actor-state)) + desc + (make-ephemeron desc initial-value)) + (field-handle desc)) + +(define (field-scope-error who desc) + (error who "Field ~a used out-of-scope" (field-descriptor-name desc))) + +(define (field-ref desc) + (ephemeron-value + (hash-ref (actor-state-field-table (current-actor-state)) + desc + (lambda () (field-scope-error 'field-ref desc))))) + +(define (field-set! desc v) + (define a (current-actor-state)) + (define ft (actor-state-field-table a)) + (unless (hash-has-key? ft desc) + (field-scope-error 'field-set! desc)) + (hash-set! ft desc (make-ephemeron desc v))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Facet Storage in an Actor + +(define (facet-live? fid) + (hash-has-key? (actor-state-facets (current-actor-state)) fid)) + +(define (lookup-facet fid) + (hash-ref (actor-state-facets (current-actor-state)) fid #f)) + +(define (facet-live-but-inert? fid) + (define f (lookup-facet fid)) + (and f + (hash-empty? (facet-endpoints f)) + (set-empty? (facet-children f)))) + +(define (update-facet! fid proc) + (define old-facet (lookup-facet fid)) + (define new-facet (proc old-facet)) + (store-facet! fid new-facet)) + +(define (store-facet! fid new-facet) + (define a (current-actor-state)) + (current-actor-state + (struct-copy actor-state a + [facets (hash-set/remove (actor-state-facets a) fid new-facet)]))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Entering and Leaving Facet Context; Queueing of Work Items + +(define-syntax-rule (with-current-facet fid in? body ...) + (parameterize ((current-facet-id fid) + (in-script? in?)) + body ...)) + +(define (capture-facet-context proc) + (let ((fid (current-facet-id))) + (lambda args + (with-current-facet fid #t + (call-with-syndicate-effects + (lambda () (apply proc args))))))) + +(define (schedule-script! #:priority [priority *normal-priority*] thunk) + (push-script! priority (capture-facet-context thunk))) + +(define (push-script! priority thunk-with-context) + (define v (current-pending-scripts)) + (vector-set! v priority (enqueue (vector-ref v priority) thunk-with-context))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Action Queue Management + +(define (schedule-action! ac) + (if (patch? ac) + (when (patch-non-empty? ac) + (current-pending-patch (compose-patch ac (current-pending-patch)))) + (begin (flush-pending-patch!) + (current-pending-actions (list (current-pending-actions) + ((current-action-transformer) ac)))))) + +(define (schedule-actions! . acs) + (for [(ac (core:clean-actions acs))] (schedule-action! ac))) + +(define (flush-pending-patch!) + (define p (current-pending-patch)) + (when (patch-non-empty? p) + (current-pending-patch patch-empty) + (current-pending-actions (list (current-pending-actions) + ((current-action-transformer) p))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Endpoint Creation + +(define (add-endpoint! where patch-fn handler-fn) + (when (in-script?) + (error 'add-endpoint! + "~a: Cannot add endpoint in script; are you missing a (react ...)?" + where)) + (define-values (new-eid delta-aggregate) + (let () + (define a (current-actor-state)) + (define new-eid (mux-next-pid (actor-state-mux a))) + (define-values (new-mux _new-eid _delta delta-aggregate) + (mux-add-stream (actor-state-mux a) + (parameterize ((current-dataflow-subject-id + (list (current-facet-id) new-eid))) + (patch-fn)))) + (current-actor-state (struct-copy actor-state (current-actor-state) [mux new-mux])) + (values new-eid delta-aggregate))) + (update-facet! (current-facet-id) + (lambda (f) + (and f + (struct-copy facet f + [endpoints + (hash-set (facet-endpoints f) + new-eid + (endpoint new-eid patch-fn handler-fn))])))) + (schedule-action! delta-aggregate)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Facet Lifecycle + +(define next-fid-uid 0) +(define (add-facet! where setup-proc) + (when (not (in-script?)) + (error 'add-facet! + "~a: Cannot add facet outside script; are you missing an (on ...)?" + where)) + (define parent-fid (current-facet-id)) + (define fid-uid next-fid-uid) + (define fid (cons fid-uid parent-fid)) + (set! next-fid-uid (+ next-fid-uid 1)) + (update-facet! fid (lambda (_f) (facet fid (hasheqv) '() (set)))) + (update-facet! parent-fid + (lambda (pf) + (and pf (struct-copy facet pf + [children (set-add (facet-children pf) fid)])))) + (with-current-facet fid #f + (setup-proc) + (schedule-script! + (lambda () + (when (and (facet-live? fid) + (or (and (pair? parent-fid) (not (facet-live? parent-fid))) + (facet-live-but-inert? fid))) + (terminate-facet! fid))))) + (facet-handle-event! fid + (lookup-facet fid) + (patch (actor-state-knowledge (current-actor-state)) trie-empty) + #t)) + +;; If the named facet is live, terminate it. +(define (terminate-facet! fid) + (define f (lookup-facet fid)) + (when f + (define parent-fid (cdr fid)) + + (when (pair? parent-fid) + (update-facet! parent-fid + (lambda (f) + (and f + (struct-copy facet f + [children (set-remove (facet-children f) + fid)]))))) + + (store-facet! fid #f) + + (for [(child-fid (in-set (facet-children f)))] + (terminate-facet! child-fid)) + + ;; Run stop-scripts after terminating children. This means that + ;; children's stop-scripts run before ours. + (with-current-facet fid #t + (map schedule-script! (reverse (facet-stop-scripts f)))) + + (schedule-script! + (lambda () + (for [((eid ep) (in-hash (facet-endpoints f)))] + (define a (current-actor-state)) + (dataflow-forget-subject! (actor-state-field-dataflow a) (list fid eid)) + (define-values (new-mux _eid _delta delta-aggregate) + (mux-remove-stream (actor-state-mux a) eid)) + (current-actor-state (struct-copy actor-state a [mux new-mux])) + (schedule-action! delta-aggregate)))) + + (schedule-script! + #:priority *gc-priority* + (lambda () + (when (facet-live-but-inert? parent-fid) + (log-info "terminating ~v because it's dead and child ~v terminated" parent-fid fid) + (terminate-facet! parent-fid)))))) + +(define (add-stop-script! script-proc) + (update-facet! (current-facet-id) + (lambda (f) + (and f + (struct-copy facet f + [stop-scripts (cons script-proc (facet-stop-scripts f))]))))) + +(define (make-empty-pending-scripts) + (make-vector priority-count (make-queue))) + +(define (boot-actor script-proc) + (with-store [(current-actor-state + (actor-state (mux) + (hash) + trie-empty + trie-empty + (make-weak-hasheq) + (make-dataflow-graph))) + (current-pending-patch patch-empty) + (current-pending-actions '()) + (current-pending-scripts (make-empty-pending-scripts)) + (current-action-transformer values)] + (with-current-facet '() #f + (schedule-action! (core:retract ?)) + ;; Retract any initial-assertions we might have been given. We + ;; must ensure that we explicitly maintain them: retracting them + ;; here prevents us from accidentally relying on their + ;; persistence from our creation. + (schedule-script! script-proc) + (run-scripts!)))) + +(define (pop-next-script!) + (define priority-levels (current-pending-scripts)) + (let loop ((level 0)) + (and (< level (vector-length priority-levels)) + (let ((q (vector-ref priority-levels level))) + (if (queue-empty? q) + (loop (+ level 1)) + (let-values (((script q) (dequeue q))) + (vector-set! priority-levels level q) + script)))))) + +(define (run-all-pending-scripts!) + (define script (pop-next-script!)) + (when script + (script) + (refresh-facet-assertions!) + (run-all-pending-scripts!))) + +(define (run-scripts!) + (run-all-pending-scripts!) + (flush-pending-patch!) + (define pending-actions (current-pending-actions)) + (current-pending-actions '()) + (if (hash-empty? (actor-state-facets (current-actor-state))) + (core:quit pending-actions) + (core:transition (current-actor-state) pending-actions))) + +(define (refresh-facet-assertions!) + (dataflow-repair-damage! (actor-state-field-dataflow (current-actor-state)) + (lambda (subject-id) + (match-define (list fid eid) subject-id) + (define f (lookup-facet fid)) + (when f + (with-current-facet fid #f + (define ep (hash-ref (facet-endpoints f) eid)) + (define new-patch ((endpoint-patch-fn ep))) + (define a (current-actor-state)) + (define new-interests + (trie-subtract (patch-added new-patch) + (mux-interests-of (actor-state-mux a) eid) + #:combiner (lambda (v1 v2) trie-empty))) + (define newly-relevant-knowledge + (biased-intersection (actor-state-knowledge a) new-interests)) + (update-stream! eid (compose-patch new-patch (core:retract ?))) + (facet-handle-event! fid + (lookup-facet fid) + (patch newly-relevant-knowledge trie-empty) + #t)))))) + +(define (update-stream! eid patch) + (define a (current-actor-state)) + (define-values (new-mux _eid _delta delta-aggregate) + (mux-update-stream (actor-state-mux a) eid patch)) + (current-actor-state (struct-copy actor-state a [mux new-mux])) + (schedule-action! delta-aggregate)) + +(define (actor-behavior e a) + (and e + (with-store [(current-actor-state + (if (patch? e) + (struct-copy actor-state a + [previous-knowledge (actor-state-knowledge a)] + [knowledge (update-interests (actor-state-knowledge a) e)]) + a)) + (current-pending-patch patch-empty) + (current-pending-actions '()) + (current-pending-scripts (make-empty-pending-scripts)) + (current-action-transformer values)] + (for [((fid f) (in-hash (actor-state-facets a)))] + (facet-handle-event! fid f e #f)) + (run-scripts!)))) + +(define (facet-handle-event! fid f e synthetic?) + (define mux (actor-state-mux (current-actor-state))) + (with-current-facet fid #f + (for [(ep (in-hash-values (facet-endpoints f)))] + ((endpoint-handler-fn ep) e (mux-interests-of mux (endpoint-id ep)) synthetic?)))) + +(module+ implementation-details + (provide actor-behavior + boot-actor + make-field + (struct-out field-descriptor) + (struct-out field-handle) + (struct-out actor-state) + (struct-out facet) + (struct-out endpoint) + + field-ref + field-set! + + suspend-script + suspend-script* + + capture-actor-actions)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Script suspend-and-resume. + +(define prompt-tag (make-continuation-prompt-tag 'syndicate)) + +(define (syndicate-effects-available?) + (continuation-prompt-available? prompt-tag)) + +(define (call-with-syndicate-effects thunk) + (call-with-continuation-prompt thunk prompt-tag)) + +(define (capture-actor-actions thunk) + (call-with-syndicate-effects + (lambda () + (with-store [(current-pending-actions '()) + (current-pending-patch patch-empty) + (current-action-transformer values)] + (call-with-values thunk + (lambda results + (flush-pending-patch!) + (when (> (length results) 1) + (error 'capture-actor-actions + "~a values supplied in top-level Syndicate action; more than one is unacceptable" + (length results))) + (cons results (current-pending-actions)))))))) + +(module+ for-module-begin + (provide capture-actor-actions)) + +(define (suspend-script* where proc) + (when (not (in-script?)) + (error 'suspend-script + "~a: Cannot suspend script outside script; are you missing an (on ...)?" + where)) + (call-with-composable-continuation + (lambda (k) + (abort-current-continuation + prompt-tag + (lambda () + (define suspended-fid (current-facet-id)) + (define in? (in-script?)) + (define stale? #f) + (define raw-resume-parent + (capture-facet-context + (lambda results + (parameterize ((in-script? in?)) + (apply k results))))) + (define resume-parent + (lambda results + (when stale? (error 'suspend-script + "Attempt to resume suspension (suspended at ~a) more than once" + where)) + (set! stale? #t) + (abort-current-continuation + prompt-tag + (lambda () + (let ((invoking-fid (current-facet-id))) + (when (not (equal? invoking-fid suspended-fid)) + (terminate-facet! invoking-fid))) + (push-script! *normal-priority* + (lambda () (apply raw-resume-parent results))))))) + (proc resume-parent)))) + prompt-tag)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Immediate actions + +(define (ensure-in-script! who) + (when (not (in-script?)) + (error who "Attempt to perform action outside script; are you missing an (on ...)?"))) + +(define (send! M) + (ensure-in-script! 'send!) + (schedule-action! (core:message M))) + +(define *adhoc-label* -1) + +(define (assert! P) + (ensure-in-script! 'assert!) + (update-stream! *adhoc-label* (core:assert P))) + +(define (retract! P) + (ensure-in-script! 'retract!) + (update-stream! *adhoc-label* (core:retract P))) + +(define (patch! p) + (ensure-in-script! 'patch!) + (update-stream! *adhoc-label* p)) + +(define (perform-actions! acs) + (ensure-in-script! 'perform-actions!) + (for [(ac (core:clean-actions acs))] + (match ac + [(? patch? p) (update-stream! *adhoc-label* p)] + [_ (schedule-action! ac)]))) + +(define (flush!) + (ensure-in-script! 'flush!) + (define ack (gensym 'flush!)) + (until (core:message ack) + (on-start (send! ack)))) + +(define (quit-dataspace!) + (ensure-in-script! 'quit-dataspace!) + (schedule-action! (core:quit-dataspace))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (format-field-descriptor d) + (match-define (field-descriptor name id) d) + (format "~a/~a" name id)) + +(define (pretty-print-actor-state a p) + (match-define (actor-state mux facets _ knowledge field-table dfg) a) + (fprintf p "ACTOR:\n") + (fprintf p " - ") + (display (indented-port-output 3 (lambda (p) (syndicate-pretty-print mux p)) #:first-line? #f) p) + (newline p) + (fprintf p " - Knowledge:\n ~a" (trie->pretty-string knowledge #:indent 3)) + (fprintf p " - Facets:\n") + (for ([(fid f) (in-hash facets)]) + (match-define (facet _fid endpoints _ children) f) + (fprintf p " ---- facet ~a, children=~a" fid (set->list children)) + (when (not (hash-empty? endpoints)) + (fprintf p ", endpoints=~a" (hash-keys endpoints))) + (newline p)) + (when (not (hash-empty? field-table)) + (fprintf p " - Fields:\n") + (for ([(d ve) (in-hash field-table)]) + (define subject-ids (hash-ref (dataflow-graph-edges-forward dfg) d set)) + (define v (ephemeron-value ve)) + (define v* + (indented-port-output 6 (lambda (p) (syndicate-pretty-print v p)) #:first-line? #f)) + (if (set-empty? subject-ids) + (fprintf p " - ~a: ~a\n" (format-field-descriptor d) v*) + (fprintf p " - ~a: ~a ~a\n" + (format-field-descriptor d) + (for/list [(subject-id subject-ids)] + (match-define (list fid eid) subject-id) + (format "~a:~a" fid eid)) + v*)))))