syndicate-rkt/syndicate/relay.rkt

167 lines
7.1 KiB
Racket

#lang racket/base
;; Cross-layer relaying between adjacent dataspaces
;; TODO: protocol for *clean* shutdown of a dataspace
;; TODO: Actually elide the need for relays entirely, by allowing an
;; actor to manifest in multiple dataspaces (multiple
;; points-of-attachment), and by placing assertions and subscriptions
;; directly in the dataspace concerned. (Done naively, this would
;; avoid manifesting observed assertions in intermediate nested
;; dataspaces; but then, if anyone cared, they'd be observing the
;; tuples themselves - right?? Oh, maybe observing the observers would
;; be an, er, observable difference.)
(provide quit-dataspace!
dataspace)
(require racket/match)
(require racket/set)
(require "assertions.rkt")
(require "dataspace.rkt")
(require "syntax.rkt")
(require "skeleton.rkt")
(require "term.rkt")
(require "bag.rkt")
(require (for-syntax racket/base))
(require (for-syntax syntax/parse))
(require "syntax-classes.rkt")
(struct *quit-dataspace* () #:transparent)
;; TODO: inbound^n, outbound^n -- protocol/standard-relay, iow
(define (quit-dataspace!)
(send! (*quit-dataspace*)))
(define-syntax (dataspace stx)
(syntax-parse stx
[(_ name:name form ...)
(syntax/loc stx
(let ((ds-name name.N))
(spawn #:name ds-name
(define outer-facet (current-facet))
(begin/dataflow (void)) ;; eww. dummy endpoint to keep the root facet alive
(define (schedule-inner!)
(push-script!
(facet-actor outer-facet)
(lambda ()
(with-current-facet [outer-facet]
(when (facet-live? outer-facet)
(defer-turn! (lambda ()
(when (run-scripts! inner-ds)
(schedule-inner!)))))))))
(define inner-ds (make-dataspace
(lambda ()
(schedule-script!
(current-actor)
(lambda ()
(spawn #:name (list 'ds-link ds-name)
(boot-relay schedule-inner!
outer-facet))
(spawn* form ...))))))
(on-start (schedule-inner!)))))]))
(define (boot-relay schedule-inner! outer-facet)
(define inbound-endpoints (make-hash))
(define outbound-endpoints (make-hash))
(define inner-facet (current-facet))
(define inner-actor (current-actor))
(define inner-ds (actor-dataspace inner-actor))
(on (asserted (observe (inbound $x)))
;; (log-info "~a (asserted (observe (inbound ~v)))" inner-actor x)
(with-current-facet [outer-facet]
(with-non-script-context
(define outer-capture-proj (term->capture-proj x))
(define inner-capture-proj (map (lambda (p) (cons 0 p)) outer-capture-proj))
;; ^ inner-capture-proj accounts for the extra (inbound ...) layer around assertions
(define i (skeleton-interest
(term->skeleton x)
(term->skeleton-proj x)
(term->key x)
outer-capture-proj
(lambda (op . captured-values)
(define term (inbound (instantiate-term->value x captured-values)))
(define assertion (visibility-restriction inner-capture-proj term))
;; (log-info "~a => ~a ~a ~v"
;; outer-facet
;; inner-facet
;; op
;; assertion)
(match op
['+ (apply-patch! inner-ds inner-actor (bag assertion +1))]
['- (apply-patch! inner-ds inner-actor (bag assertion -1))]
['! (send-assertion! (dataspace-routing-table inner-ds) assertion)])
(schedule-inner!))
(lambda (cache)
(apply-patch!
inner-ds
inner-actor
(for/bag/count [(captured-values (in-bag cache))]
;; (log-info "~a (cleanup) ~v" inner-actor term)
(values (visibility-restriction
inner-capture-proj
(inbound (instantiate-term->value x captured-values)))
-1)))
(schedule-inner!))))
(add-endpoint-if-live! outer-facet
inbound-endpoints
x
"dataspace-relay (observe (inbound ...))"
(lambda () (values (observe x) i))))))
(on (message (*quit-dataspace*))
(with-current-facet [outer-facet]
(stop-current-facet)))
(on (retracted (observe (inbound $x)))
;; (log-info "~a (retracted (observe (inbound ~v)))" inner-actor x)
(with-current-facet [outer-facet]
(with-non-script-context
(remove-endpoint! outer-facet (hash-ref inbound-endpoints x))
(hash-remove! inbound-endpoints x))))
(on (asserted (outbound $x))
;; (log-info "~a (asserted (outbound ~v))" inner-actor x)
(with-current-facet [outer-facet]
(with-non-script-context
(add-endpoint-if-live! outer-facet
outbound-endpoints
x
"dataspace-relay (outbound ...)"
(lambda () (values x #f))))))
(on (retracted (outbound $x))
;; (log-info "~a (retracted (outbound ~v))" inner-actor x)
(with-current-facet [outer-facet]
(with-non-script-context
(remove-endpoint! outer-facet (hash-ref outbound-endpoints x))
(hash-remove! outbound-endpoints x))))
(on (message (outbound $x))
;; (log-info "~a (message (outbound ~v))" inner-actor x)
(with-current-facet [outer-facet]
(send! x))))
(define (add-endpoint-if-live! f table key desc update-fn)
(when (facet-live? f)
;;
;; ^ Check that `f` is still alive, because we're (carefully!!)
;; violating an invariant of `dataspace.rkt` by adding an endpoint
;; well after the construction of the facet we're in. We may be
;; executing this handler just after clean shutdown of the facet
;; by a `quit-dataspace!` request, and in that case we MUST NOT
;; add any further endpoints because their assertions will not get
;; removed, because cleanup (as part of `(quit)` processing) has
;; already been done.
;;
;; We don't have to do a similar check before calling
;; `remove-endpoint!`, because shortly after all (both) calls to
;; `destroy-endpoint!`, all destroyed endpoints are removed from
;; the `facet-endpoints` table, ensuring they won't be processed
;; again.
;;
(hash-set! table key (add-endpoint! f desc #t update-fn))))