From 4db7c17dc82c89ee3cdaf343d8f6638c08839c72 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 5 Dec 2015 05:21:13 +1300 Subject: [PATCH] Refactor split in responsibilities between core and mux --- prospect/core.rkt | 41 +++++++++++++----------- prospect/mux.rkt | 82 +++++++++++++++++++++++------------------------ 2 files changed, 64 insertions(+), 59 deletions(-) diff --git a/prospect/core.rkt b/prospect/core.rkt index 4eabd2c..6560c34 100644 --- a/prospect/core.rkt +++ b/prospect/core.rkt @@ -324,24 +324,27 @@ (exn->string exn)) (transition w '())))] ['quit - (define-values (new-mux _label patches meta-action) (mux-remove-stream (world-mux w) label)) - (deliver-patches (struct-copy world w [mux new-mux]) - ;; ^ behavior & state already removed by disable-process - patches - meta-action)] + (define-values (new-mux _label delta delta-aggregate) (mux-remove-stream (world-mux w) label)) + ;; behavior & state in w already removed by disable-process + (deliver-patches w new-mux label delta delta-aggregate)] [(quit-world) (make-quit)] [(? patch? delta-orig) - (define-values (new-mux _label patches meta-action) + (define-values (new-mux _label delta delta-aggregate) (mux-update-stream (world-mux w) label delta-orig)) - (deliver-patches (struct-copy world w [mux new-mux]) - patches - meta-action)] + (deliver-patches w new-mux label delta delta-aggregate)] [(and m (message body)) - (define-values (send-to-meta? affected-pids) (mux-route-message (world-mux w) label body)) - (transition (for/fold [(w w)] [(pid (in-list affected-pids))] - (send-event m pid w)) - (and send-to-meta? (message (at-meta-claim body))))])) + (when (observe? body) + (log-warning "Stream ~a sent message containing query ~v" + (cons label (trace-pid-stack)) + body)) + (if (and (not (meta-label? label)) ;; it's from a local process, not envt + (at-meta? body)) ;; it relates to envt, not local + (transition w (message (at-meta-claim body))) + (transition (for/fold [(w w)] + [(pid (in-list (mux-route-message (world-mux w) body)))] + (send-event m pid w)) + '()))])) (define (create-process w behavior initial-transition) (if (not initial-transition) @@ -363,18 +366,20 @@ (match initial-actions [(cons (? patch? p) rest) (values p rest)] [other (values empty-patch other)])) - (define-values (new-mux new-pid patches meta-action) + (define-values (new-mux new-pid delta delta-aggregate) (mux-add-stream (world-mux w) initial-patch)) (let* ((w (struct-copy world w - [mux new-mux] [behaviors (hash-set (world-behaviors w) new-pid behavior)])) (w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions))) - (deliver-patches w patches meta-action))))) + (deliver-patches w new-mux new-pid delta delta-aggregate))))) -(define (deliver-patches w patches meta-action) - (transition (for/fold [(w w)] [(entry (in-list patches))] +(define (deliver-patches w new-mux acting-label delta delta-aggregate) + (define-values (patches meta-action) + (compute-patches (world-mux w) new-mux acting-label delta delta-aggregate)) + (transition (for/fold [(w (struct-copy world w [mux new-mux]))] + [(entry (in-list patches))] (match-define (cons label event) entry) (send-event/guard event label w)) meta-action)) diff --git a/prospect/mux.rkt b/prospect/mux.rkt index 1f6e191..b007bdf 100644 --- a/prospect/mux.rkt +++ b/prospect/mux.rkt @@ -9,6 +9,7 @@ mux-update-stream mux-route-message mux-interests-of + compute-patches compute-affected-pids) (require racket/set) @@ -44,36 +45,44 @@ (define (mux-update-stream m label delta-orig) (define old-interests (mux-interests-of m label)) + (define old-routing-table (mux-routing-table m)) (define delta (limit-patch (label-patch delta-orig (datum-tset label)) old-interests)) (define new-interests (apply-patch old-interests delta)) - (let* ((m (struct-copy mux m - [interest-table - (if (matcher-empty? new-interests) - (hash-remove (mux-interest-table m) label) - (hash-set (mux-interest-table m) label new-interests))]))) - ;; CONDITION at this point: delta has been labelled and limited to - ;; be minimal with respect to existing interests of its label. - (define old-routing-table (mux-routing-table m)) - (define new-routing-table (apply-patch old-routing-table delta)) - (define delta-aggregate (compute-aggregate-patch delta label old-routing-table)) - (define affected-pids (let ((pids (compute-affected-pids old-routing-table delta))) - (tset-remove (tset-add pids label) 'meta))) ;; TODO: removing meta is weird - (values (struct-copy mux m [routing-table new-routing-table]) - label - (for/list [(pid (tset->list affected-pids))] - (cond [(equal? pid label) - (define feedback - (patch-union - (patch (biased-intersection new-routing-table (patch-added delta)) - (biased-intersection old-routing-table (patch-removed delta))) - (patch (biased-intersection (patch-added delta-aggregate) new-interests) - (biased-intersection (patch-removed delta-aggregate) old-interests)))) - (cons label feedback)] - [else - (cons pid (view-patch delta-aggregate (mux-interests-of m pid)))])) - (and (not (meta-label? label)) - (drop-patch - (compute-aggregate-patch delta label old-routing-table #:remove-meta? #t)))))) + ;; CONDITION at this point: delta has been labelled and limited to + ;; be minimal with respect to existing interests of its label. + (define delta-aggregate (compute-aggregate-patch delta label old-routing-table)) + (define new-routing-table (apply-patch old-routing-table delta)) + (values (struct-copy mux m + [routing-table new-routing-table] + [interest-table (if (matcher-empty? new-interests) + (hash-remove (mux-interest-table m) label) + (hash-set (mux-interest-table m) label new-interests))]) + label + delta + delta-aggregate)) + +(define (compute-patches old-m new-m label delta delta-aggregate) + (define old-routing-table (mux-routing-table old-m)) + (define new-routing-table (mux-routing-table new-m)) + (define affected-pids + (let ((pids (compute-affected-pids old-routing-table delta))) + (tset-remove (tset-add pids label) 'meta))) ;; TODO: removing meta is weird + (values (for/list [(pid (tset->list affected-pids))] + (cond [(equal? pid label) + (define feedback + (patch-union + (patch (biased-intersection new-routing-table (patch-added delta)) + (biased-intersection old-routing-table (patch-removed delta))) + (patch (biased-intersection (patch-added delta-aggregate) + (mux-interests-of new-m label)) + (biased-intersection (patch-removed delta-aggregate) + (mux-interests-of old-m label))))) + (cons label feedback)] + [else + (cons pid (view-patch delta-aggregate (mux-interests-of old-m pid)))])) + (and (not (meta-label? label)) + (drop-patch + (compute-aggregate-patch delta label old-routing-table #:remove-meta? #t))))) (define (compute-affected-pids routing-table delta) (define cover (matcher-union (patch-added delta) (patch-removed delta))) @@ -84,19 +93,10 @@ #:left-short (lambda (v r acc) (tset-union acc (success-value (matcher-step r EOS)))))) -(define (mux-route-message m label body) - (when (observe? body) - (log-warning "Stream ~a sent message containing query ~v" - (cons label (trace-pid-stack)) - body)) - (cond - [(matcher-match-value (mux-routing-table m) body #f) ;; some other stream has declared body - (values #f '())] - [(and (not (meta-label? label)) ;; it's from a local process, not envt - (at-meta? body)) ;; it relates to envt, not local - (values #t '())] - [else - (values #f (tset->list (matcher-match-value (mux-routing-table m) (observe body) (datum-tset))))])) +(define (mux-route-message m body) + (if (matcher-match-value (mux-routing-table m) body #f) ;; some other stream has declared body + '() + (tset->list (matcher-match-value (mux-routing-table m) (observe body) (datum-tset))))) (define (mux-interests-of m label) (hash-ref (mux-interest-table m) label (matcher-empty)))