diff --git a/racket/syndicate/actor.rkt b/racket/syndicate/actor.rkt index b16eb38..3a8e147 100644 --- a/racket/syndicate/actor.rkt +++ b/racket/syndicate/actor.rkt @@ -31,6 +31,9 @@ retracted rising-edge (rename-out [core:message message]) + know + forget + realize let-event @@ -58,6 +61,7 @@ perform-actions! flush! quit-dataspace! + realize! syndicate-effects-available? @@ -80,6 +84,7 @@ (require racket/set) (require racket/match) (require racket/contract) +(require (only-in racket/list flatten)) (require (for-syntax racket/base)) (require (for-syntax syntax/parse)) @@ -198,10 +203,15 @@ endpoints ;; (Hash EID Endpoint) stop-scripts ;; (Listof Script) -- IN REVERSE ORDER children ;; (Setof FID) + previous-knowledge ;; AssertionSet of internal knowledge + knowledge ;; AssertionSet of internal knowledge ) #:prefab) (struct endpoint (id patch-fn handler-fn) #:prefab) +(struct internal-knowledge (v) #:prefab) +(define internal-knowledge-parenthesis (open-parenthesis 1 struct:internal-knowledge)) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Script priorities. These are used to ensure that the results of ;; some *side effects* are visible to certain pieces of code. @@ -249,6 +259,12 @@ ;; Storeof (Constreeof Action) (define current-pending-actions (make-store)) +;; Storeof Patch +(define current-pending-internal-patch (make-store)) + +;; Storeof (Constreeof Action) +(define current-pending-internal-events (make-store)) + ;; Storeof (Vector (Queue Script) ...) ;; Mutates the vector! (define current-pending-scripts (make-store)) @@ -407,6 +423,7 @@ (analyze-pattern stx #'P)) (quasisyntax/loc stx (add-endpoint! #,(source-location->string stx) + #f (lambda () #,(let ((patch-stx #`(core:assert #,pat))) (if #'w.Pred @@ -414,6 +431,22 @@ patch-stx))) void))])) +(define-syntax (know stx) + (syntax-parse stx + [(_ w:when-pred P) + (define-values (proj pat bindings _instantiated) + (analyze-pattern stx #'P)) + (quasisyntax/loc stx + (add-endpoint! + #,(source-location->string stx) + #t + (lambda () + #,(let ((patch-stx #`(core:assert (internal-knowledge #,pat)))) + (if #'w.Pred + #`(if w.Pred #,patch-stx patch-empty) + patch-stx))) + void))])) + (define (fid-ancestor? fid maybe-ancestor) (and (pair? fid) ;; empty fid lists obviously no ancestors at all! (or (equal? fid maybe-ancestor) @@ -474,6 +507,7 @@ (define (on-event* where proc #:priority [priority *normal-priority*]) (add-endpoint! where + #f (lambda () patch-empty) (lambda (e _current-interests _synthetic?) (schedule-script! #:priority priority (lambda () (proc e)))))) @@ -547,6 +581,7 @@ (quasisyntax/loc stx (let () (add-endpoint! #,(source-location->string stx) + #f (lambda () (define subject-id (current-dataflow-subject-id)) (schedule-script! @@ -570,6 +605,8 @@ (define-syntax (asserted stx) (raise-syntax-error #f "asserted: Used outside event spec" stx)) (define-syntax (retracted stx) (raise-syntax-error #f "retracted: Used outside event spec" stx)) (define-syntax (rising-edge stx) (raise-syntax-error #f "rising-edge: Used outside event spec" stx)) +(define-syntax (forget stx) (raise-syntax-error #f "forget: Used outside event spec" stx)) +(define-syntax (realize stx) (raise-syntax-error #f "realize: Used outside event spec" stx)) (define-syntax (suspend-script stx) (syntax-parse stx @@ -772,62 +809,119 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Syntax-time support -(define (interests-pre-and-post-patch pat synthetic?) +(define (interests-pre-and-post-patch pat synthetic? retrieve-knowledge) (define (or* x y) (or x y)) - (define a (current-actor-state)) - (define previous-knowledge (if synthetic? trie-empty (actor-state-previous-knowledge a))) - (define old (trie-lookup previous-knowledge pat #f #:wildcard-union or*)) - (define new (trie-lookup (actor-state-knowledge a) pat #f #:wildcard-union or*)) + (define-values (prev current) (retrieve-knowledge synthetic?)) + (define old (trie-lookup prev pat #f #:wildcard-union or*)) + (define new (trie-lookup current pat #f #:wildcard-union or*)) (values old new)) -(define (interest-just-appeared-matching? pat synthetic?) - (define-values (old new) (interests-pre-and-post-patch pat synthetic?)) +(define (interest-just-appeared-matching? pat synthetic? retrieve-knowledge) + (define-values (old new) (interests-pre-and-post-patch pat synthetic? retrieve-knowledge)) (and (not old) new)) -(define (interest-just-disappeared-matching? pat synthetic?) - (define-values (old new) (interests-pre-and-post-patch pat synthetic?)) +(define (interest-just-disappeared-matching? pat synthetic? retrieve-knowledge) + (define-values (old new) (interests-pre-and-post-patch pat synthetic? retrieve-knowledge)) (and old (not new))) -(define-for-syntax (analyze-asserted/retracted outer-expr-stx - when-pred-stx - event-stx - script-stx - asserted? - P-stx - priority-stx) +;; Bool -> (Values AssertionSet AssertionSet) +;; retrieve the previous and current knowledge fields from the current actor state +(define (current-actor-state-knowledges synthetic?) + (define a (current-actor-state)) + (define previous-knowledge (if synthetic? trie-empty (actor-state-previous-knowledge a))) + (define current-knowledge (actor-state-knowledge a)) + (values previous-knowledge current-knowledge)) + +;; Bool -> (Values AssertionSet AssertionSet) +;; retrieve the previous and current knowledge fields from the current facet +(define (current-facet-knowledges synthetic?) + (define f (lookup-facet (current-facet-id))) + (define previous-knowledge (if synthetic? trie-empty (facet-previous-knowledge f))) + (define current-knowledge (facet-knowledge f)) + (values previous-knowledge current-knowledge)) + +(define-for-syntax (analyze-appear/disappear outer-expr-stx + when-pred-stx + event-stx + script-stx + asserted? + P-stx + priority-stx + internal?) + (define P+ + (if internal? #`(internal-knowledge #,P-stx) P-stx)) (define-values (proj-stx pat bindings _instantiated) - (analyze-pattern event-stx P-stx)) + (analyze-pattern event-stx P+)) (define event-predicate-stx (if asserted? #'patch/added? #'patch/removed?)) (define patch-accessor-stx (if asserted? #'patch-added #'patch-removed)) (define change-detector-stx (if asserted? #'interest-just-appeared-matching? #'interest-just-disappeared-matching?)) + (define knowledge-retriever + (if internal? #'current-facet-knowledges #'current-actor-state-knowledges)) (quasisyntax/loc outer-expr-stx - (add-endpoint! #,(source-location->string outer-expr-stx) - (lambda () (if #,when-pred-stx - (core:sub #,pat) - patch-empty)) - (lambda (e current-interests synthetic?) - (when (not (trie-empty? current-interests)) - (core:match-event e - [(? #,event-predicate-stx p) - (define proj #,proj-stx) - (define proj-arity (projection-arity proj)) - (define entry-set (trie-project/set #:take proj-arity - (#,patch-accessor-stx p) - proj)) - (when (not entry-set) - (error 'asserted - "Wildcard interest discovered while projecting by ~v at ~a" - proj - #,(source-location->string P-stx))) - (for [(entry (in-set entry-set))] - (let ((instantiated (instantiate-projection proj entry))) - (and (#,change-detector-stx instantiated synthetic?) - (schedule-script! - #:priority #,priority-stx - (lambda () - (match-define (list #,@bindings) entry) - #,script-stx)))))])))))) + (add-endpoint! + #,(source-location->string outer-expr-stx) + #,internal? + (lambda () (if #,when-pred-stx + (core:sub #,pat) + patch-empty)) + (lambda (e current-interests synthetic?) + (when (not (trie-empty? current-interests)) + (core:match-event e + [(? #,event-predicate-stx p) + (define proj #,proj-stx) + (define proj-arity (projection-arity proj)) + (define entry-set (trie-project/set #:take proj-arity + (#,patch-accessor-stx p) + proj)) + (when (not entry-set) + (error 'asserted + "Wildcard interest discovered while projecting by ~v at ~a" + proj + #,(source-location->string P-stx))) + (for [(entry (in-set entry-set))] + (let ((instantiated (instantiate-projection proj entry))) + (and (#,change-detector-stx instantiated synthetic? #,knowledge-retriever) + (schedule-script! + #:priority #,priority-stx + (lambda () + (match-define (list #,@bindings) entry) + #,script-stx)))))])))))) + +(define-for-syntax (analyze-message outer-expr-stx + when-pred-stx + event-stx + script-stx + P-stx + priority-stx + internal?) + (define-values (proj pat bindings _instantiated) + (analyze-pattern event-stx P-stx)) + (define sub + (if internal? #`(internal-knowledge #,pat) pat)) + (define matchp + (if internal? #'(internal-knowledge body) #'body)) + (quasisyntax/loc outer-expr-stx + (add-endpoint! + #,(source-location->string outer-expr-stx) + #,internal? + (lambda () (if #,when-pred-stx + (core:sub #,sub) + patch-empty)) + (lambda (e current-interests _synthetic?) + (when (not (trie-empty? current-interests)) + (core:match-event e + [(core:message #,matchp) + (define capture-vals + (match-value/captures + body + #,proj)) + (and capture-vals + (schedule-script! + #:priority #,priority-stx + (lambda () + (apply (lambda #,bindings #,script-stx) + capture-vals))))])))))) (define-for-syntax orig-insp (variable-reference->module-declaration-inspector (#%variable-reference))) @@ -839,7 +933,7 @@ priority-stx) (define event-stx (syntax-disarm armed-event-stx orig-insp)) (syntax-parse event-stx - #:literals [core:message asserted retracted rising-edge] + #:literals [core:message asserted retracted rising-edge know forget realize] [(expander args ...) #:when (event-expander-id? #'expander) (event-expander-transform @@ -851,33 +945,23 @@ script-stx priority-stx)))] [(core:message P) - (define-values (proj pat bindings _instantiated) - (analyze-pattern event-stx #'P)) - (quasisyntax/loc outer-expr-stx - (add-endpoint! #,(source-location->string outer-expr-stx) - (lambda () (if #,when-pred-stx - (core:sub #,pat) - patch-empty)) - (lambda (e current-interests _synthetic?) - (when (not (trie-empty? current-interests)) - (core:match-event e - [(core:message body) - (define capture-vals - (match-value/captures - body - #,proj)) - (and capture-vals - (schedule-script! - #:priority #,priority-stx - (lambda () - (apply (lambda #,bindings #,script-stx) - capture-vals))))])))))] + (analyze-message outer-expr-stx when-pred-stx event-stx script-stx + #'P priority-stx #f)] [(asserted P) - (analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx - #t #'P priority-stx)] + (analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx + #t #'P priority-stx #f)] [(retracted P) - (analyze-asserted/retracted outer-expr-stx when-pred-stx event-stx script-stx - #f #'P priority-stx)] + (analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx + #f #'P priority-stx #f)] + [(realize P) + (analyze-message outer-expr-stx when-pred-stx event-stx script-stx + #'P priority-stx #t)] + [(know P) + (analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx + #t #'P priority-stx #t)] + [(forget P) + (analyze-appear/disappear outer-expr-stx when-pred-stx event-stx script-stx + #f #'P priority-stx #t)] [(rising-edge Pred) (define field-name (datum->syntax event-stx @@ -887,6 +971,7 @@ (let () (field [#,field-name #f]) (add-endpoint! #,(source-location->string outer-expr-stx) + #f (lambda () (when #,when-pred-stx (define old-val (#,field-name)) @@ -1003,10 +1088,25 @@ (current-pending-actions (list (current-pending-actions) ((current-action-transformer) p))))) +(define (schedule-internal-event! ac) + (if (patch? ac) + (when (patch-non-empty? ac) + (current-pending-internal-patch (compose-patch ac (current-pending-internal-patch)))) + (begin (flush-pending-internal-patch!) + (current-pending-internal-events (list (current-pending-internal-events) + ((current-action-transformer) ac)))))) + +(define (flush-pending-internal-patch!) + (define p (current-pending-internal-patch)) + (when (patch-non-empty? p) + (current-pending-internal-patch patch-empty) + (current-pending-internal-events (list (current-pending-internal-events) + ((current-action-transformer) p))))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Endpoint Creation -(define (add-endpoint! where patch-fn handler-fn) +(define (add-endpoint! where internal? patch-fn handler-fn) (when (in-script?) (error 'add-endpoint! "~a: Cannot add endpoint in script; are you missing a (react ...)?" @@ -1030,7 +1130,9 @@ (hash-set (facet-endpoints f) new-eid (endpoint new-eid patch-fn handler-fn))])))) - (schedule-action! delta-aggregate)) + (if internal? + (schedule-internal-event! delta-aggregate) + (schedule-action! delta-aggregate))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Facet Lifecycle @@ -1045,7 +1147,7 @@ (define fid-uid next-fid-uid) (define fid (cons fid-uid parent-fid)) (set! next-fid-uid (+ next-fid-uid 1)) - (update-facet! fid (lambda (_f) (facet fid (hasheqv) '() (set)))) + (update-facet! fid (lambda (_f) (facet fid (hasheqv) '() (set) trie-empty trie-empty))) (update-facet! parent-fid (lambda (pf) (and pf (struct-copy facet pf @@ -1094,8 +1196,24 @@ (dataflow-forget-subject! (actor-state-field-dataflow a) (list fid eid)) (define-values (new-mux _eid _delta delta-aggregate) (mux-remove-stream (actor-state-mux a) eid)) - (current-actor-state (struct-copy actor-state a [mux new-mux])) - (schedule-action! delta-aggregate)))) + (define internal (patch-step delta-aggregate internal-knowledge-parenthesis)) + (define external (patch (trie-subtract (patch-added delta-aggregate) (patch-added internal)) + (trie-subtract (patch-removed delta-aggregate) (patch-removed internal)))) + (current-actor-state (struct-copy actor-state a + [mux new-mux])) + (define internal-aggregate (patch-prepend internal-knowledge-parenthesis internal)) + (schedule-script! + #:priority *gc-priority* + ;; need to do this later for the forget change detector + (lambda () + (define a (current-actor-state)) + (define new-knowledge + (update-interests (actor-state-knowledge a) internal)) + (current-actor-state (struct-copy actor-state a + [knowledge new-knowledge])))) + + (schedule-internal-event! internal-aggregate) + (schedule-action! external)))) (schedule-script! #:priority *gc-priority* @@ -1124,6 +1242,8 @@ (make-dataflow-graph))) (current-pending-patch patch-empty) (current-pending-actions '()) + (current-pending-internal-patch patch-empty) + (current-pending-internal-events '()) (current-pending-scripts (make-empty-pending-scripts)) (current-action-transformer values)] (with-current-facet '() #f @@ -1151,6 +1271,7 @@ (when script (script) (refresh-facet-assertions!) + (dispatch-internal-events!) (run-all-pending-scripts!))) (define (run-scripts!) @@ -1162,6 +1283,16 @@ (core:quit pending-actions) (core:transition (current-actor-state) pending-actions))) +;; dispatch the internal events that have accumulated during script execution +(define (dispatch-internal-events!) + (flush-pending-internal-patch!) + (define pending (flatten (current-pending-internal-events))) + (current-pending-internal-events '()) + (define a (current-actor-state)) + (for* ([e (in-list pending)] + [(fid f) (in-hash (actor-state-facets a))]) + (facet-handle-event! fid f e #f))) + (define (refresh-facet-assertions!) (dataflow-repair-damage! (actor-state-field-dataflow (current-actor-state)) (lambda (subject-id) @@ -1201,6 +1332,8 @@ a)) (current-pending-patch patch-empty) (current-pending-actions '()) + (current-pending-internal-patch patch-empty) + (current-pending-internal-events '()) (current-pending-scripts (make-empty-pending-scripts)) (current-action-transformer values)] (for [((fid f) (in-hash (actor-state-facets a)))] @@ -1210,6 +1343,16 @@ (define (facet-handle-event! fid f e synthetic?) (define mux (actor-state-mux (current-actor-state))) (with-current-facet fid #f + (when (patch? e) + ;; quick-and-dirty intersection with (internal-knowledge ?) + (define internal (patch-prepend internal-knowledge-parenthesis + (patch-step e internal-knowledge-parenthesis))) + (update-facet! fid + (lambda (f) + (and f + (struct-copy facet f + [previous-knowledge (facet-knowledge f)] + [knowledge (apply-patch (facet-knowledge f) internal)]))))) (for [(ep (in-hash-values (facet-endpoints f)))] ((endpoint-handler-fn ep) e (mux-interests-of mux (endpoint-id ep)) synthetic?)))) @@ -1306,6 +1449,10 @@ (ensure-in-script! 'send!) (schedule-action! (core:message M))) +(define (realize! M) + (ensure-in-script! 'realize!) + (schedule-internal-event! (core:message (internal-knowledge M)))) + (define *adhoc-label* -1) (define (assert! P) @@ -1352,7 +1499,7 @@ (fprintf p " - Knowledge:\n ~a" (trie->pretty-string knowledge #:indent 3)) (fprintf p " - Facets:\n") (for ([(fid f) (in-hash facets)]) - (match-define (facet _fid endpoints _ children) f) + (match-define (facet _fid endpoints _ children _ _) f) (fprintf p " ---- facet ~a, children=~a" fid (set->list children)) (when (not (hash-empty? endpoints)) (fprintf p ", endpoints=~a" (hash-keys endpoints))) diff --git a/racket/syndicate/examples/actor/internal-knowledge.rkt b/racket/syndicate/examples/actor/internal-knowledge.rkt new file mode 100644 index 0000000..68d3213 --- /dev/null +++ b/racket/syndicate/examples/actor/internal-knowledge.rkt @@ -0,0 +1,62 @@ +#lang syndicate + +;; Expected Output: +#| +balance = 0 +balance = 5 +balance = 0 +JEEPERS +know overdraft! +balance = -1 +balance = -2 +no longer in overdraft +balance = 8 +|# + +(assertion-struct balance (v)) +(message-struct deposit (v)) + +(spawn + ;; Internal Events + (message-struct new-transaction (old new)) + (assertion-struct overdraft ()) + + (field [account 0]) + + (assert (balance (account))) + + (on (message (deposit $v)) + (define prev (account)) + (account (+ v (account))) + (realize! (new-transaction prev (account)))) + + (on (realize (new-transaction $old $new)) + (when (and (negative? new) + (not (negative? old))) + (react + ;; (this print is to make sure only one of these facets is created) + (printf "JEEPERS\n") + (know (overdraft)) + (on (realize (new-transaction $old $new)) + (when (not (negative? new)) + (stop-current-facet)))))) + + (on (know (overdraft)) + (printf "know overdraft!\n")) + (on (forget (overdraft)) + (printf "no longer in overdraft\n"))) + +(spawn + (on (asserted (balance $v)) + (printf "balance = ~a\n" v))) + +(spawn* + (send! (deposit 5)) + (flush!) + (send! (deposit -5)) + (flush!) + (send! (deposit -1)) + (flush!) + (send! (deposit -1)) + (flush!) + (send! (deposit 10))) diff --git a/racket/syndicate/examples/actor/realize.rkt b/racket/syndicate/examples/actor/realize.rkt new file mode 100644 index 0000000..5b18d6c --- /dev/null +++ b/racket/syndicate/examples/actor/realize.rkt @@ -0,0 +1,19 @@ +#lang syndicate + +;; Expected Output: +#| +received message bad +realized good +|# + +(message-struct ping (v)) + +(spawn + (on (realize (ping $v)) + (printf "realized ~a\n" v)) + (on (message (ping $v)) + (printf "received message ~a\n" v) + (realize! (ping 'good)))) + +(spawn* + (send! (ping 'bad))) diff --git a/racket/syndicate/patch.rkt b/racket/syndicate/patch.rkt index 2b6e444..b24ddff 100644 --- a/racket/syndicate/patch.rkt +++ b/racket/syndicate/patch.rkt @@ -14,6 +14,7 @@ limit-patch patch-step patch-step* + patch-prepend compute-aggregate-patch apply-patch update-interests @@ -125,6 +126,13 @@ (define (patch-step* p keys) (foldl (lambda (key p) (patch-step p key)) p keys)) +;; (U Sigma OpenParenthesis) Trie -> Trie +;; Prepend both added and removed sets +(define (patch-prepend key p) + (match-define (patch added removed) p) + (patch (trie-prepend key added) + (trie-prepend key removed))) + ;; Entries labelled with `label` may already exist in `base`; the ;; patch `p` MUST already have been limited to add only where no ;; `label`-labelled portions of `base` exist, and to remove only where