Refactor split in responsibilities between core and mux
This commit is contained in:
parent
e03645e682
commit
4db7c17dc8
|
@ -324,24 +324,27 @@
|
||||||
(exn->string exn))
|
(exn->string exn))
|
||||||
(transition w '())))]
|
(transition w '())))]
|
||||||
['quit
|
['quit
|
||||||
(define-values (new-mux _label patches meta-action) (mux-remove-stream (world-mux w) label))
|
(define-values (new-mux _label delta delta-aggregate) (mux-remove-stream (world-mux w) label))
|
||||||
(deliver-patches (struct-copy world w [mux new-mux])
|
;; behavior & state in w already removed by disable-process
|
||||||
;; ^ behavior & state already removed by disable-process
|
(deliver-patches w new-mux label delta delta-aggregate)]
|
||||||
patches
|
|
||||||
meta-action)]
|
|
||||||
[(quit-world)
|
[(quit-world)
|
||||||
(make-quit)]
|
(make-quit)]
|
||||||
[(? patch? delta-orig)
|
[(? patch? delta-orig)
|
||||||
(define-values (new-mux _label patches meta-action)
|
(define-values (new-mux _label delta delta-aggregate)
|
||||||
(mux-update-stream (world-mux w) label delta-orig))
|
(mux-update-stream (world-mux w) label delta-orig))
|
||||||
(deliver-patches (struct-copy world w [mux new-mux])
|
(deliver-patches w new-mux label delta delta-aggregate)]
|
||||||
patches
|
|
||||||
meta-action)]
|
|
||||||
[(and m (message body))
|
[(and m (message body))
|
||||||
(define-values (send-to-meta? affected-pids) (mux-route-message (world-mux w) label body))
|
(when (observe? body)
|
||||||
(transition (for/fold [(w w)] [(pid (in-list affected-pids))]
|
(log-warning "Stream ~a sent message containing query ~v"
|
||||||
(send-event m pid w))
|
(cons label (trace-pid-stack))
|
||||||
(and send-to-meta? (message (at-meta-claim body))))]))
|
body))
|
||||||
|
(if (and (not (meta-label? label)) ;; it's from a local process, not envt
|
||||||
|
(at-meta? body)) ;; it relates to envt, not local
|
||||||
|
(transition w (message (at-meta-claim body)))
|
||||||
|
(transition (for/fold [(w w)]
|
||||||
|
[(pid (in-list (mux-route-message (world-mux w) body)))]
|
||||||
|
(send-event m pid w))
|
||||||
|
'()))]))
|
||||||
|
|
||||||
(define (create-process w behavior initial-transition)
|
(define (create-process w behavior initial-transition)
|
||||||
(if (not initial-transition)
|
(if (not initial-transition)
|
||||||
|
@ -363,18 +366,20 @@
|
||||||
(match initial-actions
|
(match initial-actions
|
||||||
[(cons (? patch? p) rest) (values p rest)]
|
[(cons (? patch? p) rest) (values p rest)]
|
||||||
[other (values empty-patch other)]))
|
[other (values empty-patch other)]))
|
||||||
(define-values (new-mux new-pid patches meta-action)
|
(define-values (new-mux new-pid delta delta-aggregate)
|
||||||
(mux-add-stream (world-mux w) initial-patch))
|
(mux-add-stream (world-mux w) initial-patch))
|
||||||
(let* ((w (struct-copy world w
|
(let* ((w (struct-copy world w
|
||||||
[mux new-mux]
|
|
||||||
[behaviors (hash-set (world-behaviors w)
|
[behaviors (hash-set (world-behaviors w)
|
||||||
new-pid
|
new-pid
|
||||||
behavior)]))
|
behavior)]))
|
||||||
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
|
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
|
||||||
(deliver-patches w patches meta-action)))))
|
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
|
||||||
|
|
||||||
(define (deliver-patches w patches meta-action)
|
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
|
||||||
(transition (for/fold [(w w)] [(entry (in-list patches))]
|
(define-values (patches meta-action)
|
||||||
|
(compute-patches (world-mux w) new-mux acting-label delta delta-aggregate))
|
||||||
|
(transition (for/fold [(w (struct-copy world w [mux new-mux]))]
|
||||||
|
[(entry (in-list patches))]
|
||||||
(match-define (cons label event) entry)
|
(match-define (cons label event) entry)
|
||||||
(send-event/guard event label w))
|
(send-event/guard event label w))
|
||||||
meta-action))
|
meta-action))
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
mux-update-stream
|
mux-update-stream
|
||||||
mux-route-message
|
mux-route-message
|
||||||
mux-interests-of
|
mux-interests-of
|
||||||
|
compute-patches
|
||||||
compute-affected-pids)
|
compute-affected-pids)
|
||||||
|
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
|
@ -44,36 +45,44 @@
|
||||||
|
|
||||||
(define (mux-update-stream m label delta-orig)
|
(define (mux-update-stream m label delta-orig)
|
||||||
(define old-interests (mux-interests-of m label))
|
(define old-interests (mux-interests-of m label))
|
||||||
|
(define old-routing-table (mux-routing-table m))
|
||||||
(define delta (limit-patch (label-patch delta-orig (datum-tset label)) old-interests))
|
(define delta (limit-patch (label-patch delta-orig (datum-tset label)) old-interests))
|
||||||
(define new-interests (apply-patch old-interests delta))
|
(define new-interests (apply-patch old-interests delta))
|
||||||
(let* ((m (struct-copy mux m
|
;; CONDITION at this point: delta has been labelled and limited to
|
||||||
[interest-table
|
;; be minimal with respect to existing interests of its label.
|
||||||
(if (matcher-empty? new-interests)
|
(define delta-aggregate (compute-aggregate-patch delta label old-routing-table))
|
||||||
(hash-remove (mux-interest-table m) label)
|
(define new-routing-table (apply-patch old-routing-table delta))
|
||||||
(hash-set (mux-interest-table m) label new-interests))])))
|
(values (struct-copy mux m
|
||||||
;; CONDITION at this point: delta has been labelled and limited to
|
[routing-table new-routing-table]
|
||||||
;; be minimal with respect to existing interests of its label.
|
[interest-table (if (matcher-empty? new-interests)
|
||||||
(define old-routing-table (mux-routing-table m))
|
(hash-remove (mux-interest-table m) label)
|
||||||
(define new-routing-table (apply-patch old-routing-table delta))
|
(hash-set (mux-interest-table m) label new-interests))])
|
||||||
(define delta-aggregate (compute-aggregate-patch delta label old-routing-table))
|
label
|
||||||
(define affected-pids (let ((pids (compute-affected-pids old-routing-table delta)))
|
delta
|
||||||
(tset-remove (tset-add pids label) 'meta))) ;; TODO: removing meta is weird
|
delta-aggregate))
|
||||||
(values (struct-copy mux m [routing-table new-routing-table])
|
|
||||||
label
|
(define (compute-patches old-m new-m label delta delta-aggregate)
|
||||||
(for/list [(pid (tset->list affected-pids))]
|
(define old-routing-table (mux-routing-table old-m))
|
||||||
(cond [(equal? pid label)
|
(define new-routing-table (mux-routing-table new-m))
|
||||||
(define feedback
|
(define affected-pids
|
||||||
(patch-union
|
(let ((pids (compute-affected-pids old-routing-table delta)))
|
||||||
(patch (biased-intersection new-routing-table (patch-added delta))
|
(tset-remove (tset-add pids label) 'meta))) ;; TODO: removing meta is weird
|
||||||
(biased-intersection old-routing-table (patch-removed delta)))
|
(values (for/list [(pid (tset->list affected-pids))]
|
||||||
(patch (biased-intersection (patch-added delta-aggregate) new-interests)
|
(cond [(equal? pid label)
|
||||||
(biased-intersection (patch-removed delta-aggregate) old-interests))))
|
(define feedback
|
||||||
(cons label feedback)]
|
(patch-union
|
||||||
[else
|
(patch (biased-intersection new-routing-table (patch-added delta))
|
||||||
(cons pid (view-patch delta-aggregate (mux-interests-of m pid)))]))
|
(biased-intersection old-routing-table (patch-removed delta)))
|
||||||
(and (not (meta-label? label))
|
(patch (biased-intersection (patch-added delta-aggregate)
|
||||||
(drop-patch
|
(mux-interests-of new-m label))
|
||||||
(compute-aggregate-patch delta label old-routing-table #:remove-meta? #t))))))
|
(biased-intersection (patch-removed delta-aggregate)
|
||||||
|
(mux-interests-of old-m label)))))
|
||||||
|
(cons label feedback)]
|
||||||
|
[else
|
||||||
|
(cons pid (view-patch delta-aggregate (mux-interests-of old-m pid)))]))
|
||||||
|
(and (not (meta-label? label))
|
||||||
|
(drop-patch
|
||||||
|
(compute-aggregate-patch delta label old-routing-table #:remove-meta? #t)))))
|
||||||
|
|
||||||
(define (compute-affected-pids routing-table delta)
|
(define (compute-affected-pids routing-table delta)
|
||||||
(define cover (matcher-union (patch-added delta) (patch-removed delta)))
|
(define cover (matcher-union (patch-added delta) (patch-removed delta)))
|
||||||
|
@ -84,19 +93,10 @@
|
||||||
#:left-short (lambda (v r acc)
|
#:left-short (lambda (v r acc)
|
||||||
(tset-union acc (success-value (matcher-step r EOS))))))
|
(tset-union acc (success-value (matcher-step r EOS))))))
|
||||||
|
|
||||||
(define (mux-route-message m label body)
|
(define (mux-route-message m body)
|
||||||
(when (observe? body)
|
(if (matcher-match-value (mux-routing-table m) body #f) ;; some other stream has declared body
|
||||||
(log-warning "Stream ~a sent message containing query ~v"
|
'()
|
||||||
(cons label (trace-pid-stack))
|
(tset->list (matcher-match-value (mux-routing-table m) (observe body) (datum-tset)))))
|
||||||
body))
|
|
||||||
(cond
|
|
||||||
[(matcher-match-value (mux-routing-table m) body #f) ;; some other stream has declared body
|
|
||||||
(values #f '())]
|
|
||||||
[(and (not (meta-label? label)) ;; it's from a local process, not envt
|
|
||||||
(at-meta? body)) ;; it relates to envt, not local
|
|
||||||
(values #t '())]
|
|
||||||
[else
|
|
||||||
(values #f (tset->list (matcher-match-value (mux-routing-table m) (observe body) (datum-tset))))]))
|
|
||||||
|
|
||||||
(define (mux-interests-of m label)
|
(define (mux-interests-of m label)
|
||||||
(hash-ref (mux-interest-table m) label (matcher-empty)))
|
(hash-ref (mux-interest-table m) label (matcher-empty)))
|
||||||
|
|
Loading…
Reference in New Issue