diff --git a/racket/syndicate/actor.rkt b/racket/syndicate/actor.rkt index 9e28385..3550df0 100644 --- a/racket/syndicate/actor.rkt +++ b/racket/syndicate/actor.rkt @@ -107,13 +107,22 @@ (case-lambda [(handle) (define desc (field-handle-desc handle)) - (dataflow-record-observation! (actor-state-field-dataflow (current-actor-state)) desc) + (with-current-dataflow dataflow-record-observation desc) (field-ref desc)] [(handle v) (define desc (field-handle-desc handle)) - (dataflow-record-damage! (actor-state-field-dataflow (current-actor-state)) desc) + (with-current-dataflow dataflow-record-damage desc) (field-set! desc v)])) +(define (current-dataflow-graph) + (actor-state-field-dataflow (current-actor-state))) + +(define (set-current-dataflow-graph! g) + (current-actor-state (struct-copy actor-state (current-actor-state) [field-dataflow g]))) + +(define-syntax-rule (with-current-dataflow proc arg ...) + (set-current-dataflow-graph! (proc (current-dataflow-graph) arg ...))) + (define (make-field-proxy field guard wrap) (case-lambda [() (wrap (field))] @@ -1012,8 +1021,8 @@ (and f (begin (for [((eid ep) (in-hash (facet-endpoints f)))] + (with-current-dataflow dataflow-forget-subject (list fid eid)) (define a (current-actor-state)) - (dataflow-forget-subject! (actor-state-field-dataflow a) (list fid eid)) (define-values (new-mux _eid _delta delta-aggregate) (mux-remove-stream (actor-state-mux a) eid)) (current-actor-state (struct-copy actor-state a [mux new-mux])) @@ -1106,8 +1115,9 @@ (core:transition (current-actor-state) pending-actions))) (define (refresh-facet-assertions!) - (dataflow-repair-damage! (actor-state-field-dataflow (current-actor-state)) - (lambda (subject-id) + (with-current-dataflow + dataflow-repair-damage (lambda (g subject-id) + (set-current-dataflow-graph! g) (match-define (list fid eid) subject-id) (define f (lookup-facet fid)) (when f @@ -1115,7 +1125,8 @@ (define ep (hash-ref (facet-endpoints f) eid)) (define new-patch ((endpoint-patch-fn ep))) (update-stream! eid (compose-patch new-patch - (core:retract ?)))))))) + (core:retract ?))))) + (current-dataflow-graph)))) (define (update-stream! eid patch) (define a (current-actor-state)) diff --git a/racket/syndicate/dataflow.rkt b/racket/syndicate/dataflow.rkt index 93eacde..dc93c4c 100644 --- a/racket/syndicate/dataflow.rkt +++ b/racket/syndicate/dataflow.rkt @@ -7,19 +7,21 @@ current-dataflow-subject-id - dataflow-record-observation! - dataflow-record-damage! - dataflow-forget-subject! - dataflow-repair-damage!) + dataflow-record-observation + dataflow-record-damage + dataflow-forget-subject + dataflow-repair-damage) +(require racket/match) (require racket/set) +(require "store.rkt") (require "support/hash.rkt") (struct dataflow-graph (edges-forward ;; object-id -> (Setof subject-id) edges-reverse ;; subject-id -> (Setof object-id) damaged-nodes ;; Setof object-id - ) - #:mutable) + ) #:transparent) ;; TODO <---- REMOVE +;; TODO ^^^ (define current-dataflow-subject-id (make-parameter #f)) @@ -28,46 +30,50 @@ (hash) (set))) -(define (dataflow-record-observation! g object-id) - (define subject-id (current-dataflow-subject-id)) - (when subject-id - (define fwd (dataflow-graph-edges-forward g)) - (set-dataflow-graph-edges-forward! g (hashset-add fwd object-id subject-id)) - (define rev (dataflow-graph-edges-reverse g)) - (set-dataflow-graph-edges-reverse! g (hashset-add rev subject-id object-id)))) +(define (dataflow-record-observation g object-id) + (match (current-dataflow-subject-id) + [#f g] + [subject-id + (match g + [(dataflow-graph fwd rev damaged) + (dataflow-graph (hashset-add fwd object-id subject-id) + (hashset-add rev subject-id object-id) + damaged)])])) -(define (dataflow-record-damage! g object-id) - (set-dataflow-graph-damaged-nodes! g (set-add (dataflow-graph-damaged-nodes g) object-id))) +(define (dataflow-record-damage g object-id) + (struct-copy dataflow-graph g + [damaged-nodes (set-add (dataflow-graph-damaged-nodes g) object-id)])) -(define (dataflow-forget-subject! g subject-id) +(define (dataflow-forget-subject g subject-id) (define rev (dataflow-graph-edges-reverse g)) (define subject-objects (hash-ref rev subject-id set)) - (set-dataflow-graph-edges-reverse! g (hash-remove rev subject-id)) - (for [(object-id (in-set subject-objects))] - (define fwd (dataflow-graph-edges-forward g)) - (set-dataflow-graph-edges-forward! g (hashset-remove fwd object-id subject-id)))) + (let ((g (struct-copy dataflow-graph g [edges-reverse (hash-remove rev subject-id)]))) + (for/fold [(g g)] [(object-id (in-set subject-objects))] + (define fwd (dataflow-graph-edges-forward g)) + (struct-copy dataflow-graph g + [edges-forward (hashset-remove fwd object-id subject-id)])))) -(define (dataflow-repair-damage! g repair-node!) +(define (dataflow-repair-damage g repair-node!) (define repaired-this-round (set)) - (let loop () + (let loop ((g g)) (define workset (dataflow-graph-damaged-nodes g)) - (set-dataflow-graph-damaged-nodes! g (set)) - (let ((already-damaged (set-intersect workset repaired-this-round))) (when (not (set-empty? already-damaged)) (log-warning "Cyclic dependencies involving ids ~v\n" already-damaged))) - (set! workset (set-subtract workset repaired-this-round)) - (set! repaired-this-round (set-union repaired-this-round workset)) + (let ((g (struct-copy dataflow-graph g [damaged-nodes (set)]))) + (set! workset (set-subtract workset repaired-this-round)) + (set! repaired-this-round (set-union repaired-this-round workset)) - (when (not (set-empty? workset)) - (for [(object-id (in-set workset))] - (define subjects (hash-ref (dataflow-graph-edges-forward g) object-id set)) - (for [(subject-id (in-set subjects))] - (dataflow-forget-subject! g subject-id) - (parameterize ((current-dataflow-subject-id subject-id)) - (repair-node! subject-id)))) - (loop)))) + (if (set-empty? workset) + g + (loop (for/fold [(g g)] [(object-id (in-set workset))] + (define subjects (hash-ref (dataflow-graph-edges-forward g) object-id set)) + + (for/fold [(g g)] [(subject-id (in-set subjects))] + (let ((g (dataflow-forget-subject g subject-id))) + (parameterize ((current-dataflow-subject-id subject-id)) + (repair-node! g subject-id)))))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -82,16 +88,18 @@ (define g #f) (define (R b) - (dataflow-record-observation! g b) + (set! g (dataflow-record-observation g b)) (cell-value b)) (define (W b v) (when (not (equal? (cell-value b) v)) - (dataflow-record-damage! g b) + (set! g (dataflow-record-damage g b)) (set-cell-value! b v))) - (define (repair! b) - (b)) + (define (repair! g* b) + (set! g g*) + (b) + g) (define-syntax-rule (V expr) (letrec ((node (cell #f)) @@ -106,15 +114,14 @@ (let* ((c (V 123)) (d (V (* (R c) 2)))) (check-equal? (list (R c) (R d)) (list 123 246)) - (dataflow-repair-damage! g repair!) + (set! g (dataflow-repair-damage g repair!)) (check-equal? (list (R c) (R d)) (list 123 246)) - (dataflow-repair-damage! g repair!) + (set! g (dataflow-repair-damage g repair!)) (check-equal? (list (R c) (R d)) (list 123 246)) (W c 124) (check-equal? (list (R c) (R d)) (list 124 246)) - (dataflow-repair-damage! g repair!) + (set! g (dataflow-repair-damage g repair!)) (check-equal? (list (R c) (R d)) (list 124 248))) - ;; (newline) (set! g (make-dataflow-graph)) @@ -132,7 +139,7 @@ (define (fix! stage) ;; (printf "\n----- Stage: ~a\n" stage) - (dataflow-repair-damage! g repair!) + (set! g (dataflow-repair-damage g repair!)) ;; (write `((xs ,(R xs)) ;; (sum ,(R sum)) ;; (len ,(R len))