diff --git a/racket/syndicate/actor.rkt b/racket/syndicate/actor.rkt index d770fc5..11965d1 100644 --- a/racket/syndicate/actor.rkt +++ b/racket/syndicate/actor.rkt @@ -63,6 +63,7 @@ (require "patch.rkt") (require "trie.rkt") (require "pattern.rkt") +(require "dataflow.rkt") ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Data Definitions and Structures @@ -79,14 +80,21 @@ (fprintf p "#" (field-descriptor-name (field-handle-desc h))))] #:property prop:procedure (case-lambda - [(handle) (unbox (get-field-box (field-handle-desc handle)))] - [(handle v) (set-box! (get-field-box (field-handle-desc handle)) v)])) + [(handle) + (define desc (field-handle-desc handle)) + (dataflow-record-observation! (actor-state-field-dataflow (current-actor-state)) desc) + (unbox (get-field-box desc))] + [(handle v) + (define desc (field-handle-desc handle)) + (dataflow-record-damage! (actor-state-field-dataflow (current-actor-state)) desc) + (set-box! (get-field-box desc) v)])) (struct actor-state (mux ;; Mux facets ;; (Hash FID Facet) previous-knowledge ;; AssertionSet knowledge ;; AssertionSet field-table ;; FieldTable + field-dataflow ;; DataflowGraph ) #:prefab) (struct facet (field-table ;; FieldTable @@ -529,8 +537,11 @@ (define-values (new-eid delta-aggregate) (let () (define a (current-actor-state)) - (define-values (new-mux new-eid _delta delta-aggregate) - (mux-add-stream (actor-state-mux a) (patch-fn))) + (define new-eid (mux-next-pid (actor-state-mux a))) + (define-values (new-mux _new-eid _delta delta-aggregate) + (mux-add-stream (actor-state-mux a) + (parameterize ((current-dataflow-subject-id (list (current-facet-id) new-eid))) + (patch-fn)))) (current-actor-state (struct-copy actor-state a [mux new-mux])) (values new-eid delta-aggregate))) (update-facet! (current-facet-id) @@ -588,6 +599,7 @@ (begin (for [((eid ep) (in-hash (facet-endpoints f)))] (define a (current-actor-state)) + (dataflow-forget-subject! (actor-state-field-dataflow a) (list fid eid)) (define-values (new-mux _eid _delta delta-aggregate) (mux-remove-stream (actor-state-mux a) eid)) (current-actor-state (struct-copy actor-state a [mux new-mux])) @@ -627,7 +639,8 @@ (hasheqv) trie-empty trie-empty - (hash))) + (hash) + (make-dataflow-graph))) (current-pending-patch patch-empty) (current-pending-actions '()) (current-pending-scripts (make-empty-pending-scripts))) @@ -645,8 +658,7 @@ (script) #t) (loop))) - (for ([(fid f) (in-hash (actor-state-facets (current-actor-state)))]) - (refresh-facet! fid f)) + (refresh-facet-assertions!) (flush-pending-patch!) (define pending-actions (current-pending-actions)) (current-pending-actions '()) @@ -654,10 +666,17 @@ (core:quit pending-actions) (core:transition (current-actor-state) pending-actions))) -(define (refresh-facet! fid f) - (with-current-facet fid (facet-field-table f) #f - (for [((eid ep) (in-hash (facet-endpoints f)))] - (update-stream! eid (compose-patch ((endpoint-patch-fn ep)) (core:retract ?)))))) +(define (refresh-facet-assertions!) + (dataflow-repair-damage! (actor-state-field-dataflow (current-actor-state)) + (lambda (subject-id) + (match-define (list fid eid) subject-id) + (define f (lookup-facet fid)) + (when f + (with-current-facet fid (facet-field-table f) #f + (define ep (hash-ref (facet-endpoints f) eid)) + (define new-patch ((endpoint-patch-fn ep))) + (update-stream! eid (compose-patch new-patch + (core:retract ?)))))))) (define (update-stream! eid patch) (define a (current-actor-state)) diff --git a/racket/syndicate/dataflow.rkt b/racket/syndicate/dataflow.rkt new file mode 100644 index 0000000..b26b08d --- /dev/null +++ b/racket/syndicate/dataflow.rkt @@ -0,0 +1,150 @@ +#lang racket/base +;; Simple lazy dataflow. + +(provide dataflow-graph? + make-dataflow-graph + + current-dataflow-subject-id + + dataflow-record-observation! + dataflow-record-damage! + dataflow-forget-subject! + dataflow-repair-damage!) + +(require racket/set) + +(struct dataflow-graph (edges-forward ;; object-id -> (Setof subject-id) + edges-reverse ;; subject-id -> (Setof object-id) + damaged-nodes ;; Setof object-id + ) + #:mutable) + +(define current-dataflow-subject-id (make-parameter #f)) + +(define (make-dataflow-graph) + (dataflow-graph (hash) + (hash) + (set))) + +(define (hash-set-add ht k v [set-ctor set]) + (hash-set ht k (set-add (hash-ref ht k set-ctor) v))) + +(define (hash-set-remove ht k v) + (define old (hash-ref ht k #f)) + (if old + (let ((new (set-remove old v))) + (if (set-empty? new) + (hash-remove ht k) + (hash-set ht k new))) + ht)) + +(define (dataflow-record-observation! g object-id) + (define subject-id (current-dataflow-subject-id)) + (when subject-id + (define fwd (dataflow-graph-edges-forward g)) + (set-dataflow-graph-edges-forward! g (hash-set-add fwd object-id subject-id)) + (define rev (dataflow-graph-edges-reverse g)) + (set-dataflow-graph-edges-reverse! g (hash-set-add rev subject-id object-id)))) + +(define (dataflow-record-damage! g object-id) + (set-dataflow-graph-damaged-nodes! g (set-add (dataflow-graph-damaged-nodes g) object-id))) + +(define (dataflow-forget-subject! g subject-id) + (define rev (dataflow-graph-edges-reverse g)) + (define subject-objects (hash-ref rev subject-id set)) + (set-dataflow-graph-edges-reverse! g (hash-remove rev subject-id)) + (for [(object-id (in-set subject-objects))] + (define fwd (dataflow-graph-edges-forward g)) + (set-dataflow-graph-edges-forward! g (hash-set-remove fwd object-id subject-id)))) + +(define (dataflow-repair-damage! g repair-node!) + (define repaired-this-round (set)) + (let loop () + (define workset (dataflow-graph-damaged-nodes g)) + (set-dataflow-graph-damaged-nodes! g (set)) + + (let ((already-damaged (set-intersect workset repaired-this-round))) + (when (not (set-empty? already-damaged)) + (log-warning "Cyclic dependencies involving ids ~v\n" already-damaged))) + + (set! workset (set-subtract workset repaired-this-round)) + (set! repaired-this-round (set-union repaired-this-round workset)) + + (when (not (set-empty? workset)) + (for [(object-id (in-set workset))] + (define subjects (hash-ref (dataflow-graph-edges-forward g) object-id set)) + (for [(subject-id (in-set subjects))] + (dataflow-forget-subject! g subject-id) + (parameterize ((current-dataflow-subject-id subject-id)) + (repair-node! subject-id)))) + (loop)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(module+ test + (require rackunit) + + (struct cell (value) #:mutable + #:methods gen:custom-write + [(define (write-proc c port mode) + (fprintf port "#" (cell-value c)))]) + + (define g (make-dataflow-graph)) + + (define (R b) + (dataflow-record-observation! g b) + (cell-value b)) + + (define (W b v) + (when (not (equal? (cell-value b) v)) + (dataflow-record-damage! g b) + (set-cell-value! b v))) + + (define (repair! b) + (b)) + + (define-syntax-rule (V expr) + (letrec ((node (cell #f)) + (handler (lambda () + (printf "Evaluating ~a\n" 'expr) + (W node expr)))) + (parameterize ((current-dataflow-subject-id handler)) + (handler) + node))) + + (define xs (V (list 1 2 3 4))) + (define sum (V (foldr + 0 (R xs)))) + (define len (V (length (R xs)))) + (define avg (V (if (zero? (R len)) + (void) + (/ (R sum) (R len))))) + (define scale (V 1)) + (define ans (V (if (zero? (R scale)) + (void) + (and (number? (R avg)) + (/ (R avg) (R scale)))))) + + (define (fix! stage) + (printf "\n----- Stage: ~a\n" stage) + (dataflow-repair-damage! g repair!) + (write `((xs ,(R xs)) + (sum ,(R sum)) + (len ,(R len)) + (avg ,(R avg)) + (scale ,(R scale)) + (ans ,(R ans)))) + (newline)) + + (fix! 'initial) + (W scale 0) + (fix! 'scale-zero) + (W xs (list* 9 0 (R xs))) + (fix! 'with-nine-and-zero) + (W xs (list* 5 4 (cddr (R xs)))) + (fix! 'with-five-and-four) + (W scale 1) + (fix! 'scale-one) + (W xs '()) + (fix! 'empty) + (W xs (list 4 5 6)) + (fix! 'four-five-six)) diff --git a/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt b/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt index 8d96ad1..f4e0945 100644 --- a/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt +++ b/racket/syndicate/examples/actor/example-during-criterion-snapshotting.rkt @@ -16,11 +16,11 @@ (struct foo (x y) #:prefab) -(actor (define x 123) +(actor (field [x 123]) (react - (assert (foo x 999)) - (during (foo x $v) - (log-info "x=~a v=~a" x v) - (when (= x 123) (set! x 124)) + (assert (foo (x) 999)) + (during (foo (x) $v) + (log-info "x=~a v=~a" (x) v) + (when (= (x) 123) (x 124)) (on-stop - (log-info "finally for x=~a v=~a" x v))))) + (log-info "finally for x=~a v=~a" (x) v)))))