Change federation protocol to be transport-neutral, carried via Syndicate itself
This commit is contained in:
parent
84361dcbaf
commit
94e27e1cbc
|
@ -14,9 +14,9 @@
|
||||||
#:name address
|
#:name address
|
||||||
(assert (server-poa address))
|
(assert (server-poa address))
|
||||||
(on (message (message-server->poa address $p)) (send! (server-packet address p)))
|
(on (message (message-server->poa address $p)) (send! (server-packet address p)))
|
||||||
(define !! (make-flow-controlled-sender message-poa->server address))
|
(on-start (react
|
||||||
(on (asserted (observe (message-poa->server address _)))
|
(stop-when (asserted (observe (message-poa->server address _)))
|
||||||
(react
|
(react (generic-client-session-facet
|
||||||
(generic-client-session-facet address
|
address
|
||||||
scope
|
scope
|
||||||
(lambda (x) (!! (message-poa->server address x))))))))
|
(lambda (x) (send! (message-poa->server address x))))))))))
|
||||||
|
|
|
@ -27,21 +27,34 @@
|
||||||
;; connection IDs in <server.rkt> (namely, dataspace-unique). Links
|
;; connection IDs in <server.rkt> (namely, dataspace-unique). Links
|
||||||
;; are stateful.
|
;; are stateful.
|
||||||
;;
|
;;
|
||||||
;; Inbound links are set up in response to incoming connections; the
|
;; The link protocol is enacted in special non-federated, local
|
||||||
;; process on the other end of an inbound link is the part that
|
;; federation-management server scopes, identified by
|
||||||
;; implements the TCP service. It too is stateful. At connection setup
|
;; `federation-management-scope` assertions. The code in this module
|
||||||
;; time, the remote client instructs it on which local federated scope
|
;; responds to assertions and messages in these scopes. Besides its
|
||||||
;; it should create a link to.
|
;; scoped nature, the protocol is otherwise ordinary. By reusing
|
||||||
|
;; Syndicate itself for management and operation of federation, we are
|
||||||
|
;; able to address transport independently of federation.
|
||||||
|
;;
|
||||||
|
;; Inbound links are set up by code outside this module in response to
|
||||||
|
;; the appearance of some new federated peer "downstream" of this one.
|
||||||
|
;; For example, after establishing a new client-server connection to a
|
||||||
|
;; federation-management scope, a remote peer may begin using the link
|
||||||
|
;; protocol.
|
||||||
;;
|
;;
|
||||||
;; Outbound links are created in response to an assertion of a
|
;; Outbound links are created in response to an assertion of a
|
||||||
;; `federated-uplink` record, which contains a pair of a local scope
|
;; `federated-uplink` record in a federation-management scope. Each
|
||||||
;; ID and a client transport address (such as `server-tcp-connection`
|
;; such record contains a triple of a local scope ID, a client
|
||||||
;; from <client/tcp.rkt>). Together, these map from a local federated
|
;; transport address (such as `server-tcp-connection` from
|
||||||
;; scope to a remote transport endpoint, and within that endpoint, a
|
;; <client/tcp.rkt>), and a remote scope ID. Together, these federate
|
||||||
;; remote federated scope.
|
;; the local and remote scope IDs via a client-server connection to
|
||||||
|
;; the given address.
|
||||||
;;
|
;;
|
||||||
;; Local links are created automatically whenever there is an active
|
;; Local links are a special case of inbound link. They are created
|
||||||
;; server scope of the same name as a federated scope.
|
;; automatically whenever there is an active server scope of the same
|
||||||
|
;; name as a federated scope.
|
||||||
|
;;
|
||||||
|
;; Local federation-management scopes must not be federated.
|
||||||
|
;; TODO: Enforce this?
|
||||||
|
|
||||||
;; Subscription IDs (== "endpoint IDs") must be connection-unique AND
|
;; Subscription IDs (== "endpoint IDs") must be connection-unique AND
|
||||||
;; must correspond one-to-one with a specific subscription spec. That
|
;; must correspond one-to-one with a specific subscription spec. That
|
||||||
|
@ -62,70 +75,79 @@
|
||||||
;; peer nodes.
|
;; peer nodes.
|
||||||
|
|
||||||
;;---------------------------------------------------------------------------
|
;;---------------------------------------------------------------------------
|
||||||
;; Inbound links: the heavy lifting is done by server.rkt along with some
|
;; Outbound links. (Really, they end up being a kind of "inbound link"
|
||||||
;; transport-specific code e.g. server/tcp.rkt
|
;; too! Ultimately we have just *links*, connected to arbitrary
|
||||||
|
;; things. For traditional "inbound", it's some remote party that has
|
||||||
|
;; connected to us; for "local", it's a local server scope; for
|
||||||
|
;; "outbound", it's a connection to another server that we reached out
|
||||||
|
;; to.)
|
||||||
|
|
||||||
;;---------------------------------------------------------------------------
|
(spawn #:name 'federated-uplink-factory
|
||||||
;; Outbound links.
|
(during (federation-management-scope $management-scope)
|
||||||
|
(during/spawn (server-envelope management-scope
|
||||||
|
($ link (federated-uplink $local-scope
|
||||||
|
$peer-addr
|
||||||
|
$remote-scope)))
|
||||||
|
#:name link
|
||||||
|
(during (server-connected peer-addr)
|
||||||
|
|
||||||
(spawn #:name 'federated-tcp-uplink-factory
|
(assert (server-proposal management-scope (federated-uplink-connected link)))
|
||||||
(during/spawn ($ link (federated-uplink $local-scope
|
;; ^ out to local requester
|
||||||
(server-tcp-connection $host $port $remote-scope)))
|
|
||||||
#:name link
|
|
||||||
(reassert-on (tcp-connection link (tcp-address host port))
|
|
||||||
(retracted (tcp-accepted link))
|
|
||||||
(asserted (tcp-rejected link _)))
|
|
||||||
|
|
||||||
(during (tcp-accepted link)
|
(define session-id (strong-gensym 'peer-))
|
||||||
(on-start (issue-unbounded-credit! tcp-in link))
|
(assert (server-proposal management-scope (federated-link session-id local-scope)))
|
||||||
(assert (federated-uplink-connected link)) ;; out to local requester
|
(assert (to-server peer-addr (federated-link session-id remote-scope)))
|
||||||
|
|
||||||
(define session-id (gensym 'uplink))
|
(on (message (from-server peer-addr (message-server->poa session-id $p)))
|
||||||
(assert (federated-link session-id local-scope)) ;; in to federated scope
|
(send! (server-proposal management-scope (message-poa->server session-id p))))
|
||||||
|
|
||||||
(define !! (make-flow-controlled-sender message-poa->server session-id))
|
(on (message (server-envelope management-scope (message-server->poa session-id $p)))
|
||||||
(define accumulate!
|
(send! (to-server peer-addr (message-poa->server session-id p))))))))
|
||||||
(packet-accumulator (lambda (p) (!! (message-poa->server session-id p)))))
|
|
||||||
(on (message (tcp-in link $bs)) (accumulate! bs))
|
|
||||||
|
|
||||||
(on (message (message-server->poa session-id $p))
|
|
||||||
(send! (tcp-out link (encode p))))
|
|
||||||
|
|
||||||
(on-start (send! (message-server->poa session-id (Peer remote-scope)))))))
|
|
||||||
|
|
||||||
;;---------------------------------------------------------------------------
|
;;---------------------------------------------------------------------------
|
||||||
;; Local links.
|
;; Local links.
|
||||||
|
|
||||||
(spawn #:name 'federated-local-link-factory
|
(spawn #:name 'federated-local-link-factory
|
||||||
(during (federated-link _ $scope)
|
(during (federation-management-scope $management-scope)
|
||||||
(during/spawn (server-active scope)
|
(during (server-envelope management-scope (federated-link _ $scope))
|
||||||
#:name (list 'local-link scope)
|
(during/spawn (server-active scope)
|
||||||
|
#:name (list 'local-link management-scope scope)
|
||||||
|
|
||||||
(define session-id (gensym 'local-link))
|
(define session-id (gensym 'local-link))
|
||||||
(assert (federated-link session-id scope))
|
(assert (server-proposal management-scope (federated-link session-id scope)))
|
||||||
|
|
||||||
(define !! (make-flow-controlled-sender message-poa->server session-id))
|
(define (!! m)
|
||||||
|
(send! (server-proposal management-scope (message-poa->server session-id m))))
|
||||||
|
|
||||||
(on (message (message-server->poa session-id (Assert $subid (observe $spec))))
|
(on (message (server-envelope management-scope
|
||||||
(react
|
(message-server->poa session-id
|
||||||
(define ((! ctor) cs) (!! (message-poa->server session-id (ctor subid cs))))
|
(Assert $subid (observe $spec)))))
|
||||||
(add-observer-endpoint! (lambda () (server-proposal scope spec))
|
(react
|
||||||
#:on-add (! Add)
|
(define ((! ctor) cs) (!! (ctor subid cs)))
|
||||||
#:on-remove (! Del)
|
(add-observer-endpoint! (lambda () (server-proposal scope spec))
|
||||||
#:on-message (! Msg))
|
#:on-add (! Add)
|
||||||
(assert (server-envelope scope (observe spec)))
|
#:on-remove (! Del)
|
||||||
(stop-when (message (message-server->poa session-id (Clear subid))))))
|
#:on-message (! Msg))
|
||||||
|
(assert (server-envelope scope (observe spec)))
|
||||||
|
(stop-when (message
|
||||||
|
(server-envelope management-scope
|
||||||
|
(message-server->poa session-id (Clear subid)))))))
|
||||||
|
|
||||||
(during (observe ($ pat (server-envelope scope $spec)))
|
(during (observe ($ pat (server-envelope scope $spec)))
|
||||||
(define ep (gensym 'ep))
|
(define ep (gensym 'ep))
|
||||||
(on-start (!! (message-poa->server session-id (Assert ep (observe spec)))))
|
(on-start (!! (Assert ep (observe spec))))
|
||||||
(on-stop (!! (message-poa->server session-id (Clear ep))))
|
(on-stop (!! (Clear ep)))
|
||||||
(assert (server-envelope scope (observe spec)))
|
(assert (server-envelope scope (observe spec)))
|
||||||
(on (message (message-server->poa session-id (Add ep $captures)))
|
(on (message (server-envelope management-scope
|
||||||
(react (assert (instantiate-term->value pat captures))
|
(message-server->poa session-id (Add ep $captures))))
|
||||||
(stop-when (message (message-server->poa session-id (Del ep captures))))))
|
(react (assert (instantiate-term->value pat captures))
|
||||||
(on (message (message-server->poa session-id (Msg ep $captures)))
|
(stop-when (message
|
||||||
(send! (instantiate-term->value pat captures)))))))
|
(server-envelope management-scope
|
||||||
|
(message-server->poa session-id
|
||||||
|
(Del ep captures)))))))
|
||||||
|
(on (message (server-envelope management-scope
|
||||||
|
(message-server->poa session-id (Msg ep $captures))))
|
||||||
|
(send! (instantiate-term->value pat captures))))))))
|
||||||
|
|
||||||
;;---------------------------------------------------------------------------
|
;;---------------------------------------------------------------------------
|
||||||
;; Federated scopes.
|
;; Federated scopes.
|
||||||
|
@ -139,197 +161,203 @@
|
||||||
#:transparent)
|
#:transparent)
|
||||||
|
|
||||||
(spawn #:name 'federated-scope-factory
|
(spawn #:name 'federated-scope-factory
|
||||||
(during/spawn (federated-link _ $scope)
|
(during (federation-management-scope $management-scope)
|
||||||
#:name (list 'federated-scope scope)
|
|
||||||
|
|
||||||
;; Generates a fresh local ID naming a subscription propagated to our peers.
|
(define (send-to-link! linkid m)
|
||||||
(define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1))))))
|
(send! (server-proposal management-scope (message-server->poa linkid m))))
|
||||||
|
|
||||||
(field [peers (set)] ;; (Set LinkID)
|
(during/spawn (server-envelope management-scope (federated-link _ $scope))
|
||||||
[specs (hash)] ;; (Hash Spec LocalID)
|
#:name (list 'federated-scope management-scope scope)
|
||||||
[subs (hasheq)] ;; (Hash LocalID Subscription)
|
|
||||||
)
|
|
||||||
|
|
||||||
(when (log-level? syndicate/federation-logger 'debug)
|
;; Generates a fresh local ID naming a subscription propagated to our peers.
|
||||||
(begin/dataflow (log-syndicate/federation-debug "~a peers:" scope)
|
(define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1))))))
|
||||||
(for [(peer (in-set (peers)))]
|
|
||||||
(log-syndicate/federation-debug " link ~v" peer))
|
|
||||||
(log-syndicate/federation-debug "-"))
|
|
||||||
(begin/dataflow (log-syndicate/federation-debug "~a specs:" scope)
|
|
||||||
(for [((spec local) (in-hash (specs)))]
|
|
||||||
(log-syndicate/federation-debug " spec ~v -> local ~a" spec local))
|
|
||||||
(log-syndicate/federation-debug "-"))
|
|
||||||
(begin/dataflow (log-syndicate/federation-debug "~a subs:" scope)
|
|
||||||
(for [((local sub) (in-hash (subs)))]
|
|
||||||
(match-define (subscription _id spec holders matches) sub)
|
|
||||||
(log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec)
|
|
||||||
(when (not (hash-empty? holders))
|
|
||||||
(log-syndicate/federation-debug " holders:")
|
|
||||||
(for [((link ep) (in-hash holders))]
|
|
||||||
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
|
|
||||||
(when (not (bag-empty? matches))
|
|
||||||
(log-syndicate/federation-debug " matches:")
|
|
||||||
(for [((captures count) (in-bag/count matches))]
|
|
||||||
(log-syndicate/federation-debug " captures ~v count ~a"
|
|
||||||
captures count))))
|
|
||||||
(log-syndicate/federation-debug "-")))
|
|
||||||
|
|
||||||
(define (call-with-sub localid linkid f)
|
(field [peers (set)] ;; (Set LinkID)
|
||||||
(match (hash-ref (subs) localid #f)
|
[specs (hash)] ;; (Hash Spec LocalID)
|
||||||
[#f (log-syndicate/federation-error
|
[subs (hasheq)] ;; (Hash LocalID Subscription)
|
||||||
"Mention of nonexistent local ID ~v from link ~v. Ignoring."
|
|
||||||
localid
|
|
||||||
linkid)]
|
|
||||||
[sub (f sub)]))
|
|
||||||
|
|
||||||
(define (unsubscribe! localid linkid)
|
|
||||||
(call-with-sub
|
|
||||||
localid linkid
|
|
||||||
(lambda (sub)
|
|
||||||
(define new-holders (hash-remove (subscription-holders sub) linkid))
|
|
||||||
(if (hash-empty? new-holders)
|
|
||||||
(begin (specs (hash-remove (specs) (subscription-spec sub)))
|
|
||||||
(subs (hash-remove (subs) localid)))
|
|
||||||
(subs (hash-set (subs) localid (struct-copy subscription sub
|
|
||||||
[holders new-holders]))))
|
|
||||||
|
|
||||||
;; The messages we send depend on (hash-count new-holders):
|
|
||||||
;; - if >1, there are enough other active subscribers that we don't need to send
|
|
||||||
;; any messages.
|
|
||||||
;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid)
|
|
||||||
;; - if =0, we retract the subscription from all peers except linkid
|
|
||||||
|
|
||||||
(match (hash-count new-holders)
|
|
||||||
[0 (for [(peer (in-set (peers)))]
|
|
||||||
(when (not (equal? peer linkid))
|
|
||||||
(send! (message-server->poa peer (Clear localid)))))]
|
|
||||||
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid
|
|
||||||
(send! (message-server->poa peer (Clear localid))))]
|
|
||||||
[_ (void)]))))
|
|
||||||
|
|
||||||
(define (adjust-matches localid linkid captures delta expected-outcome ctor)
|
|
||||||
(call-with-sub
|
|
||||||
localid linkid
|
|
||||||
(lambda (sub)
|
|
||||||
(define-values (new-matches outcome)
|
|
||||||
(bag-change (subscription-matches sub) captures delta #:clamp? #t))
|
|
||||||
(subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches])))
|
|
||||||
(when (eq? outcome expected-outcome)
|
|
||||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
|
||||||
(when (not (equal? peer linkid))
|
|
||||||
(send! (message-server->poa peer (ctor peer-subid captures)))))))))
|
|
||||||
|
|
||||||
(during (federated-link $linkid scope)
|
|
||||||
|
|
||||||
(on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid)
|
|
||||||
(peers (set-add (peers) linkid)))
|
|
||||||
(on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid)
|
|
||||||
(peers (set-remove (peers) linkid)))
|
|
||||||
|
|
||||||
(field [link-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
|
||||||
[link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
|
|
||||||
)
|
)
|
||||||
|
|
||||||
(when (log-level? syndicate/federation-logger 'debug)
|
(when (log-level? syndicate/federation-logger 'debug)
|
||||||
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid)
|
(begin/dataflow (log-syndicate/federation-debug "~a peers:" scope)
|
||||||
(for [((sub local) (in-hash (link-subs)))]
|
(for [(peer (in-set (peers)))]
|
||||||
(log-syndicate/federation-debug " sub ~a -> local ~a" sub local))
|
(log-syndicate/federation-debug " link ~v" peer))
|
||||||
(log-syndicate/federation-debug "-"))
|
(log-syndicate/federation-debug "-"))
|
||||||
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid)
|
(begin/dataflow (log-syndicate/federation-debug "~a specs:" scope)
|
||||||
(for [((item count) (in-bag/count (link-matches)))]
|
(for [((spec local) (in-hash (specs)))]
|
||||||
(match-define (cons local captures) item)
|
(log-syndicate/federation-debug " spec ~v -> local ~a" spec local))
|
||||||
(log-syndicate/federation-debug " local ~a captures ~v count ~a"
|
(log-syndicate/federation-debug "-"))
|
||||||
local captures count))
|
(begin/dataflow (log-syndicate/federation-debug "~a subs:" scope)
|
||||||
|
(for [((local sub) (in-hash (subs)))]
|
||||||
|
(match-define (subscription _id spec holders matches) sub)
|
||||||
|
(log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec)
|
||||||
|
(when (not (hash-empty? holders))
|
||||||
|
(log-syndicate/federation-debug " holders:")
|
||||||
|
(for [((link ep) (in-hash holders))]
|
||||||
|
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
|
||||||
|
(when (not (bag-empty? matches))
|
||||||
|
(log-syndicate/federation-debug " matches:")
|
||||||
|
(for [((captures count) (in-bag/count matches))]
|
||||||
|
(log-syndicate/federation-debug " captures ~v count ~a"
|
||||||
|
captures count))))
|
||||||
(log-syndicate/federation-debug "-")))
|
(log-syndicate/federation-debug "-")))
|
||||||
|
|
||||||
(on-start (for ([(spec localid) (in-hash (specs))])
|
(define (call-with-sub localid linkid f)
|
||||||
(send! (message-server->poa linkid (Assert localid (observe spec))))))
|
(match (hash-ref (subs) localid #f)
|
||||||
|
[#f (log-syndicate/federation-error
|
||||||
|
"Mention of nonexistent local ID ~v from link ~v. Ignoring."
|
||||||
|
localid
|
||||||
|
linkid)]
|
||||||
|
[sub (f sub)]))
|
||||||
|
|
||||||
(on-stop (for ([item (in-bag (link-matches))])
|
(define (unsubscribe! localid linkid)
|
||||||
(match-define (cons localid captures) item)
|
(call-with-sub
|
||||||
(adjust-matches localid linkid captures -1 'present->absent Del))
|
localid linkid
|
||||||
(for ([localid (in-hash-values (link-subs))])
|
(lambda (sub)
|
||||||
(unsubscribe! localid linkid)))
|
(define new-holders (hash-remove (subscription-holders sub) linkid))
|
||||||
|
(if (hash-empty? new-holders)
|
||||||
|
(begin (specs (hash-remove (specs) (subscription-spec sub)))
|
||||||
|
(subs (hash-remove (subs) localid)))
|
||||||
|
(subs (hash-set (subs) localid (struct-copy subscription sub
|
||||||
|
[holders new-holders]))))
|
||||||
|
|
||||||
(on-start (issue-unbounded-credit! message-poa->server linkid))
|
;; The messages we send depend on (hash-count new-holders):
|
||||||
|
;; - if >1, there are enough other active subscribers that we don't need to send
|
||||||
|
;; any messages.
|
||||||
|
;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid)
|
||||||
|
;; - if =0, we retract the subscription from all peers except linkid
|
||||||
|
|
||||||
(on (message (message-poa->server linkid (Assert $subid (observe $spec))))
|
(match (hash-count new-holders)
|
||||||
(define known? (hash-has-key? (specs) spec))
|
[0 (for [(peer (in-set (peers)))]
|
||||||
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
|
||||||
(define sub
|
|
||||||
(hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag)))))
|
|
||||||
(define holders (subscription-holders sub))
|
|
||||||
(cond
|
|
||||||
[(hash-has-key? holders linkid)
|
|
||||||
(log-syndicate/federation-error
|
|
||||||
"Duplicate subscription ~a, ID ~a, from link ~a. Ignoring."
|
|
||||||
spec
|
|
||||||
subid
|
|
||||||
linkid)]
|
|
||||||
[else
|
|
||||||
(link-subs (hash-set (link-subs) subid localid))
|
|
||||||
(when (not known?) (specs (hash-set (specs) spec localid)))
|
|
||||||
(subs (hash-set (subs) localid (struct-copy subscription sub
|
|
||||||
[holders (hash-set holders linkid subid)])))
|
|
||||||
|
|
||||||
;; If not known, then relay the subscription to all peers except `linkid`.
|
|
||||||
;;
|
|
||||||
;; If known, then one or more links that aren't this one have previously
|
|
||||||
;; subscribed with this spec. If exactly one other link has previously
|
|
||||||
;; subscribed, the only subscription that needs sent is to that peer;
|
|
||||||
;; otherwise, no subscriptions at all need sent, since everyone has already
|
|
||||||
;; been informed of this subscription.
|
|
||||||
|
|
||||||
(cond
|
|
||||||
[(not known?)
|
|
||||||
(for [(peer (in-set (peers)))]
|
|
||||||
(when (not (equal? peer linkid))
|
(when (not (equal? peer linkid))
|
||||||
(send! (message-server->poa peer (Assert localid (observe spec))))))]
|
(send-to-link! peer (Clear localid))))]
|
||||||
[(= (hash-count holders) 1)
|
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid
|
||||||
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid
|
(send-to-link! peer (Clear localid)))]
|
||||||
(send! (message-server->poa peer (Assert localid (observe spec)))))]
|
[_ (void)]))))
|
||||||
[else
|
|
||||||
(void)])
|
|
||||||
|
|
||||||
;; Once subscription relaying has taken place, send up matches to the active
|
(define (adjust-matches localid linkid captures delta expected-outcome ctor)
|
||||||
;; link.
|
(call-with-sub
|
||||||
(for [(captures (in-bag (subscription-matches sub)))]
|
localid linkid
|
||||||
(send! (message-server->poa linkid (Add subid captures))))
|
(lambda (sub)
|
||||||
|
(define-values (new-matches outcome)
|
||||||
]))
|
(bag-change (subscription-matches sub) captures delta #:clamp? #t))
|
||||||
|
(subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches])))
|
||||||
(on (message (message-poa->server linkid (Clear $subid)))
|
(when (eq? outcome expected-outcome)
|
||||||
(match (hash-ref (link-subs) subid #f)
|
|
||||||
[#f (log-syndicate/federation-error
|
|
||||||
"Mention of nonexistent subscription ID ~v from link ~v. Ignoring."
|
|
||||||
subid
|
|
||||||
linkid)]
|
|
||||||
[localid
|
|
||||||
(link-subs (hash-remove (link-subs) subid))
|
|
||||||
(unsubscribe! localid linkid)]))
|
|
||||||
|
|
||||||
(define (relay-add-or-del localid captures delta expected-outcome ctor)
|
|
||||||
(define-values (new-link-matches link-outcome)
|
|
||||||
(bag-change (link-matches) (cons localid captures) delta #:clamp? #t))
|
|
||||||
(link-matches new-link-matches)
|
|
||||||
(when (eq? link-outcome expected-outcome)
|
|
||||||
(adjust-matches localid linkid captures delta expected-outcome ctor)))
|
|
||||||
|
|
||||||
(on (message (message-poa->server linkid (Add $localid $captures)))
|
|
||||||
(relay-add-or-del localid captures +1 'absent->present Add))
|
|
||||||
|
|
||||||
(on (message (message-poa->server linkid (Del $localid $captures)))
|
|
||||||
(relay-add-or-del localid captures -1 'present->absent Del))
|
|
||||||
|
|
||||||
(on (message (message-poa->server linkid (Ping)))
|
|
||||||
(send! (message-server->poa linkid (Pong))))
|
|
||||||
|
|
||||||
(on (message (message-poa->server linkid (Msg $localid $captures)))
|
|
||||||
(call-with-sub
|
|
||||||
localid linkid
|
|
||||||
(lambda (sub)
|
|
||||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||||
(when (not (equal? peer linkid))
|
(when (not (equal? peer linkid))
|
||||||
(send! (message-server->poa peer (Msg peer-subid captures))))))))
|
(send-to-link! peer (ctor peer-subid captures))))))))
|
||||||
|
|
||||||
)))
|
(during (server-envelope management-scope (federated-link $linkid scope))
|
||||||
|
|
||||||
|
(on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid)
|
||||||
|
(peers (set-add (peers) linkid)))
|
||||||
|
(on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid)
|
||||||
|
(peers (set-remove (peers) linkid)))
|
||||||
|
|
||||||
|
(field [link-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
||||||
|
[link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
|
||||||
|
)
|
||||||
|
|
||||||
|
(when (log-level? syndicate/federation-logger 'debug)
|
||||||
|
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid)
|
||||||
|
(for [((sub local) (in-hash (link-subs)))]
|
||||||
|
(log-syndicate/federation-debug " sub ~a -> local ~a" sub local))
|
||||||
|
(log-syndicate/federation-debug "-"))
|
||||||
|
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid)
|
||||||
|
(for [((item count) (in-bag/count (link-matches)))]
|
||||||
|
(match-define (cons local captures) item)
|
||||||
|
(log-syndicate/federation-debug " local ~a captures ~v count ~a"
|
||||||
|
local captures count))
|
||||||
|
(log-syndicate/federation-debug "-")))
|
||||||
|
|
||||||
|
(on-start (for ([(spec localid) (in-hash (specs))])
|
||||||
|
(send-to-link! linkid (Assert localid (observe spec)))))
|
||||||
|
|
||||||
|
(on-stop (for ([item (in-bag (link-matches))])
|
||||||
|
(match-define (cons localid captures) item)
|
||||||
|
(adjust-matches localid linkid captures -1 'present->absent Del))
|
||||||
|
(for ([localid (in-hash-values (link-subs))])
|
||||||
|
(unsubscribe! localid linkid)))
|
||||||
|
|
||||||
|
(on (message (server-envelope management-scope
|
||||||
|
(message-poa->server linkid
|
||||||
|
(Assert $subid (observe $spec)))))
|
||||||
|
(define known? (hash-has-key? (specs) spec))
|
||||||
|
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
||||||
|
(define sub
|
||||||
|
(hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag)))))
|
||||||
|
(define holders (subscription-holders sub))
|
||||||
|
(cond
|
||||||
|
[(hash-has-key? holders linkid)
|
||||||
|
(log-syndicate/federation-error
|
||||||
|
"Duplicate subscription ~a, ID ~a, from link ~a. Ignoring."
|
||||||
|
spec
|
||||||
|
subid
|
||||||
|
linkid)]
|
||||||
|
[else
|
||||||
|
(link-subs (hash-set (link-subs) subid localid))
|
||||||
|
(when (not known?) (specs (hash-set (specs) spec localid)))
|
||||||
|
(subs (hash-set (subs) localid (struct-copy subscription sub
|
||||||
|
[holders (hash-set holders linkid subid)])))
|
||||||
|
|
||||||
|
;; If not known, then relay the subscription to all peers except `linkid`.
|
||||||
|
;;
|
||||||
|
;; If known, then one or more links that aren't this one have previously
|
||||||
|
;; subscribed with this spec. If exactly one other link has previously
|
||||||
|
;; subscribed, the only subscription that needs sent is to that peer;
|
||||||
|
;; otherwise, no subscriptions at all need sent, since everyone has already
|
||||||
|
;; been informed of this subscription.
|
||||||
|
|
||||||
|
(cond
|
||||||
|
[(not known?)
|
||||||
|
(for [(peer (in-set (peers)))]
|
||||||
|
(when (not (equal? peer linkid))
|
||||||
|
(send-to-link! peer (Assert localid (observe spec)))))]
|
||||||
|
[(= (hash-count holders) 1)
|
||||||
|
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid
|
||||||
|
(send-to-link! peer (Assert localid (observe spec))))]
|
||||||
|
[else
|
||||||
|
(void)])
|
||||||
|
|
||||||
|
;; Once subscription relaying has taken place, send up matches to the active
|
||||||
|
;; link.
|
||||||
|
(for [(captures (in-bag (subscription-matches sub)))]
|
||||||
|
(send-to-link! linkid (Add subid captures)))
|
||||||
|
|
||||||
|
]))
|
||||||
|
|
||||||
|
(on (message (server-envelope management-scope
|
||||||
|
(message-poa->server linkid (Clear $subid))))
|
||||||
|
(match (hash-ref (link-subs) subid #f)
|
||||||
|
[#f (log-syndicate/federation-error
|
||||||
|
"Mention of nonexistent subscription ID ~v from link ~v. Ignoring."
|
||||||
|
subid
|
||||||
|
linkid)]
|
||||||
|
[localid
|
||||||
|
(link-subs (hash-remove (link-subs) subid))
|
||||||
|
(unsubscribe! localid linkid)]))
|
||||||
|
|
||||||
|
(define (relay-add-or-del localid captures delta expected-outcome ctor)
|
||||||
|
(define-values (new-link-matches link-outcome)
|
||||||
|
(bag-change (link-matches) (cons localid captures) delta #:clamp? #t))
|
||||||
|
(link-matches new-link-matches)
|
||||||
|
(when (eq? link-outcome expected-outcome)
|
||||||
|
(adjust-matches localid linkid captures delta expected-outcome ctor)))
|
||||||
|
|
||||||
|
(on (message (server-envelope management-scope
|
||||||
|
(message-poa->server linkid (Add $localid $captures))))
|
||||||
|
(relay-add-or-del localid captures +1 'absent->present Add))
|
||||||
|
|
||||||
|
(on (message (server-envelope management-scope
|
||||||
|
(message-poa->server linkid (Del $localid $captures))))
|
||||||
|
(relay-add-or-del localid captures -1 'present->absent Del))
|
||||||
|
|
||||||
|
(on (message (server-envelope management-scope
|
||||||
|
(message-poa->server linkid (Msg $localid $captures))))
|
||||||
|
(call-with-sub
|
||||||
|
localid linkid
|
||||||
|
(lambda (sub)
|
||||||
|
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||||
|
(when (not (equal? peer linkid))
|
||||||
|
(send-to-link! peer (Msg peer-subid captures)))))))
|
||||||
|
|
||||||
|
))))
|
||||||
|
|
|
@ -25,7 +25,9 @@
|
||||||
(require racket/cmdline)
|
(require racket/cmdline)
|
||||||
(define tcp-port default-tcp-server-port)
|
(define tcp-port default-tcp-server-port)
|
||||||
(define http-port default-http-server-port)
|
(define http-port default-http-server-port)
|
||||||
|
(define default-management-scope "local")
|
||||||
(define uplinks '())
|
(define uplinks '())
|
||||||
|
(define management-scope default-management-scope)
|
||||||
(command-line #:once-any
|
(command-line #:once-any
|
||||||
["--tcp" port
|
["--tcp" port
|
||||||
((format "Listen on plain TCP port (default ~a)" default-tcp-server-port))
|
((format "Listen on plain TCP port (default ~a)" default-tcp-server-port))
|
||||||
|
@ -39,8 +41,14 @@
|
||||||
["--no-http" "Do not listen on any websocket HTTP port"
|
["--no-http" "Do not listen on any websocket HTTP port"
|
||||||
(set! http-port #f)]
|
(set! http-port #f)]
|
||||||
#:multi
|
#:multi
|
||||||
|
[("--management-scope" "-m") scope
|
||||||
|
("Set the management scope for future `--uplink`s and, "
|
||||||
|
"ultimately, for local federation management use. "
|
||||||
|
(format "(default ~v)" default-management-scope))
|
||||||
|
(set! management-scope scope)]
|
||||||
["--uplink" local-scope host port remote-scope
|
["--uplink" local-scope host port remote-scope
|
||||||
"Connect the named local scope to the named scope at the server at host:port"
|
("Connect the named local-scope to the named remote-scope"
|
||||||
|
"via the management scope in the server at host:port")
|
||||||
(define port-number (string->number port))
|
(define port-number (string->number port))
|
||||||
(when (not port-number)
|
(when (not port-number)
|
||||||
(eprintf "Invalid --uplink port number: ~v" port)
|
(eprintf "Invalid --uplink port number: ~v" port)
|
||||||
|
@ -48,14 +56,19 @@
|
||||||
(set! uplinks (cons (federated-uplink local-scope
|
(set! uplinks (cons (federated-uplink local-scope
|
||||||
(server-tcp-connection host
|
(server-tcp-connection host
|
||||||
port-number
|
port-number
|
||||||
remote-scope))
|
management-scope)
|
||||||
|
remote-scope)
|
||||||
uplinks))])
|
uplinks))])
|
||||||
(extend-ground-boot! (lambda ()
|
(extend-ground-boot! (lambda ()
|
||||||
|
(spawn (assert (federation-management-scope management-scope)))
|
||||||
|
;; ^ for inbound as well as outbound links
|
||||||
(when tcp-port (spawn-tcp-server! tcp-port))
|
(when tcp-port (spawn-tcp-server! tcp-port))
|
||||||
(when http-port (spawn-websocket-server! http-port))
|
(when http-port (spawn-websocket-server! http-port))
|
||||||
(when (pair? uplinks)
|
(when (pair? uplinks)
|
||||||
(spawn (for [(u uplinks)]
|
(spawn (define a (server-loopback-connection management-scope))
|
||||||
(assert u)))))))
|
(assert (server-connection a))
|
||||||
|
(for [(u uplinks)]
|
||||||
|
(assert (to-server a u))))))))
|
||||||
|
|
||||||
(define-logger syndicate/distributed)
|
(define-logger syndicate/distributed)
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
(message-struct force-server-disconnect (address))
|
(message-struct force-server-disconnect (address))
|
||||||
|
|
||||||
;; Federation configuration
|
;; Federation configuration
|
||||||
;; e.g. (federated-uplink "scope1" (server-tcp-connection "peer.example" 8001 "scope2"))
|
;; e.g. (federated-uplink "scope1" (server-tcp-connection "peer.example" 8001 "local") "scope2")
|
||||||
(assertion-struct federated-uplink (scope peer))
|
(assertion-struct federated-uplink (local-scope peer remote-scope))
|
||||||
(assertion-struct federated-uplink-connected (link))
|
(assertion-struct federated-uplink-connected (link))
|
||||||
|
|
||||||
|
(assertion-struct federation-management-scope (name))
|
||||||
|
|
|
@ -4,8 +4,6 @@
|
||||||
(require "internal-protocol.rkt")
|
(require "internal-protocol.rkt")
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
|
|
||||||
(require imperative-syndicate/protocol/credit)
|
|
||||||
|
|
||||||
(spawn #:name 'server-factory
|
(spawn #:name 'server-factory
|
||||||
|
|
||||||
;; Previously, we just had server-envelope. Now, we have both
|
;; Previously, we just had server-envelope. Now, we have both
|
||||||
|
@ -20,12 +18,9 @@
|
||||||
|
|
||||||
(during/spawn (server-poa $id)
|
(during/spawn (server-poa $id)
|
||||||
(on-start
|
(on-start
|
||||||
(issue-credit! message-poa->server id)
|
(match (let-event [(message (message-poa->server id $p))] p)
|
||||||
(let-event [(message (message-poa->server id $p))]
|
[(Connect scope) (react (connected id scope))]
|
||||||
(match p
|
[_ (send-error! id 'connection-not-setup)]))))
|
||||||
[(Connect scope) (react (connected id scope))]
|
|
||||||
[(Peer scope) (react (assert (federated-link id scope)))]
|
|
||||||
[_ (send-error! id 'connection-not-setup)])))))
|
|
||||||
|
|
||||||
(define (send-error! id detail)
|
(define (send-error! id detail)
|
||||||
(send! (message-server->poa id (Err detail))))
|
(send! (message-server->poa id (Err detail))))
|
||||||
|
@ -33,7 +28,6 @@
|
||||||
(define (connected id scope)
|
(define (connected id scope)
|
||||||
(define endpoints (set))
|
(define endpoints (set))
|
||||||
(assert (server-active scope))
|
(assert (server-active scope))
|
||||||
(on-start (issue-unbounded-credit! message-poa->server id))
|
|
||||||
(on (message (message-poa->server id $p))
|
(on (message (message-poa->server id $p))
|
||||||
(match p
|
(match p
|
||||||
[(Assert ep a) #:when (not (set-member? endpoints ep))
|
[(Assert ep a) #:when (not (set-member? endpoints ep))
|
||||||
|
@ -68,7 +62,6 @@
|
||||||
(define (unhandled-message id p)
|
(define (unhandled-message id p)
|
||||||
(match p
|
(match p
|
||||||
[(Connect _) (send-error! id 'duplicate-connection-setup)]
|
[(Connect _) (send-error! id 'duplicate-connection-setup)]
|
||||||
[(Peer _) (send-error! id 'duplicate-connection-setup)]
|
|
||||||
[(Ping) (send! (message-server->poa id (Pong)))]
|
[(Ping) (send! (message-server->poa id (Pong)))]
|
||||||
[(Pong) (void)]
|
[(Pong) (void)]
|
||||||
[_ (send-error! id 'invalid-message)]))
|
[_ (send-error! id 'invalid-message)]))
|
||||||
|
|
|
@ -14,8 +14,7 @@
|
||||||
(assert (tcp-accepted id))
|
(assert (tcp-accepted id))
|
||||||
(assert (server-poa id))
|
(assert (server-poa id))
|
||||||
(on-start (issue-unbounded-credit! tcp-in id))
|
(on-start (issue-unbounded-credit! tcp-in id))
|
||||||
(define !! (make-flow-controlled-sender message-poa->server id))
|
(define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p)))))
|
||||||
(define accumulate! (packet-accumulator (lambda (p) (!! (message-poa->server id p)))))
|
|
||||||
(on (message (tcp-in id $bs))
|
(on (message (tcp-in id $bs))
|
||||||
(accumulate! bs))
|
(accumulate! bs))
|
||||||
(on (message (message-server->poa id $p))
|
(on (message (message-server->poa id $p))
|
||||||
|
|
|
@ -23,12 +23,11 @@
|
||||||
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
|
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
|
||||||
(send! (message-server->poa id (Ping))))
|
(send! (message-server->poa id (Ping))))
|
||||||
|
|
||||||
(define !! (make-flow-controlled-sender message-poa->server id))
|
|
||||||
(on (message (websocket-in id $body))
|
(on (message (websocket-in id $body))
|
||||||
(define-values (packet remainder) (decode body))
|
(define-values (packet remainder) (decode body))
|
||||||
(when (not (equal? remainder #""))
|
(when (not (equal? remainder #""))
|
||||||
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
|
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
|
||||||
(!! (message-poa->server id packet)))
|
(send! (message-poa->server id packet)))
|
||||||
(on (message (message-server->poa id $p))
|
(on (message (message-server->poa id $p))
|
||||||
(send! (websocket-out id (encode p)))
|
(send! (websocket-out id (encode p)))
|
||||||
(when (Err? p) (stop-current-facet))))
|
(when (Err? p) (stop-current-facet))))
|
||||||
|
|
|
@ -5,11 +5,9 @@
|
||||||
(require (prefix-in preserves: preserves))
|
(require (prefix-in preserves: preserves))
|
||||||
(require bitsyntax)
|
(require bitsyntax)
|
||||||
(require (only-in net/rfc6455 ws-idle-timeout))
|
(require (only-in net/rfc6455 ws-idle-timeout))
|
||||||
(require imperative-syndicate/protocol/credit)
|
|
||||||
|
|
||||||
;; Enrolment
|
;; Enrolment
|
||||||
(message-struct Connect (scope)) ;; Client --> Server
|
(message-struct Connect (scope)) ;; Client --> Server
|
||||||
(message-struct Peer (scope)) ;; Peer --> Peer
|
|
||||||
|
|
||||||
;; Actions; Client --> Server (and Peer --> Peer, except for Message)
|
;; Actions; Client --> Server (and Peer --> Peer, except for Message)
|
||||||
(message-struct Assert (endpoint-name assertion))
|
(message-struct Assert (endpoint-name assertion))
|
||||||
|
@ -26,12 +24,6 @@
|
||||||
(message-struct Ping ())
|
(message-struct Ping ())
|
||||||
(message-struct Pong ())
|
(message-struct Pong ())
|
||||||
|
|
||||||
;; Peering
|
|
||||||
;; =======
|
|
||||||
;;
|
|
||||||
;; To peer, send `(Peer Scope)` at the start of a connection instead
|
|
||||||
;; of the usual `(Connect Scope)`.
|
|
||||||
;;
|
|
||||||
;; In peer mode, *actions* and *events* travel in *both* directions,
|
;; In peer mode, *actions* and *events* travel in *both* directions,
|
||||||
;; but `Message`s do not appear and (for now) `Assert` is only used to
|
;; but `Message`s do not appear and (for now) `Assert` is only used to
|
||||||
;; establish `observe`s, i.e. subscriptions.
|
;; establish `observe`s, i.e. subscriptions.
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
(define host (make-parameter "localhost"))
|
(define host (make-parameter "localhost"))
|
||||||
(define port (make-parameter 8001))
|
(define port (make-parameter 8001))
|
||||||
(define scope (make-parameter "local"))
|
(define scope (make-parameter "chat"))
|
||||||
|
|
||||||
(module+ main
|
(module+ main
|
||||||
(require racket/cmdline)
|
(require racket/cmdline)
|
||||||
|
|
Loading…
Reference in New Issue