add-observer-endpoint!, add-raw-observer-endpoint!
This commit is contained in:
parent
276042134d
commit
f23debf074
|
@ -31,22 +31,14 @@
|
|||
|
||||
(assert (server-envelope scope (assertion)))
|
||||
|
||||
(define (recompute-endpoint)
|
||||
(define a (assertion))
|
||||
(if (observe? a)
|
||||
(let ((spec (server-envelope scope (observe-specification a))))
|
||||
(values (observe spec)
|
||||
(term->skeleton-interest
|
||||
spec
|
||||
(capture-facet-context
|
||||
(lambda (op . captured-values)
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
|
||||
(send! (server-outbound id (ctor ep captured-values))))))))))
|
||||
(values (void) #f)))
|
||||
(add-endpoint! (current-facet) "server" #t recompute-endpoint)
|
||||
(let ((! (lambda (ctor) (lambda (cs) (send! (server-outbound id (ctor ep cs)))))))
|
||||
(add-observer-endpoint! (lambda ()
|
||||
(let ((a (assertion)))
|
||||
(when (observe? a)
|
||||
(server-envelope scope (observe-specification a)))))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))
|
||||
|
||||
(on (message (server-inbound id (Assert ep $new-a)))
|
||||
(assertion new-a))
|
||||
|
|
|
@ -105,18 +105,12 @@
|
|||
(during (router-connection node name)
|
||||
(on (message (router-outbound name (Subscribe $subid $spec)))
|
||||
(react
|
||||
(define (update-fn)
|
||||
(values (observe (to-broker node spec))
|
||||
(term->skeleton-interest
|
||||
(to-broker node spec)
|
||||
(capture-facet-context
|
||||
(lambda (op . captures)
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
|
||||
(send! (router-inbound name (ctor subid captures))))))))))
|
||||
(add-endpoint! (current-facet) "router" #f update-fn)
|
||||
(let ((! (lambda (ctor)
|
||||
(lambda (cs) (send! (router-inbound name (ctor subid cs)))))))
|
||||
(add-observer-endpoint! (lambda () (to-broker node spec))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))
|
||||
(assert (from-broker node (observe spec)))
|
||||
(stop-when (message (router-outbound name (Unsubscribe subid))))))
|
||||
|
||||
|
|
|
@ -77,23 +77,17 @@
|
|||
(when (observe? assertion)
|
||||
(define pattern (observe-specification assertion))
|
||||
(define x (mcds-outbound pattern))
|
||||
(define i (term->skeleton-interest
|
||||
x
|
||||
(lambda (op . captured-values)
|
||||
(when (eq? op '+)
|
||||
(define term
|
||||
(instantiate-term->value pattern captured-values
|
||||
#:visibility-restriction-proj #f))
|
||||
;; TODO: flawed?? Needs visibility-restriction, or some other way
|
||||
;; of ignoring the opaque placeholders!
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(assert! (mcds-relevant term peer))))))))
|
||||
(add-endpoint! (current-facet)
|
||||
"udp-dataspace (mcds-inbound (observe ...))"
|
||||
#t
|
||||
(lambda () (values (observe x) i))))
|
||||
(add-observer-endpoint!
|
||||
(lambda () x)
|
||||
#:on-add
|
||||
(lambda (captured-values)
|
||||
;; TODO: flawed?? Needs visibility-restriction, or some other way of
|
||||
;; ignoring the opaque placeholders!
|
||||
(assert! (mcds-relevant (instantiate-term->value pattern
|
||||
captured-values
|
||||
#:visibility-restriction-proj
|
||||
#f)
|
||||
peer)))))
|
||||
|
||||
(stop-when (message (mcds-change peer '- assertion)))
|
||||
(stop-when (asserted (later-than expiry)))
|
||||
|
|
|
@ -74,44 +74,28 @@
|
|||
;; (log-info "~a (asserted (observe (inbound ~v)))" inner-actor x)
|
||||
(with-current-facet [outer-facet]
|
||||
(with-non-script-context
|
||||
(define i
|
||||
(let ((inner-capture-proj
|
||||
(define (make-endpoint)
|
||||
(define inner-capture-proj
|
||||
;; inner-capture-proj accounts for the extra (inbound ...) layer around
|
||||
;; assertions
|
||||
(let ((outer-capture-proj (term->capture-proj x)))
|
||||
(map (lambda (p) (cons 0 p)) outer-capture-proj))))
|
||||
(term->skeleton-interest
|
||||
x
|
||||
(lambda (op . captured-values)
|
||||
(define assertion
|
||||
(instantiate-term->value (inbound x) captured-values
|
||||
(map (lambda (p) (cons 0 p)) outer-capture-proj)))
|
||||
(define (rebuild cs)
|
||||
(instantiate-term->value (inbound x) cs
|
||||
#:visibility-restriction-proj inner-capture-proj))
|
||||
;; (log-info "~a => ~a ~a ~v"
|
||||
;; outer-facet
|
||||
;; inner-facet
|
||||
;; op
|
||||
;; assertion)
|
||||
(match op
|
||||
['+ (apply-patch! inner-ds inner-actor (bag assertion +1))]
|
||||
['- (apply-patch! inner-ds inner-actor (bag assertion -1))]
|
||||
['! (send-assertion! (dataspace-routing-table inner-ds) assertion)])
|
||||
(define ((wrap f) cs)
|
||||
(f (rebuild cs))
|
||||
(schedule-inner!))
|
||||
#:cleanup
|
||||
(lambda (cache)
|
||||
(apply-patch!
|
||||
inner-ds
|
||||
inner-actor
|
||||
(for/bag/count [(captured-values (in-bag cache))]
|
||||
;; (log-info "~a (cleanup) ~v" inner-actor term)
|
||||
(values (instantiate-term->value (inbound x) captured-values
|
||||
#:visibility-restriction-proj inner-capture-proj)
|
||||
-1)))
|
||||
(schedule-inner!)))))
|
||||
(add-endpoint-if-live! outer-facet
|
||||
inbound-endpoints
|
||||
x
|
||||
"dataspace-relay (observe (inbound ...))"
|
||||
(lambda () (values (observe x) i))))))
|
||||
(add-raw-observer-endpoint!
|
||||
(lambda () x)
|
||||
#:on-add (wrap (lambda (t) (apply-patch! inner-ds inner-actor (bag t +1))))
|
||||
#:on-remove (wrap (lambda (t) (apply-patch! inner-ds inner-actor (bag t -1))))
|
||||
#:on-message (wrap (lambda (t) (send-assertion! (dataspace-routing-table inner-ds) t)))
|
||||
#:cleanup (lambda (cache)
|
||||
(apply-patch! inner-ds inner-actor (for/bag/count [(cs (in-bag cache))]
|
||||
(values (rebuild cs) -1)))
|
||||
(schedule-inner!))))
|
||||
(record-endpoint-if-live! outer-facet inbound-endpoints x make-endpoint))))
|
||||
|
||||
(on (message (*quit-dataspace*))
|
||||
(with-current-facet [outer-facet]
|
||||
|
@ -128,11 +112,14 @@
|
|||
;; (log-info "~a (asserted (outbound ~v))" inner-actor x)
|
||||
(with-current-facet [outer-facet]
|
||||
(with-non-script-context
|
||||
(add-endpoint-if-live! outer-facet
|
||||
(record-endpoint-if-live! outer-facet
|
||||
outbound-endpoints
|
||||
x
|
||||
(lambda ()
|
||||
(add-endpoint! outer-facet
|
||||
"dataspace-relay (outbound ...)"
|
||||
(lambda () (values x #f))))))
|
||||
#t
|
||||
(lambda () (values x #f))))))))
|
||||
|
||||
(on (retracted (outbound $x))
|
||||
;; (log-info "~a (retracted (outbound ~v))" inner-actor x)
|
||||
|
@ -146,7 +133,7 @@
|
|||
(with-current-facet [outer-facet]
|
||||
(send! x))))
|
||||
|
||||
(define (add-endpoint-if-live! f table key desc update-fn)
|
||||
(define (record-endpoint-if-live! f table key ep-adder)
|
||||
(when (facet-live? f)
|
||||
;;
|
||||
;; ^ Check that `f` is still alive, because we're (carefully!!)
|
||||
|
@ -164,4 +151,4 @@
|
|||
;; the `facet-endpoints` table, ensuring they won't be processed
|
||||
;; again.
|
||||
;;
|
||||
(hash-set! table key (add-endpoint! f desc #t update-fn))))
|
||||
(hash-set! table key (ep-adder))))
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
on-start
|
||||
on-stop
|
||||
on
|
||||
add-raw-observer-endpoint!
|
||||
add-observer-endpoint!
|
||||
during
|
||||
during/spawn
|
||||
begin/dataflow
|
||||
|
@ -68,6 +70,7 @@
|
|||
(require "event-expander.rkt")
|
||||
(require "skeleton.rkt")
|
||||
(require "pattern.rkt")
|
||||
(require "term.rkt")
|
||||
|
||||
(require racket/match)
|
||||
(require racket/set)
|
||||
|
@ -240,6 +243,45 @@
|
|||
(syntax/loc stx (begin/void-default script ...))
|
||||
#'prio.level)]))
|
||||
|
||||
(define (add-raw-observer-endpoint! spec-thunk
|
||||
#:on-add [on-add void]
|
||||
#:on-remove [on-remove void]
|
||||
#:on-message [on-message void]
|
||||
#:cleanup [cleanup #f])
|
||||
(add-endpoint! (current-facet)
|
||||
"add-observer-endpoint!/add-raw-observer-endpoint!"
|
||||
#t
|
||||
(lambda ()
|
||||
(define spec (spec-thunk))
|
||||
(if (void? spec)
|
||||
(values (void) #f)
|
||||
(values (observe spec)
|
||||
(term->skeleton-interest
|
||||
spec
|
||||
(lambda (op . captured-values)
|
||||
(match op
|
||||
['+ (on-add captured-values)]
|
||||
['- (on-remove captured-values)]
|
||||
['! (on-message captured-values)]))
|
||||
#:cleanup cleanup))))))
|
||||
|
||||
(define (add-observer-endpoint! spec-thunk
|
||||
#:on-add [on-add void]
|
||||
#:on-remove [on-remove void]
|
||||
#:on-message [on-message void]
|
||||
#:cleanup [cleanup #f])
|
||||
(define (scriptify f)
|
||||
(if (eq? f void)
|
||||
void
|
||||
(capture-facet-context
|
||||
(lambda (captured-values)
|
||||
(schedule-script! (current-actor) (lambda () (f captured-values)))))))
|
||||
(add-raw-observer-endpoint! spec-thunk
|
||||
#:on-add (scriptify on-add)
|
||||
#:on-remove (scriptify on-remove)
|
||||
#:on-message (scriptify on-message)
|
||||
#:cleanup cleanup))
|
||||
|
||||
(define-syntax (begin/dataflow stx)
|
||||
(syntax-parse stx
|
||||
[(_ prio:priority expr ...)
|
||||
|
|
Loading…
Reference in New Issue