From 94e27e1cbc67085606eb6959ab39e6afc84c244c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 16 May 2019 22:28:42 +0100 Subject: [PATCH] Change federation protocol to be transport-neutral, carried via Syndicate itself --- syndicate/distributed/client/loopback.rkt | 12 +- syndicate/distributed/federation.rkt | 504 +++++++++++---------- syndicate/distributed/main.rkt | 21 +- syndicate/distributed/protocol.rkt | 6 +- syndicate/distributed/server.rkt | 13 +- syndicate/distributed/server/tcp.rkt | 3 +- syndicate/distributed/server/websocket.rkt | 3 +- syndicate/distributed/wire-protocol.rkt | 8 - syndicate/examples/server-chat-client.rkt | 2 +- 9 files changed, 299 insertions(+), 273 deletions(-) diff --git a/syndicate/distributed/client/loopback.rkt b/syndicate/distributed/client/loopback.rkt index 91f80e5..2c19588 100644 --- a/syndicate/distributed/client/loopback.rkt +++ b/syndicate/distributed/client/loopback.rkt @@ -14,9 +14,9 @@ #:name address (assert (server-poa address)) (on (message (message-server->poa address $p)) (send! (server-packet address p))) - (define !! (make-flow-controlled-sender message-poa->server address)) - (on (asserted (observe (message-poa->server address _))) - (react - (generic-client-session-facet address - scope - (lambda (x) (!! (message-poa->server address x)))))))) + (on-start (react + (stop-when (asserted (observe (message-poa->server address _))) + (react (generic-client-session-facet + address + scope + (lambda (x) (send! (message-poa->server address x)))))))))) diff --git a/syndicate/distributed/federation.rkt b/syndicate/distributed/federation.rkt index 6695f95..6064646 100644 --- a/syndicate/distributed/federation.rkt +++ b/syndicate/distributed/federation.rkt @@ -27,21 +27,34 @@ ;; connection IDs in (namely, dataspace-unique). Links ;; are stateful. ;; -;; Inbound links are set up in response to incoming connections; the -;; process on the other end of an inbound link is the part that -;; implements the TCP service. It too is stateful. At connection setup -;; time, the remote client instructs it on which local federated scope -;; it should create a link to. +;; The link protocol is enacted in special non-federated, local +;; federation-management server scopes, identified by +;; `federation-management-scope` assertions. The code in this module +;; responds to assertions and messages in these scopes. Besides its +;; scoped nature, the protocol is otherwise ordinary. By reusing +;; Syndicate itself for management and operation of federation, we are +;; able to address transport independently of federation. +;; +;; Inbound links are set up by code outside this module in response to +;; the appearance of some new federated peer "downstream" of this one. +;; For example, after establishing a new client-server connection to a +;; federation-management scope, a remote peer may begin using the link +;; protocol. ;; ;; Outbound links are created in response to an assertion of a -;; `federated-uplink` record, which contains a pair of a local scope -;; ID and a client transport address (such as `server-tcp-connection` -;; from ). Together, these map from a local federated -;; scope to a remote transport endpoint, and within that endpoint, a -;; remote federated scope. +;; `federated-uplink` record in a federation-management scope. Each +;; such record contains a triple of a local scope ID, a client +;; transport address (such as `server-tcp-connection` from +;; ), and a remote scope ID. Together, these federate +;; the local and remote scope IDs via a client-server connection to +;; the given address. ;; -;; Local links are created automatically whenever there is an active -;; server scope of the same name as a federated scope. +;; Local links are a special case of inbound link. They are created +;; automatically whenever there is an active server scope of the same +;; name as a federated scope. +;; +;; Local federation-management scopes must not be federated. +;; TODO: Enforce this? ;; Subscription IDs (== "endpoint IDs") must be connection-unique AND ;; must correspond one-to-one with a specific subscription spec. That @@ -62,70 +75,79 @@ ;; peer nodes. ;;--------------------------------------------------------------------------- -;; Inbound links: the heavy lifting is done by server.rkt along with some -;; transport-specific code e.g. server/tcp.rkt +;; Outbound links. (Really, they end up being a kind of "inbound link" +;; too! Ultimately we have just *links*, connected to arbitrary +;; things. For traditional "inbound", it's some remote party that has +;; connected to us; for "local", it's a local server scope; for +;; "outbound", it's a connection to another server that we reached out +;; to.) -;;--------------------------------------------------------------------------- -;; Outbound links. +(spawn #:name 'federated-uplink-factory + (during (federation-management-scope $management-scope) + (during/spawn (server-envelope management-scope + ($ link (federated-uplink $local-scope + $peer-addr + $remote-scope))) + #:name link + (during (server-connected peer-addr) -(spawn #:name 'federated-tcp-uplink-factory - (during/spawn ($ link (federated-uplink $local-scope - (server-tcp-connection $host $port $remote-scope))) - #:name link - (reassert-on (tcp-connection link (tcp-address host port)) - (retracted (tcp-accepted link)) - (asserted (tcp-rejected link _))) + (assert (server-proposal management-scope (federated-uplink-connected link))) + ;; ^ out to local requester - (during (tcp-accepted link) - (on-start (issue-unbounded-credit! tcp-in link)) - (assert (federated-uplink-connected link)) ;; out to local requester + (define session-id (strong-gensym 'peer-)) + (assert (server-proposal management-scope (federated-link session-id local-scope))) + (assert (to-server peer-addr (federated-link session-id remote-scope))) - (define session-id (gensym 'uplink)) - (assert (federated-link session-id local-scope)) ;; in to federated scope + (on (message (from-server peer-addr (message-server->poa session-id $p))) + (send! (server-proposal management-scope (message-poa->server session-id p)))) - (define !! (make-flow-controlled-sender message-poa->server session-id)) - (define accumulate! - (packet-accumulator (lambda (p) (!! (message-poa->server session-id p))))) - (on (message (tcp-in link $bs)) (accumulate! bs)) - - (on (message (message-server->poa session-id $p)) - (send! (tcp-out link (encode p)))) - - (on-start (send! (message-server->poa session-id (Peer remote-scope))))))) + (on (message (server-envelope management-scope (message-server->poa session-id $p))) + (send! (to-server peer-addr (message-poa->server session-id p)))))))) ;;--------------------------------------------------------------------------- ;; Local links. (spawn #:name 'federated-local-link-factory - (during (federated-link _ $scope) - (during/spawn (server-active scope) - #:name (list 'local-link scope) + (during (federation-management-scope $management-scope) + (during (server-envelope management-scope (federated-link _ $scope)) + (during/spawn (server-active scope) + #:name (list 'local-link management-scope scope) - (define session-id (gensym 'local-link)) - (assert (federated-link session-id scope)) + (define session-id (gensym 'local-link)) + (assert (server-proposal management-scope (federated-link session-id scope))) - (define !! (make-flow-controlled-sender message-poa->server session-id)) + (define (!! m) + (send! (server-proposal management-scope (message-poa->server session-id m)))) - (on (message (message-server->poa session-id (Assert $subid (observe $spec)))) - (react - (define ((! ctor) cs) (!! (message-poa->server session-id (ctor subid cs)))) - (add-observer-endpoint! (lambda () (server-proposal scope spec)) - #:on-add (! Add) - #:on-remove (! Del) - #:on-message (! Msg)) - (assert (server-envelope scope (observe spec))) - (stop-when (message (message-server->poa session-id (Clear subid)))))) + (on (message (server-envelope management-scope + (message-server->poa session-id + (Assert $subid (observe $spec))))) + (react + (define ((! ctor) cs) (!! (ctor subid cs))) + (add-observer-endpoint! (lambda () (server-proposal scope spec)) + #:on-add (! Add) + #:on-remove (! Del) + #:on-message (! Msg)) + (assert (server-envelope scope (observe spec))) + (stop-when (message + (server-envelope management-scope + (message-server->poa session-id (Clear subid))))))) - (during (observe ($ pat (server-envelope scope $spec))) - (define ep (gensym 'ep)) - (on-start (!! (message-poa->server session-id (Assert ep (observe spec))))) - (on-stop (!! (message-poa->server session-id (Clear ep)))) - (assert (server-envelope scope (observe spec))) - (on (message (message-server->poa session-id (Add ep $captures))) - (react (assert (instantiate-term->value pat captures)) - (stop-when (message (message-server->poa session-id (Del ep captures)))))) - (on (message (message-server->poa session-id (Msg ep $captures))) - (send! (instantiate-term->value pat captures))))))) + (during (observe ($ pat (server-envelope scope $spec))) + (define ep (gensym 'ep)) + (on-start (!! (Assert ep (observe spec)))) + (on-stop (!! (Clear ep))) + (assert (server-envelope scope (observe spec))) + (on (message (server-envelope management-scope + (message-server->poa session-id (Add ep $captures)))) + (react (assert (instantiate-term->value pat captures)) + (stop-when (message + (server-envelope management-scope + (message-server->poa session-id + (Del ep captures))))))) + (on (message (server-envelope management-scope + (message-server->poa session-id (Msg ep $captures)))) + (send! (instantiate-term->value pat captures)))))))) ;;--------------------------------------------------------------------------- ;; Federated scopes. @@ -139,197 +161,203 @@ #:transparent) (spawn #:name 'federated-scope-factory - (during/spawn (federated-link _ $scope) - #:name (list 'federated-scope scope) + (during (federation-management-scope $management-scope) - ;; Generates a fresh local ID naming a subscription propagated to our peers. - (define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1)))))) + (define (send-to-link! linkid m) + (send! (server-proposal management-scope (message-server->poa linkid m)))) - (field [peers (set)] ;; (Set LinkID) - [specs (hash)] ;; (Hash Spec LocalID) - [subs (hasheq)] ;; (Hash LocalID Subscription) - ) + (during/spawn (server-envelope management-scope (federated-link _ $scope)) + #:name (list 'federated-scope management-scope scope) - (when (log-level? syndicate/federation-logger 'debug) - (begin/dataflow (log-syndicate/federation-debug "~a peers:" scope) - (for [(peer (in-set (peers)))] - (log-syndicate/federation-debug " link ~v" peer)) - (log-syndicate/federation-debug "-")) - (begin/dataflow (log-syndicate/federation-debug "~a specs:" scope) - (for [((spec local) (in-hash (specs)))] - (log-syndicate/federation-debug " spec ~v -> local ~a" spec local)) - (log-syndicate/federation-debug "-")) - (begin/dataflow (log-syndicate/federation-debug "~a subs:" scope) - (for [((local sub) (in-hash (subs)))] - (match-define (subscription _id spec holders matches) sub) - (log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec) - (when (not (hash-empty? holders)) - (log-syndicate/federation-debug " holders:") - (for [((link ep) (in-hash holders))] - (log-syndicate/federation-debug " link ~a -> ep ~a" link ep))) - (when (not (bag-empty? matches)) - (log-syndicate/federation-debug " matches:") - (for [((captures count) (in-bag/count matches))] - (log-syndicate/federation-debug " captures ~v count ~a" - captures count)))) - (log-syndicate/federation-debug "-"))) + ;; Generates a fresh local ID naming a subscription propagated to our peers. + (define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1)))))) - (define (call-with-sub localid linkid f) - (match (hash-ref (subs) localid #f) - [#f (log-syndicate/federation-error - "Mention of nonexistent local ID ~v from link ~v. Ignoring." - localid - linkid)] - [sub (f sub)])) - - (define (unsubscribe! localid linkid) - (call-with-sub - localid linkid - (lambda (sub) - (define new-holders (hash-remove (subscription-holders sub) linkid)) - (if (hash-empty? new-holders) - (begin (specs (hash-remove (specs) (subscription-spec sub))) - (subs (hash-remove (subs) localid))) - (subs (hash-set (subs) localid (struct-copy subscription sub - [holders new-holders])))) - - ;; The messages we send depend on (hash-count new-holders): - ;; - if >1, there are enough other active subscribers that we don't need to send - ;; any messages. - ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid) - ;; - if =0, we retract the subscription from all peers except linkid - - (match (hash-count new-holders) - [0 (for [(peer (in-set (peers)))] - (when (not (equal? peer linkid)) - (send! (message-server->poa peer (Clear localid)))))] - [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid - (send! (message-server->poa peer (Clear localid))))] - [_ (void)])))) - - (define (adjust-matches localid linkid captures delta expected-outcome ctor) - (call-with-sub - localid linkid - (lambda (sub) - (define-values (new-matches outcome) - (bag-change (subscription-matches sub) captures delta #:clamp? #t)) - (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) - (when (eq? outcome expected-outcome) - (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) - (when (not (equal? peer linkid)) - (send! (message-server->poa peer (ctor peer-subid captures))))))))) - - (during (federated-link $linkid scope) - - (on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid) - (peers (set-add (peers) linkid))) - (on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid) - (peers (set-remove (peers) linkid))) - - (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) - [link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) + (field [peers (set)] ;; (Set LinkID) + [specs (hash)] ;; (Hash Spec LocalID) + [subs (hasheq)] ;; (Hash LocalID Subscription) ) (when (log-level? syndicate/federation-logger 'debug) - (begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid) - (for [((sub local) (in-hash (link-subs)))] - (log-syndicate/federation-debug " sub ~a -> local ~a" sub local)) + (begin/dataflow (log-syndicate/federation-debug "~a peers:" scope) + (for [(peer (in-set (peers)))] + (log-syndicate/federation-debug " link ~v" peer)) (log-syndicate/federation-debug "-")) - (begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid) - (for [((item count) (in-bag/count (link-matches)))] - (match-define (cons local captures) item) - (log-syndicate/federation-debug " local ~a captures ~v count ~a" - local captures count)) + (begin/dataflow (log-syndicate/federation-debug "~a specs:" scope) + (for [((spec local) (in-hash (specs)))] + (log-syndicate/federation-debug " spec ~v -> local ~a" spec local)) + (log-syndicate/federation-debug "-")) + (begin/dataflow (log-syndicate/federation-debug "~a subs:" scope) + (for [((local sub) (in-hash (subs)))] + (match-define (subscription _id spec holders matches) sub) + (log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec) + (when (not (hash-empty? holders)) + (log-syndicate/federation-debug " holders:") + (for [((link ep) (in-hash holders))] + (log-syndicate/federation-debug " link ~a -> ep ~a" link ep))) + (when (not (bag-empty? matches)) + (log-syndicate/federation-debug " matches:") + (for [((captures count) (in-bag/count matches))] + (log-syndicate/federation-debug " captures ~v count ~a" + captures count)))) (log-syndicate/federation-debug "-"))) - (on-start (for ([(spec localid) (in-hash (specs))]) - (send! (message-server->poa linkid (Assert localid (observe spec)))))) + (define (call-with-sub localid linkid f) + (match (hash-ref (subs) localid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent local ID ~v from link ~v. Ignoring." + localid + linkid)] + [sub (f sub)])) - (on-stop (for ([item (in-bag (link-matches))]) - (match-define (cons localid captures) item) - (adjust-matches localid linkid captures -1 'present->absent Del)) - (for ([localid (in-hash-values (link-subs))]) - (unsubscribe! localid linkid))) + (define (unsubscribe! localid linkid) + (call-with-sub + localid linkid + (lambda (sub) + (define new-holders (hash-remove (subscription-holders sub) linkid)) + (if (hash-empty? new-holders) + (begin (specs (hash-remove (specs) (subscription-spec sub))) + (subs (hash-remove (subs) localid))) + (subs (hash-set (subs) localid (struct-copy subscription sub + [holders new-holders])))) - (on-start (issue-unbounded-credit! message-poa->server linkid)) + ;; The messages we send depend on (hash-count new-holders): + ;; - if >1, there are enough other active subscribers that we don't need to send + ;; any messages. + ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid) + ;; - if =0, we retract the subscription from all peers except linkid - (on (message (message-poa->server linkid (Assert $subid (observe $spec)))) - (define known? (hash-has-key? (specs) spec)) - (define localid (if known? (hash-ref (specs) spec) (make-localid))) - (define sub - (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag))))) - (define holders (subscription-holders sub)) - (cond - [(hash-has-key? holders linkid) - (log-syndicate/federation-error - "Duplicate subscription ~a, ID ~a, from link ~a. Ignoring." - spec - subid - linkid)] - [else - (link-subs (hash-set (link-subs) subid localid)) - (when (not known?) (specs (hash-set (specs) spec localid))) - (subs (hash-set (subs) localid (struct-copy subscription sub - [holders (hash-set holders linkid subid)]))) - - ;; If not known, then relay the subscription to all peers except `linkid`. - ;; - ;; If known, then one or more links that aren't this one have previously - ;; subscribed with this spec. If exactly one other link has previously - ;; subscribed, the only subscription that needs sent is to that peer; - ;; otherwise, no subscriptions at all need sent, since everyone has already - ;; been informed of this subscription. - - (cond - [(not known?) - (for [(peer (in-set (peers)))] + (match (hash-count new-holders) + [0 (for [(peer (in-set (peers)))] (when (not (equal? peer linkid)) - (send! (message-server->poa peer (Assert localid (observe spec))))))] - [(= (hash-count holders) 1) - (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid - (send! (message-server->poa peer (Assert localid (observe spec)))))] - [else - (void)]) + (send-to-link! peer (Clear localid))))] + [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid + (send-to-link! peer (Clear localid)))] + [_ (void)])))) - ;; Once subscription relaying has taken place, send up matches to the active - ;; link. - (for [(captures (in-bag (subscription-matches sub)))] - (send! (message-server->poa linkid (Add subid captures)))) - - ])) - - (on (message (message-poa->server linkid (Clear $subid))) - (match (hash-ref (link-subs) subid #f) - [#f (log-syndicate/federation-error - "Mention of nonexistent subscription ID ~v from link ~v. Ignoring." - subid - linkid)] - [localid - (link-subs (hash-remove (link-subs) subid)) - (unsubscribe! localid linkid)])) - - (define (relay-add-or-del localid captures delta expected-outcome ctor) - (define-values (new-link-matches link-outcome) - (bag-change (link-matches) (cons localid captures) delta #:clamp? #t)) - (link-matches new-link-matches) - (when (eq? link-outcome expected-outcome) - (adjust-matches localid linkid captures delta expected-outcome ctor))) - - (on (message (message-poa->server linkid (Add $localid $captures))) - (relay-add-or-del localid captures +1 'absent->present Add)) - - (on (message (message-poa->server linkid (Del $localid $captures))) - (relay-add-or-del localid captures -1 'present->absent Del)) - - (on (message (message-poa->server linkid (Ping))) - (send! (message-server->poa linkid (Pong)))) - - (on (message (message-poa->server linkid (Msg $localid $captures))) - (call-with-sub - localid linkid - (lambda (sub) + (define (adjust-matches localid linkid captures delta expected-outcome ctor) + (call-with-sub + localid linkid + (lambda (sub) + (define-values (new-matches outcome) + (bag-change (subscription-matches sub) captures delta #:clamp? #t)) + (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) + (when (eq? outcome expected-outcome) (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) (when (not (equal? peer linkid)) - (send! (message-server->poa peer (Msg peer-subid captures)))))))) + (send-to-link! peer (ctor peer-subid captures)))))))) - ))) + (during (server-envelope management-scope (federated-link $linkid scope)) + + (on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid) + (peers (set-add (peers) linkid))) + (on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid) + (peers (set-remove (peers) linkid))) + + (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) + [link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) + ) + + (when (log-level? syndicate/federation-logger 'debug) + (begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid) + (for [((sub local) (in-hash (link-subs)))] + (log-syndicate/federation-debug " sub ~a -> local ~a" sub local)) + (log-syndicate/federation-debug "-")) + (begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid) + (for [((item count) (in-bag/count (link-matches)))] + (match-define (cons local captures) item) + (log-syndicate/federation-debug " local ~a captures ~v count ~a" + local captures count)) + (log-syndicate/federation-debug "-"))) + + (on-start (for ([(spec localid) (in-hash (specs))]) + (send-to-link! linkid (Assert localid (observe spec))))) + + (on-stop (for ([item (in-bag (link-matches))]) + (match-define (cons localid captures) item) + (adjust-matches localid linkid captures -1 'present->absent Del)) + (for ([localid (in-hash-values (link-subs))]) + (unsubscribe! localid linkid))) + + (on (message (server-envelope management-scope + (message-poa->server linkid + (Assert $subid (observe $spec))))) + (define known? (hash-has-key? (specs) spec)) + (define localid (if known? (hash-ref (specs) spec) (make-localid))) + (define sub + (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag))))) + (define holders (subscription-holders sub)) + (cond + [(hash-has-key? holders linkid) + (log-syndicate/federation-error + "Duplicate subscription ~a, ID ~a, from link ~a. Ignoring." + spec + subid + linkid)] + [else + (link-subs (hash-set (link-subs) subid localid)) + (when (not known?) (specs (hash-set (specs) spec localid))) + (subs (hash-set (subs) localid (struct-copy subscription sub + [holders (hash-set holders linkid subid)]))) + + ;; If not known, then relay the subscription to all peers except `linkid`. + ;; + ;; If known, then one or more links that aren't this one have previously + ;; subscribed with this spec. If exactly one other link has previously + ;; subscribed, the only subscription that needs sent is to that peer; + ;; otherwise, no subscriptions at all need sent, since everyone has already + ;; been informed of this subscription. + + (cond + [(not known?) + (for [(peer (in-set (peers)))] + (when (not (equal? peer linkid)) + (send-to-link! peer (Assert localid (observe spec)))))] + [(= (hash-count holders) 1) + (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid + (send-to-link! peer (Assert localid (observe spec))))] + [else + (void)]) + + ;; Once subscription relaying has taken place, send up matches to the active + ;; link. + (for [(captures (in-bag (subscription-matches sub)))] + (send-to-link! linkid (Add subid captures))) + + ])) + + (on (message (server-envelope management-scope + (message-poa->server linkid (Clear $subid)))) + (match (hash-ref (link-subs) subid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent subscription ID ~v from link ~v. Ignoring." + subid + linkid)] + [localid + (link-subs (hash-remove (link-subs) subid)) + (unsubscribe! localid linkid)])) + + (define (relay-add-or-del localid captures delta expected-outcome ctor) + (define-values (new-link-matches link-outcome) + (bag-change (link-matches) (cons localid captures) delta #:clamp? #t)) + (link-matches new-link-matches) + (when (eq? link-outcome expected-outcome) + (adjust-matches localid linkid captures delta expected-outcome ctor))) + + (on (message (server-envelope management-scope + (message-poa->server linkid (Add $localid $captures)))) + (relay-add-or-del localid captures +1 'absent->present Add)) + + (on (message (server-envelope management-scope + (message-poa->server linkid (Del $localid $captures)))) + (relay-add-or-del localid captures -1 'present->absent Del)) + + (on (message (server-envelope management-scope + (message-poa->server linkid (Msg $localid $captures)))) + (call-with-sub + localid linkid + (lambda (sub) + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer linkid)) + (send-to-link! peer (Msg peer-subid captures))))))) + + )))) diff --git a/syndicate/distributed/main.rkt b/syndicate/distributed/main.rkt index 3d8c92e..4a1ba15 100644 --- a/syndicate/distributed/main.rkt +++ b/syndicate/distributed/main.rkt @@ -25,7 +25,9 @@ (require racket/cmdline) (define tcp-port default-tcp-server-port) (define http-port default-http-server-port) + (define default-management-scope "local") (define uplinks '()) + (define management-scope default-management-scope) (command-line #:once-any ["--tcp" port ((format "Listen on plain TCP port (default ~a)" default-tcp-server-port)) @@ -39,8 +41,14 @@ ["--no-http" "Do not listen on any websocket HTTP port" (set! http-port #f)] #:multi + [("--management-scope" "-m") scope + ("Set the management scope for future `--uplink`s and, " + "ultimately, for local federation management use. " + (format "(default ~v)" default-management-scope)) + (set! management-scope scope)] ["--uplink" local-scope host port remote-scope - "Connect the named local scope to the named scope at the server at host:port" + ("Connect the named local-scope to the named remote-scope" + "via the management scope in the server at host:port") (define port-number (string->number port)) (when (not port-number) (eprintf "Invalid --uplink port number: ~v" port) @@ -48,14 +56,19 @@ (set! uplinks (cons (federated-uplink local-scope (server-tcp-connection host port-number - remote-scope)) + management-scope) + remote-scope) uplinks))]) (extend-ground-boot! (lambda () + (spawn (assert (federation-management-scope management-scope))) + ;; ^ for inbound as well as outbound links (when tcp-port (spawn-tcp-server! tcp-port)) (when http-port (spawn-websocket-server! http-port)) (when (pair? uplinks) - (spawn (for [(u uplinks)] - (assert u))))))) + (spawn (define a (server-loopback-connection management-scope)) + (assert (server-connection a)) + (for [(u uplinks)] + (assert (to-server a u)))))))) (define-logger syndicate/distributed) diff --git a/syndicate/distributed/protocol.rkt b/syndicate/distributed/protocol.rkt index 8f7c540..1e1b832 100644 --- a/syndicate/distributed/protocol.rkt +++ b/syndicate/distributed/protocol.rkt @@ -17,6 +17,8 @@ (message-struct force-server-disconnect (address)) ;; Federation configuration -;; e.g. (federated-uplink "scope1" (server-tcp-connection "peer.example" 8001 "scope2")) -(assertion-struct federated-uplink (scope peer)) +;; e.g. (federated-uplink "scope1" (server-tcp-connection "peer.example" 8001 "local") "scope2") +(assertion-struct federated-uplink (local-scope peer remote-scope)) (assertion-struct federated-uplink-connected (link)) + +(assertion-struct federation-management-scope (name)) diff --git a/syndicate/distributed/server.rkt b/syndicate/distributed/server.rkt index 08cf926..84eb27b 100644 --- a/syndicate/distributed/server.rkt +++ b/syndicate/distributed/server.rkt @@ -4,8 +4,6 @@ (require "internal-protocol.rkt") (require racket/set) -(require imperative-syndicate/protocol/credit) - (spawn #:name 'server-factory ;; Previously, we just had server-envelope. Now, we have both @@ -20,12 +18,9 @@ (during/spawn (server-poa $id) (on-start - (issue-credit! message-poa->server id) - (let-event [(message (message-poa->server id $p))] - (match p - [(Connect scope) (react (connected id scope))] - [(Peer scope) (react (assert (federated-link id scope)))] - [_ (send-error! id 'connection-not-setup)]))))) + (match (let-event [(message (message-poa->server id $p))] p) + [(Connect scope) (react (connected id scope))] + [_ (send-error! id 'connection-not-setup)])))) (define (send-error! id detail) (send! (message-server->poa id (Err detail)))) @@ -33,7 +28,6 @@ (define (connected id scope) (define endpoints (set)) (assert (server-active scope)) - (on-start (issue-unbounded-credit! message-poa->server id)) (on (message (message-poa->server id $p)) (match p [(Assert ep a) #:when (not (set-member? endpoints ep)) @@ -68,7 +62,6 @@ (define (unhandled-message id p) (match p [(Connect _) (send-error! id 'duplicate-connection-setup)] - [(Peer _) (send-error! id 'duplicate-connection-setup)] [(Ping) (send! (message-server->poa id (Pong)))] [(Pong) (void)] [_ (send-error! id 'invalid-message)])) diff --git a/syndicate/distributed/server/tcp.rkt b/syndicate/distributed/server/tcp.rkt index 1459ec1..260d19d 100644 --- a/syndicate/distributed/server/tcp.rkt +++ b/syndicate/distributed/server/tcp.rkt @@ -14,8 +14,7 @@ (assert (tcp-accepted id)) (assert (server-poa id)) (on-start (issue-unbounded-credit! tcp-in id)) - (define !! (make-flow-controlled-sender message-poa->server id)) - (define accumulate! (packet-accumulator (lambda (p) (!! (message-poa->server id p))))) + (define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p))))) (on (message (tcp-in id $bs)) (accumulate! bs)) (on (message (message-server->poa id $p)) diff --git a/syndicate/distributed/server/websocket.rkt b/syndicate/distributed/server/websocket.rkt index 59a986a..313ab6a 100644 --- a/syndicate/distributed/server/websocket.rkt +++ b/syndicate/distributed/server/websocket.rkt @@ -23,12 +23,11 @@ (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) (send! (message-server->poa id (Ping)))) - (define !! (make-flow-controlled-sender message-poa->server id)) (on (message (websocket-in id $body)) (define-values (packet remainder) (decode body)) (when (not (equal? remainder #"")) (error 'server-facet/websocket "Multiple packets in a single websocket message")) - (!! (message-poa->server id packet))) + (send! (message-poa->server id packet))) (on (message (message-server->poa id $p)) (send! (websocket-out id (encode p))) (when (Err? p) (stop-current-facet)))) diff --git a/syndicate/distributed/wire-protocol.rkt b/syndicate/distributed/wire-protocol.rkt index 0b2e9a7..e33d707 100644 --- a/syndicate/distributed/wire-protocol.rkt +++ b/syndicate/distributed/wire-protocol.rkt @@ -5,11 +5,9 @@ (require (prefix-in preserves: preserves)) (require bitsyntax) (require (only-in net/rfc6455 ws-idle-timeout)) -(require imperative-syndicate/protocol/credit) ;; Enrolment (message-struct Connect (scope)) ;; Client --> Server -(message-struct Peer (scope)) ;; Peer --> Peer ;; Actions; Client --> Server (and Peer --> Peer, except for Message) (message-struct Assert (endpoint-name assertion)) @@ -26,12 +24,6 @@ (message-struct Ping ()) (message-struct Pong ()) -;; Peering -;; ======= -;; -;; To peer, send `(Peer Scope)` at the start of a connection instead -;; of the usual `(Connect Scope)`. -;; ;; In peer mode, *actions* and *events* travel in *both* directions, ;; but `Message`s do not appear and (for now) `Assert` is only used to ;; establish `observe`s, i.e. subscriptions. diff --git a/syndicate/examples/server-chat-client.rkt b/syndicate/examples/server-chat-client.rkt index 0b23855..6e405ed 100644 --- a/syndicate/examples/server-chat-client.rkt +++ b/syndicate/examples/server-chat-client.rkt @@ -9,7 +9,7 @@ (define host (make-parameter "localhost")) (define port (make-parameter 8001)) -(define scope (make-parameter "local")) +(define scope (make-parameter "chat")) (module+ main (require racket/cmdline)