diff --git a/racket/syndicate-gl/2d.rkt b/racket/syndicate-gl/2d.rkt index b3cc22e..3d3595e 100644 --- a/racket/syndicate-gl/2d.rkt +++ b/racket/syndicate-gl/2d.rkt @@ -32,6 +32,7 @@ (require syndicate) (require syndicate/trie) (require syndicate/ground) +(require syndicate/hierarchy) (require "texture.rkt") (require "affine.rkt") @@ -390,7 +391,8 @@ (process-fullscreen-requests! p)] [(message (request-gc)) (perform-gc-request!)] - [(message _) (void)])) + [(message _) (void)] + [(attributed-action inner-a _) (process-action! inner-a)])) (define (process-scene-updates! p) (define-values (added removed) (patch-project/set/single p scene-projection)) diff --git a/racket/syndicate/core.rkt b/racket/syndicate/core.rkt index 310f4e4..b602208 100644 --- a/racket/syndicate/core.rkt +++ b/racket/syndicate/core.rkt @@ -69,6 +69,7 @@ (require "trie.rkt") (require "patch.rkt") (require "mux.rkt") +(require "hierarchy.rkt") ;; Events = Patches ∪ Messages (struct message (body) #:prefab) @@ -150,8 +151,18 @@ [(transition state actions) (transition state (clean-actions actions))] [(? void?) #f])) +(define (action-filter* x) + (and (action? x) (not (patch-empty? x)))) + +(define (action-filter x) + (match x + [(attributed-action inner attribution) + (define r (action-filter* inner)) + (and r (attributed-action r attribution))] + [_ (action-filter* x)])) + (define (clean-actions actions) - (filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions))) + (filter action-filter (flatten actions))) (define (update-process-state i new-state) (struct-copy process i [state new-state])) diff --git a/racket/syndicate/dataspace.rkt b/racket/syndicate/dataspace.rkt index 511a689..7b66d0b 100644 --- a/racket/syndicate/dataspace.rkt +++ b/racket/syndicate/dataspace.rkt @@ -25,7 +25,7 @@ ;; VM private states (struct dataspace (mux ;; Multiplexer - pending-action-queue ;; (Queueof (Cons Label (U Action 'quit))) + pending-action-queue ;; (Queueof (Cons Label AttributedAction)) runnable-pids ;; (Setof PID) process-table ;; (HashTable PID Process) ) @@ -36,13 +36,13 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define (send-event origin-pid e pid w) +(define (send-event attribution e pid w) (match-define (and the-process (process process-name behavior old-state)) (hash-ref (dataspace-process-table w) pid missing-process)) (if (not behavior) w (begin - (when origin-pid (trace-causal-influence origin-pid pid e)) + (when attribution (trace-causal-influence attribution pid e)) (trace-event-consumed pid e) (trace-turn-begin pid the-process) (invoke-process pid @@ -75,10 +75,10 @@ (define (update-state w pid s) (update-process-entry w pid (lambda (p) (update-process-state p s)))) -(define (send-event/guard origin-pid e pid w) +(define (send-event/guard attribution e pid w) (if (patch-empty? e) w - (send-event origin-pid e pid w))) + (send-event attribution e pid w))) (define (disable-process pid exn w) (when exn @@ -107,7 +107,7 @@ (struct-copy dataspace w [pending-action-queue (queue-append-list (dataspace-pending-action-queue w) - (for/list [(a actions)] (cons label a)))])) + (for/list [(a actions)] (cons label (attribute-action a label))))])) (define-syntax spawn-dataspace (syntax-rules () @@ -149,12 +149,15 @@ (for/fold ([wt (transition (struct-copy dataspace w [pending-action-queue (make-queue)]) '())]) ((entry (in-list (queue->list (dataspace-pending-action-queue w))))) #:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn? - (match-define [cons label a] entry) + (define-values (label a attribution) + (match entry + [(cons label (attributed-action a attribution)) (values label a attribution)] + [(cons label a) (values label a (cons label (current-actor-path-rev)))])) (when (or (event? a) (eq? a 'quit)) (trace-action-produced label a)) - (define wt1 (transition-bind (perform-action label a) wt)) + (define wt1 (transition-bind (perform-action label attribution a) wt)) wt1)) -(define ((perform-action label a) w) +(define ((perform-action label attribution a) w) (match a [( boot) (invoke-process (mux-next-pid (dataspace-mux w)) ;; anticipate pid allocation @@ -180,23 +183,23 @@ ;; Clean up the "tombstone" left for us by disable-process (let ((w (struct-copy dataspace w [process-table (hash-remove (dataspace-process-table w) label)]))) - (deliver-patches w new-mux label delta delta-aggregate))] + (deliver-patches w new-mux label attribution delta delta-aggregate))] [(quit-dataspace) (quit)] [(? patch? delta-orig) (define-values (new-mux _label delta delta-aggregate) (mux-update-stream (dataspace-mux w) label delta-orig)) - (deliver-patches w new-mux label delta delta-aggregate)] + (deliver-patches w new-mux label attribution delta delta-aggregate)] [(and m (message body)) (when (observe? body) (log-warning "Stream ~a sent message containing query ~v" (append (current-actor-path) (list label)) body)) (define-values (affected-pids meta-affected?) (mux-route-message (dataspace-mux w) body)) - (transition (for/fold [(w w)] [(pid (in-list affected-pids))] (send-event label m pid w)) - (and meta-affected? m))] + (transition (for/fold [(w w)] [(pid (in-list affected-pids))] (send-event attribution m pid w)) + (and meta-affected? (attributed-action m attribution)))] [(targeted-event (cons pid remaining-path) e) - (transition (send-event/guard label (target-event remaining-path e) pid w) '())])) + (transition (send-event/guard attribution (target-event remaining-path e) pid w) '())])) (define (create-process parent-label w behavior initial-transition name) (if (not initial-transition) @@ -229,18 +232,19 @@ (process name behavior initial-state))])) - (w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions))) + (w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)) + (new-attribution (cons new-pid (current-actor-path-rev)))) (trace-action-produced new-pid initial-patch) - (deliver-patches w new-mux new-pid delta delta-aggregate))))) + (deliver-patches w new-mux new-pid new-attribution delta delta-aggregate))))) -(define (deliver-patches w new-mux acting-label delta delta-aggregate) +(define (deliver-patches w new-mux acting-label attribution delta delta-aggregate) (define-values (patches meta-action) (compute-patches (dataspace-mux w) new-mux acting-label delta delta-aggregate)) (transition (for/fold [(w (struct-copy dataspace w [mux new-mux]))] [(entry (in-list patches))] (match-define (cons label event) entry) - (send-event/guard acting-label event label w)) - (and (patch-non-empty? meta-action) meta-action))) + (send-event/guard attribution event label w)) + (and (patch-non-empty? meta-action) (attributed-action meta-action attribution)))) (define (step-children w) (define runnable-pids (dataspace-runnable-pids w)) diff --git a/racket/syndicate/ground.rkt b/racket/syndicate/ground.rkt index 64b17ec..69ca0e7 100644 --- a/racket/syndicate/ground.rkt +++ b/racket/syndicate/ground.rkt @@ -144,6 +144,7 @@ (match actions ['() (await-interrupt #f proc interests background-activity-count)] [(cons a actions) + (when (attributed-action? a) (set! a (attributed-action-action a))) (match a [(? patch? p) (process-actions actions (apply-patch interests (label-patch p (datum-tset 'root))))] diff --git a/racket/syndicate/hierarchy.rkt b/racket/syndicate/hierarchy.rkt index c246f78..cc22d60 100644 --- a/racket/syndicate/hierarchy.rkt +++ b/racket/syndicate/hierarchy.rkt @@ -4,7 +4,9 @@ ;; - injecting events from the outside world to specific locations in the tree (provide (struct-out targeted-event) + (struct-out attributed-action) target-event + attribute-action current-actor-path-rev current-actor-path call/extended-actor-path @@ -17,6 +19,10 @@ ;; Used to inject events from the outside world. (struct targeted-event (relative-path event) #:prefab) +;; A pair of an action and the full (absolute) hierarchical path to +;; that actor (in reverse order, with the leaf actor ID leftmost). +(struct attributed-action (action source-path-rev) #:prefab) + ;; If a non-null path is provided, wraps event in a targeted-event ;; struct. (define (target-event relative-path event) @@ -24,6 +30,13 @@ (targeted-event relative-path event) event)) +;; (U AttributedAction Action 'quit) PID -> AttributedAction +;; If the action IS NOT ALREADY ATTRIBUTED IT, label it. +(define (attribute-action action pid) + (if (attributed-action? action) + action + (attributed-action action (cons pid (current-actor-path-rev))))) + ;; Storeof (Listof Any) ;; Path to the active leaf in the process tree. The car end is the ;; leaf; the cdr end, the root. diff --git a/racket/syndicate/relay.rkt b/racket/syndicate/relay.rkt index 84abcb8..8d8a538 100644 --- a/racket/syndicate/relay.rkt +++ b/racket/syndicate/relay.rkt @@ -52,6 +52,9 @@ [(patch a d) (define p (patch (relay-drop-interests a r) (relay-drop-interests d r))) (and (patch-non-empty? p) p)] + [(attributed-action inner-ac attribution) + (define v (relay-drop-action inner-ac r)) + (and v (attributed-action v attribution))] [_ ;; TODO: What should be done about spawn? Anything? ;; TODO: How about quit-dataspace? Could this be a better place for it than core.rkt? diff --git a/racket/syndicate/trace.rkt b/racket/syndicate/trace.rkt index 091875c..7e53496 100644 --- a/racket/syndicate/trace.rkt +++ b/racket/syndicate/trace.rkt @@ -90,5 +90,5 @@ (notify! (current-actor-path-rev) (cons-pid pid) 'event e)) ;; PID PID Event -(define (trace-causal-influence src-pid snk-pid e) - (notify! (cons-pid src-pid) (cons-pid snk-pid) 'influence e)) +(define (trace-causal-influence attribution snk-pid e) + (notify! attribution (cons-pid snk-pid) 'influence e))