#lang racket/base ;; Cross-layer relaying between adjacent dataspaces ;; TODO: protocol for *clean* shutdown of a dataspace (provide (struct-out inbound) (struct-out outbound) quit-dataspace! dataspace) (require racket/match) (require racket/set) (require "dataspace.rkt") (require "syntax.rkt") (require "skeleton.rkt") (require "term.rkt") (require "bag.rkt") (require (for-syntax racket/base)) (require (for-syntax syntax/parse)) (require "syntax-classes.rkt") (struct inbound (assertion) #:prefab) (struct outbound (assertion) #:prefab) (struct *quit-dataspace* () #:transparent) ;; TODO: inbound^n, outbound^n -- protocol/standard-relay, iow (define (quit-dataspace!) (send! (*quit-dataspace*))) (define-syntax (dataspace stx) (syntax-parse stx [(_ name:name form ...) (syntax/loc stx (let ((ds-name name.N)) (spawn #:name ds-name (define outer-facet (current-facet)) (begin/dataflow (void)) ;; eww. dummy endpoint to keep the root facet alive (define (schedule-inner!) (push-script! (facet-actor outer-facet) (lambda () (with-current-facet [outer-facet] (when (facet-live? outer-facet) (defer-turn! (lambda () (when (run-scripts! inner-ds) (schedule-inner!))))))))) (define inner-ds (make-dataspace (lambda () (schedule-script! (current-actor) (lambda () (spawn #:name (list 'ds-link ds-name) (boot-relay schedule-inner! outer-facet)) (spawn* form ...)))))) (on-start (schedule-inner!)))))])) (define (boot-relay schedule-inner! outer-facet) (define inbound-endpoints (make-hash)) (define outbound-endpoints (make-hash)) (define inner-actor (current-actor)) (define inner-facet (current-facet)) (on (asserted (observe (inbound $x))) ;; (log-info "~a (asserted (observe (inbound ~v)))" inner-actor x) (with-current-facet [outer-facet] (with-non-script-context (define i (skeleton-interest (term->skeleton x) (term->skeleton-proj x) (term->key x) (term->capture-proj x) (lambda (op . captured-values) (define term (inbound (instantiate-term->value x captured-values))) ;; (log-info "~a => ~a ~a ~v" ;; outer-facet ;; inner-facet ;; op ;; term) (push-script! inner-actor (lambda () ;; (log-info "~a (~a) ~v" inner-actor op term) (match op ['+ (adhoc-assert! inner-actor term)] ['- (adhoc-retract! inner-actor term)] ['! (enqueue-send! inner-actor term)]))) (schedule-inner!)) (lambda (cache) (push-script! inner-actor (lambda () (for [(captured-values (in-bag cache))] (define term (inbound (instantiate-term->value x captured-values))) ;; (log-info "~a (cleanup) ~v" inner-actor term) (adhoc-retract! inner-actor term)))) (schedule-inner!)))) (hash-set! inbound-endpoints x (add-endpoint! outer-facet "dataspace-relay (observe (inbound ...))" #t (lambda () (values (observe x) i))))))) (on (message (*quit-dataspace*)) (with-current-facet [outer-facet] (stop-current-facet))) (on (retracted (observe (inbound $x))) ;; (log-info "~a (retracted (observe (inbound ~v)))" inner-actor x) (with-current-facet [outer-facet] (with-non-script-context (remove-endpoint! outer-facet (hash-ref inbound-endpoints x)) (hash-remove! inbound-endpoints x)))) (on (asserted (outbound $x)) ;; (log-info "~a (asserted (outbound ~v))" inner-actor x) (with-current-facet [outer-facet] (with-non-script-context (hash-set! outbound-endpoints x (add-endpoint! outer-facet "dataspace-relay (outbound ...)" #t (lambda () (values x #f))))))) (on (retracted (outbound $x)) ;; (log-info "~a (retracted (outbound ~v))" inner-actor x) (with-current-facet [outer-facet] (with-non-script-context (remove-endpoint! outer-facet (hash-ref outbound-endpoints x)) (hash-remove! outbound-endpoints x)))) (on (message (outbound $x)) ;; (log-info "~a (message (outbound ~v))" inner-actor x) (with-current-facet [outer-facet] (send! x))))