;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2010-2021 Tony Garnock-Jones #lang syndicate ;; Relays for federation, both "client" (outbound) and "server" (inbound) ends. (require "wire-protocol.rkt") (require "internal-protocol.rkt") (require "protocol.rkt") (require "buffer.rkt") (require "turn.rkt") (require syndicate/term) (require syndicate/reassert) (require racket/set) (require/activate syndicate/drivers/tcp) (define-logger syndicate/federation) ;; A federated scope (as distinct from a non-federated server scope) ;; communicates via "links" to "peers", which come in three flavours: ;; - inbound links, aka "downlinks" from the POV of this node, which ;; result from incoming TCP/websocket/etc connections ;; - outbound links, aka "uplinks", which reach out to a remote TCP/ ;; websocket/etc server ;; - local links, (usually? always?) just one per scope, which ;; connect the federated scope to its local server scope ;; ;; All links are identified by a link ID, scoped the same as ;; connection IDs in (namely, dataspace-unique). Links ;; are stateful. ;; ;; The link protocol is enacted in special non-federated, local ;; federation-management server scopes, identified by ;; `federation-management-scope` assertions. The code in this module ;; responds to assertions and messages in these scopes. Besides its ;; scoped nature, the protocol is otherwise ordinary. By reusing ;; Syndicate itself for management and operation of federation, we are ;; able to address transport independently of federation. ;; ;; Inbound links are set up by code outside this module in response to ;; the appearance of some new federated peer "downstream" of this one. ;; For example, after establishing a new client-server connection to a ;; federation-management scope, a remote peer may begin using the link ;; protocol. ;; ;; Outbound links are created in response to an assertion of a ;; `federated-uplink` record in a federation-management scope. Each ;; such record contains a triple of a local scope ID, a client ;; transport address (such as `server-tcp-connection` from ;; ), and a remote scope ID. Together, these federate ;; the local and remote scope IDs via a client-server connection to ;; the given address. ;; ;; Local links are a special case of inbound link. They are created ;; automatically whenever there is an active server scope of the same ;; name as a federated scope. ;; ;; Local federation-management scopes must not be federated. ;; TODO: Enforce this? ;; Subscription IDs (== "endpoint IDs") must be connection-unique AND ;; must correspond one-to-one with a specific subscription spec. That ;; is, a subscription ID is merely connection-local shorthand for its ;; spec, and two subscription IDs within a connection must be `equal?` ;; exactly when their corresponding specs are `equal?`. ;; ;; Local IDs must be scope-unique. They are used as subscription IDs ;; in outbound messages. ;; ;; Each federated scope maintains a bidirectional mapping between ;; subscription IDs (each scoped within its connection ID) and local ;; IDs. One local ID may map to multiple subscription IDs - this is ;; the place where aggregation pops up. ;; Unlike the client/server protocol, both Actions and Events are ;; BIDIRECTIONAL, travelling in both directions along edges linking ;; peer nodes. ;;--------------------------------------------------------------------------- ;; Outbound links. (Really, they end up being a kind of "inbound link" ;; too! Ultimately we have just *links*, connected to arbitrary ;; things. For traditional "inbound", it's some remote party that has ;; connected to us; for "local", it's a local server scope; for ;; "outbound", it's a connection to another server that we reached out ;; to.) (spawn #:name 'federated-uplink-factory (during (federation-management-scope $management-scope) (during/spawn (server-envelope management-scope ($ link (federated-uplink $local-scope $peer-addr $remote-scope))) #:name link (during (server-connected peer-addr) (assert (server-proposal management-scope (federated-uplink-connected link))) ;; ^ out to local requester (define session-id (strong-gensym 'peer-)) (assert (server-proposal management-scope (federated-link session-id local-scope))) (assert (to-server peer-addr (federated-link session-id remote-scope))) ;; We have to buffer in both directions, because at startup there's latency ;; between asserting a federated-link record and it being ready to receive ;; message-poa->server records. (define-values (push-in drain-in) (make-buffer)) (define-values (push-out drain-out) (make-buffer)) (on (message (from-server peer-addr (message-server->poa session-id $p))) (push-in p)) (on (message (server-envelope management-scope (message-server->poa session-id $p))) (push-out p)) (define (wrap p) (message-poa->server session-id p)) (during (server-envelope management-scope (federated-link-ready session-id)) (during (from-server peer-addr (federated-link-ready session-id)) (drain-in (lambda (p) (send! (server-proposal management-scope (wrap p))))) (drain-out (lambda (p) (send! (to-server peer-addr (wrap p))))))))))) ;;--------------------------------------------------------------------------- ;; Local links. (spawn #:name 'federated-local-link-factory (struct sub (spec [captures #:mutable]) #:transparent) (during (federation-management-scope $management-scope) (during (server-envelope management-scope (federated-link _ $scope)) (during/spawn (server-active scope) #:name (list 'local-link management-scope scope) (define session-id (gensym 'local-link)) (assert (server-proposal management-scope (federated-link session-id scope))) (define (!! m) (send! (server-proposal management-scope (message-poa->server session-id m)))) (define turn (turn-recorder (lambda (items) (!! (Turn items))))) (define remote-endpoints (hash)) (define local-endpoints (hash)) (define local-matches (hash)) (define (instantiate s vs) (instantiate-term->value (server-envelope scope (sub-spec s)) vs)) (on (asserted (observe (server-envelope scope $spec))) (define ep (gensym 'ep)) (extend-turn! turn (Assert ep (observe spec))) (set! local-endpoints (hash-set local-endpoints spec ep)) (set! local-matches (hash-set local-matches ep (sub spec (hash))))) (on (retracted (observe (server-envelope scope $spec))) (define ep (hash-ref local-endpoints spec)) (extend-turn! turn (Clear ep)) (set! local-endpoints (hash-remove local-endpoints spec))) (on (message (server-envelope management-scope (message-server->poa session-id (Turn $items)))) (for [(item (in-list items))] (match item [(Assert subid (observe spec)) (when (hash-has-key? remote-endpoints subid) (error 'local-link "Duplicate endpoint" subid)) (react (define ep-facet (current-facet)) (set! remote-endpoints (hash-set remote-endpoints subid ep-facet)) (on-stop (set! remote-endpoints (hash-remove remote-endpoints subid))) (assert (server-envelope scope (observe spec))) (define ((! ctor) cs) (extend-turn! turn (ctor subid cs))) (add-observer-endpoint! (lambda () (server-proposal scope spec)) #:on-add (! Add) #:on-remove (! Del) #:on-message (! Msg)))] [(Clear subid) (stop-facet (hash-ref remote-endpoints subid) (extend-turn! turn (End subid)))] [(Add ep vs) (let* ((s (hash-ref local-matches ep)) (a (instantiate s vs))) (set-sub-captures! s (hash-set (sub-captures s) vs a)) (assert! a))] [(Del ep vs) (let* ((s (hash-ref local-matches ep)) (a (hash-ref (sub-captures s) vs))) (retract! a) (set-sub-captures! s (hash-remove (sub-captures s) vs)))] [(Msg ep vs) (let* ((s (hash-ref local-matches ep))) (send! (instantiate s vs)))] [(End ep) (let* ((s (hash-ref local-matches ep #f))) (when s (for [(a (in-hash-values (sub-captures s)))] (retract! a)) (set! local-matches (hash-remove local-matches ep))))]))))))) ;;--------------------------------------------------------------------------- ;; Federated scopes. (spawn #:name 'federated-scope-factory (struct subscription (id ;; LocalID spec ;; Assertion holders ;; (Hash LinkID SubscriptionID) matches ;; (Hash (Listof Assertion) (Set LinkID)) ) #:transparent) (during (federation-management-scope $management-scope) (during/spawn (server-envelope management-scope (federated-link _ $scope)) #:name (list 'federated-scope management-scope scope) ;; Generates a fresh local ID naming a subscription propagated to our peers. (define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1)))))) (field [turns (hash)] ;; (Map LinkID Turn) [specs (hash)] ;; (Hash Spec LocalID) [subs (hasheq)] ;; (Hash LocalID Subscription) ) (define (send-to-link! peer p) (extend-turn! (hash-ref (turns) peer) p)) (when (log-level? syndicate/federation-logger 'debug) (begin/dataflow (log-syndicate/federation-debug "~a turns:" scope) (for [((peer turn) (in-hash (turns)))] (log-syndicate/federation-debug " link ~v -> ~v" peer (turn 'debug))) (log-syndicate/federation-debug "-")) (begin/dataflow (log-syndicate/federation-debug "~a specs:" scope) (for [((spec local) (in-hash (specs)))] (log-syndicate/federation-debug " spec ~v -> local ~a" spec local)) (log-syndicate/federation-debug "-")) (begin/dataflow (log-syndicate/federation-debug "~a subs:" scope) (for [((local sub) (in-hash (subs)))] (match-define (subscription _id spec holders matches) sub) (log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec) (when (not (hash-empty? holders)) (log-syndicate/federation-debug " holders:") (for [((link ep) (in-hash holders))] (log-syndicate/federation-debug " link ~a -> ep ~a" link ep))) (when (not (hash-empty? matches)) (log-syndicate/federation-debug " matches:") (for [((captures holders) (in-hash matches))] (log-syndicate/federation-debug " captures ~v held by ~a" captures holders)))) (log-syndicate/federation-debug "-"))) (define (call-with-sub localid linkid f #:not-found-ok? [not-found-ok? #t]) (match (hash-ref (subs) localid #f) [#f (when (not not-found-ok?) (log-syndicate/federation-error "Mention of nonexistent local ID ~v from link ~v. Ignoring." localid linkid))] [sub (f sub)])) (define (store-sub! sub) (match-define (subscription localid spec holders matches) sub) (if (and (hash-empty? holders) (hash-empty? matches)) (begin (specs (hash-remove (specs) spec)) (subs (hash-remove (subs) localid))) (subs (hash-set (subs) localid sub)))) (define (unsubscribe! localid linkid) (call-with-sub #:not-found-ok? #f localid linkid (lambda (sub) (define new-holders (hash-remove (subscription-holders sub) linkid)) (store-sub! (struct-copy subscription sub [holders new-holders])) ;; The messages we send depend on (hash-count new-holders): ;; - if >1, there are enough other active subscribers that we don't need to send ;; any messages. ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid) ;; - if =0, we retract the subscription from all peers except linkid (match (hash-count new-holders) [0 (for [((peer turn) (in-hash (turns)))] (when (not (equal? peer linkid)) (extend-turn! turn (Clear localid))))] [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid (send-to-link! peer (Clear localid)))] [_ (void)])))) (define (remove-match! localid captures linkid) (call-with-sub localid linkid (lambda (sub) (define old-matches (subscription-matches sub)) (define old-match-holders (hash-ref old-matches captures set)) (define new-match-holders (set-remove old-match-holders linkid)) (define new-matches (if (set-empty? new-match-holders) (hash-remove old-matches captures) (hash-set old-matches captures new-match-holders))) (store-sub! (struct-copy subscription sub [matches new-matches])) (match (set-count new-match-holders) [0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))] (when (not (equal? peer linkid)) (send-to-link! peer (Del peer-subid captures))))] [1 (for [(peer (in-set new-match-holders))] ;; only one, ≠ linkid (define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f)) (when maybe-peer-subid (send-to-link! peer (Del maybe-peer-subid captures))))] [_ (void)])))) (during (server-envelope management-scope (federated-link $linkid scope)) (assert (server-proposal management-scope (federated-link-ready linkid))) (define turn (turn-recorder (lambda (items) (send! (server-proposal management-scope (message-server->poa linkid (Turn items))))))) (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) [link-matches (hash)] ;; (Hash LocalID (Set (Listof Assertion))) ) (define (err! detail [context #f]) (send! (server-proposal management-scope (message-server->poa linkid (Err detail context)))) (reset-turn! turn) (stop-current-facet)) (on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid) (turns (hash-set (turns) linkid turn)) (for ([(spec localid) (in-hash (specs))]) (when (not (hash-empty? (subscription-holders (hash-ref (subs) localid)))) (extend-turn! turn (Assert localid (observe spec))))) (commit-turn! turn)) (on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid) (turns (hash-remove (turns) linkid)) (for [((localid matches) (in-hash (link-matches)))] (for [(captures (in-set matches))] (remove-match! localid captures linkid))) (for ([localid (in-hash-values (link-subs))]) (unsubscribe! localid linkid)) (commit-turn! turn)) (when (log-level? syndicate/federation-logger 'debug) (begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid) (for [((sub local) (in-hash (link-subs)))] (log-syndicate/federation-debug " sub ~a -> local ~a" sub local)) (log-syndicate/federation-debug "-")) (begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid) (for [((local matches) (in-hash (link-matches)))] (for [(captures (in-set matches))] (log-syndicate/federation-debug " local ~a captures ~v" local captures))) (log-syndicate/federation-debug "-"))) (stop-when (message (server-envelope management-scope (message-poa->server linkid (Err $detail $context)))) (log-syndicate/federation-error "Received Err from peer link ~v: detail ~v; context ~v" linkid detail context) (reset-turn! turn)) (on (message (server-envelope management-scope (message-poa->server linkid (Turn $items)))) (for [(item (in-list items))] (match item [(Assert subid (observe spec)) (define known? (hash-has-key? (specs) spec)) (define localid (if known? (hash-ref (specs) spec) (make-localid))) (define sub (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (hash))))) (define holders (subscription-holders sub)) (cond [(hash-has-key? holders linkid) (log-syndicate/federation-error "Duplicate subscription ~a, ID ~a, from link ~a." spec subid linkid) (err! 'duplicate-endpoint item)] [else (link-subs (hash-set (link-subs) subid localid)) (when (not known?) (specs (hash-set (specs) spec localid))) (subs (hash-set (subs) localid (struct-copy subscription sub [holders (hash-set holders linkid subid)]))) ;; If not known, then relay the subscription to all peers except `linkid`. ;; ;; If known, then one or more links that aren't this one have previously ;; subscribed with this spec. If exactly one other link has previously ;; subscribed, the only subscription that needs sent is to that peer; ;; otherwise, no subscriptions at all need sent, since everyone has already ;; been informed of this subscription. (cond [(not known?) (for [((peer peer-turn) (in-hash (turns)))] (when (not (equal? peer linkid)) (extend-turn! peer-turn (Assert localid (observe spec)))))] [(= (hash-count holders) 1) (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid (send-to-link! peer (Assert localid (observe spec))))] [else (void)]) ;; Once subscription relaying has taken place, send up matches to the active ;; link. (for [((captures match-holders) (in-hash (subscription-matches sub)))] ;; Compute the number of times someone OTHER THAN this link has asserted ;; a match to this spec. If it's nonzero, we need to hear about it: (when (not (set-empty? (set-remove match-holders linkid))) (extend-turn! turn (Add subid captures)))) ])] [(Clear subid) (match (hash-ref (link-subs) subid #f) [#f (log-syndicate/federation-error "Mention of nonexistent subscription ID ~v from link ~v." subid linkid) (err! 'nonexistent-endpoint item)] [localid (link-subs (hash-remove (link-subs) subid)) (unsubscribe! localid linkid)]) (extend-turn! turn (End subid))] [(End localid) (for [(captures (in-set (hash-ref (link-matches) localid set)))] (remove-match! localid captures linkid)) (link-matches (hash-remove (link-matches) localid))] [(Add localid captures) (define matches (hash-ref (link-matches) localid set)) (cond [(set-member? matches captures) (err! 'duplicate-capture item)] [else (link-matches (hash-set (link-matches) localid (set-add matches captures))) (call-with-sub localid linkid (lambda (sub) (define old-matches (subscription-matches sub)) (define old-match-holders (hash-ref old-matches captures set)) (define new-match-holders (set-add old-match-holders linkid)) (define new-matches (hash-set old-matches captures new-match-holders)) (store-sub! (struct-copy subscription sub [matches new-matches])) (match (set-count old-match-holders) [0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))] (when (not (equal? peer linkid)) (send-to-link! peer (Add peer-subid captures))))] [1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid (define peer-subid (hash-ref (subscription-holders sub) peer #f)) (when peer-subid ;; the other holder may not itself subscribe! (send-to-link! peer (Add peer-subid captures))))] [_ (void)])))])] [(Del localid captures) (define matches (hash-ref (link-matches) localid set)) (if (not (set-member? matches captures)) (err! 'nonexistent-capture item) (let ((new-matches (set-remove matches captures))) (link-matches (if (set-empty? new-matches) (hash-remove (link-matches) localid) (hash-set (link-matches) localid new-matches))) (remove-match! localid captures linkid)))] [(Msg localid captures) (call-with-sub localid linkid (lambda (sub) (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) (when (not (equal? peer linkid)) (send-to-link! peer (Msg peer-subid captures))))))] )))))))