attributed-action for improved causal influence tracking
This commit is contained in:
parent
7633174562
commit
025a96a167
|
@ -32,6 +32,7 @@
|
|||
(require syndicate)
|
||||
(require syndicate/trie)
|
||||
(require syndicate/ground)
|
||||
(require syndicate/hierarchy)
|
||||
|
||||
(require "texture.rkt")
|
||||
(require "affine.rkt")
|
||||
|
@ -390,7 +391,8 @@
|
|||
(process-fullscreen-requests! p)]
|
||||
[(message (request-gc))
|
||||
(perform-gc-request!)]
|
||||
[(message _) (void)]))
|
||||
[(message _) (void)]
|
||||
[(attributed-action inner-a _) (process-action! inner-a)]))
|
||||
|
||||
(define (process-scene-updates! p)
|
||||
(define-values (added removed) (patch-project/set/single p scene-projection))
|
||||
|
|
|
@ -69,6 +69,7 @@
|
|||
(require "trie.rkt")
|
||||
(require "patch.rkt")
|
||||
(require "mux.rkt")
|
||||
(require "hierarchy.rkt")
|
||||
|
||||
;; Events = Patches ∪ Messages
|
||||
(struct message (body) #:prefab)
|
||||
|
@ -150,8 +151,18 @@
|
|||
[(transition state actions) (transition state (clean-actions actions))]
|
||||
[(? void?) #f]))
|
||||
|
||||
(define (action-filter* x)
|
||||
(and (action? x) (not (patch-empty? x))))
|
||||
|
||||
(define (action-filter x)
|
||||
(match x
|
||||
[(attributed-action inner attribution)
|
||||
(define r (action-filter* inner))
|
||||
(and r (attributed-action r attribution))]
|
||||
[_ (action-filter* x)]))
|
||||
|
||||
(define (clean-actions actions)
|
||||
(filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions)))
|
||||
(filter action-filter (flatten actions)))
|
||||
|
||||
(define (update-process-state i new-state)
|
||||
(struct-copy process i [state new-state]))
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
|
||||
;; VM private states
|
||||
(struct dataspace (mux ;; Multiplexer
|
||||
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
|
||||
pending-action-queue ;; (Queueof (Cons Label AttributedAction))
|
||||
runnable-pids ;; (Setof PID)
|
||||
process-table ;; (HashTable PID Process)
|
||||
)
|
||||
|
@ -36,13 +36,13 @@
|
|||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define (send-event origin-pid e pid w)
|
||||
(define (send-event attribution e pid w)
|
||||
(match-define (and the-process (process process-name behavior old-state))
|
||||
(hash-ref (dataspace-process-table w) pid missing-process))
|
||||
(if (not behavior)
|
||||
w
|
||||
(begin
|
||||
(when origin-pid (trace-causal-influence origin-pid pid e))
|
||||
(when attribution (trace-causal-influence attribution pid e))
|
||||
(trace-event-consumed pid e)
|
||||
(trace-turn-begin pid the-process)
|
||||
(invoke-process pid
|
||||
|
@ -75,10 +75,10 @@
|
|||
(define (update-state w pid s)
|
||||
(update-process-entry w pid (lambda (p) (update-process-state p s))))
|
||||
|
||||
(define (send-event/guard origin-pid e pid w)
|
||||
(define (send-event/guard attribution e pid w)
|
||||
(if (patch-empty? e)
|
||||
w
|
||||
(send-event origin-pid e pid w)))
|
||||
(send-event attribution e pid w)))
|
||||
|
||||
(define (disable-process pid exn w)
|
||||
(when exn
|
||||
|
@ -107,7 +107,7 @@
|
|||
(struct-copy dataspace w
|
||||
[pending-action-queue
|
||||
(queue-append-list (dataspace-pending-action-queue w)
|
||||
(for/list [(a actions)] (cons label a)))]))
|
||||
(for/list [(a actions)] (cons label (attribute-action a label))))]))
|
||||
|
||||
(define-syntax spawn-dataspace
|
||||
(syntax-rules ()
|
||||
|
@ -149,12 +149,15 @@
|
|||
(for/fold ([wt (transition (struct-copy dataspace w [pending-action-queue (make-queue)]) '())])
|
||||
((entry (in-list (queue->list (dataspace-pending-action-queue w)))))
|
||||
#:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn?
|
||||
(match-define [cons label a] entry)
|
||||
(define-values (label a attribution)
|
||||
(match entry
|
||||
[(cons label (attributed-action a attribution)) (values label a attribution)]
|
||||
[(cons label a) (values label a (cons label (current-actor-path-rev)))]))
|
||||
(when (or (event? a) (eq? a 'quit)) (trace-action-produced label a))
|
||||
(define wt1 (transition-bind (perform-action label a) wt))
|
||||
(define wt1 (transition-bind (perform-action label attribution a) wt))
|
||||
wt1))
|
||||
|
||||
(define ((perform-action label a) w)
|
||||
(define ((perform-action label attribution a) w)
|
||||
(match a
|
||||
[(<spawn> boot)
|
||||
(invoke-process (mux-next-pid (dataspace-mux w)) ;; anticipate pid allocation
|
||||
|
@ -180,23 +183,23 @@
|
|||
;; Clean up the "tombstone" left for us by disable-process
|
||||
(let ((w (struct-copy dataspace w
|
||||
[process-table (hash-remove (dataspace-process-table w) label)])))
|
||||
(deliver-patches w new-mux label delta delta-aggregate))]
|
||||
(deliver-patches w new-mux label attribution delta delta-aggregate))]
|
||||
[(quit-dataspace)
|
||||
(quit)]
|
||||
[(? patch? delta-orig)
|
||||
(define-values (new-mux _label delta delta-aggregate)
|
||||
(mux-update-stream (dataspace-mux w) label delta-orig))
|
||||
(deliver-patches w new-mux label delta delta-aggregate)]
|
||||
(deliver-patches w new-mux label attribution delta delta-aggregate)]
|
||||
[(and m (message body))
|
||||
(when (observe? body)
|
||||
(log-warning "Stream ~a sent message containing query ~v"
|
||||
(append (current-actor-path) (list label))
|
||||
body))
|
||||
(define-values (affected-pids meta-affected?) (mux-route-message (dataspace-mux w) body))
|
||||
(transition (for/fold [(w w)] [(pid (in-list affected-pids))] (send-event label m pid w))
|
||||
(and meta-affected? m))]
|
||||
(transition (for/fold [(w w)] [(pid (in-list affected-pids))] (send-event attribution m pid w))
|
||||
(and meta-affected? (attributed-action m attribution)))]
|
||||
[(targeted-event (cons pid remaining-path) e)
|
||||
(transition (send-event/guard label (target-event remaining-path e) pid w) '())]))
|
||||
(transition (send-event/guard attribution (target-event remaining-path e) pid w) '())]))
|
||||
|
||||
(define (create-process parent-label w behavior initial-transition name)
|
||||
(if (not initial-transition)
|
||||
|
@ -229,18 +232,19 @@
|
|||
(process name
|
||||
behavior
|
||||
initial-state))]))
|
||||
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
|
||||
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions))
|
||||
(new-attribution (cons new-pid (current-actor-path-rev))))
|
||||
(trace-action-produced new-pid initial-patch)
|
||||
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
|
||||
(deliver-patches w new-mux new-pid new-attribution delta delta-aggregate)))))
|
||||
|
||||
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
|
||||
(define (deliver-patches w new-mux acting-label attribution delta delta-aggregate)
|
||||
(define-values (patches meta-action)
|
||||
(compute-patches (dataspace-mux w) new-mux acting-label delta delta-aggregate))
|
||||
(transition (for/fold [(w (struct-copy dataspace w [mux new-mux]))]
|
||||
[(entry (in-list patches))]
|
||||
(match-define (cons label event) entry)
|
||||
(send-event/guard acting-label event label w))
|
||||
(and (patch-non-empty? meta-action) meta-action)))
|
||||
(send-event/guard attribution event label w))
|
||||
(and (patch-non-empty? meta-action) (attributed-action meta-action attribution))))
|
||||
|
||||
(define (step-children w)
|
||||
(define runnable-pids (dataspace-runnable-pids w))
|
||||
|
|
|
@ -144,6 +144,7 @@
|
|||
(match actions
|
||||
['() (await-interrupt #f proc interests background-activity-count)]
|
||||
[(cons a actions)
|
||||
(when (attributed-action? a) (set! a (attributed-action-action a)))
|
||||
(match a
|
||||
[(? patch? p)
|
||||
(process-actions actions (apply-patch interests (label-patch p (datum-tset 'root))))]
|
||||
|
|
|
@ -4,7 +4,9 @@
|
|||
;; - injecting events from the outside world to specific locations in the tree
|
||||
|
||||
(provide (struct-out targeted-event)
|
||||
(struct-out attributed-action)
|
||||
target-event
|
||||
attribute-action
|
||||
current-actor-path-rev
|
||||
current-actor-path
|
||||
call/extended-actor-path
|
||||
|
@ -17,6 +19,10 @@
|
|||
;; Used to inject events from the outside world.
|
||||
(struct targeted-event (relative-path event) #:prefab)
|
||||
|
||||
;; A pair of an action and the full (absolute) hierarchical path to
|
||||
;; that actor (in reverse order, with the leaf actor ID leftmost).
|
||||
(struct attributed-action (action source-path-rev) #:prefab)
|
||||
|
||||
;; If a non-null path is provided, wraps event in a targeted-event
|
||||
;; struct.
|
||||
(define (target-event relative-path event)
|
||||
|
@ -24,6 +30,13 @@
|
|||
(targeted-event relative-path event)
|
||||
event))
|
||||
|
||||
;; (U AttributedAction Action 'quit) PID -> AttributedAction
|
||||
;; If the action IS NOT ALREADY ATTRIBUTED IT, label it.
|
||||
(define (attribute-action action pid)
|
||||
(if (attributed-action? action)
|
||||
action
|
||||
(attributed-action action (cons pid (current-actor-path-rev)))))
|
||||
|
||||
;; Storeof (Listof Any)
|
||||
;; Path to the active leaf in the process tree. The car end is the
|
||||
;; leaf; the cdr end, the root.
|
||||
|
|
|
@ -52,6 +52,9 @@
|
|||
[(patch a d)
|
||||
(define p (patch (relay-drop-interests a r) (relay-drop-interests d r)))
|
||||
(and (patch-non-empty? p) p)]
|
||||
[(attributed-action inner-ac attribution)
|
||||
(define v (relay-drop-action inner-ac r))
|
||||
(and v (attributed-action v attribution))]
|
||||
[_
|
||||
;; TODO: What should be done about spawn? Anything?
|
||||
;; TODO: How about quit-dataspace? Could this be a better place for it than core.rkt?
|
||||
|
|
|
@ -90,5 +90,5 @@
|
|||
(notify! (current-actor-path-rev) (cons-pid pid) 'event e))
|
||||
|
||||
;; PID PID Event
|
||||
(define (trace-causal-influence src-pid snk-pid e)
|
||||
(notify! (cons-pid src-pid) (cons-pid snk-pid) 'influence e))
|
||||
(define (trace-causal-influence attribution snk-pid e)
|
||||
(notify! attribution (cons-pid snk-pid) 'influence e))
|
||||
|
|
Loading…
Reference in New Issue