diff --git a/racket/syndicate/actor.rkt b/racket/syndicate/actor.rkt index 45403c4..d5febec 100644 --- a/racket/syndicate/actor.rkt +++ b/racket/syndicate/actor.rkt @@ -2,415 +2,130 @@ (provide actor dataspace - ;; background - state + react + react/independent + react/suspend until forever + field + assert + stop-when + on-start + on-stop + on-event + on-event* + on + during + during/actor + + asserted + retracted + rising-edge + + suspend-script + + send! assert! retract! patch! - send! - return! - return/no-link-result! - perform-core-action! - - ;; forall - - actor-body->spawn-action - - patch-without-linkage - - ? ;; from pattern.rkt - - ;;---------------------------------------- - (struct-out actor-state) - pretty-print-actor-state syndicate-effects-available? + suspend-script* + + ;; + + schedule-action! + + (struct-out field-descriptor) + (struct-out field-handle) + (struct-out actor-state) + (struct-out facet) + (struct-out endpoint) ) -(require (for-syntax racket/base)) -(require (for-syntax racket/sequence)) -(require "support/dsl.rkt") -(require "pretty.rkt") -(require "effect.rkt") - -(define&provide-dsl-helper-syntaxes "state/until/forever form" - [on - on-event - during - assert - query - - asserted - retracted - message - rising-edge - - exists - ]) - -(require (for-syntax racket/match)) -(require (for-syntax racket/list)) -(require (for-syntax syntax/parse)) -(require (for-syntax syntax/stx)) -(require "pattern.rkt") - (require racket/set) (require racket/match) -(require (except-in "core.rkt" assert dataspace) - (rename-in "core.rkt" [assert core:assert] [dataspace core:dataspace])) -(require "trie.rkt") +(require (for-syntax racket/base)) +(require (for-syntax syntax/parse)) +(require (for-syntax syntax/srcloc)) + +(require (prefix-in core: "core.rkt")) (require "mux.rkt") +(require "patch.rkt") +(require "trie.rkt") +(require "pattern.rkt") ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Actor State +;; Data Definitions and Structures -;; A Variables is a (Vectorof Any), storing the explicit state -;; variables of an actor. +;; A FieldTable is a (FieldDescriptor |-> Boxof Any) -;; An Aggregates is a (Hashtable Nat Any), storing implicit state of -;; an actor, including queried and implicit aggregates. +;; (field-descriptor Symbol UniqueNatural) +(struct field-descriptor (name id) #:prefab) -;; A Script is a (-> Variables). It is to be executed inside -;; the special syndicate-hll prompt, and so may have Instruction -;; side-effects. +;; (field-handle FieldDescriptor) +(struct field-handle (desc) + #:methods gen:custom-write + [(define (write-proc h p mode) + (fprintf p "#" (field-descriptor-name (field-handle-desc h))))] + #:property prop:procedure + (case-lambda + [(handle) (unbox (get-field-box (field-handle-desc handle)))] + [(handle v) (set-box! (get-field-box (field-handle-desc handle)) v)])) -;; An Instruction is one of -;; - (patch-instruction Patch) -;; - (action-instruction Action) -;; - (return-instruction (Option (Listof Any))) -;; - (spawn-instruction LinkageKind (Symbol Symbol -> Spawn)) -;; - (script-complete-instruction Variables) -;; and represents a side-effect for an actor to take in its -;; interactions with the outside world. -;; -;; A LinkageKind is one of -;; - 'call, a blocking, exception-linked connection -;; - 'actor, a non-blocking, non-exception-linked connection -;; - 'dataspace, a non-blocking, nested, non-exception-linked connection -;; -;; Patch Instructions are issued when the actor uses `assert!` and -;; `retract!`. Action instructions are issued when the actor uses -;; `perform-core-action!`, and return instructions when `return!` is -;; called. Script-complete instructions are automatically issued when -;; a Script terminates successfully. -;; -;; Spawn instructions are issued when `actor`, `dataspace`, and `state` -;; are used, directly or indirectly. (TODO: `background`?) The -;; spawn-action-producing function is given the IDs of the spawned and -;; spawning actors, and is to return an action which spawns the new -;; actor, which in turn engages in the appropriate linkage protocol -;; with the spawning actor. The (Void -> Instruction) continuation is -;; released when the spawned actor terminates (for blocking variants) -;; or immediately following the spawn (for non-blocking variants). -;; -;; (Background is done differently, with a new continuation for the -;; background script, and a self-send to activate it. (TODO)) -;; -(struct patch-instruction (patch) #:transparent) -(struct action-instruction (action) #:transparent) -(struct return-instruction (result-values) #:transparent) -(struct spawn-instruction (linkage-kind action-fn) #:transparent) -(struct script-complete-instruction (variables) #:transparent) +(struct actor-state (mux ;; Mux + facets ;; (Hash FID Facet) + previous-knowledge ;; AssertionSet + knowledge ;; AssertionSet + ) #:prefab) -;; An ActorState is an (actor-state ... as below), describing the -;; state of an HLL actor. -;; -(struct actor-state (continuation-table ;; (Hashtable Symbol (Variables Any ... -> Instruction)) - caller-id ;; Symbol - self-id ;; Symbol - variables ;; Variables - aggregates ;; Aggregates - pending-patch ;; (Option Patch) - aggregate patch being accumulated - mux ;; Mux - prev-assertions ;; Trie - assertions from envt at the start of this event - curr-assertions ;; Trie - prev-assertions, updated by the incoming event - ) - #:transparent - #:methods gen:syndicate-pretty-printable - [(define (syndicate-pretty-print s [p (current-output-port)]) - (pretty-print-actor-state s p))]) +(struct facet (field-table ;; FieldTable + endpoints ;; (Hash EID Endpoint) + stop-scripts ;; (Listof Script) -- IN REVERSE ORDER + children ;; (Setof FID) + parent ;; (Option FID) + ) #:prefab) + +(struct endpoint (id patch-fn handler-fn) #:prefab) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Linkage protocol -;; -;; Linkages are used to both propagate values from callee to caller -;; and to monitor callee presence for exception-propagation. -;; - assertion: LinkActive -;; - message: LinkResult -;; - role: Caller -;; Monitors LinkActive to detect termination of the Callee, -;; normal or abnormal. If LinkResult is received before -;; LinkActive vanishes, termination was normal; otherwise, it -;; was abnormal. -;; - role: Callee -;; Asserts LinkActive while it runs. Should send LinkResult -;; before termination to indicate success and communicate values -;; to Caller. Monitors (observe LinkActive) to detect -;; termination of the Caller; upon termination of the Caller, -;; terminates itself. -;; -;; A LinkActive is a (link-active Symbol Symbol), describing an -;; ongoing relationship between the indicated caller and callee. -(struct link-active (caller-id callee-id) #:transparent) -;; -;; A LinkResult is a (link-result Symbol Symbol (Listof Any)), -;; describing the final values yielded by a callee to its caller. -(struct link-result (caller-id callee-id values) #:transparent) ;; message +;; Parameters. Many of these are *updated* during facet execution! -;; Projection for observing LinkActive. -(define link-active-projection (link-active ? (?!))) +;; Parameterof FieldTable +(define current-field-table (make-parameter (hash))) -;; Assertions for patch-without-linkage to remove. TODO: this is gross. -(define linkage-assertions - (trie-union-all #:combiner (lambda (v1 v2) (trie-success #t)) - (list (pattern->trie #t (link-active ? ?)) - (pattern->trie #t (observe (link-active ? ?))) - (pattern->trie #t (link-result ? ? ?)) - (pattern->trie #t (observe (link-result ? ? ?)))))) +;; Parameterof ActorState +(define current-actor-state (make-parameter #f)) -;; Patch -> Patch -;; Remove linkage-related assertions. -(define (patch-without-linkage p) - (patch-pruned-by p linkage-assertions)) +;; Parameterof FID +(define current-facet-id (make-parameter #f)) + +;; Parameterof Patch +(define current-pending-patch (make-parameter patch-empty)) + +;; Parameterof (Constreeof Action) +(define current-pending-actions (make-parameter '())) + +(define (make-empty-pending-scripts) + (vector '() '())) + +;; Parameterof (Vector (List Script) (List Script)) +(define current-pending-scripts (make-parameter (make-empty-pending-scripts))) + +;; Parameterof Boolean +(define in-script? (make-parameter #f)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Producing Instruction side-effects - -(define syndicate-tag (make-effect-tag 'syndicate)) - -(define (syndicate-effects-available?) - (effect-available? syndicate-tag)) - -(define do! (perform syndicate-tag)) -(define do/abort! (perform/abort syndicate-tag)) - -;; Returns void -(define (assert! P #:meta-level [meta-level 0]) - (do! (patch-instruction (core:assert P #:meta-level meta-level)))) - -;; Returns void -(define (retract! P #:meta-level [meta-level 0]) - (do! (patch-instruction (retract P #:meta-level meta-level)))) - -;; Returns void -(define (patch! p) - (do! (patch-instruction p))) - -;; Returns void -(define (send! M #:meta-level [meta-level 0]) - (perform-core-action! (message (prepend-at-meta M meta-level)))) - -;; Returns void -(define (perform-core-action! A) - (do! (action-instruction A))) - -;; Does not return to caller; instead, terminates the current actor -;; after sending a link-result to the calling actor. -(define (return! . result-values) - (do/abort! (return-instruction result-values))) - -;; Does not return to caller; instead, terminates the current actor -;; without sending a link-result to the calling actor. -(define (return/no-link-result!) - (do/abort! (return-instruction #f))) - -;; Returns new variables, plus values from spawned actor if any. -(define (spawn! linkage-kind action-fn) - (do! (spawn-instruction linkage-kind action-fn))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Main behavior of HLL actors - -;; Special mux label used to track ad-hoc assertions -;; TODO: Revisit this, it is a bit ugly -(define *adhoc-label* -1) - -;; Special mux label used to track linkage between actors. -;; TODO: Revisit this, it is a bit ugly -(define *linkage-label* -2) - -;; Behavior -(define (generic-query-updater e s) - (transition (if (patch? e) - (let ((t (actor-state-curr-assertions s))) - (struct-copy actor-state s - [prev-assertions t] - [curr-assertions (update-interests t e)])) - s) - '())) - -(define (interests-pre-and-post-patch s pat) - (define (or* a b) (or a b)) - (define old (trie-lookup (actor-state-prev-assertions s) pat #f #:wildcard-union or*)) - (define new (trie-lookup (actor-state-curr-assertions s) pat #f #:wildcard-union or*)) - (values old new)) - -;; ActorState Pattern -> Boolean -(define (interest-just-appeared-matching? s pat) - (define-values (old new) (interests-pre-and-post-patch s pat)) - (and (not old) new)) - -;; ActorState Pattern -> Boolean -(define (interest-just-disappeared-matching? s pat) - (define-values (old new) (interests-pre-and-post-patch s pat)) - (and old (not new))) - -;; Behavior -(define (generic-actor-behavior e s) - (match e - [(? patch/removed? p) - (define caller-id (actor-state-caller-id s)) - (define self-id (actor-state-self-id s)) - (define continuation-table (actor-state-continuation-table s)) - (define quit? - (or (trie-lookup (patch-removed p) (observe (link-active caller-id self-id)) #f) - (for/or [(callee-id (trie-project/set/single (patch-removed p) link-active-projection))] - (hash-has-key? continuation-table callee-id)))) - (if quit? ;; TODO: raise exception instead? Signal the cause of the quit somehow? - (quit) - #f)] - [(message (link-result (== (actor-state-self-id s)) callee-id reply-values)) - ;; ^ NB. We, in principle, shouldn't need to check the - ;; link-result's caller against our own self-id here, because - ;; events should be routed to us only when generally falling - ;; within our interests. First, the current implementation - ;; overapproximates (though it could use a mux to be precise); - ;; second, *in principle*, overapproximation should perhaps be - ;; seen as OK, since routing may be able to be done much more - ;; efficiently by overapproximating interest slightly. Imagine - ;; using a bloom filter, for instance. - (invoke-stored-continuation s callee-id reply-values)] - [_ #f])) - -;; ActorState Symbol (Variables Any ... -> Instruction) -> ActorState -(define (store-continuation s callee-id get-next-instr) - (struct-copy actor-state s - [continuation-table - (hash-set (actor-state-continuation-table s) - callee-id - get-next-instr)])) - -;; ActorState Symbol (Listof Any) -> Transition -(define (invoke-stored-continuation s callee-id reply-values) - (define continuation-table (actor-state-continuation-table s)) - (define continuation (hash-ref continuation-table callee-id #f)) - (define new-table (hash-remove continuation-table callee-id)) - ;; (log-info "invoke-stored-continuation self=~a callee=~a values=~v k=~v" - ;; (actor-state-self-id s) - ;; callee-id - ;; reply-values - ;; continuation) - (handle-effects (transition (struct-copy actor-state s [continuation-table new-table]) '()) - (lambda (_void) - (apply continuation - (append reply-values (vector->list (actor-state-variables s))))))) - -;; ActorState -> Transition -(define (perform-pending-patch s) - (transition (struct-copy actor-state s [pending-patch #f]) (actor-state-pending-patch s))) - -;; Label Patch -> ActorState -> Transition -(define ((extend-pending-patch label p) s) - (define-values (new-mux _label _p p-aggregate) - (mux-update-stream (actor-state-mux s) label p)) - (define p0 (actor-state-pending-patch s)) - (define new-pending-patch (if p0 (patch-seq p0 p-aggregate) p-aggregate)) - (transition (struct-copy actor-state s - [pending-patch new-pending-patch] - [mux new-mux]) - '())) - -;; ActorState Script -> Transition -(define (run-script s script) - (handle-effects (transition s '()) - (lambda (_void) (do/abort! (script-complete-instruction (script)))))) - -(define (actor-body->spawn-action thunk) - (with-effect #:shallow syndicate-tag k - ([(spawn-instruction 'actor action-fn) - (action-fn (gensym 'root-actor) (gensym 'boot-actor))]) - (begin (actor (thunk)) - (error '%%boot "Reached end of boot thunk")))) - -;; Transition (Void -> Instruction) -> Transition -(define (handle-effects t get-this-instr) - (with-effect #:shallow syndicate-tag get-next-instr - ([(patch-instruction p) - (handle-effects (sequence-transitions t (extend-pending-patch *adhoc-label* p)) - get-next-instr)] - [(action-instruction a) - (handle-effects (sequence-transitions t - perform-pending-patch - (lambda (s) (transition s a))) - get-next-instr)] - [(return-instruction result-values) - (sequence-transitions t - perform-pending-patch - (lambda (s) - (if result-values - (quit (message (link-result (actor-state-caller-id s) - (actor-state-self-id s) - result-values))) - (quit))))] - [(spawn-instruction linkage-kind action-fn) - (define blocking? (eq? linkage-kind 'call)) - (define next-t - (sequence-transitions t - perform-pending-patch - (lambda (s) - (define callee-id (gensym linkage-kind)) - (define spawn-action - (action-fn callee-id (actor-state-self-id s))) - (transition (if blocking? - (store-continuation s callee-id get-next-instr) - s) - (if (eq? linkage-kind 'dataspace) - (spawn-dataspace spawn-action) - spawn-action))))) - (if blocking? - next-t - (handle-effects next-t get-next-instr))] - [(script-complete-instruction new-variables) - (sequence-transitions t - ;; NB: Does not perform-pending-patch here. - ;; Instead, the script runner will now - ;; update ongoing subscriptions and - ;; incorporate the pending patch into that - ;; process. - (lambda (s) - (transition (struct-copy actor-state s [variables new-variables]) - '())))]) - (get-this-instr (void)))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Compilation of HLL actors - -;; TODO: query -;; TODO: default to hll -;; TODO: some better means of keeping track of nested dataspace levels +;; Syntax; main entry points (begin-for-syntax (define-splicing-syntax-class name (pattern (~seq #:name N)) (pattern (~seq) #:attr N #'#f)) - (define-splicing-syntax-class init - (pattern (~seq #:init [I ...])) - (pattern (~seq) #:attr [I 1] '())) - - (define-splicing-syntax-class done - (pattern (~seq #:done [I ...])) - (pattern (~seq) #:attr [I 1] '())) - - (define-splicing-syntax-class bindings - (pattern (~seq #:collect [(id init) ...])) - (pattern (~seq) #:attr [id 1] '() #:attr [init 1] '())) - (define-splicing-syntax-class when-pred (pattern (~seq #:when Pred)) (pattern (~seq) #:attr Pred #'#t)) @@ -419,439 +134,590 @@ (pattern (~seq #:meta-level level:integer)) (pattern (~seq) #:attr level #'0))) -;; Syntax for spawning a 'call-linked actor. -(define-syntax (state stx) - (syntax-parse stx - [(_ init:init [bs:bindings O ...] [E Oe ...] ...) - (expand-state #'#f 'call #'(init.I ...) #'(bs.id ...) #'(bs.init ...) #'(O ...) #'([E Oe ...] ...))])) - -;; Sugar -(define-syntax (until stx) - (syntax-parse stx - [(_ E init:init done:done bs:bindings O ...) - #'(state #:init [init.I ...] [#:collect [(bs.id bs.init) ...] O ...] [E done.I ... (values)])])) - -;; Sugar -(define-syntax (forever stx) - (syntax-parse stx - [(_ init:init bs:bindings O ...) - #'(state #:init [init.I ...] [#:collect [(bs.id bs.init) ...] O ...])])) - -;; Spawn actors with 'actor linkage (define-syntax (actor stx) (syntax-parse stx - [(_ name:name I ...) - (expand-state #'name.N 'actor #'(I ... (return/no-link-result!)) #'() #'() #'() #'())])) + [(_ name:name script ...) + (quasisyntax/loc stx + (let* ((spawn-action (core: + (lambda () + (list actor-behavior + (boot-actor (lambda () (begin/void-default script ...))) + name.N))))) + (if (syndicate-effects-available?) + (schedule-action! spawn-action) + spawn-action)))])) -;; Spawn whole dataspaces (define-syntax (dataspace stx) (syntax-parse stx - [(_ I ...) - (expand-state #'#f - 'dataspace - #'(I - ... - (perform-core-action! (quit-dataspace)) - (return/no-link-result!)) - #'() - #'() - #'() - #'())])) + [(_ name:name script ...) + (quasisyntax/loc stx + (core:spawn-dataspace #:name name.N + (actor script ... + (schedule-action! (core:quit-dataspace)))))])) -(begin-for-syntax - (define (expand-state name-exp linkage-kind init-actions binding-names binding-inits ongoings edges) - ;; ---------------------------------------- - (define binding-count (length (syntax->list binding-names))) - ;; ---------------------------------------- - ;; A StageProducer is a ((Syntax ) -> (Syntax Transition)>)). - ;; It computes a behavior stage suitable for composition using sequence-transitions. - ;; It is given syntax for an expression yielding the actor's current event. +(define-syntax (react stx) + (syntax-parse stx + [(_ O ...) + (quasisyntax/loc stx + (add-facet! #:substate #t + #,(source-location->string stx) + (lambda () (begin/void-default O ...))))])) - ;; Records syntaxes for aggregate initializers. - ;; (Boxof (Listof (Syntax ))) - (define aggregate-init-stxs (box '())) +(define-syntax (react/independent stx) + (syntax-parse stx + [(_ O ...) + (quasisyntax/loc stx + (add-facet! #:substate #f + #,(source-location->string stx) + (lambda () (begin/void-default O ...))))])) - ;; Records aggregate updaters. - ;; (Boxof (Listof StageProducer)) - (define query-updaters (box '())) +(define-syntax (react/suspend stx) + (syntax-parse stx + [(_ (resume-parent) O ...) + (quasisyntax/loc stx + (suspend-script* #,(source-location->string stx) + (lambda (resume-parent) + (add-facet! #:substate #t + #,(source-location->string stx) + (lambda () (begin/void-default O ...))))))])) - ;; Records both actual event handlers and termination check handlers. - ;; (Boxof (Listof StageProducer)) - (define event-handlers (box '())) +(define-syntax (until stx) + (syntax-parse stx + [(_ E O ...) + (syntax/loc stx + (react/suspend (continue) + (stop-when E (continue (void))) + O ...))])) - ;; (Boxof (Listof StageProducer)) - (define assertion-maintainers (box '())) +(define-syntax (forever stx) + (syntax-parse stx + [(_ O ...) + (syntax/loc stx + (react/suspend (continue) O ...))])) - (define (box-adjoin! v val) (set-box! v (append (unbox v) (list val)))) - ;; ---------------------------------------- +(define-syntax (field stx) + (syntax-parse stx + [(_ [id:id init] ...) + (syntax/loc stx + (begin (define id (make-field 'id init)) + ...))])) - (define (allocate-aggregate! init-stx) - (box-adjoin! aggregate-init-stxs init-stx) - (- (length (unbox aggregate-init-stxs)) 1)) +(define-syntax (assert stx) + (syntax-parse stx + [(_ w:when-pred P L:meta-level) + (define-values (proj pat bindings _instantiated) + (analyze-pattern stx #'P)) + (quasisyntax/loc stx + (add-endpoint! #,(source-location->string stx) + (lambda () + #,(let ((patch-stx #`(core:assert #,pat #:meta-level L.level))) + (if #'w.Pred + #`(if w.Pred #,patch-stx patch-empty) + patch-stx))) + void))])) - ;; StageProducer -> Void - (define (add-query-updater! stage-producer) (box-adjoin! query-updaters stage-producer)) - (define (add-event-handler! stage-producer) (box-adjoin! event-handlers stage-producer)) +(define-syntax (stop-when stx) + (syntax-parse stx + [(_ E script ...) + (analyze-event stx #'E #t (syntax/loc stx (begin/void-default script ...)))])) - (define (mapply v fs) (map (lambda (f) (f v)) fs)) +(define-syntax (on-start stx) + (syntax-parse stx + [(_ script ...) + (quasisyntax/loc stx + (schedule-script! #f (lambda () (begin/void-default script ...))))])) - (define (make-run-script-call outer-expr-stx state-stx I-stxs) - (cond - [(zero? binding-count) - #`(run-script #,state-stx (lambda () - #,@I-stxs - (vector)))] - [(stx-null? I-stxs) - (raise-syntax-error #f "Empty expression sequence not permitted" outer-expr-stx I-stxs)] - [else - #`(run-script #,state-stx (lambda () - (call-with-values (lambda () #,@I-stxs) - vector)))])) +(define-syntax (on-stop stx) + (syntax-parse stx + [(_ script ...) + (quasisyntax/loc stx + (add-stop-script! (lambda () (begin/void-default script ...))))])) - (define (add-assertion-maintainer! endpoint-index - assert-stx - pat-stx - maybe-Pred-stx - L-stx) - (box-adjoin! assertion-maintainers - (lambda (evt-stx) - #`(lambda (s) - (match-define (vector #,@binding-names) - (actor-state-variables s)) - (define old-assertions - (strip-interests - (mux-interests-of (actor-state-mux s) #,endpoint-index))) - (define (compute-new-assertions) - (patch-added (#,assert-stx #,pat-stx #:meta-level #,L-stx))) - (define new-assertions - #,(if maybe-Pred-stx - #`(if #,maybe-Pred-stx - (compute-new-assertions) - trie-empty) - #`(compute-new-assertions))) - (and (not (eq? old-assertions new-assertions)) - ((extend-pending-patch - #,endpoint-index - (patch-seq (patch trie-empty old-assertions) - (patch new-assertions trie-empty))) - s)))))) +(define-syntax (on-event stx) + (syntax-parse stx + [(_ clause ...) + (quasisyntax/loc stx + (on-event* #,(source-location->string stx) + (lambda (e) + (core:match-event e + clause ...))))])) - (define (analyze-asserted-or-retracted! endpoint-index asserted? outer-expr-stx P-stx I-stxs L-stx) - (define-values (proj-stx pat bindings _instantiated) - (analyze-pattern outer-expr-stx P-stx)) - (add-assertion-maintainer! endpoint-index #'sub pat #f L-stx) - (add-event-handler! - (lambda (evt-stx) - #`(let* ((proj (prepend-at-meta #,proj-stx #,L-stx)) - (proj-arity (projection-arity proj))) - (lambda (s) - (match #,evt-stx - [(? #,(if asserted? #'patch/added? #'patch/removed?) p) - (define entry-set - (trie-project/set #:take proj-arity - #,(if asserted? #'(patch-added p) #'(patch-removed p)) - proj)) - (when (not entry-set) - (error #,(if asserted? #''asserted #''retracted) - "Wildcard interest discovered while projecting by ~v" proj)) - (sequence-transitions0* - s - (for/list [(entry (in-set entry-set))] - (lambda (s) - (define instantiated (instantiate-projection proj entry)) - (and (#,(if asserted? - #'interest-just-appeared-matching? - #'interest-just-disappeared-matching?) s instantiated) - (match (actor-state-variables s) - [(vector #,@binding-names) - (match-define (list #,@bindings) entry) - #,(make-run-script-call outer-expr-stx #'s I-stxs)])))))] - [_ #f])))))) +(define (on-event* where proc #:priority [priority 0]) + (add-endpoint! where + (lambda () patch-empty) + (lambda (e) + (schedule-script! #:priority priority #f (lambda () (proc e)))))) - (define (prepend-at-meta-stx context-stx stx level) - (if (zero? level) - stx - #`(at-meta #,(prepend-at-meta-stx context-stx stx (- level 1))))) +(define-syntax (on stx) + (syntax-parse stx + [(_ E script ...) + (analyze-event stx #'E #f (syntax/loc stx (begin/void-default script ...)))])) - (define (analyze-message-subscription! endpoint-index outer-expr-stx P-stx I-stxs L-stx) - (define-values (proj pat bindings _instantiated) - (analyze-pattern outer-expr-stx P-stx)) - (add-assertion-maintainer! endpoint-index #'sub pat #f L-stx) - (add-event-handler! - (lambda (evt-stx) - #`(lambda (s) - (match (actor-state-variables s) - [(vector #,@binding-names) - (match #,evt-stx - [(message body) - (define capture-vals - (match-value/captures body - #,(prepend-at-meta-stx outer-expr-stx - proj - (syntax-e L-stx)))) - (and capture-vals - (apply (lambda #,bindings - #,(make-run-script-call outer-expr-stx #'s I-stxs)) - capture-vals))] - [_ #f])]))))) +(define-syntax (during stx) + (syntax-parse stx + [(_ P L:meta-level O ...) + (define E-stx (syntax/loc #'P (asserted P #:meta-level L.level))) + (define-values (_proj _pat _bindings instantiated) + (analyze-pattern E-stx #'P)) + (quasisyntax/loc stx + (on #,E-stx + (let ((p #,instantiated)) + (react (stop-when (retracted p #:meta-level L.level)) + O ...))))])) - (define (analyze-event! index E-stx I-stxs) - (syntax-parse E-stx - #:literals [asserted retracted message rising-edge] - [(asserted P L:meta-level) - (analyze-asserted-or-retracted! index #t E-stx #'P I-stxs #'L.level)] - [(retracted P L:meta-level) - (analyze-asserted-or-retracted! index #f E-stx #'P I-stxs #'L.level)] - [(message P L:meta-level) - (analyze-message-subscription! index E-stx #'P I-stxs #'L.level)] - [(rising-edge Pred) - ;; TODO: more kinds of Pred than just expr - (define aggregate-index (allocate-aggregate! #'#f)) - (add-event-handler! - (lambda (evt-stx) - #`(lambda (s) - (match-define (vector #,@binding-names) (actor-state-variables s)) - (define old-val (hash-ref (actor-state-aggregates s) #,aggregate-index)) - (define new-val Pred) - (if (eq? old-val new-val) - #f - (let ((s (struct-copy actor-state s - [aggregates (hash-set (actor-state-aggregates s) - #,aggregate-index - new-val)]))) - (if new-val - #,(make-run-script-call E-stx #'s I-stxs) - (transition s '())))))))])) +(define-syntax (during/actor stx) + (syntax-parse stx + [(_ P L:meta-level O ...) + (define E-stx (syntax/loc #'P (asserted P #:meta-level L.level))) + (define-values (_proj _pat _bindings instantiated) + (analyze-pattern E-stx #'P)) + (quasisyntax/loc stx + (on #,E-stx + (let ((p #,instantiated)) + (actor + (react (stop-when (retracted p #:meta-level L.level)) + O ...)))))])) - (define (analyze-during! index P-stx L-stx O-stxs) - (define E-stx #`(asserted #,P-stx #:meta-level #,L-stx)) - (define-values (_proj _pat _bindings instantiated) (analyze-pattern E-stx P-stx)) - (define I-stx #`(let ((p #,instantiated)) - (until (retracted p #:meta-level #,L-stx) - #,@O-stxs))) - (analyze-event! index E-stx #`(#,I-stx))) +(define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx)) +(define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx)) +(define-syntax (rising-edge stx) (raise-syntax-error #f "rising-edge: Used outside event spec" stx)) - (define (analyze-assertion! index Pred-stx outer-expr-stx P-stx L-stx) - (define-values (proj pat bindings _instantiated) - (analyze-pattern outer-expr-stx P-stx)) - (add-assertion-maintainer! index #'core:assert pat Pred-stx L-stx)) - - (define (analyze-on-event! index clauses-stx outer-expr-stx) - (add-event-handler! - (lambda (evt-stx) - #`(lambda (s) - (match (actor-state-variables s) - [(vector #,@binding-names) - (match #,evt-stx - #,@(for/list [(clause-stx (syntax->list clauses-stx))] - (syntax-case clause-stx () - [(pat #:when cond-expr body ...) - #`(pat #:when cond-expr #,(make-run-script-call outer-expr-stx #'s #'(body ...)))] - [(pat body ...) - #`(pat #,(make-run-script-call outer-expr-stx #'s #'(body ...)))])) - [_ #f])]))))) - - (define (analyze-queries! index query-spec-stxs I-stxs) - (error 'analyze-queries! "unimplemented")) - - ;; Query analysis happens first, because we need the queried - ;; bindings to be in scope everywhere else. - (for [(ongoing (in-list (syntax->list ongoings))) - (ongoing-index (in-naturals))] - (syntax-parse ongoing - #:literals [query] - [(query [query-spec ...] I ...) - (analyze-queries! ongoing-index #'(query-spec ...) #'(I ...))] - [_ (void)])) - - ;; Now make another pass over the ongoings, ignoring queries this - ;; time. - (for [(ongoing (in-list (syntax->list ongoings))) - (ongoing-index (in-naturals))] - (syntax-parse ongoing - #:literals [on on-event during assert query] - [(on E I ...) - (analyze-event! ongoing-index #'E #'(I ...))] - [(on-event clause ...) - (analyze-on-event! ongoing-index #'(clause ...) ongoing)] - [(during P L:meta-level O ...) - (analyze-during! ongoing-index #'P #'L.level #'(O ...))] - [(assert w:when-pred P L:meta-level) - (analyze-assertion! ongoing-index #'w.Pred ongoing #'P #'L.level)] - [(query [query-spec ...] I ...) - (void)])) - - ;; Finally, add in the termination conditions... - (for [(edge (in-list (syntax->list edges))) - (edge-index (in-naturals (length (syntax->list ongoings))))] - (syntax-parse edge - [(E I0 I ...) - (analyze-event! edge-index #'E #'((call-with-values (lambda () I0 I ...) return!)))])) - - ;; ...the generic query-updater... - (add-query-updater! - (lambda (evt-stx) - #`(lambda (s) (generic-query-updater #,evt-stx s)))) - - ;; ...and generic linkage-related behaviors. - (add-event-handler! - (lambda (evt-stx) - #`(lambda (s) (generic-actor-behavior #,evt-stx s)))) - - (define action-fn-stx - #`(lambda (self-id caller-id) - ( - (lambda () - (define ((maintain-assertions e) s) - (sequence-transitions0 s #,@(mapply #'e (unbox assertion-maintainers)))) - - (define (behavior e s) - (and e - (sequence-transitions0 s - #,@(mapply #'e (unbox query-updaters)) - #,@(mapply #'e (unbox event-handlers)) - (maintain-assertions e) - perform-pending-patch))) - - (define initial-state - (actor-state (hasheq) - caller-id - self-id - (vector #,@binding-inits) - (make-immutable-hash - (list - #,@(for/list [(init-stx (unbox aggregate-init-stxs)) - (init-idx (in-naturals))] - #`(cons #,init-idx #,init-stx)))) - #f - (mux) - trie-empty - trie-empty)) - - (define (subscribe-to-linkage s) - (define sub-to-callees - (patch-seq (sub (link-active self-id ?)) - (sub (link-result self-id ? ?)))) - (define initial-subs - #,(if (eq? linkage-kind 'call) - #`(patch-seq sub-to-callees - (sub (observe (link-active caller-id self-id))) - (core:assert (link-active caller-id self-id))) - #`sub-to-callees)) - ((extend-pending-patch *linkage-label* initial-subs) s)) - - (define (run-init-actions s) - (match (actor-state-variables s) - [(vector #,@binding-names) - ;; TODO: At the moment we are *not* letting the - ;; init-actions update the variables. Is this the - ;; right thing? - ;; TODO: what about intermediate (state)s? How are the variables updated? - (run-script s (lambda () - #,@init-actions - (vector #,@binding-names)))])) - - (list behavior - (sequence-transitions0 initial-state - subscribe-to-linkage - (maintain-assertions #f) - perform-pending-patch - run-init-actions) - #,name-exp))))) - - ;; (local-require racket/pretty) - ;; (pretty-print (syntax->datum action-fn-stx)) - - #`(let ((do-spawn (lambda () (spawn! '#,linkage-kind #,action-fn-stx)))) - (if (syndicate-effects-available?) - (do-spawn) - (actor-body->spawn-action do-spawn)))) - ) - - ;; ;; Given a Pred, computes (and perhaps allocates): - ;; ;; - an optional StageProducer for taking on board information from the outside world - ;; ;; - syntax for retrieving the current value of the Pred - ;; ;; - syntax for evaluating a new value for the Pred - ;; ;; - optional syntax for an updater for an aggregate - ;; ;; (Syntax ) -> (Values (Option StageProducer) - ;; ;; (Syntax ) - ;; ;; (Syntax ) - ;; ;; (Option (Syntax ActorState)>))) - ;; (define (analyze-pred! Pred-stx) - ;; (syntax-parse Pred-stx - ;; #:literals [not or and exists] - ;; [(not Pred) - ;; (define-values (upd curr next store) (analyze-pred! #'Pred)) - ;; (values upd #`(not #,curr) #`(not ,next))] - ;; [((~and HEAD (~or or and)) PredN ...) - ;; (define-values (upds currs nexts) (analyze-preds! #'(PredN ...))) - ;; (values (and (not (null? upds)) - ;; (lambda (evt-stx) - ;; #`(lambda (s) (sequence-transitions0 s #,@(mapply evt-stx upds))))) - ;; #`(HEAD #,@currs) - ;; #`(HEAD #,@nexts))] - ;; [(exists P Pred) - ;; ...] - - ;; [expr - ;; (define index (allocate-aggregate!)) - ;; (values #f - ;; #' - ;; ...])) - - ;; (define (analyze-preds! Pred-stxs) - ;; (define-values (upds-rev currs-rev nexts-rev) - ;; (for/fold [(upds-rev '()) - ;; (currs-rev '()) - ;; (nexts-rev '())] - ;; [(Pred-stx (in-list (syntax->list Pred-stxs)))] - ;; (define-values (upd curr next) (analyze-pred! Pred-stx)) - ;; (values (if upd (cons upd upds-rev) upds-rev) - ;; (cons curr currs-rev) - ;; (cons next nexts-rev)))) - ;; (values (reverse upds-rev) - ;; (reverse currs-rev) - ;; (reverse nexts-rev))) +(define-syntax (suspend-script stx) + (syntax-parse stx + [(_ proc) + (quasisyntax/loc stx + (suspend-script* #,(source-location->string stx) proc))])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Syntax-time support -(define (pretty-print-actor-state s [p (current-output-port)]) - (match-define - (actor-state continuation-table - caller-id - self-id - variables - aggregates - pending-patch - mux - prev-assertions - curr-assertions) - s) - (fprintf p "ACTOR id ~a (caller-id ~a):\n" self-id caller-id) - (fprintf p " - ~a pending continuations\n" (hash-count continuation-table)) - (fprintf p " - variables:\n") - (for ((v variables)) - (fprintf p " - ") - (display (indented-port-output 6 (lambda (p) (syndicate-pretty-print v p)) #:first-line? #f) p) - (newline p)) - (fprintf p " - aggregates:\n") - (for (((index a) (in-hash aggregates))) - (define leader (format " - ~a: " index)) - (fprintf p "~a" leader) - (display (indented-port-output #:first-line? #f - (string-length leader) - (lambda (p) (syndicate-pretty-print a p))) - p) - (newline p)) - (fprintf p " - pending-patch:\n") - (display (indented-port-output 3 (lambda (p) (syndicate-pretty-print pending-patch p))) p) - (newline p) - (fprintf p " - previous assertions from the environment:\n ") - (pretty-print-trie prev-assertions p #:indent 3) - (newline p) - (fprintf p " - current assertions from the environment:\n ") - (pretty-print-trie curr-assertions p #:indent 3) - (newline p) - (fprintf p " - ") - (display (indented-port-output 3 (lambda (p) (syndicate-pretty-print mux p)) #:first-line? #f) p) - (newline p)) +(define (interests-pre-and-post-patch pat) + (define (or* x y) (or x y)) + (define a (current-actor-state)) + (define old (trie-lookup (actor-state-previous-knowledge a) pat #f #:wildcard-union or*)) + (define new (trie-lookup (actor-state-knowledge a) pat #f #:wildcard-union or*)) + (values old new)) + +(define (interest-just-appeared-matching? pat) + (define-values (old new) (interests-pre-and-post-patch pat)) + (and (not old) new)) + +(define (interest-just-disappeared-matching? pat) + (define-values (old new) (interests-pre-and-post-patch pat)) + (and old (not new))) + +(define-for-syntax (analyze-asserted/retracted outer-expr-stx + event-stx + terminal? + script-stx + asserted? + P-stx + meta-level) + (define-values (proj-stx pat bindings _instantiated) + (analyze-pattern event-stx P-stx)) + (define event-predicate-stx (if asserted? #'patch/added? #'patch/removed?)) + (define patch-accessor-stx (if asserted? #'patch-added #'patch-removed)) + (define change-detector-stx + (if asserted? #'interest-just-appeared-matching? #'interest-just-disappeared-matching?)) + (quasisyntax/loc outer-expr-stx + (add-endpoint! #,(source-location->string outer-expr-stx) + (lambda () (core:sub #,pat #:meta-level #,meta-level)) + (lambda (e) + (core:match-event e + [(? #,event-predicate-stx p) + (define proj (core:prepend-at-meta #,proj-stx #,meta-level)) + (define proj-arity (projection-arity proj)) + (define entry-set (trie-project/set #:take proj-arity + (#,patch-accessor-stx p) + proj)) + (when (not entry-set) + (error 'asserted + "Wildcard interest discovered while projecting by ~v at ~a" + proj + #,(source-location->string event-stx))) + #,(let ((entry-handler-stx + (quasisyntax/loc script-stx + (let ((instantiated (instantiate-projection proj entry))) + (and (#,change-detector-stx instantiated) + (schedule-script! + #,(if terminal? #'#t #'#f) + (lambda () + (match-define (list #,@bindings) entry) + #,script-stx))))))) + (if terminal? + #`(let ((entry-count (set-count entry-set))) + (cond + [(zero? entry-count)] + [(= entry-count 1) + (let ((entry (set-first entry-set))) + #,entry-handler-stx)] + [else + (error 'asserted + "Multiple assertions triggered stop-when at ~a" + #,(source-location->string event-stx))])) + #`(for [(entry (in-set entry-set))] + #,entry-handler-stx)))]))))) + +(define-for-syntax (prepend-at-meta-stx stx level) + (if (zero? level) + stx + #`(at-meta #,(prepend-at-meta-stx stx (- level 1))))) + +(define-for-syntax (analyze-event outer-expr-stx event-stx terminal? script-stx) + (syntax-parse event-stx + #:literals [core:message asserted retracted rising-edge] + [(core:message P L:meta-level) + (define-values (proj pat bindings _instantiated) + (analyze-pattern event-stx #'P)) + (quasisyntax/loc outer-expr-stx + (add-endpoint! #,(source-location->string outer-expr-stx) + (lambda () (core:sub #,pat #:meta-level L.level)) + (lambda (e) + (core:match-event e + [(core:message body) + (define capture-vals + (match-value/captures + body + #,(prepend-at-meta-stx proj (syntax-e #'L.level)))) + (and capture-vals + (schedule-script! + #,(if terminal? #'#t #'#f) + (lambda () + (apply (lambda #,bindings #,script-stx) + capture-vals))))]))))] + [(asserted P L:meta-level) + (analyze-asserted/retracted outer-expr-stx event-stx terminal? script-stx #t #'P #'L.level)] + [(retracted P L:meta-level) + (analyze-asserted/retracted outer-expr-stx event-stx terminal? script-stx #f #'P #'L.level)] + [(rising-edge Pred) + (define field-name (format "~a:rising-edge" (source-location->string event-stx))) + (quasisyntax/loc outer-expr-stx + (let () + (field [edge-state #f]) + (on-event* #,(source-location->string outer-expr-stx) + (lambda (e) + (define old-val (edge-state)) + (define new-val Pred) + (when (not (eq? old-val new-val)) + (edge-state new-val) + (when new-val + (schedule-script! #,(if terminal? #'#t #'#f) + (lambda () #,script-stx))))) + #:priority 1)))])) + +(define-syntax (begin/void-default stx) + (syntax-parse stx + [(_) + (syntax/loc stx (void))] + [(_ expr0 expr ...) + (syntax/loc stx (begin expr0 expr ...))])) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Field Construction and Access + +(define field-counter 0) +(define (make-field name initial-value) + (define desc (field-descriptor name field-counter)) + (set! field-counter (+ field-counter 1)) + (current-field-table (hash-set (current-field-table) + desc + (box initial-value))) + (field-handle desc)) + +(define (get-field-box desc) + (hash-ref (current-field-table) + desc + (lambda () + (error 'get-field-box + "Field ~a used out-of-scope" + (field-descriptor-name desc))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Facet Storage in an Actor + +(define (facet-live? fid) + (hash-has-key? (actor-state-facets (current-actor-state)) fid)) + +(define (lookup-facet fid) + (hash-ref (actor-state-facets (current-actor-state)) fid #f)) + +(define (update-facet! fid proc) + (define old-facet (lookup-facet fid)) + (define new-facet (proc old-facet)) + (store-facet! fid new-facet)) + +(define (store-facet! fid new-facet) + (define a (current-actor-state)) + (current-actor-state + (struct-copy actor-state a + [facets + (if new-facet + (hash-set (actor-state-facets a) fid new-facet) + (hash-remove (actor-state-facets a) fid))]))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Entering and Leaving Facet Context; Queueing of Work Items + +(define-syntax-rule (with-current-facet fid field-table in? body ...) + (parameterize ((current-field-table field-table) + (current-facet-id fid) + (in-script? in?)) + body ...)) + +(define (capture-facet-context proc) + (let ((field-table (current-field-table)) + (fid (current-facet-id))) + (lambda args + (with-current-facet fid field-table #t + (call-with-continuation-prompt + (lambda () (apply proc args)) + prompt-tag))))) + +(define (schedule-script! #:priority [priority 0] terminal? thunk) + (if terminal? + (let ((f (terminate-facet! (current-facet-id)))) + (when f ;; only want to run a terminal script if we genuinely terminated + (push-script! priority + (parameterize ((current-facet-id (facet-parent f))) + (capture-facet-context thunk))))) + (push-script! priority (capture-facet-context thunk)))) + +(define (push-script! priority thunk-with-context) + (define v (current-pending-scripts)) + (vector-set! v priority (cons thunk-with-context (vector-ref v priority)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Action Queue Management + +(define (schedule-action! ac) + (if (patch? ac) + (when (patch-non-empty? ac) + (current-pending-patch (compose-patch ac (current-pending-patch)))) + (begin (flush-pending-patch!) + (current-pending-actions (list (current-pending-actions) ac))))) + +(define (flush-pending-patch!) + (define p (current-pending-patch)) + (when (patch-non-empty? p) + (current-pending-patch patch-empty) + (current-pending-actions (list (current-pending-actions) p)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Endpoint Creation + +(define (add-endpoint! where patch-fn handler-fn) + (when (in-script?) + (error 'add-endpoint! + "~a: Cannot add endpoint in script; are you missing a (react ...)?" + where)) + (define-values (new-eid delta-aggregate) + (let () + (define a (current-actor-state)) + (define-values (new-mux new-eid _delta delta-aggregate) + (mux-add-stream (actor-state-mux a) (patch-fn))) + (current-actor-state (struct-copy actor-state a [mux new-mux])) + (values new-eid delta-aggregate))) + (update-facet! (current-facet-id) + (lambda (f) + (and f + (struct-copy facet f + [endpoints + (hash-set (facet-endpoints f) + new-eid + (endpoint new-eid patch-fn handler-fn))])))) + (schedule-action! delta-aggregate)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Facet Lifecycle + +(define next-fid 0) +(define (add-facet! #:substate substate? where setup-proc) + (when (not (in-script?)) + (error 'add-facet! + "~a: Cannot add facet outside script; are you missing an (on ...)?" + where)) + (define parent-fid (and substate? (current-facet-id))) + (define fid next-fid) + (set! next-fid (+ next-fid 1)) + (update-facet! fid (lambda (_f) (facet 'not-yet-ready + (hasheqv) + '() + (seteqv) + parent-fid))) + (when parent-fid + (update-facet! parent-fid (lambda (f) + (and f + (struct-copy facet f + [children (set-add (facet-children f) fid)]))))) + (with-current-facet fid (current-field-table) #f + (setup-proc) + (update-facet! fid (lambda (f) (and f + (struct-copy facet f + [field-table (current-field-table)]))))) + (facet-handle-event! fid + (lookup-facet fid) + (patch (actor-state-knowledge (current-actor-state)) trie-empty)) + (when (and (facet-live? fid) parent-fid (not (facet-live? parent-fid))) + (terminate-facet! fid))) + +;; If the named facet is live, terminate it and return its facet +;; record; otherwise, return #f. +(define (terminate-facet! fid) + (define f (lookup-facet fid)) + (and f + (begin + (for [((eid ep) (in-hash (facet-endpoints f)))] + (define a (current-actor-state)) + (define-values (new-mux _eid _delta delta-aggregate) + (mux-remove-stream (actor-state-mux a) eid)) + (current-actor-state (struct-copy actor-state a [mux new-mux])) + (schedule-action! delta-aggregate)) + + (let ((parent-fid (facet-parent f))) + (when parent-fid + (update-facet! parent-fid + (lambda (f) + (and f + (struct-copy facet f + [children (set-remove (facet-children f) + fid)])))))) + (store-facet! fid #f) + + (for [(child-fid (in-set (facet-children f)))] + (terminate-facet! child-fid)) + + ;; Run stop-scripts after terminating children. This means that + ;; children's stop-scripts run before ours. + (with-current-facet fid (facet-field-table f) #t + (for [(script (in-list (reverse (facet-stop-scripts f))))] + (call-with-continuation-prompt script prompt-tag))) + + f))) + +(define (add-stop-script! script-proc) + (update-facet! (current-facet-id) + (lambda (f) + (and f + (struct-copy facet f + [stop-scripts (cons script-proc (facet-stop-scripts f))]))))) + +(define (boot-actor script-proc) + (parameterize ((current-actor-state + (actor-state (mux) + (hasheqv) + trie-empty + trie-empty)) + (current-pending-patch patch-empty) + (current-pending-actions '()) + (current-pending-scripts (make-empty-pending-scripts))) + (with-current-facet #f (hasheq) #f + (schedule-script! #f script-proc) + (run-scripts!)))) + +(define (run-scripts!) + (let loop () + (define pending-scripts (current-pending-scripts)) + (current-pending-scripts (make-empty-pending-scripts)) + (when (for*/fold [(did-something? #f)] + [(scripts (in-vector pending-scripts)) + (script (in-list (reverse scripts)))] + (script) + #t) + (loop))) + (for ([(fid f) (in-hash (actor-state-facets (current-actor-state)))]) + (refresh-facet! fid f)) + (flush-pending-patch!) + (define pending-actions (current-pending-actions)) + (current-pending-actions '()) + (if (hash-empty? (actor-state-facets (current-actor-state))) + (core:quit pending-actions) + (core:transition (current-actor-state) pending-actions))) + +(define (refresh-facet! fid f) + (with-current-facet fid (facet-field-table f) #f + (for [((eid ep) (in-hash (facet-endpoints f)))] + (update-stream! eid (compose-patch ((endpoint-patch-fn ep)) (core:retract ?)))))) + +(define (update-stream! eid patch) + (define a (current-actor-state)) + (define-values (new-mux _eid _delta delta-aggregate) + (mux-update-stream (actor-state-mux a) eid patch)) + (current-actor-state (struct-copy actor-state a [mux new-mux])) + (schedule-action! delta-aggregate)) + +(define (actor-behavior e a) + (and e + (parameterize ((current-actor-state + (if (patch? e) + (struct-copy actor-state a + [previous-knowledge (actor-state-knowledge a)] + [knowledge (update-interests (actor-state-knowledge a) e)]) + a)) + (current-pending-patch patch-empty) + (current-pending-actions '()) + (current-pending-scripts (make-empty-pending-scripts))) + (for [((fid f) (in-hash (actor-state-facets a)))] + (facet-handle-event! fid f e)) + (run-scripts!)))) + +(define (facet-handle-event! fid f e) + (with-current-facet fid (facet-field-table f) #f + (for [(ep (in-hash-values (facet-endpoints f)))] + ((endpoint-handler-fn ep) e)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Script suspend-and-resume. + +(define prompt-tag (make-continuation-prompt-tag 'syndicate)) + +(define (syndicate-effects-available?) + (continuation-prompt-available? prompt-tag)) + +(define (suspend-script* where proc) + (when (not (in-script?)) + (error 'suspend-script + "~a: Cannot suspend script outside script; are you missing an (on ...)?" + where)) + (call-with-composable-continuation + (lambda (k) + (abort-current-continuation + prompt-tag + (lambda () + (define suspended-fid (current-facet-id)) + (define in? (in-script?)) + (define raw-resume-parent + (capture-facet-context + (lambda results + (parameterize ((in-script? in?)) + (apply k results))))) + (define resume-parent + (lambda results + (let ((invoking-fid (current-facet-id))) + (when (not (equal? invoking-fid suspended-fid)) + (terminate-facet! invoking-fid))) + (push-script! 0 (lambda () (apply raw-resume-parent results))))) + (proc resume-parent)))) + prompt-tag)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Immediate actions + +(define (send! M #:meta-level [meta-level 0]) + (when (not (in-script?)) + (error + 'send! + "Attempt to send message outside script; are you missing an (on ...)? (msg: ~v, metalevel: ~v)" + M + meta-level)) + (schedule-action! (core:message (core:prepend-at-meta M meta-level)))) + +(define *adhoc-label* -1) + +(define (assert! P #:meta-level [meta-level 0]) + (update-stream! *adhoc-label* (core:assert P #:meta-level meta-level))) + +(define (retract! P #:meta-level [meta-level 0]) + (update-stream! *adhoc-label* (core:retract P #:meta-level meta-level))) + +(define (patch! p) + (update-stream! *adhoc-label* p)) diff --git a/racket/syndicate/broker/server.rkt b/racket/syndicate/broker/server.rkt index 3c853b7..1319ea1 100644 --- a/racket/syndicate/broker/server.rkt +++ b/racket/syndicate/broker/server.rkt @@ -121,7 +121,7 @@ [_ #f])) (run-ground (spawn-timer-driver) (spawn-websocket-driver) - (dataspace (perform-core-action! (spawn-broker-server 8000)) + (dataspace (schedule-action! (spawn-broker-server 8000)) (when ssl-options - (perform-core-action! (spawn-broker-server 8443 #:ssl-options ssl-options))) + (schedule-action! (spawn-broker-server 8443 #:ssl-options ssl-options))) (forever)))) diff --git a/racket/syndicate/examples/actor/bank-account.rkt b/racket/syndicate/examples/actor/bank-account.rkt index 407828f..1986f60 100644 --- a/racket/syndicate/examples/actor/bank-account.rkt +++ b/racket/syndicate/examples/actor/bank-account.rkt @@ -6,13 +6,13 @@ (struct account (balance) #:prefab) (struct deposit (amount) #:prefab) -(actor (forever #:collect [(balance 0)] - (assert (account balance)) - (on (message (deposit $amount)) - (+ balance amount)))) +(actor (react (field [balance 0]) + (assert (account (balance))) + (on (message (deposit $amount)) + (balance (+ (balance) amount))))) -(actor (forever (on (asserted (account $balance)) - (printf "Balance changed to ~a\n" balance)))) +(actor (react (on (asserted (account $balance)) + (printf "Balance changed to ~a\n" balance)))) (actor (until (asserted (observe (deposit _)))) (send! (deposit +100)) diff --git a/racket/syndicate/examples/actor/box-and-client.rkt b/racket/syndicate/examples/actor/box-and-client.rkt index 5f5a504..43a9c0d 100644 --- a/racket/syndicate/examples/actor/box-and-client.rkt +++ b/racket/syndicate/examples/actor/box-and-client.rkt @@ -6,12 +6,16 @@ (struct set-box (new-value) #:transparent) (struct box-state (value) #:transparent) -(actor (forever #:collect [(current-value 0)] - (assert (box-state current-value)) - (on (message (set-box $new-value)) - (log-info "box: taking on new-value ~v" new-value) - new-value))) +(actor (react (field [current-value 0]) + (assert (box-state (current-value))) + (stop-when (rising-edge (= (current-value) 10)) + (log-info "box: terminating")) + (on (message (set-box $new-value)) + (log-info "box: taking on new-value ~v" new-value) + (current-value new-value)))) -(actor (forever (on (asserted (box-state $v)) - (log-info "client: learned that box's value is now ~v" v) - (send! (set-box (+ v 1)))))) +(actor (react (stop-when (retracted (observe (set-box _))) + (log-info "client: box has gone")) + (on (asserted (box-state $v)) + (log-info "client: learned that box's value is now ~v" v) + (send! (set-box (+ v 1)))))) diff --git a/racket/syndicate/examples/actor/broadcast-messages.rkt b/racket/syndicate/examples/actor/broadcast-messages.rkt index f8293cf..9b9b785 100644 --- a/racket/syndicate/examples/actor/broadcast-messages.rkt +++ b/racket/syndicate/examples/actor/broadcast-messages.rkt @@ -5,11 +5,11 @@ (struct envelope (destination message) #:prefab) -(actor (forever (on (message (envelope 'alice $message)) - (log-info "Alice received ~v" message)))) +(actor (react (on (message (envelope 'alice $message)) + (log-info "Alice received ~v" message)))) -(actor (forever (on (message (envelope 'bob $message)) - (log-info "Bob received ~v" message)))) +(actor (react (on (message (envelope 'bob $message)) + (log-info "Bob received ~v" message)))) (actor (log-info "Waiting for Alice and Bob.") diff --git a/racket/syndicate/examples/actor/chain.rkt b/racket/syndicate/examples/actor/chain.rkt index eefc362..fc3a473 100644 --- a/racket/syndicate/examples/actor/chain.rkt +++ b/racket/syndicate/examples/actor/chain.rkt @@ -8,7 +8,7 @@ (define (sleep sec) (define timer-id (gensym 'sleep)) (until (message (timer-expired timer-id _)) - #:init [(send! (set-timer timer-id (* sec 1000.0) 'relative))])) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) (define (chain-step n) (printf "chain-step ~v\n" n) diff --git a/racket/syndicate/examples/actor/chat-client.rkt b/racket/syndicate/examples/actor/chat-client.rkt index e8ac33b..4dfeae2 100644 --- a/racket/syndicate/examples/actor/chat-client.rkt +++ b/racket/syndicate/examples/actor/chat-client.rkt @@ -10,13 +10,15 @@ (spawn-tcp-driver) -(forever (on (message (external-event stdin-evt (list $line)) #:meta-level 1) - (if (eof-object? line) - (return!) - (send! (tcp-channel local-handle remote-handle line)))) +(actor + (react/suspend (quit) + (on (message (external-event stdin-evt (list $line)) #:meta-level 1) + (if (eof-object? line) + (quit) + (send! (tcp-channel local-handle remote-handle line)))) - (assert (advertise (tcp-channel local-handle remote-handle _))) - (on (retracted (advertise (tcp-channel remote-handle local-handle _))) (return!)) - (on (message (tcp-channel remote-handle local-handle $bs)) - (write-bytes bs) - (flush-output))) + (assert (advertise (tcp-channel local-handle remote-handle _))) + (on (retracted (advertise (tcp-channel remote-handle local-handle _))) (quit)) + (on (message (tcp-channel remote-handle local-handle $bs)) + (write-bytes bs) + (flush-output)))) diff --git a/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt b/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt index 9d3b7f9..d9dba32 100644 --- a/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt +++ b/racket/syndicate/examples/actor/chat-no-quit-world-no-nesting.rkt @@ -31,6 +31,7 @@ (spawn-tcp-driver) (define us (tcp-listener 5999)) -(forever (assert (advertise (observe (tcp-channel _ us _)))) - (on (asserted (advertise (tcp-channel $them us _))) - (spawn-session them us))) +(actor + (forever (assert (advertise (observe (tcp-channel _ us _)))) + (on (asserted (advertise (tcp-channel $them us _))) + (spawn-session them us)))) diff --git a/racket/syndicate/examples/actor/chat-simplified-internals.rkt b/racket/syndicate/examples/actor/chat-simplified-internals.rkt index 499f389..d5a3412 100644 --- a/racket/syndicate/examples/actor/chat-simplified-internals.rkt +++ b/racket/syndicate/examples/actor/chat-simplified-internals.rkt @@ -40,13 +40,13 @@ (actor (forever (assert (advertise (observe (tcp-channel _ us _)))) (on (asserted (advertise (tcp-channel $them us _))) (define id (seal (list them us))) - (actor (state [(assert (tcp-remote-open id)) - (on (message (tcp-channel them us $bs)) - (send! (tcp-incoming-data id bs))) - (on (message (tcp-outgoing-data id $bs)) - (send! (tcp-channel us them bs)))] - [(retracted (advertise (tcp-channel them us _))) (void)] - [(retracted (tcp-local-open id)) (void)]))))) + (actor (react (stop-when (retracted (advertise (tcp-channel them us _)))) + (stop-when (retracted (tcp-local-open id))) + (assert (tcp-remote-open id)) + (on (message (tcp-channel them us $bs)) + (send! (tcp-incoming-data id bs))) + (on (message (tcp-outgoing-data id $bs)) + (send! (tcp-channel us them bs)))))))) -(forever (on (asserted (tcp-remote-open $id)) - (spawn-session id))) +(actor (forever (on (asserted (tcp-remote-open $id)) + (spawn-session id)))) diff --git a/racket/syndicate/examples/actor/echo.rkt b/racket/syndicate/examples/actor/echo.rkt index 8e92628..661d2a5 100644 --- a/racket/syndicate/examples/actor/echo.rkt +++ b/racket/syndicate/examples/actor/echo.rkt @@ -6,11 +6,11 @@ (spawn-tcp-driver) (define server-id (tcp-listener 5999)) -(forever (assert (advertise (observe (tcp-channel _ server-id _)))) - (on (asserted (advertise (tcp-channel $c server-id _))) - (printf "Accepted connection from ~v\n" c) - (actor (until (retracted (advertise (tcp-channel c server-id _))) - (assert (advertise (tcp-channel server-id c _))) - (on (message (tcp-channel c server-id $bs)) - (send! (tcp-channel server-id c bs)))) - (printf "Closed connection ~v\n" c)))) +(actor + (forever (assert (advertise (observe (tcp-channel _ server-id _)))) + (during/actor (advertise (tcp-channel $c server-id _)) + (on-start (printf "Accepted connection from ~v\n" c)) + (assert (advertise (tcp-channel server-id c _))) + (on (message (tcp-channel c server-id $bs)) + (send! (tcp-channel server-id c bs))) + (on-stop (printf "Closed connection ~v\n" c))))) diff --git a/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt b/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt index 262f4a4..fefba48 100644 --- a/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt +++ b/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt @@ -19,9 +19,10 @@ (struct foo (x y) #:prefab) (actor (define x 123) - (forever + (react (assert (foo x 999)) (during (foo x $v) - #:init [(log-info "x=~a v=~a" x v) - (when (= x 123) (set! x 124))] - #:done [(log-info "finally for x=~a v=~a" x v)]))) + (log-info "x=~a v=~a" x v) + (when (= x 123) (set! x 124)) + (on-stop + (log-info "finally for x=~a v=~a" x v))))) diff --git a/racket/syndicate/examples/actor/example-partial-retraction.rkt b/racket/syndicate/examples/actor/example-partial-retraction.rkt index ad0895a..c63e98d 100644 --- a/racket/syndicate/examples/actor/example-partial-retraction.rkt +++ b/racket/syndicate/examples/actor/example-partial-retraction.rkt @@ -7,7 +7,7 @@ (struct ready (what) #:prefab) (struct entry (key val) #:prefab) -(actor (forever +(actor (react (assert (ready 'listener)) (on (asserted (entry $key _)) (log-info "key ~v asserted" key) @@ -18,19 +18,20 @@ (log-info "del binding: ~v -> ~v" key value))) (log-info "key ~v retracted" key)))) -(actor (forever +(actor (react (assert (ready 'other-listener)) (during (entry $key _) - #:init [(log-info "(other-listener) key ~v asserted" key)] - #:done [(log-info "(other-listener) key ~v retracted" key)] + (log-info "(other-listener) key ~v asserted" key) + (on-stop (log-info "(other-listener) key ~v retracted" key)) (during (entry key $value) - #:init [(log-info "(other-listener) ~v ---> ~v" key value)] - #:done [(log-info "(other-listener) ~v -/-> ~v" key value)])))) + (log-info "(other-listener) ~v ---> ~v" key value) + (on-stop (log-info "(other-listener) ~v -/-> ~v" key value)))))) (define (pause) (log-info "pause") - (until (asserted (ready 'pause)) - (assert (ready 'pause)))) + (define token (gensym 'pause)) ;; FIXME:: If we use the same token every time, need epochs! + (until (asserted (ready token)) + (assert (ready token)))) (actor (until (asserted (ready 'listener))) (until (asserted (ready 'other-listener))) diff --git a/racket/syndicate/examples/actor/file-system-during.rkt b/racket/syndicate/examples/actor/file-system-during.rkt index 368d37f..e8b5b00 100644 --- a/racket/syndicate/examples/actor/file-system-during.rkt +++ b/racket/syndicate/examples/actor/file-system-during.rkt @@ -13,21 +13,21 @@ (spawn-timer-driver) -(actor (forever #:collect [(files (hash))] - (during (observe (file $name _)) - #:init [(printf "At least one reader exists for ~v\n" name)] - #:done [(printf "No remaining readers exist for ~v\n" name)] - #:collect [(content (hash-ref files name #f))] - (assert (file name content)) - (on (message (save (file name $content))) content) - (on (message (delete name)) #f)) - (on (message (save (file $name $content))) (hash-set files name content)) - (on (message (delete $name)) (hash-remove files name)))) +(actor (react (field [files (hash)]) + (during (observe (file $name _)) + (on-start (printf "At least one reader exists for ~v\n" name)) + (on-stop (printf "No remaining readers exist for ~v\n" name)) + (field [content (hash-ref (files) name #f)]) + (assert (file name (content))) + (on (message (save (file name $new-content))) (content new-content)) + (on (message (delete name)) (content #f))) + (on (message (save (file $name $content))) (files (hash-set (files) name content))) + (on (message (delete $name)) (files (hash-remove (files) name))))) (define (sleep sec) (define timer-id (gensym 'sleep)) (until (message (timer-expired timer-id _)) - #:init [(send! (set-timer timer-id (* sec 1000.0) 'relative))])) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) ;; Shell (let ((e (read-bytes-line-evt (current-input-port) 'any))) diff --git a/racket/syndicate/examples/actor/file-system-lll.rkt b/racket/syndicate/examples/actor/file-system-lll.rkt index 1425077..c26237c 100644 --- a/racket/syndicate/examples/actor/file-system-lll.rkt +++ b/racket/syndicate/examples/actor/file-system-lll.rkt @@ -58,7 +58,7 @@ (define (sleep sec) (define timer-id (gensym 'sleep)) (until (message (timer-expired timer-id _)) - #:init [(send! (set-timer timer-id (* sec 1000.0) 'relative))])) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) ;; Shell (let ((e (read-bytes-line-evt (current-input-port) 'any))) diff --git a/racket/syndicate/examples/actor/file-system-lll2.rkt b/racket/syndicate/examples/actor/file-system-lll2.rkt index df1a4d7..fd17229 100644 --- a/racket/syndicate/examples/actor/file-system-lll2.rkt +++ b/racket/syndicate/examples/actor/file-system-lll2.rkt @@ -59,7 +59,7 @@ (define (sleep sec) (define timer-id (gensym 'sleep)) (until (message (timer-expired timer-id _)) - #:init [(send! (set-timer timer-id (* sec 1000.0) 'relative))])) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) ;; Shell (let ((e (read-bytes-line-evt (current-input-port) 'any))) diff --git a/racket/syndicate/examples/actor/file-system.rkt b/racket/syndicate/examples/actor/file-system.rkt index 2c51a5d..630ec03 100644 --- a/racket/syndicate/examples/actor/file-system.rkt +++ b/racket/syndicate/examples/actor/file-system.rkt @@ -13,22 +13,22 @@ (spawn-timer-driver) -(actor (forever #:collect [(files (hash))] - (on (asserted (observe (file $name _))) - (printf "At least one reader exists for ~v\n" name) - (begin0 (until (retracted (observe (file name _))) - #:collect [(content (hash-ref files name #f))] - (assert (file name content)) - (on (message (save (file name $content))) content) - (on (message (delete name)) #f)) - (printf "No remaining readers exist for ~v\n" name))) - (on (message (save (file $name $content))) (hash-set files name content)) - (on (message (delete $name)) (hash-remove files name)))) +(actor (react (field [files (hash)]) + (on (asserted (observe (file $name _))) + (printf "At least one reader exists for ~v\n" name) + (begin0 (until (retracted (observe (file name _))) + (field [content (hash-ref (files) name #f)]) + (assert (file name (content))) + (on (message (save (file name $new-content))) (content new-content)) + (on (message (delete name)) (content #f))) + (printf "No remaining readers exist for ~v\n" name))) + (on (message (save (file $name $content))) (files (hash-set (files) name content))) + (on (message (delete $name)) (files (hash-remove (files) name))))) (define (sleep sec) (define timer-id (gensym 'sleep)) (until (message (timer-expired timer-id _)) - #:init [(send! (set-timer timer-id (* sec 1000.0) 'relative))])) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) ;; Shell (let ((e (read-bytes-line-evt (current-input-port) 'any))) diff --git a/racket/syndicate/examples/actor/file-system2.rkt b/racket/syndicate/examples/actor/file-system2.rkt index b672201..27735cc 100644 --- a/racket/syndicate/examples/actor/file-system2.rkt +++ b/racket/syndicate/examples/actor/file-system2.rkt @@ -14,30 +14,30 @@ (spawn-timer-driver) -(actor (forever #:collect [(files (hash)) (monitored (set))] - (on (asserted (observe (file $name _))) - (printf "At least one reader exists for ~v\n" name) - (assert! (file name (hash-ref files name #f))) - (values files (set-add monitored name))) - (on (retracted (observe (file $name _))) - (printf "No remaining readers exist for ~v\n" name) - (retract! (file name (hash-ref files name #f))) - (values files (set-remove monitored name))) - (on (message (save (file $name $content))) - (when (set-member? monitored name) - (retract! (file name (hash-ref files name #f))) - (assert! (file name content))) - (values (hash-set files name content) monitored)) - (on (message (delete $name)) - (when (set-member? monitored name) - (retract! (file name (hash-ref files name #f))) - (assert! (file name #f))) - (values (hash-remove files name) monitored)))) +(actor (react (field [files (hash)] [monitored (set)]) + (on (asserted (observe (file $name _))) + (printf "At least one reader exists for ~v\n" name) + (assert! (file name (hash-ref (files) name #f))) + (monitored (set-add (monitored) name))) + (on (retracted (observe (file $name _))) + (printf "No remaining readers exist for ~v\n" name) + (retract! (file name (hash-ref (files) name #f))) + (monitored (set-remove (monitored) name))) + (on (message (save (file $name $content))) + (when (set-member? (monitored) name) + (retract! (file name (hash-ref (files) name #f))) + (assert! (file name content))) + (files (hash-set (files) name content))) + (on (message (delete $name)) + (when (set-member? (monitored) name) + (retract! (file name (hash-ref (files) name #f))) + (assert! (file name #f))) + (files (hash-remove (files) name))))) (define (sleep sec) (define timer-id (gensym 'sleep)) (until (message (timer-expired timer-id _)) - #:init [(send! (set-timer timer-id (* sec 1000.0) 'relative))])) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) ;; Shell (let ((e (read-bytes-line-evt (current-input-port) 'any))) diff --git a/racket/syndicate/examples/actor/forward-chaining-mini.rkt b/racket/syndicate/examples/actor/forward-chaining-mini.rkt index 6bd853f..a6418dd 100644 --- a/racket/syndicate/examples/actor/forward-chaining-mini.rkt +++ b/racket/syndicate/examples/actor/forward-chaining-mini.rkt @@ -3,17 +3,17 @@ (require syndicate/actor) -(actor (forever (assert `(parent john douglas)))) -(actor (forever (assert `(parent bob john)))) -(actor (forever (assert `(parent ebbon bob)))) +(actor (react (assert `(parent john douglas)))) +(actor (react (assert `(parent bob john)))) +(actor (react (assert `(parent ebbon bob)))) ;; This looks like an implication: ;; (parent A C) ⇒ ((ancestor A C) ∧ ((ancestor C B) ⇒ (ancestor A B))) ;; -(actor (forever (during `(parent ,$A ,$C) - (assert `(ancestor ,A ,C)) - (during `(ancestor ,C ,$B) - (assert `(ancestor ,A ,B)))))) +(actor (react (during `(parent ,$A ,$C) + (assert `(ancestor ,A ,C)) + (during `(ancestor ,C ,$B) + (assert `(ancestor ,A ,B)))))) -(actor (forever (on (asserted `(ancestor ,$A ,$B)) - (log-info "~a is an ancestor of ~a" A B)))) +(actor (react (on (asserted `(ancestor ,$A ,$B)) + (log-info "~a is an ancestor of ~a" A B)))) diff --git a/racket/syndicate/examples/actor/mini-echo.rkt b/racket/syndicate/examples/actor/mini-echo.rkt index 8f3c53c..7302db1 100644 --- a/racket/syndicate/examples/actor/mini-echo.rkt +++ b/racket/syndicate/examples/actor/mini-echo.rkt @@ -5,13 +5,13 @@ (struct echo-req (body) #:prefab) (struct echo-resp (body) #:prefab) -(actor (forever #:collect [(count 0)] - (on (message (echo-req $body)) - (send! (echo-resp body)) - (+ count 1)))) +(actor (react (field [count 0]) + (on (message (echo-req $body)) + (send! (echo-resp body)) + (count (+ (count) 1))))) -(actor (forever (on (message (echo-resp $body)) - (printf "Received: ~v\n" body)))) +(actor (react (on (message (echo-resp $body)) + (printf "Received: ~v\n" body)))) (actor (until (asserted (observe (echo-req _)))) (until (asserted (observe (echo-resp _)))) diff --git a/racket/syndicate/examples/actor/spreadsheet.rkt b/racket/syndicate/examples/actor/spreadsheet.rkt index 8d8bfa6..569557e 100644 --- a/racket/syndicate/examples/actor/spreadsheet.rkt +++ b/racket/syndicate/examples/actor/spreadsheet.rkt @@ -24,26 +24,28 @@ [(cons a d) (set-union (walk a) (walk d))] [_ (set)]))) -(define (non-void? v) (not (void? v))) +(define (non-void-field? f) (not (void? (f)))) (define (cell-expr->actor-expr name expr) (define bindings (set->list (extract-bindings expr))) `(actor (until (message (set-cell ',name _)) - #:collect [,@(for/list [(b bindings)] `(,b (void)))] - (assert #:when (andmap non-void? (list ,@bindings)) (cell ',name ,expr)) + (field ,@(for/list [(b bindings)] `[,b (void)])) + (assert #:when (andmap non-void-field? (list ,@bindings)) + (cell ',name + (let (,@(for/list [(b bindings)] `(,b (,b)))) + ,expr))) ,@(for/list [(b bindings)] `(on (asserted (cell ',b $value)) - (values ,@(for/list [(b1 bindings)] - (if (eq? b b1) 'value b1)))))))) + (,b value)))))) -(actor (forever (on (message (set-cell $name $expr)) - (define actor-expr (cell-expr->actor-expr name expr)) - ;; (local-require racket/pretty) (pretty-print actor-expr) - (eval actor-expr (namespace-anchor->namespace ns))))) +(actor (react (on (message (set-cell $name $expr)) + (define actor-expr (cell-expr->actor-expr name expr)) + ;; (local-require racket/pretty) (pretty-print actor-expr) + (eval actor-expr (namespace-anchor->namespace ns))))) -(actor (forever (on (asserted (cell $name $value)) - (printf ">>> ~a ~v\n" name value) - (flush-output)))) +(actor (react (on (asserted (cell $name $value)) + (printf ">>> ~a ~v\n" name value) + (flush-output)))) (actor (void (thread (lambda () (let loop () diff --git a/racket/syndicate/examples/actor/two-buyer-protocol.rkt b/racket/syndicate/examples/actor/two-buyer-protocol.rkt index 266dda2..1bd1cc5 100644 --- a/racket/syndicate/examples/actor/two-buyer-protocol.rkt +++ b/racket/syndicate/examples/actor/two-buyer-protocol.rkt @@ -3,6 +3,8 @@ ;; given in Honda/Yoshida/Carbone 2008, "Multiparty Asynchronous ;; Session Types". +;; TODO:: code this up in Syndicate/js. See whether the killing-child-facets problems exists there. + ;; SAMPLE OUTPUT: ;;--------------------------------------------------------------------------- ;; A learns that the price of "Catch 22" is 2.22 @@ -89,38 +91,43 @@ ;; SELLER ;; (define (seller) - (actor (forever #:collect [(books (hash "The Wind in the Willows" 3.95 - "Catch 22" 2.22 - "Candide" 34.95)) - (next-order-id 10001483)] + (actor (react (field [books (hash "The Wind in the Willows" 3.95 + "Catch 22" 2.22 + "Candide" 34.95)] + [next-order-id 10001483]) - ;; Give quotes to interested parties. - ;; - (during (observe (book-quote $title _)) - (assert (book-quote title (hash-ref books title #f)))) + ;; Give quotes to interested parties. + ;; + (during (observe (book-quote $title _)) + (assert (book-quote title (hash-ref (books) title #f)))) - ;; Respond to order requests. - ;; - (on (asserted (observe (order $title $offer-price _ _))) - (define asking-price (hash-ref books title #f)) - (cond + ;; Respond to order requests. + ;; + (on (asserted (observe (order $title $offer-price _ _))) + (define asking-price (hash-ref (books) title #f)) + (cond - [(or (not asking-price) (< offer-price asking-price)) - ;; We cannot sell a book we do not have, and we will not sell for less - ;; than our asking price. - ;; - (while-relevant-assert (order title offer-price #f #f))] + [(or (not asking-price) (< offer-price asking-price)) + ;; We cannot sell a book we do not have, and we will not sell for less + ;; than our asking price. + ;; + (while-relevant-assert (order title offer-price #f #f))] - [else - ;; Tell the ordering party their order ID and delivery date. - ;; - (actor - (while-relevant-assert - (order title offer-price next-order-id "March 9th"))) + [else + ;; Allocate an order ID. + ;; + (define order-id (next-order-id)) + (next-order-id (+ order-id 1)) - ;; Remove the book from our shelves, and increment our order ID. - ;; - (values (hash-remove books title) (+ next-order-id 1))]))))) + ;; Remove the book from our shelves. + ;; + (books (hash-remove (books) title)) + + ;; Tell the ordering party their order ID and delivery date. + ;; + (actor + (while-relevant-assert + (order title offer-price order-id "March 9th")))]))))) ;; Serial SPLIT-PROPOSER ;; @@ -136,7 +143,8 @@ ;; First, retrieve a quote for the title, and analyze the result. ;; - (match (state [] [(asserted (book-quote title $price)) price]) + (match (react/suspend (yield) + (stop-when (asserted (book-quote title $price)) (yield price))) [#f (log-info "A learns that ~v is out-of-stock." title) (try-to-buy remaining-titles)] @@ -161,31 +169,14 @@ [else ;; Make our proposal, and wait for a response. - ;; SEE NOTE (A). ;; - (match (state [] [(asserted (split-proposal title price contribution $accepted?)) - accepted?]) - [#t - (log-info "A learns that the split-proposal for ~v was accepted" title) - (try-to-buy remaining-titles)] - [#f - (log-info "A learns that the split-proposal for ~v was rejected" title) - ;; Offer to contribute a little more. - (try-to-split (+ contribution (/ (- price contribution) 2)))])]))])])) - - ;; NOTE (A): Wrote this originally where the anchor to this note is found. The code here - ;; doesn't release assertions properly; we fall foul of the "run the continuation clauses - ;; while the subscriptions are still active" property of the current actor.rkt - ;; implementation. - ;; - ;; (state [] - ;; [(asserted (split-proposal title price contribution #t)) - ;; (log-info "A learns that the split-proposal for ~v was accepted" title) - ;; (try-to-buy remaining-titles)] - ;; [(asserted (split-proposal title price contribution #f)) - ;; (log-info "A learns that the split-proposal for ~v was rejected" title) - ;; (try-to-split (+ contribution (/ (- price contribution) 2)))]) - ;; + (react + (stop-when (asserted (split-proposal title price contribution #t)) + (log-info "A learns that the split-proposal for ~v was accepted" title) + (try-to-buy remaining-titles)) + (stop-when (asserted (split-proposal title price contribution #f)) + (log-info "A learns that the split-proposal for ~v was rejected" title) + (try-to-split (+ contribution (/ (- price contribution) 2)))))]))])])) (actor (try-to-buy (list "Catch 22" "Encyclopaedia Brittannica" @@ -195,11 +186,11 @@ ;; Serial SPLIT-DISPOSER ;; (define (buyer-b) - (actor (forever + (actor (react ;; This actor maintains a record of the amount of money it has to spend. ;; - #:collect [(funds 5.00)] + (field [funds 5.00]) (on (asserted (observe (split-proposal $title $price $their-contribution _))) @@ -210,8 +201,8 @@ price) (cond - [(> my-contribution funds) - (log-info "B hasn't enough funds (~a remaining)" funds) + [(> my-contribution (funds)) + (log-info "B hasn't enough funds (~a remaining)" (funds)) (while-relevant-assert (split-proposal title price their-contribution #f))] [else @@ -221,16 +212,17 @@ ;; actual purchase now that we have agreed on a split. ;; (actor (define-values (order-id delivery-date) - (state - [;; While we are in this state, waiting for order confirmation, take - ;; the opportunity to signal to our SPLIT-PROPOSER that we accepted - ;; their proposal. - ;; - (assert (split-proposal title price their-contribution #t))] - [(asserted (order title price $id $date)) - ;; We have received order confirmation from the SELLER. - ;; - (values id date)])) + (react/suspend (yield) + ;; While we are in this state, waiting for order confirmation, take + ;; the opportunity to signal to our SPLIT-PROPOSER that we accepted + ;; their proposal. + ;; + (assert (split-proposal title price their-contribution #t)) + + (stop-when (asserted (order title price $id $date)) + ;; We have received order confirmation from the SELLER. + ;; + (yield id date)))) (log-info "The order for ~v has id ~a, and will be delivered on ~a" title order-id @@ -239,10 +231,10 @@ ;; Meanwhile, update our records of our available funds, and continue to wait ;; for more split-proposals to arrive. ;; - (define remaining-funds (- funds my-contribution)) + (define remaining-funds (- (funds) my-contribution)) (log-info "B accepts the offer, leaving them with ~a remaining funds" remaining-funds) - remaining-funds]))))) + (funds remaining-funds)]))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt index 9819289..b3057e4 100644 --- a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt +++ b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths.rkt @@ -29,18 +29,18 @@ (assert (path A C (+ link-cost path-cost))))))) (actor (forever (during (path-exists $from $to) - #:collect [(costs (set)) (least +inf.0)] - (assert (min-cost from to least)) + (field [costs (set)] [least +inf.0]) + (assert (min-cost from to (least))) (on (asserted (path from to $cost)) - (values (set-add costs cost) - (min least cost))) + (costs (set-add (costs) cost)) + (least (min (least) cost))) (on (retracted (path from to $cost)) - (define new-costs (set-remove costs cost)) - (values new-costs - (for/fold [(least +inf.0)] [(x new-costs)] (min x least))))))) + (define new-costs (set-remove (costs) cost)) + (costs new-costs) + (least (for/fold [(least +inf.0)] [(x new-costs)] (min x least))))))) ;; (actor (forever (during (path $from $to $cost) -;; #:init [(displayln `(+ ,(path from to cost)))] -;; #:done [(displayln `(- ,(path from to cost)))]))) +;; (on-start (displayln `(+ ,(path from to cost)))) +;; (on-stop (displayln `(- ,(path from to cost))))))) (actor (forever (on (asserted (min-cost $from $to $cost)) (displayln (min-cost from to cost))))) diff --git a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt index 641f4cf..f15fd5f 100644 --- a/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt +++ b/racket/syndicate/examples/all-pairs-shortest-paths/all-pairs-shortest-paths2.rkt @@ -29,18 +29,18 @@ (path A C (set-add seen A) (+ link-cost path-cost))))))) (actor (forever (during (path-exists $from $to) - #:collect [(costs (set)) (least +inf.0)] - (assert (min-cost from to least)) + (field [costs (set)] [least +inf.0]) + (assert (min-cost from to (least))) (on (asserted (path from to _ $cost)) - (values (set-add costs cost) - (min least cost))) + (costs (set-add (costs) cost)) + (least (min (least) cost))) (on (retracted (path from to _ $cost)) - (define new-costs (set-remove costs cost)) - (values new-costs - (for/fold [(least +inf.0)] [(x new-costs)] (min x least))))))) + (define new-costs (set-remove (costs) cost)) + (costs new-costs) + (least (for/fold [(least +inf.0)] [(x new-costs)] (min x least))))))) (actor (forever (during (path $from $to $seen $cost) - #:init [(displayln `(+ ,(path from to seen cost)))] - #:done [(displayln `(- ,(path from to seen cost)))]))) + (on-start (displayln `(+ ,(path from to seen cost)))) + (on-stop (displayln `(- ,(path from to seen cost))))))) (actor (forever (on (asserted (min-cost $from $to $cost)) (displayln (min-cost from to cost))))) diff --git a/racket/syndicate/examples/ws-echo-client.rkt b/racket/syndicate/examples/ws-echo-client.rkt index c0c38d6..e56312b 100644 --- a/racket/syndicate/examples/ws-echo-client.rkt +++ b/racket/syndicate/examples/ws-echo-client.rkt @@ -7,7 +7,10 @@ (require syndicate/actor) (require racket/port) -(match-define (vector url) (current-command-line-arguments)) +(define url + (match (current-command-line-arguments) + [(vector url) url] + [(vector) "http://localhost:8081/ws-echo"])) (spawn-websocket-driver) @@ -19,14 +22,15 @@ (define (generate-reader-id) (begin0 reader-count (set! reader-count (+ reader-count 1)))) - (actor (state [(assert (advertise (websocket-message c s _))) - (on (asserted (websocket-peer-details c s $la _ $ra _)) - (log-info "~a: local ~v :: remote ~v" c la ra)) - (on (message (external-event e (list (? bytes? $bs))) #:meta-level 1) - (send! (websocket-message c s bs))) - (on (message (websocket-message s c $bs)) - (printf "(From server: ~v)\n" bs))] - [(message (external-event e (list (? eof-object? _))) #:meta-level 1) - (printf "Local EOF. Terminating.\n")] - [(retracted (advertise (websocket-message s c _))) - (printf "Server disconnected.\n")]))) + (actor (react (assert (advertise (websocket-message c s _))) + (on (asserted (websocket-peer-details c s $la _ $ra _)) + (log-info "~a: local ~v :: remote ~v" c la ra)) + (on (message (external-event e (list (? bytes? $bs))) #:meta-level 1) + (send! (websocket-message c s bs))) + (on (message (websocket-message s c $bs)) + (printf "(From server: ~v)\n" bs)) + (stop-when (message (external-event e (list (? eof-object? _))) + #:meta-level 1) + (printf "Local EOF. Terminating.\n")) + (stop-when (retracted (advertise (websocket-message s c _))) + (printf "Server disconnected.\n"))))) diff --git a/racket/syndicate/support/dsl.rkt b/racket/syndicate/support/dsl.rkt deleted file mode 100644 index 363d33f..0000000 --- a/racket/syndicate/support/dsl.rkt +++ /dev/null @@ -1,12 +0,0 @@ -#lang racket/base - -(require (for-syntax racket/base)) -(provide define&provide-dsl-helper-syntaxes) - -(define-for-syntax (illegal-use id context stx) - (raise-syntax-error #f (format "Illegal use of ~a outside ~a" id context) stx)) - -(define-syntax-rule (define&provide-dsl-helper-syntaxes context (identifier ...)) - (begin (provide identifier ...) - (define-syntax (identifier stx) (illegal-use 'identifier context stx)) - ...))