#lang imperative-syndicate ;; Relays for federation, both "client" (outbound) and "server" (inbound) ends. (require "wire-protocol.rkt") (require "internal-protocol.rkt") (require "protocol.rkt") (require imperative-syndicate/term) (require imperative-syndicate/bag) (require imperative-syndicate/reassert) (require racket/set) (require/activate imperative-syndicate/drivers/tcp) (define-logger syndicate/federation) ;; A federated scope (as distinct from a non-federated server scope) ;; communicates via "links" to "peers", which come in three flavours: ;; - inbound links, aka "downlinks" from the POV of this node, which ;; result from incoming TCP/websocket/etc connections ;; - outbound links, aka "uplinks", which reach out to a remote TCP/ ;; websocket/etc server ;; - local links, (usually? always?) just one per scope, which ;; connect the federated scope to its local server scope ;; ;; All links are identified by a link ID, scoped the same as ;; connection IDs in (namely, dataspace-unique). Links ;; are stateful. ;; ;; Inbound links are set up in response to incoming connections; the ;; process on the other end of an inbound link is the part that ;; implements the TCP service. It too is stateful. At connection setup ;; time, the remote client instructs it on which local federated scope ;; it should create a link to. ;; ;; Outbound links are created in response to an assertion of a ;; `federated-uplink` record, which contains a pair of a local scope ;; ID and a client transport address (such as `server-tcp-connection` ;; from ). Together, these map from a local federated ;; scope to a remote transport endpoint, and within that endpoint, a ;; remote federated scope. ;; ;; Local links are created automatically whenever there is an active ;; server scope of the same name as a federated scope. ;; Subscription IDs (== "endpoint IDs") must be connection-unique AND ;; must correspond one-to-one with a specific subscription spec. That ;; is, a subscription ID is merely connection-local shorthand for its ;; spec, and two subscription IDs within a connection must be `equal?` ;; exactly when their corresponding specs are `equal?`. ;; ;; Local IDs must be scope-unique. They are used as subscription IDs ;; in outbound messages. ;; ;; Each federated scope maintains a bidirectional mapping between ;; subscription IDs (each scoped within its connection ID) and local ;; IDs. One local ID may map to multiple subscription IDs - this is ;; the place where aggregation pops up. ;; Unlike the client/server protocol, both Actions and Events are ;; BIDIRECTIONAL, travelling in both directions along edges linking ;; peer nodes. ;;--------------------------------------------------------------------------- ;; Inbound links: the heavy lifting is done by server.rkt along with some ;; transport-specific code e.g. server/tcp.rkt ;;--------------------------------------------------------------------------- ;; Outbound links. (spawn #:name 'federated-tcp-uplink-factory (during/spawn ($ link (federated-uplink $local-scope (server-tcp-connection $host $port $remote-scope))) #:name link (reassert-on (tcp-connection link (tcp-address host port)) (retracted (tcp-accepted link)) (asserted (tcp-rejected link _))) (during (tcp-accepted link) (assert (federated-uplink-connected link)) ;; out to local requester (define session-id (gensym 'uplink)) (assert (federated-link session-id local-scope)) ;; in to federated scope (define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server session-id p))))) (on (message (tcp-in link $bs)) (accumulate! bs)) (on (message (message-server->poa session-id $p)) (send! (tcp-out link (encode p)))) (on-start (send! (message-server->poa session-id (Peer remote-scope))))))) ;;--------------------------------------------------------------------------- ;; Local links. (spawn #:name 'federated-local-link-factory (during (federated-link _ $scope) (during/spawn (server-active scope) #:name (list 'local-link scope) (define session-id (gensym 'local-link)) (assert (federated-link session-id scope)) (on (message (message-server->poa session-id (Assert $subid (observe $spec)))) (react (define ((! ctor) cs) (send! (message-poa->server session-id (ctor subid cs)))) (add-observer-endpoint! (lambda () (server-proposal scope spec)) #:on-add (! Add) #:on-remove (! Del) #:on-message (! Msg)) (assert (server-envelope scope (observe spec))) (stop-when (message (message-server->poa session-id (Clear subid)))))) (during (observe ($ pat (server-envelope scope $spec))) (define ep (gensym 'ep)) (on-start (send! (message-poa->server session-id (Assert ep (observe spec))))) (on-stop (send! (message-poa->server session-id (Clear ep)))) (assert (server-envelope scope (observe spec))) (on (message (message-server->poa session-id (Add ep $captures))) (react (assert (instantiate-term->value pat captures)) (stop-when (message (message-server->poa session-id (Del ep captures)))))) (on (message (message-server->poa session-id (Msg ep $captures))) (send! (instantiate-term->value pat captures))))))) ;;--------------------------------------------------------------------------- ;; Federated scopes. ;; Internal state (struct subscription (id ;; LocalID spec ;; Assertion holders ;; (Hash ConnectionID SubscriptionID) matches ;; (Bag (Listof Assertion)) ) #:transparent) (spawn #:name 'federated-scope-factory (during/spawn (federated-link _ $scope) #:name (list 'federated-scope scope) ;; Generates a fresh local ID naming a subscription propagated to our peers. (define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1)))))) (field [peers (set)] ;; (Set LinkID) [specs (hash)] ;; (Hash Spec LocalID) [subs (hasheq)] ;; (Hash LocalID Subscription) ) (when (log-level? syndicate/federation-logger 'debug) (begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" scope (peers))) (begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" scope (specs))) (begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" scope (subs)))) (define (call-with-sub localid linkid f) (match (hash-ref (subs) localid #f) [#f (log-syndicate/federation-error "Mention of nonexistent local ID ~v from link ~v. Ignoring." localid linkid)] [sub (f sub)])) (define (unsubscribe! localid linkid) (call-with-sub localid linkid (lambda (sub) (define new-holders (hash-remove (subscription-holders sub) linkid)) (if (hash-empty? new-holders) (begin (specs (hash-remove (specs) (subscription-spec sub))) (subs (hash-remove (subs) localid))) (subs (hash-set (subs) localid (struct-copy subscription sub [holders new-holders])))) ;; The messages we send depend on (hash-count new-holders): ;; - if >1, there are enough other active subscribers that we don't need to send ;; any messages. ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid) ;; - if =0, we retract the subscription from all peers except linkid (match (hash-count new-holders) [0 (for [(peer (in-set (peers)))] (when (not (equal? peer linkid)) (send! (message-server->poa peer (Clear localid)))))] [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid (send! (message-server->poa peer (Clear localid))))] [_ (void)])))) (define (adjust-matches localid linkid captures delta expected-outcome ctor) (call-with-sub localid linkid (lambda (sub) (define-values (new-matches outcome) (bag-change (subscription-matches sub) captures delta #:clamp? #t)) (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) (when (eq? outcome expected-outcome) (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) (when (not (equal? peer linkid)) (send! (message-server->poa peer (ctor peer-subid captures))))))))) (during (federated-link $linkid scope) (on-start (peers (set-add (peers) linkid))) (on-stop (peers (set-remove (peers) linkid))) (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) [link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) ) (when (log-level? syndicate/federation-logger 'debug) (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-subs ~v" scope linkid (link-subs))) (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-matches ~v" scope linkid (link-matches)))) (on-start (for ([(spec localid) (in-hash (specs))]) (send! (message-server->poa linkid (Assert localid (observe spec)))))) (on-stop (for ([item (in-bag (link-matches))]) (match-define (cons localid captures) item) (adjust-matches localid linkid captures -1 'present->absent Del)) (for ([localid (in-hash-values (link-subs))]) (unsubscribe! localid linkid))) (on (message (message-poa->server linkid (Assert $subid (observe $spec)))) (define known? (hash-has-key? (specs) spec)) (define localid (if known? (hash-ref (specs) spec) (make-localid))) (define sub (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag))))) (define holders (subscription-holders sub)) (cond [(hash-has-key? holders linkid) (log-syndicate/federation-error "Duplicate subscription ~a, ID ~a, from link ~a. Ignoring." spec subid linkid)] [else (link-subs (hash-set (link-subs) subid localid)) (when (not known?) (specs (hash-set (specs) spec localid))) (subs (hash-set (subs) localid (struct-copy subscription sub [holders (hash-set holders linkid subid)]))) ;; If not known, then relay the subscription to all peers except `linkid`. ;; ;; If known, then one or more links that aren't this one have previously ;; subscribed with this spec. If exactly one other link has previously ;; subscribed, the only subscription that needs sent is to that peer; ;; otherwise, no subscriptions at all need sent, since everyone has already ;; been informed of this subscription. (cond [(not known?) (for [(peer (in-set (peers)))] (when (not (equal? peer linkid)) (send! (message-server->poa peer (Assert localid (observe spec))))))] [(= (hash-count holders) 1) (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid (send! (message-server->poa peer (Assert localid (observe spec)))))] [else (void)]) ;; Once subscription relaying has taken place, send up matches to the active ;; link. (for [(captures (in-bag (subscription-matches sub)))] (send! (message-server->poa linkid (Add subid captures)))) ])) (on (message (message-poa->server linkid (Clear $subid))) (match (hash-ref (link-subs) subid #f) [#f (log-syndicate/federation-error "Mention of nonexistent subscription ID ~v from link ~v. Ignoring." subid linkid)] [localid (link-subs (hash-remove (link-subs) subid)) (unsubscribe! localid linkid)])) (define (relay-add-or-del localid captures delta expected-outcome ctor) (define-values (new-link-matches link-outcome) (bag-change (link-matches) (cons localid captures) delta #:clamp? #t)) (link-matches new-link-matches) (when (eq? link-outcome expected-outcome) (adjust-matches localid linkid captures delta expected-outcome ctor))) (on (message (message-poa->server linkid (Add $localid $captures))) (relay-add-or-del localid captures +1 'absent->present Add)) (on (message (message-poa->server linkid (Del $localid $captures))) (relay-add-or-del localid captures -1 'present->absent Del)) (on (message (message-poa->server linkid (Msg $localid $captures))) (call-with-sub localid linkid (lambda (sub) (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) (when (not (equal? peer linkid)) (send! (message-server->poa peer (Msg peer-subid captures)))))))) )))