From 2b882cc10a512596514c800547bd88b587451fcf Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 9 May 2019 11:17:37 +0100 Subject: [PATCH] Integrate federation/peering with client/server code; almost but not quite working yet --- syndicate/distributed/client.rkt | 13 +- syndicate/distributed/client/loopback.rkt | 5 +- syndicate/distributed/client/tcp.rkt | 9 +- syndicate/distributed/federation.rkt | 299 ++++++++++++++++++++ syndicate/distributed/internal-protocol.rkt | 28 ++ syndicate/distributed/main.rkt | 23 +- syndicate/distributed/protocol.rkt | 12 + syndicate/distributed/server.rkt | 44 +-- syndicate/distributed/server/tcp.rkt | 1 + syndicate/distributed/server/websocket.rkt | 1 + 10 files changed, 380 insertions(+), 55 deletions(-) create mode 100644 syndicate/distributed/federation.rkt create mode 100644 syndicate/distributed/internal-protocol.rkt diff --git a/syndicate/distributed/client.rkt b/syndicate/distributed/client.rkt index 692026f..b5ed280 100644 --- a/syndicate/distributed/client.rkt +++ b/syndicate/distributed/client.rkt @@ -1,10 +1,9 @@ #lang imperative-syndicate -(provide generic-client-session-facet - (struct-out server-packet) - (struct-out server-transport-connected)) +(provide generic-client-session-facet) (require "wire-protocol.rkt") +(require "internal-protocol.rkt") (require "protocol.rkt") (require imperative-syndicate/term) @@ -15,14 +14,6 @@ (during (observe (from-server $a _)) (assert (server-connection a))) (during (observe (server-connected $a)) (assert (server-connection a)))) -;; Received packets from server are relayed via one of these. -(message-struct server-packet (address packet)) -;; Like `server-connected`, but for reflecting `tcp-accepted` to the -;; client end of a client-server connection without reordering wrt -;; `server-packet` messages. Implementation-facing, where -;; `server-connected` is part of the API. -(assertion-struct server-transport-connected (address)) - (define (generic-client-session-facet address scope w) (on-start (log-syndicate/distributed-info "Connected to ~v" address)) (on-stop (log-syndicate/distributed-info "Disconnected from ~v" address)) diff --git a/syndicate/distributed/client/loopback.rkt b/syndicate/distributed/client/loopback.rkt index f42f3eb..e1b6a84 100644 --- a/syndicate/distributed/client/loopback.rkt +++ b/syndicate/distributed/client/loopback.rkt @@ -1,15 +1,12 @@ #lang imperative-syndicate -(provide (struct-out server-loopback-connection)) - (require "../client.rkt") (require "../wire-protocol.rkt") +(require "../internal-protocol.rkt") (require "../protocol.rkt") (require/activate imperative-syndicate/distributed/server) -(assertion-struct server-loopback-connection (scope)) - (spawn #:name 'loopback-client-factory (during/spawn (server-connection ($ address (server-loopback-connection $scope))) #:name address diff --git a/syndicate/distributed/client/tcp.rkt b/syndicate/distributed/client/tcp.rkt index d1bc456..d9b0e6d 100644 --- a/syndicate/distributed/client/tcp.rkt +++ b/syndicate/distributed/client/tcp.rkt @@ -1,20 +1,13 @@ #lang imperative-syndicate -(provide standard-localhost-server/tcp - (struct-out server-tcp-connection)) - (require "../client.rkt") (require "../wire-protocol.rkt") +(require "../internal-protocol.rkt") (require "../protocol.rkt") (require imperative-syndicate/reassert) (require/activate imperative-syndicate/drivers/tcp) -(assertion-struct server-tcp-connection (host port scope)) - -(define (standard-localhost-server/tcp [scope "broker"]) - (server-tcp-connection "localhost" 8001 scope)) - (spawn #:name 'tcp-client-factory (during/spawn (server-connection ($ address (server-tcp-connection $host $port $scope))) #:name address diff --git a/syndicate/distributed/federation.rkt b/syndicate/distributed/federation.rkt new file mode 100644 index 0000000..76c6d07 --- /dev/null +++ b/syndicate/distributed/federation.rkt @@ -0,0 +1,299 @@ +#lang imperative-syndicate +;; Relays for federation, both "client" (outbound) and "server" (inbound) ends. + +(require "wire-protocol.rkt") +(require "internal-protocol.rkt") +(require "protocol.rkt") + +(require imperative-syndicate/term) +(require imperative-syndicate/bag) +(require imperative-syndicate/reassert) +(require racket/set) + +(require/activate imperative-syndicate/drivers/tcp) + +(define-logger syndicate/federation) + +;; A federated scope (as distinct from a non-federated server scope) +;; communicates via "links" to "peers", which come in three flavours: +;; - inbound links, aka "downlinks" from the POV of this node, which +;; result from incoming TCP/websocket/etc connections +;; - outbound links, aka "uplinks", which reach out to a remote TCP/ +;; websocket/etc server +;; - local links, (usually? always?) just one per scope, which +;; connect the federated scope to its local server scope +;; +;; All links are identified by a link ID, scoped the same as +;; connection IDs in (namely, dataspace-unique). Links +;; are stateful. +;; +;; Inbound links are set up in response to incoming connections; the +;; process on the other end of an inbound link is the part that +;; implements the TCP service. It too is stateful. At connection setup +;; time, the remote client instructs it on which local federated scope +;; it should create a link to. +;; +;; Outbound links are created in response to an assertion of a +;; `federated-uplink` record, which contains a pair of a local scope +;; ID and a client transport address (such as `server-tcp-connection` +;; from ). Together, these map from a local federated +;; scope to a remote transport endpoint, and within that endpoint, a +;; remote federated scope. +;; +;; Local links are created automatically whenever there is an active +;; server scope of the same name as a federated scope. + +;; Subscription IDs (== "endpoint IDs") must be connection-unique AND +;; must correspond one-to-one with a specific subscription spec. That +;; is, a subscription ID is merely connection-local shorthand for its +;; spec, and two subscription IDs within a connection must be `equal?` +;; exactly when their corresponding specs are `equal?`. +;; +;; Local IDs must be scope-unique. They are used as subscription IDs +;; in outbound messages. +;; +;; Each federated scope maintains a bidirectional mapping between +;; subscription IDs (each scoped within its connection ID) and local +;; IDs. One local ID may map to multiple subscription IDs - this is +;; the place where aggregation pops up. + +;; Unlike the client/server protocol, both Actions and Events are +;; BIDIRECTIONAL, travelling in both directions along edges linking +;; peer nodes. + +;;--------------------------------------------------------------------------- +;; Inbound links: the heavy lifting is done by server.rkt along with some +;; transport-specific code e.g. server/tcp.rkt + +;;--------------------------------------------------------------------------- +;; Outbound links. + +(spawn #:name 'federated-tcp-uplink-factory + (during/spawn ($ link (federated-uplink $local-scope + (server-tcp-connection $host $port $remote-scope))) + #:name link + (reassert-on (tcp-connection link (tcp-address host port)) + (retracted (tcp-accepted link)) + (asserted (tcp-rejected link _))) + + (during (tcp-accepted link) + (assert (federated-uplink-connected link)) ;; out to local requester + + (define session-id (gensym 'uplink)) + (assert (federated-link session-id local-scope)) ;; in to federated scope + + (define accumulate! + (packet-accumulator (lambda (p) (send! (message-poa->server session-id p))))) + (on (message (tcp-in link $bs)) (accumulate! bs)) + + (on (message (message-server->poa session-id $p)) + (send! (tcp-out link (encode p)))) + + (on-start (send! (message-server->poa session-id (Peer remote-scope))))))) + +;;--------------------------------------------------------------------------- +;; Local links. + +(spawn #:name 'federated-local-link-factory + (during (federated-link _ $scope) + (during/spawn (server-active scope) + #:name (list 'local-link scope) + + (define session-id (gensym 'local-link)) + (assert (federated-link session-id scope)) + + (on (message (message-server->poa session-id (Assert $subid (observe $spec)))) + (react + (define ((! ctor) cs) (send! (message-poa->server session-id (ctor subid cs)))) + (add-observer-endpoint! (lambda () (server-proposal scope spec)) + #:on-add (! Add) + #:on-remove (! Del) + #:on-message (! Msg)) + (assert (server-envelope scope (observe spec))) + (stop-when (message (message-server->poa session-id (Clear subid)))))) + + (during (observe ($ pat (server-envelope scope $spec))) + (define ep (gensym 'ep)) + (on-start (send! (message-poa->server session-id (Assert ep (observe spec))))) + (on-stop (send! (message-poa->server session-id (Clear ep)))) + (assert (server-envelope scope (observe spec))) + (on (message (message-server->poa session-id (Add ep $captures))) + (react (assert (instantiate-term->value pat captures)) + (stop-when (message (message-server->poa session-id (Del ep captures)))))) + (on (message (message-server->poa session-id (Msg ep $captures))) + (send! (instantiate-term->value pat captures))))))) + +;;--------------------------------------------------------------------------- +;; Federated scopes. + +;; Internal state +(struct subscription (id ;; LocalID + spec ;; Assertion + holders ;; (Hash ConnectionID SubscriptionID) + matches ;; (Bag (Listof Assertion)) + ) + #:transparent) + +(spawn #:name 'federated-scope-factory + (during/spawn (federated-link _ $scope) + #:name (list 'federated-scope scope) + + ;; Generates a fresh local ID naming a subscription propagated to our peers. + (define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1)))))) + + (field [peers (set)] ;; (Set LinkID) + [specs (hash)] ;; (Hash Spec LocalID) + [subs (hasheq)] ;; (Hash LocalID Subscription) + ) + + (when (log-level? syndicate/federation-logger 'debug) + (begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" scope (peers))) + (begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" scope (specs))) + (begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" scope (subs)))) + + (define (call-with-sub localid linkid f) + (match (hash-ref (subs) localid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent local ID ~v from link ~v. Ignoring." + localid + linkid)] + [sub (f sub)])) + + (define (unsubscribe! localid linkid) + (call-with-sub + localid linkid + (lambda (sub) + (define new-holders (hash-remove (subscription-holders sub) linkid)) + (if (hash-empty? new-holders) + (begin (specs (hash-remove (specs) (subscription-spec sub))) + (subs (hash-remove (subs) localid))) + (subs (hash-set (subs) localid (struct-copy subscription sub + [holders new-holders])))) + + ;; The messages we send depend on (hash-count new-holders): + ;; - if >1, there are enough other active subscribers that we don't need to send + ;; any messages. + ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid) + ;; - if =0, we retract the subscription from all peers except linkid + + (match (hash-count new-holders) + [0 (for [(peer (in-set (peers)))] + (when (not (equal? peer linkid)) + (send! (message-server->poa peer (Clear localid)))))] + [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid + (send! (message-server->poa peer (Clear localid))))] + [_ (void)])))) + + (define (adjust-matches localid linkid captures delta expected-outcome ctor) + (call-with-sub + localid linkid + (lambda (sub) + (define-values (new-matches outcome) + (bag-change (subscription-matches sub) captures delta #:clamp? #t)) + (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) + (when (eq? outcome expected-outcome) + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer linkid)) + (send! (message-server->poa peer (ctor peer-subid captures))))))))) + + (during (federated-link $linkid scope) + + (on-start (peers (set-add (peers) linkid))) + (on-stop (peers (set-remove (peers) linkid))) + + (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) + [link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) + ) + + (when (log-level? syndicate/federation-logger 'debug) + (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-subs ~v" + scope linkid (link-subs))) + (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-matches ~v" + scope linkid (link-matches)))) + + (on-start (for ([(spec localid) (in-hash (specs))]) + (send! (message-server->poa linkid (Assert localid (observe spec)))))) + + (on-stop (for ([item (in-bag (link-matches))]) + (match-define (cons localid captures) item) + (adjust-matches localid linkid captures -1 'present->absent Del)) + (for ([localid (in-hash-values (link-subs))]) + (unsubscribe! localid linkid))) + + (on (message (message-poa->server linkid (Assert $subid (observe $spec)))) + (define known? (hash-has-key? (specs) spec)) + (define localid (if known? (hash-ref (specs) spec) (make-localid))) + (define sub + (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag))))) + (define holders (subscription-holders sub)) + (cond + [(hash-has-key? holders linkid) + (log-syndicate/federation-error + "Duplicate subscription ~a, ID ~a, from link ~a. Ignoring." + spec + subid + linkid)] + [else + (link-subs (hash-set (link-subs) subid localid)) + (when (not known?) (specs (hash-set (specs) spec localid))) + (subs (hash-set (subs) localid (struct-copy subscription sub + [holders (hash-set holders linkid subid)]))) + + ;; If not known, then relay the subscription to all peers except `linkid`. + ;; + ;; If known, then one or more links that aren't this one have previously + ;; subscribed with this spec. If exactly one other link has previously + ;; subscribed, the only subscription that needs sent is to that peer; + ;; otherwise, no subscriptions at all need sent, since everyone has already + ;; been informed of this subscription. + + (cond + [(not known?) + (for [(peer (in-set (peers)))] + (when (not (equal? peer linkid)) + (send! (message-server->poa peer (Assert localid (observe spec))))))] + [(= (hash-count holders) 1) + (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid + (send! (message-server->poa peer (Assert localid (observe spec)))))] + [else + (void)]) + + ;; Once subscription relaying has taken place, send up matches to the active + ;; link. + (for [(captures (in-bag (subscription-matches sub)))] + (send! (message-server->poa linkid (Add subid captures)))) + + ])) + + (on (message (message-poa->server linkid (Clear $subid))) + (match (hash-ref (link-subs) subid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent subscription ID ~v from link ~v. Ignoring." + subid + linkid)] + [localid + (link-subs (hash-remove (link-subs) subid)) + (unsubscribe! localid linkid)])) + + (define (relay-add-or-del localid captures delta expected-outcome ctor) + (define-values (new-link-matches link-outcome) + (bag-change (link-matches) (cons localid captures) delta #:clamp? #t)) + (link-matches new-link-matches) + (when (eq? link-outcome expected-outcome) + (adjust-matches localid linkid captures delta expected-outcome ctor))) + + (on (message (message-poa->server linkid (Add $localid $captures))) + (relay-add-or-del localid captures +1 'absent->present Add)) + + (on (message (message-poa->server linkid (Del $localid $captures))) + (relay-add-or-del localid captures -1 'present->absent Del)) + + (on (message (message-poa->server linkid (Msg $localid $captures))) + (call-with-sub + localid linkid + (lambda (sub) + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer linkid)) + (send! (message-server->poa peer (Msg peer-subid captures)))))))) + + ))) diff --git a/syndicate/distributed/internal-protocol.rkt b/syndicate/distributed/internal-protocol.rkt new file mode 100644 index 0000000..d58df31 --- /dev/null +++ b/syndicate/distributed/internal-protocol.rkt @@ -0,0 +1,28 @@ +#lang imperative-syndicate +;; Internal server and federation protocol + +(provide (all-defined-out)) + +;; Client-server internal protocol +;; Received packets from server are relayed via one of these. +(message-struct server-packet (address packet)) +;; Like `server-connected`, but for reflecting `tcp-accepted` to the +;; client end of a client-server connection without reordering wrt +;; `server-packet` messages. Implementation-facing, where +;; `server-connected` is part of the API. +(assertion-struct server-transport-connected (address)) + +;; Internal connection protocol +(assertion-struct server-poa (connection-id)) ;; "Point of Attachment" +(assertion-struct message-poa->server (connection-id body)) +(assertion-struct message-server->poa (connection-id body)) + +;; Internal isolation -- these are isomorphic to `to-server` and `from-server`! +;; (and, for that matter, to `outbound` and `inbound`!) +(assertion-struct server-proposal (scope body)) ;; suggestions (~ actions) +(assertion-struct server-envelope (scope body)) ;; decisions (~ events) + +(assertion-struct server-active (scope)) + +;; Federated links generally +(assertion-struct federated-link (id scope)) diff --git a/syndicate/distributed/main.rkt b/syndicate/distributed/main.rkt index 17d3b7b..3d8c92e 100644 --- a/syndicate/distributed/main.rkt +++ b/syndicate/distributed/main.rkt @@ -8,6 +8,7 @@ (all-from-out "server/tcp.rkt") (all-from-out "server/websocket.rkt")) +(require "internal-protocol.rkt") (require "protocol.rkt") (require/activate "client.rkt") @@ -18,10 +19,13 @@ (require/activate "server/tcp.rkt") (require/activate "server/websocket.rkt") +(require/activate "federation.rkt") + (module+ main (require racket/cmdline) (define tcp-port default-tcp-server-port) (define http-port default-http-server-port) + (define uplinks '()) (command-line #:once-any ["--tcp" port ((format "Listen on plain TCP port (default ~a)" default-tcp-server-port)) @@ -33,10 +37,25 @@ ((format "Listen on websocket HTTP port (default ~a)" default-http-server-port)) (set! http-port (string->number port))] ["--no-http" "Do not listen on any websocket HTTP port" - (set! http-port #f)]) + (set! http-port #f)] + #:multi + ["--uplink" local-scope host port remote-scope + "Connect the named local scope to the named scope at the server at host:port" + (define port-number (string->number port)) + (when (not port-number) + (eprintf "Invalid --uplink port number: ~v" port) + (exit 1)) + (set! uplinks (cons (federated-uplink local-scope + (server-tcp-connection host + port-number + remote-scope)) + uplinks))]) (extend-ground-boot! (lambda () (when tcp-port (spawn-tcp-server! tcp-port)) - (when http-port (spawn-websocket-server! http-port))))) + (when http-port (spawn-websocket-server! http-port)) + (when (pair? uplinks) + (spawn (for [(u uplinks)] + (assert u))))))) (define-logger syndicate/distributed) diff --git a/syndicate/distributed/protocol.rkt b/syndicate/distributed/protocol.rkt index b1b1528..35a67e5 100644 --- a/syndicate/distributed/protocol.rkt +++ b/syndicate/distributed/protocol.rkt @@ -2,9 +2,21 @@ (provide (all-defined-out)) +;; Addressing +(assertion-struct server-tcp-connection (host port scope)) +(assertion-struct server-loopback-connection (scope)) + +(define (standard-localhost-server/tcp [scope "broker"]) + (server-tcp-connection "localhost" 8001 scope)) + ;; Client protocol (assertion-struct to-server (address assertion)) (assertion-struct from-server (address assertion)) (assertion-struct server-connection (address)) (assertion-struct server-connected (address)) (message-struct force-server-disconnect (address)) + +;; Federation configuration +;; e.g. (federated-uplink "broker" (server-tcp-connection "peer.example" 8001 "broker2")) +(assertion-struct federated-uplink (scope peer)) +(assertion-struct federated-uplink-connected (link)) diff --git a/syndicate/distributed/server.rkt b/syndicate/distributed/server.rkt index a44888b..7360cbe 100644 --- a/syndicate/distributed/server.rkt +++ b/syndicate/distributed/server.rkt @@ -1,24 +1,9 @@ #lang imperative-syndicate -(provide (struct-out server-poa) - (struct-out message-poa->server) - (struct-out message-server->poa) - (struct-out server-proposal) - (struct-out server-envelope)) - (require "wire-protocol.rkt") +(require "internal-protocol.rkt") (require racket/set) -;; Internal connection protocol -(assertion-struct server-poa (connection-id)) ;; "Point of Attachment" -(assertion-struct message-poa->server (connection-id body)) -(assertion-struct message-server->poa (connection-id body)) - -;; Internal isolation -- these are isomorphic to `to-server` and `from-server`! -;; (and, for that matter, to `outbound` and `inbound`!) -(assertion-struct server-proposal (scope body)) ;; suggestions (~ actions) -(assertion-struct server-envelope (scope body)) ;; decisions (~ events) - (spawn #:name 'server-factory ;; Previously, we just had server-envelope. Now, we have both @@ -30,24 +15,19 @@ (assert (server-envelope scope assertion))) (during/spawn (server-poa $id) - (on (message (message-poa->server id $p)) - (match p - [(Connect scope) (stop-current-facet (react (connected id scope)))] - [(Peer scope) (stop-current-facet (react (peering id scope)))] - [_ (send-error! id 'connection-not-setup)])))) + (on-start + (let-event [(message (message-poa->server id $p))] + (match p + [(Connect scope) (react (connected id scope))] + [(Peer scope) (react (assert (federated-link id scope)))] + [_ (send-error! id 'connection-not-setup)]))))) (define (send-error! id detail) (send! (message-server->poa id (Err detail)))) -(define (unhandled-message id p) - (match p - [(Connect _) (send-error! id 'duplicate-connection-setup)] - [(Peer _) (send-error! id 'duplicate-connection-setup)] - [(Ping) (send! (message-server->poa id (Pong)))] - [_ (send-error! id 'invalid-message)])) - (define (connected id scope) (define endpoints (set)) + (assert (server-active scope)) (on (message (message-poa->server id $p)) (match p [(Assert ep a) #:when (not (set-member? endpoints ep)) @@ -79,5 +59,9 @@ [other (unhandled-message id other)]))) -(define (peering id scope) - (error 'peering "Not yet implemented")) +(define (unhandled-message id p) + (match p + [(Connect _) (send-error! id 'duplicate-connection-setup)] + [(Peer _) (send-error! id 'duplicate-connection-setup)] + [(Ping) (send! (message-server->poa id (Pong)))] + [_ (send-error! id 'invalid-message)])) diff --git a/syndicate/distributed/server/tcp.rkt b/syndicate/distributed/server/tcp.rkt index 9cc2272..24bbaf5 100644 --- a/syndicate/distributed/server/tcp.rkt +++ b/syndicate/distributed/server/tcp.rkt @@ -5,6 +5,7 @@ spawn-tcp-server!) (require "../wire-protocol.rkt") +(require "../internal-protocol.rkt") (require/activate imperative-syndicate/drivers/tcp) (require/activate imperative-syndicate/distributed/server) diff --git a/syndicate/distributed/server/websocket.rkt b/syndicate/distributed/server/websocket.rkt index 372cd07..a750622 100644 --- a/syndicate/distributed/server/websocket.rkt +++ b/syndicate/distributed/server/websocket.rkt @@ -5,6 +5,7 @@ spawn-websocket-server!) (require "../wire-protocol.rkt") +(require "../internal-protocol.rkt") (require/activate imperative-syndicate/drivers/web) (require/activate imperative-syndicate/drivers/timer)