2021-06-04 13:56:03 +00:00
|
|
|
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
|
|
|
;;; SPDX-FileCopyrightText: Copyright © 2010-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
2021-06-01 15:19:24 +00:00
|
|
|
|
2020-04-27 18:27:48 +00:00
|
|
|
#lang syndicate
|
2019-05-09 10:17:37 +00:00
|
|
|
;; Relays for federation, both "client" (outbound) and "server" (inbound) ends.
|
|
|
|
|
|
|
|
(require "wire-protocol.rkt")
|
|
|
|
(require "internal-protocol.rkt")
|
|
|
|
(require "protocol.rkt")
|
2019-06-11 17:33:37 +00:00
|
|
|
(require "buffer.rkt")
|
|
|
|
(require "turn.rkt")
|
2019-05-09 10:17:37 +00:00
|
|
|
|
2020-04-27 18:27:48 +00:00
|
|
|
(require syndicate/term)
|
|
|
|
(require syndicate/reassert)
|
2019-05-09 10:17:37 +00:00
|
|
|
(require racket/set)
|
|
|
|
|
2020-04-27 18:27:48 +00:00
|
|
|
(require/activate syndicate/drivers/tcp)
|
2019-05-09 10:17:37 +00:00
|
|
|
|
|
|
|
(define-logger syndicate/federation)
|
|
|
|
|
|
|
|
;; A federated scope (as distinct from a non-federated server scope)
|
|
|
|
;; communicates via "links" to "peers", which come in three flavours:
|
|
|
|
;; - inbound links, aka "downlinks" from the POV of this node, which
|
|
|
|
;; result from incoming TCP/websocket/etc connections
|
|
|
|
;; - outbound links, aka "uplinks", which reach out to a remote TCP/
|
|
|
|
;; websocket/etc server
|
|
|
|
;; - local links, (usually? always?) just one per scope, which
|
|
|
|
;; connect the federated scope to its local server scope
|
|
|
|
;;
|
|
|
|
;; All links are identified by a link ID, scoped the same as
|
|
|
|
;; connection IDs in <server.rkt> (namely, dataspace-unique). Links
|
|
|
|
;; are stateful.
|
|
|
|
;;
|
2019-05-16 21:28:42 +00:00
|
|
|
;; The link protocol is enacted in special non-federated, local
|
|
|
|
;; federation-management server scopes, identified by
|
|
|
|
;; `federation-management-scope` assertions. The code in this module
|
|
|
|
;; responds to assertions and messages in these scopes. Besides its
|
|
|
|
;; scoped nature, the protocol is otherwise ordinary. By reusing
|
|
|
|
;; Syndicate itself for management and operation of federation, we are
|
|
|
|
;; able to address transport independently of federation.
|
|
|
|
;;
|
|
|
|
;; Inbound links are set up by code outside this module in response to
|
|
|
|
;; the appearance of some new federated peer "downstream" of this one.
|
|
|
|
;; For example, after establishing a new client-server connection to a
|
|
|
|
;; federation-management scope, a remote peer may begin using the link
|
|
|
|
;; protocol.
|
2019-05-09 10:17:37 +00:00
|
|
|
;;
|
|
|
|
;; Outbound links are created in response to an assertion of a
|
2019-05-16 21:28:42 +00:00
|
|
|
;; `federated-uplink` record in a federation-management scope. Each
|
|
|
|
;; such record contains a triple of a local scope ID, a client
|
|
|
|
;; transport address (such as `server-tcp-connection` from
|
|
|
|
;; <client/tcp.rkt>), and a remote scope ID. Together, these federate
|
|
|
|
;; the local and remote scope IDs via a client-server connection to
|
|
|
|
;; the given address.
|
|
|
|
;;
|
|
|
|
;; Local links are a special case of inbound link. They are created
|
|
|
|
;; automatically whenever there is an active server scope of the same
|
|
|
|
;; name as a federated scope.
|
2019-05-09 10:17:37 +00:00
|
|
|
;;
|
2019-05-16 21:28:42 +00:00
|
|
|
;; Local federation-management scopes must not be federated.
|
|
|
|
;; TODO: Enforce this?
|
2019-05-09 10:17:37 +00:00
|
|
|
|
|
|
|
;; Subscription IDs (== "endpoint IDs") must be connection-unique AND
|
|
|
|
;; must correspond one-to-one with a specific subscription spec. That
|
|
|
|
;; is, a subscription ID is merely connection-local shorthand for its
|
|
|
|
;; spec, and two subscription IDs within a connection must be `equal?`
|
|
|
|
;; exactly when their corresponding specs are `equal?`.
|
|
|
|
;;
|
|
|
|
;; Local IDs must be scope-unique. They are used as subscription IDs
|
|
|
|
;; in outbound messages.
|
|
|
|
;;
|
|
|
|
;; Each federated scope maintains a bidirectional mapping between
|
|
|
|
;; subscription IDs (each scoped within its connection ID) and local
|
|
|
|
;; IDs. One local ID may map to multiple subscription IDs - this is
|
|
|
|
;; the place where aggregation pops up.
|
|
|
|
|
|
|
|
;; Unlike the client/server protocol, both Actions and Events are
|
|
|
|
;; BIDIRECTIONAL, travelling in both directions along edges linking
|
|
|
|
;; peer nodes.
|
|
|
|
|
|
|
|
;;---------------------------------------------------------------------------
|
2019-05-16 21:28:42 +00:00
|
|
|
;; Outbound links. (Really, they end up being a kind of "inbound link"
|
|
|
|
;; too! Ultimately we have just *links*, connected to arbitrary
|
|
|
|
;; things. For traditional "inbound", it's some remote party that has
|
|
|
|
;; connected to us; for "local", it's a local server scope; for
|
|
|
|
;; "outbound", it's a connection to another server that we reached out
|
|
|
|
;; to.)
|
|
|
|
|
|
|
|
(spawn #:name 'federated-uplink-factory
|
|
|
|
(during (federation-management-scope $management-scope)
|
|
|
|
(during/spawn (server-envelope management-scope
|
|
|
|
($ link (federated-uplink $local-scope
|
|
|
|
$peer-addr
|
|
|
|
$remote-scope)))
|
|
|
|
#:name link
|
|
|
|
(during (server-connected peer-addr)
|
|
|
|
|
|
|
|
(assert (server-proposal management-scope (federated-uplink-connected link)))
|
|
|
|
;; ^ out to local requester
|
|
|
|
|
|
|
|
(define session-id (strong-gensym 'peer-))
|
|
|
|
(assert (server-proposal management-scope (federated-link session-id local-scope)))
|
|
|
|
(assert (to-server peer-addr (federated-link session-id remote-scope)))
|
|
|
|
|
2019-05-16 22:14:32 +00:00
|
|
|
;; We have to buffer in both directions, because at startup there's latency
|
|
|
|
;; between asserting a federated-link record and it being ready to receive
|
|
|
|
;; message-poa->server records.
|
2019-06-11 17:33:37 +00:00
|
|
|
(define-values (push-in drain-in) (make-buffer))
|
|
|
|
(define-values (push-out drain-out) (make-buffer))
|
2019-05-16 22:14:32 +00:00
|
|
|
|
2019-05-16 21:28:42 +00:00
|
|
|
(on (message (from-server peer-addr (message-server->poa session-id $p)))
|
2019-06-11 17:33:37 +00:00
|
|
|
(push-in p))
|
2019-05-16 21:28:42 +00:00
|
|
|
(on (message (server-envelope management-scope (message-server->poa session-id $p)))
|
2019-06-11 17:33:37 +00:00
|
|
|
(push-out p))
|
2019-05-16 22:14:32 +00:00
|
|
|
|
2019-06-11 17:33:37 +00:00
|
|
|
(define (wrap p) (message-poa->server session-id p))
|
2019-06-11 17:47:56 +00:00
|
|
|
(during (server-envelope management-scope (federated-link-ready session-id))
|
|
|
|
(during (from-server peer-addr (federated-link-ready session-id))
|
2019-06-11 17:33:37 +00:00
|
|
|
(drain-in (lambda (p) (send! (server-proposal management-scope (wrap p)))))
|
|
|
|
(drain-out (lambda (p) (send! (to-server peer-addr (wrap p)))))))))))
|
2019-05-09 10:17:37 +00:00
|
|
|
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
;; Local links.
|
|
|
|
|
|
|
|
(spawn #:name 'federated-local-link-factory
|
2019-06-11 17:33:37 +00:00
|
|
|
|
|
|
|
(struct sub (spec [captures #:mutable]) #:transparent)
|
|
|
|
|
2019-05-16 21:28:42 +00:00
|
|
|
(during (federation-management-scope $management-scope)
|
|
|
|
(during (server-envelope management-scope (federated-link _ $scope))
|
|
|
|
(during/spawn (server-active scope)
|
|
|
|
#:name (list 'local-link management-scope scope)
|
|
|
|
|
|
|
|
(define session-id (gensym 'local-link))
|
|
|
|
(assert (server-proposal management-scope (federated-link session-id scope)))
|
|
|
|
|
|
|
|
(define (!! m)
|
|
|
|
(send! (server-proposal management-scope (message-poa->server session-id m))))
|
|
|
|
|
2019-06-11 17:33:37 +00:00
|
|
|
(define turn (turn-recorder (lambda (items) (!! (Turn items)))))
|
|
|
|
|
|
|
|
(define remote-endpoints (hash))
|
|
|
|
(define local-endpoints (hash))
|
|
|
|
(define local-matches (hash))
|
|
|
|
|
|
|
|
(define (instantiate s vs)
|
|
|
|
(instantiate-term->value (server-envelope scope (sub-spec s)) vs))
|
|
|
|
|
|
|
|
(on (asserted (observe (server-envelope scope $spec)))
|
|
|
|
(define ep (gensym 'ep))
|
|
|
|
(extend-turn! turn (Assert ep (observe spec)))
|
|
|
|
(set! local-endpoints (hash-set local-endpoints spec ep))
|
|
|
|
(set! local-matches (hash-set local-matches ep (sub spec (hash)))))
|
|
|
|
|
|
|
|
(on (retracted (observe (server-envelope scope $spec)))
|
|
|
|
(define ep (hash-ref local-endpoints spec))
|
|
|
|
(extend-turn! turn (Clear ep))
|
|
|
|
(set! local-endpoints (hash-remove local-endpoints spec)))
|
|
|
|
|
2019-05-16 21:28:42 +00:00
|
|
|
(on (message (server-envelope management-scope
|
2019-06-11 17:33:37 +00:00
|
|
|
(message-server->poa session-id (Turn $items))))
|
|
|
|
(for [(item (in-list items))]
|
|
|
|
(match item
|
|
|
|
[(Assert subid (observe spec))
|
|
|
|
(when (hash-has-key? remote-endpoints subid)
|
|
|
|
(error 'local-link "Duplicate endpoint" subid))
|
|
|
|
(react
|
|
|
|
(define ep-facet (current-facet))
|
|
|
|
(set! remote-endpoints (hash-set remote-endpoints subid ep-facet))
|
|
|
|
(on-stop (set! remote-endpoints (hash-remove remote-endpoints subid)))
|
|
|
|
(assert (server-envelope scope (observe spec)))
|
|
|
|
(define ((! ctor) cs) (extend-turn! turn (ctor subid cs)))
|
|
|
|
(add-observer-endpoint! (lambda () (server-proposal scope spec))
|
|
|
|
#:on-add (! Add)
|
|
|
|
#:on-remove (! Del)
|
|
|
|
#:on-message (! Msg)))]
|
|
|
|
[(Clear subid)
|
|
|
|
(stop-facet (hash-ref remote-endpoints subid)
|
|
|
|
(extend-turn! turn (End subid)))]
|
|
|
|
[(Add ep vs) (let* ((s (hash-ref local-matches ep))
|
|
|
|
(a (instantiate s vs)))
|
|
|
|
(set-sub-captures! s (hash-set (sub-captures s) vs a))
|
|
|
|
(assert! a))]
|
|
|
|
[(Del ep vs) (let* ((s (hash-ref local-matches ep))
|
|
|
|
(a (hash-ref (sub-captures s) vs)))
|
|
|
|
(retract! a)
|
|
|
|
(set-sub-captures! s (hash-remove (sub-captures s) vs)))]
|
|
|
|
[(Msg ep vs) (let* ((s (hash-ref local-matches ep)))
|
|
|
|
(send! (instantiate s vs)))]
|
|
|
|
[(End ep) (let* ((s (hash-ref local-matches ep #f)))
|
|
|
|
(when s
|
|
|
|
(for [(a (in-hash-values (sub-captures s)))] (retract! a))
|
|
|
|
(set! local-matches (hash-remove local-matches ep))))])))))))
|
2019-05-09 10:17:37 +00:00
|
|
|
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
;; Federated scopes.
|
|
|
|
|
|
|
|
(spawn #:name 'federated-scope-factory
|
2019-05-16 21:28:42 +00:00
|
|
|
|
2019-06-11 17:33:37 +00:00
|
|
|
(struct subscription (id ;; LocalID
|
|
|
|
spec ;; Assertion
|
|
|
|
holders ;; (Hash LinkID SubscriptionID)
|
|
|
|
matches ;; (Hash (Listof Assertion) (Set LinkID))
|
|
|
|
)
|
|
|
|
#:transparent)
|
2019-05-16 21:28:42 +00:00
|
|
|
|
2019-06-11 17:33:37 +00:00
|
|
|
(during (federation-management-scope $management-scope)
|
2019-05-16 21:28:42 +00:00
|
|
|
(during/spawn (server-envelope management-scope (federated-link _ $scope))
|
|
|
|
#:name (list 'federated-scope management-scope scope)
|
|
|
|
|
|
|
|
;; Generates a fresh local ID naming a subscription propagated to our peers.
|
|
|
|
(define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1))))))
|
|
|
|
|
2019-06-11 17:33:37 +00:00
|
|
|
(field [turns (hash)] ;; (Map LinkID Turn)
|
2019-05-16 21:28:42 +00:00
|
|
|
[specs (hash)] ;; (Hash Spec LocalID)
|
|
|
|
[subs (hasheq)] ;; (Hash LocalID Subscription)
|
2019-05-09 10:17:37 +00:00
|
|
|
)
|
|
|
|
|
2019-06-11 17:33:37 +00:00
|
|
|
(define (send-to-link! peer p)
|
|
|
|
(extend-turn! (hash-ref (turns) peer) p))
|
|
|
|
|
2019-05-09 10:17:37 +00:00
|
|
|
(when (log-level? syndicate/federation-logger 'debug)
|
2019-06-11 17:33:37 +00:00
|
|
|
(begin/dataflow
|
|
|
|
(log-syndicate/federation-debug "~a turns:" scope)
|
|
|
|
(for [((peer turn) (in-hash (turns)))]
|
|
|
|
(log-syndicate/federation-debug " link ~v -> ~v" peer (turn 'debug)))
|
|
|
|
(log-syndicate/federation-debug "-"))
|
|
|
|
(begin/dataflow
|
|
|
|
(log-syndicate/federation-debug "~a specs:" scope)
|
|
|
|
(for [((spec local) (in-hash (specs)))]
|
|
|
|
(log-syndicate/federation-debug " spec ~v -> local ~a" spec local))
|
|
|
|
(log-syndicate/federation-debug "-"))
|
|
|
|
(begin/dataflow
|
|
|
|
(log-syndicate/federation-debug "~a subs:" scope)
|
|
|
|
(for [((local sub) (in-hash (subs)))]
|
|
|
|
(match-define (subscription _id spec holders matches) sub)
|
|
|
|
(log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec)
|
|
|
|
(when (not (hash-empty? holders))
|
|
|
|
(log-syndicate/federation-debug " holders:")
|
|
|
|
(for [((link ep) (in-hash holders))]
|
|
|
|
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
|
|
|
|
(when (not (hash-empty? matches))
|
|
|
|
(log-syndicate/federation-debug " matches:")
|
|
|
|
(for [((captures holders) (in-hash matches))]
|
|
|
|
(log-syndicate/federation-debug " captures ~v held by ~a"
|
|
|
|
captures holders))))
|
|
|
|
(log-syndicate/federation-debug "-")))
|
|
|
|
|
|
|
|
(define (call-with-sub localid linkid f #:not-found-ok? [not-found-ok? #t])
|
2019-05-16 21:28:42 +00:00
|
|
|
(match (hash-ref (subs) localid #f)
|
2019-06-11 17:33:37 +00:00
|
|
|
[#f (when (not not-found-ok?)
|
|
|
|
(log-syndicate/federation-error
|
|
|
|
"Mention of nonexistent local ID ~v from link ~v. Ignoring."
|
|
|
|
localid linkid))]
|
2019-05-16 21:28:42 +00:00
|
|
|
[sub (f sub)]))
|
|
|
|
|
2019-05-20 20:49:19 +00:00
|
|
|
(define (store-sub! sub)
|
|
|
|
(match-define (subscription localid spec holders matches) sub)
|
|
|
|
(if (and (hash-empty? holders) (hash-empty? matches))
|
|
|
|
(begin (specs (hash-remove (specs) spec))
|
|
|
|
(subs (hash-remove (subs) localid)))
|
|
|
|
(subs (hash-set (subs) localid sub))))
|
|
|
|
|
2019-05-16 21:28:42 +00:00
|
|
|
(define (unsubscribe! localid linkid)
|
|
|
|
(call-with-sub
|
2019-06-11 17:33:37 +00:00
|
|
|
#:not-found-ok? #f
|
2019-05-16 21:28:42 +00:00
|
|
|
localid linkid
|
|
|
|
(lambda (sub)
|
|
|
|
(define new-holders (hash-remove (subscription-holders sub) linkid))
|
2019-05-20 20:49:19 +00:00
|
|
|
(store-sub! (struct-copy subscription sub [holders new-holders]))
|
2019-05-16 21:28:42 +00:00
|
|
|
|
|
|
|
;; The messages we send depend on (hash-count new-holders):
|
|
|
|
;; - if >1, there are enough other active subscribers that we don't need to send
|
|
|
|
;; any messages.
|
|
|
|
;; - if =1, we retract the subscription from that peer (INVARIANT: will not be linkid)
|
|
|
|
;; - if =0, we retract the subscription from all peers except linkid
|
|
|
|
|
|
|
|
(match (hash-count new-holders)
|
2019-06-11 17:33:37 +00:00
|
|
|
[0 (for [((peer turn) (in-hash (turns)))]
|
2019-05-09 10:17:37 +00:00
|
|
|
(when (not (equal? peer linkid))
|
2019-06-11 17:33:37 +00:00
|
|
|
(extend-turn! turn (Clear localid))))]
|
2019-05-16 21:28:42 +00:00
|
|
|
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid
|
|
|
|
(send-to-link! peer (Clear localid)))]
|
|
|
|
[_ (void)]))))
|
|
|
|
|
2019-05-20 20:49:19 +00:00
|
|
|
(define (remove-match! localid captures linkid)
|
2019-05-16 21:28:42 +00:00
|
|
|
(call-with-sub
|
|
|
|
localid linkid
|
|
|
|
(lambda (sub)
|
2019-05-20 20:49:19 +00:00
|
|
|
(define old-matches (subscription-matches sub))
|
|
|
|
(define old-match-holders (hash-ref old-matches captures set))
|
|
|
|
(define new-match-holders (set-remove old-match-holders linkid))
|
2019-05-20 21:07:02 +00:00
|
|
|
(define new-matches (if (set-empty? new-match-holders)
|
|
|
|
(hash-remove old-matches captures)
|
|
|
|
(hash-set old-matches captures new-match-holders)))
|
2019-05-20 20:49:19 +00:00
|
|
|
(store-sub! (struct-copy subscription sub [matches new-matches]))
|
|
|
|
(match (set-count new-match-holders)
|
|
|
|
[0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))]
|
|
|
|
(when (not (equal? peer linkid))
|
|
|
|
(send-to-link! peer (Del peer-subid captures))))]
|
|
|
|
[1 (for [(peer (in-set new-match-holders))] ;; only one, ≠ linkid
|
|
|
|
(define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f))
|
|
|
|
(when maybe-peer-subid
|
|
|
|
(send-to-link! peer (Del maybe-peer-subid captures))))]
|
|
|
|
[_ (void)]))))
|
2019-05-16 21:28:42 +00:00
|
|
|
|
|
|
|
(during (server-envelope management-scope (federated-link $linkid scope))
|
2019-06-11 17:47:56 +00:00
|
|
|
(assert (server-proposal management-scope (federated-link-ready linkid)))
|
2019-05-16 21:28:42 +00:00
|
|
|
|
2019-06-11 17:33:37 +00:00
|
|
|
(define turn (turn-recorder
|
|
|
|
(lambda (items)
|
|
|
|
(send! (server-proposal management-scope
|
|
|
|
(message-server->poa linkid (Turn items)))))))
|
2019-05-16 21:28:42 +00:00
|
|
|
|
|
|
|
(field [link-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
2019-05-20 20:49:19 +00:00
|
|
|
[link-matches (hash)] ;; (Hash LocalID (Set (Listof Assertion)))
|
2019-05-16 21:28:42 +00:00
|
|
|
)
|
|
|
|
|
2019-06-11 23:23:39 +00:00
|
|
|
(define (err! detail [context #f])
|
|
|
|
(send! (server-proposal management-scope (message-server->poa linkid
|
|
|
|
(Err detail context))))
|
2019-06-11 17:33:37 +00:00
|
|
|
(reset-turn! turn)
|
|
|
|
(stop-current-facet))
|
|
|
|
|
|
|
|
(on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid)
|
|
|
|
(turns (hash-set (turns) linkid turn))
|
|
|
|
(for ([(spec localid) (in-hash (specs))])
|
2019-06-20 21:54:59 +00:00
|
|
|
(when (not (hash-empty? (subscription-holders (hash-ref (subs) localid))))
|
|
|
|
(extend-turn! turn (Assert localid (observe spec)))))
|
2019-06-11 17:33:37 +00:00
|
|
|
(commit-turn! turn))
|
|
|
|
|
|
|
|
(on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid)
|
|
|
|
(turns (hash-remove (turns) linkid))
|
|
|
|
(for [((localid matches) (in-hash (link-matches)))]
|
|
|
|
(for [(captures (in-set matches))]
|
|
|
|
(remove-match! localid captures linkid)))
|
|
|
|
(for ([localid (in-hash-values (link-subs))])
|
|
|
|
(unsubscribe! localid linkid))
|
|
|
|
(commit-turn! turn))
|
|
|
|
|
2019-05-16 21:28:42 +00:00
|
|
|
(when (log-level? syndicate/federation-logger 'debug)
|
|
|
|
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid)
|
|
|
|
(for [((sub local) (in-hash (link-subs)))]
|
|
|
|
(log-syndicate/federation-debug " sub ~a -> local ~a" sub local))
|
|
|
|
(log-syndicate/federation-debug "-"))
|
|
|
|
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid)
|
2019-05-20 20:49:19 +00:00
|
|
|
(for [((local matches) (in-hash (link-matches)))]
|
|
|
|
(for [(captures (in-set matches))]
|
|
|
|
(log-syndicate/federation-debug " local ~a captures ~v"
|
|
|
|
local captures)))
|
2019-05-16 21:28:42 +00:00
|
|
|
(log-syndicate/federation-debug "-")))
|
|
|
|
|
2019-06-13 11:51:11 +00:00
|
|
|
(stop-when
|
|
|
|
(message (server-envelope management-scope
|
|
|
|
(message-poa->server linkid (Err $detail $context))))
|
|
|
|
(log-syndicate/federation-error
|
|
|
|
"Received Err from peer link ~v: detail ~v; context ~v"
|
|
|
|
linkid
|
|
|
|
detail
|
|
|
|
context)
|
|
|
|
(reset-turn! turn))
|
|
|
|
|
2019-05-16 21:28:42 +00:00
|
|
|
(on (message (server-envelope management-scope
|
2019-06-11 17:33:37 +00:00
|
|
|
(message-poa->server linkid (Turn $items))))
|
|
|
|
(for [(item (in-list items))]
|
|
|
|
(match item
|
|
|
|
[(Assert subid (observe spec))
|
|
|
|
(define known? (hash-has-key? (specs) spec))
|
|
|
|
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
|
|
|
(define sub (hash-ref (subs) localid (lambda () (subscription localid
|
|
|
|
spec
|
|
|
|
(hash)
|
|
|
|
(hash)))))
|
|
|
|
(define holders (subscription-holders sub))
|
|
|
|
(cond
|
|
|
|
[(hash-has-key? holders linkid)
|
|
|
|
(log-syndicate/federation-error
|
|
|
|
"Duplicate subscription ~a, ID ~a, from link ~a."
|
|
|
|
spec subid linkid)
|
2019-06-11 23:23:39 +00:00
|
|
|
(err! 'duplicate-endpoint item)]
|
2019-06-11 17:33:37 +00:00
|
|
|
[else
|
|
|
|
(link-subs (hash-set (link-subs) subid localid))
|
|
|
|
(when (not known?) (specs (hash-set (specs) spec localid)))
|
|
|
|
(subs (hash-set (subs)
|
|
|
|
localid
|
|
|
|
(struct-copy subscription sub
|
|
|
|
[holders (hash-set holders linkid subid)])))
|
|
|
|
|
|
|
|
;; If not known, then relay the subscription to all peers except `linkid`.
|
|
|
|
;;
|
|
|
|
;; If known, then one or more links that aren't this one have previously
|
|
|
|
;; subscribed with this spec. If exactly one other link has previously
|
|
|
|
;; subscribed, the only subscription that needs sent is to that peer;
|
|
|
|
;; otherwise, no subscriptions at all need sent, since everyone has already
|
|
|
|
;; been informed of this subscription.
|
|
|
|
|
|
|
|
(cond
|
|
|
|
[(not known?)
|
|
|
|
(for [((peer peer-turn) (in-hash (turns)))]
|
|
|
|
(when (not (equal? peer linkid))
|
|
|
|
(extend-turn! peer-turn (Assert localid (observe spec)))))]
|
|
|
|
[(= (hash-count holders) 1)
|
|
|
|
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid
|
|
|
|
(send-to-link! peer (Assert localid (observe spec))))]
|
|
|
|
[else
|
|
|
|
(void)])
|
|
|
|
|
|
|
|
;; Once subscription relaying has taken place, send up matches to the active
|
|
|
|
;; link.
|
|
|
|
(for [((captures match-holders) (in-hash (subscription-matches sub)))]
|
|
|
|
;; Compute the number of times someone OTHER THAN this link has asserted
|
|
|
|
;; a match to this spec. If it's nonzero, we need to hear about it:
|
|
|
|
(when (not (set-empty? (set-remove match-holders linkid)))
|
|
|
|
(extend-turn! turn (Add subid captures))))
|
|
|
|
|
|
|
|
])]
|
|
|
|
[(Clear subid)
|
|
|
|
(match (hash-ref (link-subs) subid #f)
|
|
|
|
[#f (log-syndicate/federation-error
|
|
|
|
"Mention of nonexistent subscription ID ~v from link ~v."
|
|
|
|
subid linkid)
|
2019-06-11 23:23:39 +00:00
|
|
|
(err! 'nonexistent-endpoint item)]
|
2019-06-11 17:33:37 +00:00
|
|
|
[localid
|
|
|
|
(link-subs (hash-remove (link-subs) subid))
|
2019-06-13 11:50:55 +00:00
|
|
|
(unsubscribe! localid linkid)])
|
|
|
|
(extend-turn! turn (End subid))]
|
2019-06-11 17:33:37 +00:00
|
|
|
[(End localid)
|
|
|
|
(for [(captures (in-set (hash-ref (link-matches) localid set)))]
|
|
|
|
(remove-match! localid captures linkid))
|
|
|
|
(link-matches (hash-remove (link-matches) localid))]
|
|
|
|
[(Add localid captures)
|
|
|
|
(define matches (hash-ref (link-matches) localid set))
|
|
|
|
(cond
|
|
|
|
[(set-member? matches captures)
|
2019-06-11 23:23:39 +00:00
|
|
|
(err! 'duplicate-capture item)]
|
2019-06-11 17:33:37 +00:00
|
|
|
[else
|
|
|
|
(link-matches (hash-set (link-matches) localid (set-add matches captures)))
|
|
|
|
(call-with-sub
|
|
|
|
localid linkid
|
|
|
|
(lambda (sub)
|
|
|
|
(define old-matches (subscription-matches sub))
|
|
|
|
(define old-match-holders (hash-ref old-matches captures set))
|
|
|
|
(define new-match-holders (set-add old-match-holders linkid))
|
|
|
|
(define new-matches (hash-set old-matches captures new-match-holders))
|
|
|
|
(store-sub! (struct-copy subscription sub [matches new-matches]))
|
|
|
|
(match (set-count old-match-holders)
|
|
|
|
[0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))]
|
|
|
|
(when (not (equal? peer linkid))
|
|
|
|
(send-to-link! peer (Add peer-subid captures))))]
|
|
|
|
[1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid
|
|
|
|
(define peer-subid (hash-ref (subscription-holders sub) peer #f))
|
|
|
|
(when peer-subid ;; the other holder may not itself subscribe!
|
|
|
|
(send-to-link! peer (Add peer-subid captures))))]
|
|
|
|
[_ (void)])))])]
|
|
|
|
[(Del localid captures)
|
|
|
|
(define matches (hash-ref (link-matches) localid set))
|
|
|
|
(if (not (set-member? matches captures))
|
2019-06-11 23:23:39 +00:00
|
|
|
(err! 'nonexistent-capture item)
|
2019-06-11 17:33:37 +00:00
|
|
|
(let ((new-matches (set-remove matches captures)))
|
|
|
|
(link-matches (if (set-empty? new-matches)
|
|
|
|
(hash-remove (link-matches) localid)
|
|
|
|
(hash-set (link-matches) localid new-matches)))
|
|
|
|
(remove-match! localid captures linkid)))]
|
|
|
|
[(Msg localid captures)
|
|
|
|
(call-with-sub
|
|
|
|
localid linkid
|
|
|
|
(lambda (sub)
|
|
|
|
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
|
|
|
(when (not (equal? peer linkid))
|
|
|
|
(send-to-link! peer (Msg peer-subid captures))))))]
|
|
|
|
)))))))
|