From fc43d5c4a03612196eb1aff6719162943be52063 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 18 Mar 2019 15:34:14 +0000 Subject: [PATCH] Initial broker implementation --- syndicate/broker/client.rkt | 5 ++ syndicate/broker/main.rkt | 32 +++++++++ syndicate/broker/protocol.rkt | 10 +++ syndicate/broker/server.rkt | 100 +++++++++++++++++++++++++++++ syndicate/broker/wire-protocol.rkt | 27 ++++++++ syndicate/mc/udp-dataspace.rkt | 10 +-- syndicate/relay.rkt | 9 ++- syndicate/term.rkt | 16 ++++- 8 files changed, 196 insertions(+), 13 deletions(-) create mode 100644 syndicate/broker/client.rkt create mode 100644 syndicate/broker/main.rkt create mode 100644 syndicate/broker/protocol.rkt create mode 100644 syndicate/broker/server.rkt create mode 100644 syndicate/broker/wire-protocol.rkt diff --git a/syndicate/broker/client.rkt b/syndicate/broker/client.rkt new file mode 100644 index 0000000..102ef9a --- /dev/null +++ b/syndicate/broker/client.rkt @@ -0,0 +1,5 @@ +#lang imperative-syndicate + +(require "wire-protocol.rkt") +(require "protocol.rkt") + diff --git a/syndicate/broker/main.rkt b/syndicate/broker/main.rkt new file mode 100644 index 0000000..aea6f68 --- /dev/null +++ b/syndicate/broker/main.rkt @@ -0,0 +1,32 @@ +#lang imperative-syndicate + +(provide (all-from-out "protocol.rkt") + (all-from-out "client.rkt") + (all-from-out "server.rkt")) + +(require "protocol.rkt") +(require/activate "client.rkt") +(require/activate "server.rkt") + +(require/activate imperative-syndicate/drivers/tcp) +(require/activate imperative-syndicate/drivers/web) + +(define (main) + (spawn #:name 'server-listener + (define tcp-scope "broker") + (during/spawn (tcp-connection $id (tcp-listener 8001)) + #:name `(server-connection ,tcp-scope ,id) + (server-facet/tcp id tcp-scope)) + + (during/spawn (http-request $id 'get (http-resource (http-server _ 8000 #f) + `(,$scope ())) _ _ _) + #:name `(server-connection ,scope ,id) + (server-facet/websocket id scope)))) + +(module+ main + (let ((go (current-ground-dataspace))) + (current-ground-dataspace + (lambda (boot-proc) + (go (lambda () + (boot-proc) + (main))))))) diff --git a/syndicate/broker/protocol.rkt b/syndicate/broker/protocol.rkt new file mode 100644 index 0000000..3b2a0a9 --- /dev/null +++ b/syndicate/broker/protocol.rkt @@ -0,0 +1,10 @@ +#lang imperative-syndicate + +(provide (all-defined-out)) + +;; Client protocol +(assertion-struct to-broker (url assertion)) +(assertion-struct from-broker (url assertion)) +(assertion-struct broker-connection (url)) +(assertion-struct broker-connected (url)) +(message-struct force-broker-disconnect (url)) diff --git a/syndicate/broker/server.rkt b/syndicate/broker/server.rkt new file mode 100644 index 0000000..0a3c0cc --- /dev/null +++ b/syndicate/broker/server.rkt @@ -0,0 +1,100 @@ +#lang imperative-syndicate + +(provide server-facet/tcp + server-facet/websocket) + +(require "wire-protocol.rkt") +(require "protocol.rkt") +(require imperative-syndicate/term) +(require racket/set) + +(require/activate imperative-syndicate/drivers/tcp) +(require/activate imperative-syndicate/drivers/web) + +;; Internal connection protocol +(assertion-struct server-connection (connection-id scope)) +(assertion-struct server-inbound (connection-id body)) +(assertion-struct server-outbound (connection-id body)) + +;; Internal isolation +(assertion-struct envelope (scope body)) + +(define-logger syndicate/broker) + +(spawn #:name 'server-connection-factory + (during/spawn (server-connection $id $scope) + (define endpoints (set)) + + (on (message (server-inbound id (Assert $ep $a))) + (when (not (set-member? endpoints ep)) + (set! endpoints (set-add endpoints ep)) + (react + (on-stop (set! endpoints (set-remove endpoints ep))) + + (field [assertion a]) + + (define (recompute-endpoint) + (define a (assertion)) + (if (observe? a) + (let* ((pattern (observe-specification a)) + (spec (envelope scope pattern))) + (values (observe spec) + (term->skeleton-interest + spec + (capture-facet-context + (lambda (op . captured-values) + (schedule-script! + (current-actor) + (lambda () + (define ctor (match op ['+ Add] ['- Del] ['! Msg])) + (send! (server-outbound id (ctor ep captured-values)))))))))) + (values (envelope scope a) #f))) + (add-endpoint! (current-facet) "server" #t recompute-endpoint) + + (on (message (server-inbound id (Assert ep $new-a))) + (assertion new-a)) + + (stop-when (message (server-inbound id (Clear ep))))))) + + (on (message (server-inbound id (Message $body))) + (send! (envelope scope body))))) + +(define (server-facet/tcp id scope) + (assert (tcp-accepted id)) + (assert (server-connection id scope)) + + (field [buffer #""]) + (begin/dataflow + (define-values (packet remainder) (decode (buffer))) + (when packet + (buffer remainder) + (send! (server-inbound id packet)))) + + (on (message (tcp-in id $bs)) + (buffer (bytes-append (buffer) bs))) + (on (message (server-outbound id $p)) + (send! (tcp-out id (encode p))))) + +(define (server-facet/websocket id scope) + (assert (http-accepted id)) + (assert (http-response-websocket id)) + (assert (server-connection id scope)) + + (on (message (websocket-in id $body)) + (define-values (packet remainder) (decode body)) + (when (not (equal? remainder #"")) + (error 'server-facet/websocket "Multiple packets in a single websocket message")) + (send! (server-inbound id packet))) + (on (message (server-outbound id $p)) + (send! (websocket-out id (encode p))))) + +(when (log-level? syndicate/broker-logger 'debug) + (spawn #:name 'server-debug + (on (asserted (server-connection $id $scope)) + (log-syndicate/broker-debug "C+ ~v ~v" id scope)) + (on (retracted (server-connection $id $scope)) + (log-syndicate/broker-debug "C- ~v ~v" id scope)) + (on (message (server-inbound $id $p)) + (log-syndicate/broker-debug "CIN ~v ~v" id p)) + (on (message (server-outbound $id $p)) + (log-syndicate/broker-debug "COUT ~v ~v" id p)))) diff --git a/syndicate/broker/wire-protocol.rkt b/syndicate/broker/wire-protocol.rkt new file mode 100644 index 0000000..c690386 --- /dev/null +++ b/syndicate/broker/wire-protocol.rkt @@ -0,0 +1,27 @@ +#lang imperative-syndicate + +(provide (all-defined-out)) + +(require (prefix-in preserves: preserves)) +(require bitsyntax) + +;; Client --> Broker +(message-struct Assert (endpoint-name assertion)) +(message-struct Clear (endpoint-name)) +(message-struct Message (body)) + +;; Broker --> Client +(message-struct Add (endpoint-name captures)) +(message-struct Del (endpoint-name captures)) +(message-struct Msg (endpoint-name captures)) + +(define (decode bs) + (parameterize ((preserves:short-form-labels '#(discard capture observe))) + (bit-string-case bs + #:on-short (lambda (fail) (values #f bs)) + ([ (v :: (preserves:wire-value)) (rest :: binary) ] (values v (bit-string->bytes rest))) + (else (error 'decode "Invalid wire message"))))) + +(define (encode v) + (parameterize ((preserves:short-form-labels '#(discard capture observe))) + (preserves:encode v))) diff --git a/syndicate/mc/udp-dataspace.rkt b/syndicate/mc/udp-dataspace.rkt index 962fabe..8062a83 100644 --- a/syndicate/mc/udp-dataspace.rkt +++ b/syndicate/mc/udp-dataspace.rkt @@ -77,19 +77,15 @@ (when (observe? assertion) (define pattern (observe-specification assertion)) (define x (mcds-outbound pattern)) - (define i (skeleton-interest - (term->skeleton x) - (term->skeleton-proj x) - (term->key x) - (term->capture-proj x) + (define i (term->skeleton-interest + x (lambda (op . captured-values) (when (eq? op '+) (define term (instantiate-term->value pattern captured-values)) (schedule-script! (current-actor) (lambda () - (assert! (mcds-relevant term peer)))))) - #f)) + (assert! (mcds-relevant term peer)))))))) (add-endpoint! (current-facet) "udp-dataspace (mcds-inbound (observe ...))" #t diff --git a/syndicate/relay.rkt b/syndicate/relay.rkt index 03bfbb0..c73217c 100644 --- a/syndicate/relay.rkt +++ b/syndicate/relay.rkt @@ -77,11 +77,9 @@ (define outer-capture-proj (term->capture-proj x)) (define inner-capture-proj (map (lambda (p) (cons 0 p)) outer-capture-proj)) ;; ^ inner-capture-proj accounts for the extra (inbound ...) layer around assertions - (define i (skeleton-interest - (term->skeleton x) - (term->skeleton-proj x) - (term->key x) - outer-capture-proj + (define i (term->skeleton-interest + x + #:capture-projection outer-capture-proj (lambda (op . captured-values) (define term (inbound (instantiate-term->value x captured-values))) (define assertion (visibility-restriction inner-capture-proj term)) @@ -95,6 +93,7 @@ ['- (apply-patch! inner-ds inner-actor (bag assertion -1))] ['! (send-assertion! (dataspace-routing-table inner-ds) assertion)]) (schedule-inner!)) + #:cleanup (lambda (cache) (apply-patch! inner-ds diff --git a/syndicate/term.rkt b/syndicate/term.rkt index 2dd2644..aa0d051 100644 --- a/syndicate/term.rkt +++ b/syndicate/term.rkt @@ -1,7 +1,9 @@ #lang racket/base ;; Like pattern.rkt, but for dynamic use rather than compile-time use. -(provide term->skeleton +(provide term->skeleton-interest + + term->skeleton term->skeleton-proj term->key term->capture-proj @@ -11,6 +13,18 @@ (require racket/match) (require syndicate/support/struct) (require "pattern.rkt") +(require "skeleton.rkt") + +(define (term->skeleton-interest x + #:capture-projection [capture-proj (term->capture-proj x)] + handler + #:cleanup [cleanup #f]) + (skeleton-interest (term->skeleton x) + (term->skeleton-proj x) + (term->key x) + capture-proj + handler + cleanup)) (define (term->skeleton t) (let walk ((t t))