From a3a229532aaf670f075837250f7e4ee0b42d9c13 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 4 May 2019 22:58:45 +0100 Subject: [PATCH] add-observer-endpoint!, add-raw-observer-endpoint! --- syndicate/broker/server.rkt | 24 ++++------- syndicate/broker/tf.rkt | 18 +++----- syndicate/mc/udp-dataspace.rkt | 28 +++++-------- syndicate/relay.rkt | 77 ++++++++++++++-------------------- syndicate/syntax.rkt | 42 +++++++++++++++++++ 5 files changed, 99 insertions(+), 90 deletions(-) diff --git a/syndicate/broker/server.rkt b/syndicate/broker/server.rkt index 1f6a1c0..9b8542d 100644 --- a/syndicate/broker/server.rkt +++ b/syndicate/broker/server.rkt @@ -31,22 +31,14 @@ (assert (server-envelope scope (assertion))) - (define (recompute-endpoint) - (define a (assertion)) - (if (observe? a) - (let ((spec (server-envelope scope (observe-specification a)))) - (values (observe spec) - (term->skeleton-interest - spec - (capture-facet-context - (lambda (op . captured-values) - (schedule-script! - (current-actor) - (lambda () - (define ctor (match op ['+ Add] ['- Del] ['! Msg])) - (send! (server-outbound id (ctor ep captured-values)))))))))) - (values (void) #f))) - (add-endpoint! (current-facet) "server" #t recompute-endpoint) + (let ((! (lambda (ctor) (lambda (cs) (send! (server-outbound id (ctor ep cs))))))) + (add-observer-endpoint! (lambda () + (let ((a (assertion))) + (when (observe? a) + (server-envelope scope (observe-specification a))))) + #:on-add (! Add) + #:on-remove (! Del) + #:on-message (! Msg))) (on (message (server-inbound id (Assert ep $new-a))) (assertion new-a)) diff --git a/syndicate/broker/tf.rkt b/syndicate/broker/tf.rkt index 0cc70fb..f2d48d8 100644 --- a/syndicate/broker/tf.rkt +++ b/syndicate/broker/tf.rkt @@ -105,18 +105,12 @@ (during (router-connection node name) (on (message (router-outbound name (Subscribe $subid $spec))) (react - (define (update-fn) - (values (observe (to-broker node spec)) - (term->skeleton-interest - (to-broker node spec) - (capture-facet-context - (lambda (op . captures) - (schedule-script! - (current-actor) - (lambda () - (define ctor (match op ['+ Add] ['- Del] ['! Msg])) - (send! (router-inbound name (ctor subid captures)))))))))) - (add-endpoint! (current-facet) "router" #f update-fn) + (let ((! (lambda (ctor) + (lambda (cs) (send! (router-inbound name (ctor subid cs))))))) + (add-observer-endpoint! (lambda () (to-broker node spec)) + #:on-add (! Add) + #:on-remove (! Del) + #:on-message (! Msg))) (assert (from-broker node (observe spec))) (stop-when (message (router-outbound name (Unsubscribe subid)))))) diff --git a/syndicate/mc/udp-dataspace.rkt b/syndicate/mc/udp-dataspace.rkt index 08aa28c..a840215 100644 --- a/syndicate/mc/udp-dataspace.rkt +++ b/syndicate/mc/udp-dataspace.rkt @@ -77,23 +77,17 @@ (when (observe? assertion) (define pattern (observe-specification assertion)) (define x (mcds-outbound pattern)) - (define i (term->skeleton-interest - x - (lambda (op . captured-values) - (when (eq? op '+) - (define term - (instantiate-term->value pattern captured-values - #:visibility-restriction-proj #f)) - ;; TODO: flawed?? Needs visibility-restriction, or some other way - ;; of ignoring the opaque placeholders! - (schedule-script! - (current-actor) - (lambda () - (assert! (mcds-relevant term peer)))))))) - (add-endpoint! (current-facet) - "udp-dataspace (mcds-inbound (observe ...))" - #t - (lambda () (values (observe x) i)))) + (add-observer-endpoint! + (lambda () x) + #:on-add + (lambda (captured-values) + ;; TODO: flawed?? Needs visibility-restriction, or some other way of + ;; ignoring the opaque placeholders! + (assert! (mcds-relevant (instantiate-term->value pattern + captured-values + #:visibility-restriction-proj + #f) + peer))))) (stop-when (message (mcds-change peer '- assertion))) (stop-when (asserted (later-than expiry))) diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt index a07766f..913b691 100644 --- a/syndicate/relay.rkt +++ b/syndicate/relay.rkt @@ -74,44 +74,28 @@ ;; (log-info "~a (asserted (observe (inbound ~v)))" inner-actor x) (with-current-facet [outer-facet] (with-non-script-context - (define i - (let ((inner-capture-proj - ;; inner-capture-proj accounts for the extra (inbound ...) layer around - ;; assertions - (let ((outer-capture-proj (term->capture-proj x))) - (map (lambda (p) (cons 0 p)) outer-capture-proj)))) - (term->skeleton-interest - x - (lambda (op . captured-values) - (define assertion - (instantiate-term->value (inbound x) captured-values - #:visibility-restriction-proj inner-capture-proj)) - ;; (log-info "~a => ~a ~a ~v" - ;; outer-facet - ;; inner-facet - ;; op - ;; assertion) - (match op - ['+ (apply-patch! inner-ds inner-actor (bag assertion +1))] - ['- (apply-patch! inner-ds inner-actor (bag assertion -1))] - ['! (send-assertion! (dataspace-routing-table inner-ds) assertion)]) - (schedule-inner!)) - #:cleanup - (lambda (cache) - (apply-patch! - inner-ds - inner-actor - (for/bag/count [(captured-values (in-bag cache))] - ;; (log-info "~a (cleanup) ~v" inner-actor term) - (values (instantiate-term->value (inbound x) captured-values - #:visibility-restriction-proj inner-capture-proj) - -1))) - (schedule-inner!))))) - (add-endpoint-if-live! outer-facet - inbound-endpoints - x - "dataspace-relay (observe (inbound ...))" - (lambda () (values (observe x) i)))))) + (define (make-endpoint) + (define inner-capture-proj + ;; inner-capture-proj accounts for the extra (inbound ...) layer around + ;; assertions + (let ((outer-capture-proj (term->capture-proj x))) + (map (lambda (p) (cons 0 p)) outer-capture-proj))) + (define (rebuild cs) + (instantiate-term->value (inbound x) cs + #:visibility-restriction-proj inner-capture-proj)) + (define ((wrap f) cs) + (f (rebuild cs)) + (schedule-inner!)) + (add-raw-observer-endpoint! + (lambda () x) + #:on-add (wrap (lambda (t) (apply-patch! inner-ds inner-actor (bag t +1)))) + #:on-remove (wrap (lambda (t) (apply-patch! inner-ds inner-actor (bag t -1)))) + #:on-message (wrap (lambda (t) (send-assertion! (dataspace-routing-table inner-ds) t))) + #:cleanup (lambda (cache) + (apply-patch! inner-ds inner-actor (for/bag/count [(cs (in-bag cache))] + (values (rebuild cs) -1))) + (schedule-inner!)))) + (record-endpoint-if-live! outer-facet inbound-endpoints x make-endpoint)))) (on (message (*quit-dataspace*)) (with-current-facet [outer-facet] @@ -128,11 +112,14 @@ ;; (log-info "~a (asserted (outbound ~v))" inner-actor x) (with-current-facet [outer-facet] (with-non-script-context - (add-endpoint-if-live! outer-facet - outbound-endpoints - x - "dataspace-relay (outbound ...)" - (lambda () (values x #f)))))) + (record-endpoint-if-live! outer-facet + outbound-endpoints + x + (lambda () + (add-endpoint! outer-facet + "dataspace-relay (outbound ...)" + #t + (lambda () (values x #f)))))))) (on (retracted (outbound $x)) ;; (log-info "~a (retracted (outbound ~v))" inner-actor x) @@ -146,7 +133,7 @@ (with-current-facet [outer-facet] (send! x)))) -(define (add-endpoint-if-live! f table key desc update-fn) +(define (record-endpoint-if-live! f table key ep-adder) (when (facet-live? f) ;; ;; ^ Check that `f` is still alive, because we're (carefully!!) @@ -164,4 +151,4 @@ ;; the `facet-endpoints` table, ensuring they won't be processed ;; again. ;; - (hash-set! table key (add-endpoint! f desc #t update-fn)))) + (hash-set! table key (ep-adder)))) diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 8bc38b4..5a757e2 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -17,6 +17,8 @@ on-start on-stop on + add-raw-observer-endpoint! + add-observer-endpoint! during during/spawn begin/dataflow @@ -68,6 +70,7 @@ (require "event-expander.rkt") (require "skeleton.rkt") (require "pattern.rkt") +(require "term.rkt") (require racket/match) (require racket/set) @@ -240,6 +243,45 @@ (syntax/loc stx (begin/void-default script ...)) #'prio.level)])) +(define (add-raw-observer-endpoint! spec-thunk + #:on-add [on-add void] + #:on-remove [on-remove void] + #:on-message [on-message void] + #:cleanup [cleanup #f]) + (add-endpoint! (current-facet) + "add-observer-endpoint!/add-raw-observer-endpoint!" + #t + (lambda () + (define spec (spec-thunk)) + (if (void? spec) + (values (void) #f) + (values (observe spec) + (term->skeleton-interest + spec + (lambda (op . captured-values) + (match op + ['+ (on-add captured-values)] + ['- (on-remove captured-values)] + ['! (on-message captured-values)])) + #:cleanup cleanup)))))) + +(define (add-observer-endpoint! spec-thunk + #:on-add [on-add void] + #:on-remove [on-remove void] + #:on-message [on-message void] + #:cleanup [cleanup #f]) + (define (scriptify f) + (if (eq? f void) + void + (capture-facet-context + (lambda (captured-values) + (schedule-script! (current-actor) (lambda () (f captured-values))))))) + (add-raw-observer-endpoint! spec-thunk + #:on-add (scriptify on-add) + #:on-remove (scriptify on-remove) + #:on-message (scriptify on-message) + #:cleanup cleanup)) + (define-syntax (begin/dataflow stx) (syntax-parse stx [(_ prio:priority expr ...)