syndicate-rkt/syndicate/distributed/federation.rkt

383 lines
20 KiB
Racket

#lang imperative-syndicate
;; Relays for federation, both "client" (outbound) and "server" (inbound) ends.
(require "wire-protocol.rkt")
(require "internal-protocol.rkt")
(require "protocol.rkt")
(require imperative-syndicate/term)
(require imperative-syndicate/bag)
(require imperative-syndicate/reassert)
(require racket/set)
(require/activate imperative-syndicate/drivers/tcp)
(define-logger syndicate/federation)
;; A federated scope (as distinct from a non-federated server scope)
;; communicates via "links" to "peers", which come in three flavours:
;; - inbound links, aka "downlinks" from the POV of this node, which
;; result from incoming TCP/websocket/etc connections
;; - outbound links, aka "uplinks", which reach out to a remote TCP/
;; websocket/etc server
;; - local links, (usually? always?) just one per scope, which
;; connect the federated scope to its local server scope
;;
;; All links are identified by a link ID, scoped the same as
;; connection IDs in <server.rkt> (namely, dataspace-unique). Links
;; are stateful.
;;
;; The link protocol is enacted in special non-federated, local
;; federation-management server scopes, identified by
;; `federation-management-scope` assertions. The code in this module
;; responds to assertions and messages in these scopes. Besides its
;; scoped nature, the protocol is otherwise ordinary. By reusing
;; Syndicate itself for management and operation of federation, we are
;; able to address transport independently of federation.
;;
;; Inbound links are set up by code outside this module in response to
;; the appearance of some new federated peer "downstream" of this one.
;; For example, after establishing a new client-server connection to a
;; federation-management scope, a remote peer may begin using the link
;; protocol.
;;
;; Outbound links are created in response to an assertion of a
;; `federated-uplink` record in a federation-management scope. Each
;; such record contains a triple of a local scope ID, a client
;; transport address (such as `server-tcp-connection` from
;; <client/tcp.rkt>), and a remote scope ID. Together, these federate
;; the local and remote scope IDs via a client-server connection to
;; the given address.
;;
;; Local links are a special case of inbound link. They are created
;; automatically whenever there is an active server scope of the same
;; name as a federated scope.
;;
;; Local federation-management scopes must not be federated.
;; TODO: Enforce this?
;; Subscription IDs (== "endpoint IDs") must be connection-unique AND
;; must correspond one-to-one with a specific subscription spec. That
;; is, a subscription ID is merely connection-local shorthand for its
;; spec, and two subscription IDs within a connection must be `equal?`
;; exactly when their corresponding specs are `equal?`.
;;
;; Local IDs must be scope-unique. They are used as subscription IDs
;; in outbound messages.
;;
;; Each federated scope maintains a bidirectional mapping between
;; subscription IDs (each scoped within its connection ID) and local
;; IDs. One local ID may map to multiple subscription IDs - this is
;; the place where aggregation pops up.
;; Unlike the client/server protocol, both Actions and Events are
;; BIDIRECTIONAL, travelling in both directions along edges linking
;; peer nodes.
;;---------------------------------------------------------------------------
;; Outbound links. (Really, they end up being a kind of "inbound link"
;; too! Ultimately we have just *links*, connected to arbitrary
;; things. For traditional "inbound", it's some remote party that has
;; connected to us; for "local", it's a local server scope; for
;; "outbound", it's a connection to another server that we reached out
;; to.)
(spawn #:name 'federated-uplink-factory
(during (federation-management-scope $management-scope)
(during/spawn (server-envelope management-scope
($ link (federated-uplink $local-scope
$peer-addr
$remote-scope)))
#:name link
(during (server-connected peer-addr)
(assert (server-proposal management-scope (federated-uplink-connected link)))
;; ^ out to local requester
(define session-id (strong-gensym 'peer-))
(assert (server-proposal management-scope (federated-link session-id local-scope)))
(assert (to-server peer-addr (federated-link session-id remote-scope)))
;; We have to buffer in both directions, because at startup there's latency
;; between asserting a federated-link record and it being ready to receive
;; message-poa->server records.
(field [pending-in '()]
[pending-out '()])
(on (message (from-server peer-addr (message-server->poa session-id $p)))
(pending-in (cons p (pending-in))))
(on (message (server-envelope management-scope (message-server->poa session-id $p)))
(pending-out (cons p (pending-out))))
(during (server-envelope management-scope (observe (message-poa->server session-id _)))
(during (from-server peer-addr (observe (message-poa->server session-id _)))
(begin/dataflow
(when (pair? (pending-in))
(for [(p (reverse (pending-in)))]
(send! (server-proposal management-scope
(message-poa->server session-id p))))
(pending-in '())))
(begin/dataflow
(when (pair? (pending-out))
(for [(p (reverse (pending-out)))]
(send! (to-server peer-addr (message-poa->server session-id p))))
(pending-out '())))))))))
;;---------------------------------------------------------------------------
;; Local links.
(spawn #:name 'federated-local-link-factory
(during (federation-management-scope $management-scope)
(during (server-envelope management-scope (federated-link _ $scope))
(during/spawn (server-active scope)
#:name (list 'local-link management-scope scope)
(define session-id (gensym 'local-link))
(assert (server-proposal management-scope (federated-link session-id scope)))
(define (!! m)
(send! (server-proposal management-scope (message-poa->server session-id m))))
(on (message (server-envelope management-scope
(message-server->poa session-id
(Assert $subid (observe $spec)))))
(react
(define ((! ctor) cs) (!! (ctor subid cs)))
(add-observer-endpoint! (lambda () (server-proposal scope spec))
#:on-add (! Add)
#:on-remove (! Del)
#:on-message (! Msg))
(assert (server-envelope scope (observe spec)))
(stop-when (message
(server-envelope management-scope
(message-server->poa session-id (Clear subid)))))))
(during (observe ($ pat (server-envelope scope $spec)))
(define ep (gensym 'ep))
(on-start (!! (Assert ep (observe spec))))
(on-stop (!! (Clear ep)))
(on (message (server-envelope management-scope
(message-server->poa session-id (Add ep $captures))))
(react (assert (instantiate-term->value pat captures))
(stop-when (message
(server-envelope management-scope
(message-server->poa session-id
(Del ep captures)))))))
(on (message (server-envelope management-scope
(message-server->poa session-id (Msg ep $captures))))
(send! (instantiate-term->value pat captures))))))))
;;---------------------------------------------------------------------------
;; Federated scopes.
;; Internal state
(struct subscription (id ;; LocalID
spec ;; Assertion
holders ;; (Hash LinkID SubscriptionID)
matches ;; (Bag (Listof Assertion))
)
#:transparent)
(spawn #:name 'federated-scope-factory
(during (federation-management-scope $management-scope)
(define (send-to-link! linkid m)
(send! (server-proposal management-scope (message-server->poa linkid m))))
(during/spawn (server-envelope management-scope (federated-link _ $scope))
#:name (list 'federated-scope management-scope scope)
;; Generates a fresh local ID naming a subscription propagated to our peers.
(define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1))))))
(field [peers (set)] ;; (Set LinkID)
[specs (hash)] ;; (Hash Spec LocalID)
[subs (hasheq)] ;; (Hash LocalID Subscription)
)
(when (log-level? syndicate/federation-logger 'debug)
(begin/dataflow (log-syndicate/federation-debug "~a peers:" scope)
(for [(peer (in-set (peers)))]
(log-syndicate/federation-debug " link ~v" peer))
(log-syndicate/federation-debug "-"))
(begin/dataflow (log-syndicate/federation-debug "~a specs:" scope)
(for [((spec local) (in-hash (specs)))]
(log-syndicate/federation-debug " spec ~v -> local ~a" spec local))
(log-syndicate/federation-debug "-"))
(begin/dataflow (log-syndicate/federation-debug "~a subs:" scope)
(for [((local sub) (in-hash (subs)))]
(match-define (subscription _id spec holders matches) sub)
(log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec)
(when (not (hash-empty? holders))
(log-syndicate/federation-debug " holders:")
(for [((link ep) (in-hash holders))]
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
(when (not (bag-empty? matches))
(log-syndicate/federation-debug " matches:")
(for [((captures count) (in-bag/count matches))]
(log-syndicate/federation-debug " captures ~v count ~a"
captures count))))
(log-syndicate/federation-debug "-")))
(define (call-with-sub localid linkid f)
(match (hash-ref (subs) localid #f)
[#f (log-syndicate/federation-error
"Mention of nonexistent local ID ~v from link ~v. Ignoring."
localid
linkid)]
[sub (f sub)]))
(define (unsubscribe! localid linkid)
(call-with-sub
localid linkid
(lambda (sub)
(define new-holders (hash-remove (subscription-holders sub) linkid))
(if (hash-empty? new-holders)
(begin (specs (hash-remove (specs) (subscription-spec sub)))
(subs (hash-remove (subs) localid)))
(subs (hash-set (subs) localid (struct-copy subscription sub
[holders new-holders]))))
;; The messages we send depend on (hash-count new-holders):
;; - if >1, there are enough other active subscribers that we don't need to send
;; any messages.
;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid)
;; - if =0, we retract the subscription from all peers except linkid
(match (hash-count new-holders)
[0 (for [(peer (in-set (peers)))]
(when (not (equal? peer linkid))
(send-to-link! peer (Clear localid))))]
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid
(send-to-link! peer (Clear localid)))]
[_ (void)]))))
(define (adjust-matches localid linkid captures delta expected-outcome ctor)
(call-with-sub
localid linkid
(lambda (sub)
(define-values (new-matches outcome)
(bag-change (subscription-matches sub) captures delta #:clamp? #t))
(subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches])))
(when (eq? outcome expected-outcome)
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
(when (not (equal? peer linkid))
(send-to-link! peer (ctor peer-subid captures))))))))
(during (server-envelope management-scope (federated-link $linkid scope))
(on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid)
(peers (set-add (peers) linkid)))
(on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid)
(peers (set-remove (peers) linkid)))
(field [link-subs (hash)] ;; (Hash SubscriptionID LocalID)
[link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
)
(when (log-level? syndicate/federation-logger 'debug)
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid)
(for [((sub local) (in-hash (link-subs)))]
(log-syndicate/federation-debug " sub ~a -> local ~a" sub local))
(log-syndicate/federation-debug "-"))
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid)
(for [((item count) (in-bag/count (link-matches)))]
(match-define (cons local captures) item)
(log-syndicate/federation-debug " local ~a captures ~v count ~a"
local captures count))
(log-syndicate/federation-debug "-")))
(on-start (for ([(spec localid) (in-hash (specs))])
(send-to-link! linkid (Assert localid (observe spec)))))
(on-stop (for ([item (in-bag (link-matches))])
(match-define (cons localid captures) item)
(adjust-matches localid linkid captures -1 'present->absent Del))
(for ([localid (in-hash-values (link-subs))])
(unsubscribe! localid linkid)))
(on (message (server-envelope management-scope
(message-poa->server linkid
(Assert $subid (observe $spec)))))
(define known? (hash-has-key? (specs) spec))
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
(define sub
(hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag)))))
(define holders (subscription-holders sub))
(cond
[(hash-has-key? holders linkid)
(log-syndicate/federation-error
"Duplicate subscription ~a, ID ~a, from link ~a. Ignoring."
spec
subid
linkid)]
[else
(link-subs (hash-set (link-subs) subid localid))
(when (not known?) (specs (hash-set (specs) spec localid)))
(subs (hash-set (subs) localid (struct-copy subscription sub
[holders (hash-set holders linkid subid)])))
;; If not known, then relay the subscription to all peers except `linkid`.
;;
;; If known, then one or more links that aren't this one have previously
;; subscribed with this spec. If exactly one other link has previously
;; subscribed, the only subscription that needs sent is to that peer;
;; otherwise, no subscriptions at all need sent, since everyone has already
;; been informed of this subscription.
(cond
[(not known?)
(for [(peer (in-set (peers)))]
(when (not (equal? peer linkid))
(send-to-link! peer (Assert localid (observe spec)))))]
[(= (hash-count holders) 1)
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid
(send-to-link! peer (Assert localid (observe spec))))]
[else
(void)])
;; Once subscription relaying has taken place, send up matches to the active
;; link.
(for [(captures (in-bag (subscription-matches sub)))]
(send-to-link! linkid (Add subid captures)))
]))
(on (message (server-envelope management-scope
(message-poa->server linkid (Clear $subid))))
(match (hash-ref (link-subs) subid #f)
[#f (log-syndicate/federation-error
"Mention of nonexistent subscription ID ~v from link ~v. Ignoring."
subid
linkid)]
[localid
(link-subs (hash-remove (link-subs) subid))
(unsubscribe! localid linkid)]))
(define (relay-add-or-del localid captures delta expected-outcome ctor)
(define-values (new-link-matches link-outcome)
(bag-change (link-matches) (cons localid captures) delta #:clamp? #t))
(link-matches new-link-matches)
(when (eq? link-outcome expected-outcome)
(adjust-matches localid linkid captures delta expected-outcome ctor)))
(on (message (server-envelope management-scope
(message-poa->server linkid (Add $localid $captures))))
(relay-add-or-del localid captures +1 'absent->present Add))
(on (message (server-envelope management-scope
(message-poa->server linkid (Del $localid $captures))))
(relay-add-or-del localid captures -1 'present->absent Del))
(on (message (server-envelope management-scope
(message-poa->server linkid (Msg $localid $captures))))
(call-with-sub
localid linkid
(lambda (sub)
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
(when (not (equal? peer linkid))
(send-to-link! peer (Msg peer-subid captures)))))))
))))