2019-05-02 14:59:25 +00:00
|
|
|
#lang imperative-syndicate
|
|
|
|
|
2019-05-05 14:57:51 +00:00
|
|
|
(provide (struct-out router-connection)
|
2019-05-02 14:59:25 +00:00
|
|
|
(struct-out router-inbound)
|
|
|
|
(struct-out router-outbound))
|
|
|
|
|
2019-05-05 14:57:51 +00:00
|
|
|
(require "wire-protocol.rkt")
|
2019-05-02 14:59:25 +00:00
|
|
|
(require imperative-syndicate/bag)
|
2019-05-05 14:57:51 +00:00
|
|
|
(require racket/set)
|
2019-05-02 14:59:25 +00:00
|
|
|
|
|
|
|
;; Node IDs must be dataspace-unique.
|
|
|
|
;;
|
|
|
|
;; Connection IDs must be dataspace-unique. They are properly
|
|
|
|
;; node-unique, but for brevity assertions omit the node ID in most
|
|
|
|
;; places.
|
|
|
|
;;
|
|
|
|
;; Subscription IDs must be connection-unique AND must correspond
|
|
|
|
;; one-to-one with a specific subscription spec. That is, a
|
|
|
|
;; subscription ID is merely shorthand for its spec, and two
|
|
|
|
;; subscription IDs within a connection must be `equal?` exactly when
|
|
|
|
;; their corresponding specs are `equal?`.
|
|
|
|
;;
|
|
|
|
;; Local IDs must be node-unique. They are used as subscription IDs in
|
|
|
|
;; outbound messages.
|
|
|
|
;;
|
|
|
|
;; Nodes maintain a bidirectional mapping between subscription IDs
|
|
|
|
;; (scoped within their connection ID) and local IDs. One local ID may
|
|
|
|
;; map to multiple subscription IDs - this is the place where
|
|
|
|
;; aggregation pops up.
|
|
|
|
|
2019-05-05 15:37:03 +00:00
|
|
|
;; Unlike the server protocol, both Actions and Events are
|
2019-05-02 14:59:25 +00:00
|
|
|
;; BIDIRECTIONAL, travelling in both directions along edges linking
|
|
|
|
;; peer nodes.
|
|
|
|
|
|
|
|
;; Connection protocol
|
|
|
|
(assertion-struct router-connection (node-id connection-id))
|
|
|
|
(message-struct router-inbound (connection-id body))
|
|
|
|
(message-struct router-outbound (connection-id body))
|
|
|
|
|
|
|
|
;; Internal state
|
|
|
|
(struct subscription (id ;; LocalID
|
|
|
|
spec ;; Assertion
|
|
|
|
holders ;; (Hash ConnectionID SubscriptionID)
|
|
|
|
matches ;; (Bag (Listof Assertion))
|
|
|
|
)
|
|
|
|
#:transparent)
|
|
|
|
|
|
|
|
(define-logger syndicate/federation)
|
|
|
|
|
|
|
|
(spawn #:name 'router
|
|
|
|
(during/spawn (observe (router-connection $nodeid _))
|
|
|
|
#:name (list 'router nodeid)
|
|
|
|
|
|
|
|
;; Generates a fresh local ID naming a subscription
|
|
|
|
;; propagated to our peers.
|
|
|
|
(define make-localid
|
|
|
|
(let ((next 0))
|
|
|
|
(lambda () (begin0 next (set! next (+ next 1))))))
|
|
|
|
|
|
|
|
(field [peers (set)] ;; (Set ConnID)
|
|
|
|
[specs (hash)] ;; (Hash Spec LocalID)
|
|
|
|
[subs (hasheq)] ;; (Hash LocalID Subscription)
|
|
|
|
)
|
|
|
|
|
|
|
|
(when (log-level? syndicate/federation-logger 'debug)
|
|
|
|
(begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" nodeid (peers)))
|
|
|
|
(begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" nodeid (specs)))
|
|
|
|
(begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" nodeid (subs))))
|
|
|
|
|
2019-05-02 22:12:02 +00:00
|
|
|
(define (call-with-sub localid connid f)
|
|
|
|
(match (hash-ref (subs) localid #f)
|
|
|
|
[#f (log-syndicate/federation-error
|
|
|
|
"Mention of nonexistent local ID ~v from connection ~v. Ignoring."
|
|
|
|
localid
|
|
|
|
connid)]
|
|
|
|
[sub (f sub)]))
|
2019-05-02 14:59:25 +00:00
|
|
|
|
|
|
|
(define (unsubscribe! localid connid)
|
2019-05-02 22:12:02 +00:00
|
|
|
(call-with-sub
|
|
|
|
localid connid
|
|
|
|
(lambda (sub)
|
|
|
|
(define new-holders (hash-remove (subscription-holders sub) connid))
|
2019-05-03 10:20:12 +00:00
|
|
|
(if (hash-empty? new-holders)
|
|
|
|
(begin (specs (hash-remove (specs) (subscription-spec sub)))
|
|
|
|
(subs (hash-remove (subs) localid)))
|
|
|
|
(subs (hash-set (subs) localid (struct-copy subscription sub
|
2019-05-02 22:12:02 +00:00
|
|
|
[holders new-holders]))))
|
|
|
|
|
|
|
|
;; The messages we send depend on (hash-count new-holders):
|
|
|
|
;; - if >1, there are enough other active subscribers that we don't need to send
|
|
|
|
;; any messages.
|
|
|
|
;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid)
|
|
|
|
;; - if =0, we retract the subscription from all peers except connid
|
|
|
|
|
|
|
|
(match (hash-count new-holders)
|
|
|
|
[0 (for [(peer (in-set (peers)))]
|
|
|
|
(when (not (equal? peer connid))
|
2019-05-05 14:54:28 +00:00
|
|
|
(send! (router-outbound peer (Clear localid)))))]
|
2019-05-02 22:12:02 +00:00
|
|
|
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid
|
2019-05-05 14:54:28 +00:00
|
|
|
(send! (router-outbound peer (Clear localid))))]
|
2019-05-02 22:12:02 +00:00
|
|
|
[_ (void)]))))
|
2019-05-02 14:59:25 +00:00
|
|
|
|
|
|
|
(define (adjust-matches localid connid captures delta expected-outcome ctor)
|
2019-05-02 22:12:02 +00:00
|
|
|
(call-with-sub
|
|
|
|
localid connid
|
|
|
|
(lambda (sub)
|
|
|
|
(define-values (new-matches outcome)
|
|
|
|
(bag-change (subscription-matches sub) captures delta #:clamp? #t))
|
|
|
|
(subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches])))
|
|
|
|
(when (eq? outcome expected-outcome)
|
|
|
|
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
|
|
|
(when (not (equal? peer connid))
|
|
|
|
(send! (router-outbound peer (ctor peer-subid captures)))))))))
|
2019-05-02 14:59:25 +00:00
|
|
|
|
|
|
|
(during (observe (router-connection nodeid $connid))
|
|
|
|
(assert (router-connection nodeid connid))
|
|
|
|
|
|
|
|
(on-start (peers (set-add (peers) connid)))
|
|
|
|
(on-stop (peers (set-remove (peers) connid)))
|
|
|
|
|
|
|
|
(field [conn-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
|
|
|
[conn-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
|
|
|
|
)
|
|
|
|
|
|
|
|
(when (log-level? syndicate/federation-logger 'debug)
|
|
|
|
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-subs ~v"
|
|
|
|
nodeid connid (conn-subs)))
|
|
|
|
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-matches ~v"
|
|
|
|
nodeid connid (conn-matches))))
|
|
|
|
|
|
|
|
(on-start (for ([(spec localid) (in-hash (specs))])
|
2019-05-05 14:54:28 +00:00
|
|
|
(send! (router-outbound connid (Assert localid (observe spec))))))
|
2019-05-02 14:59:25 +00:00
|
|
|
|
|
|
|
(on-stop (for ([item (in-bag (conn-matches))])
|
|
|
|
(match-define (cons localid captures) item)
|
|
|
|
(adjust-matches localid connid captures -1 'present->absent Del))
|
|
|
|
(for ([localid (in-hash-values (conn-subs))])
|
|
|
|
(unsubscribe! localid connid)))
|
|
|
|
|
2019-05-05 14:54:28 +00:00
|
|
|
(on (message (router-inbound connid (Assert $subid (observe $spec))))
|
2019-05-02 14:59:25 +00:00
|
|
|
(define known? (hash-has-key? (specs) spec))
|
|
|
|
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
|
|
|
(define sub
|
|
|
|
(hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag)))))
|
|
|
|
(define holders (subscription-holders sub))
|
|
|
|
(cond
|
|
|
|
[(hash-has-key? holders connid)
|
|
|
|
(log-syndicate/federation-error
|
|
|
|
"Duplicate subscription ~a, ID ~a, from connection ~a. Ignoring."
|
|
|
|
spec
|
|
|
|
subid
|
|
|
|
connid)]
|
|
|
|
[else
|
|
|
|
(conn-subs (hash-set (conn-subs) subid localid))
|
|
|
|
(when (not known?) (specs (hash-set (specs) spec localid)))
|
|
|
|
(subs (hash-set (subs) localid (struct-copy subscription sub
|
|
|
|
[holders (hash-set holders connid subid)])))
|
|
|
|
|
|
|
|
;; If not known, then relay the subscription to all peers except `connid`.
|
|
|
|
;;
|
|
|
|
;; If known, then one or more connections that aren't this one have
|
|
|
|
;; previously subscribed with this spec. If exactly one other connection has
|
|
|
|
;; previously subscribed, the only subscription that needs sent is to that
|
|
|
|
;; peer; otherwise, no subscriptions at all need sent, since everyone has
|
|
|
|
;; already been informed of this subscription.
|
|
|
|
|
|
|
|
(cond
|
|
|
|
[(not known?)
|
|
|
|
(for [(peer (in-set (peers)))]
|
|
|
|
(when (not (equal? peer connid))
|
2019-05-05 14:54:28 +00:00
|
|
|
(send! (router-outbound peer (Assert localid (observe spec))))))]
|
2019-05-02 14:59:25 +00:00
|
|
|
[(= (hash-count holders) 1)
|
|
|
|
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ connid
|
2019-05-05 14:54:28 +00:00
|
|
|
(send! (router-outbound peer (Assert localid (observe spec)))))]
|
2019-05-02 14:59:25 +00:00
|
|
|
[else
|
|
|
|
(void)])
|
|
|
|
|
|
|
|
;; Once subscription relaying has taken place, send up matches to the active
|
|
|
|
;; connection.
|
|
|
|
(for [(captures (in-bag (subscription-matches sub)))]
|
|
|
|
(send! (router-outbound connid (Add subid captures))))
|
|
|
|
|
|
|
|
]))
|
|
|
|
|
2019-05-05 14:54:28 +00:00
|
|
|
(on (message (router-inbound connid (Clear $subid)))
|
2019-05-02 14:59:25 +00:00
|
|
|
(match (hash-ref (conn-subs) subid #f)
|
|
|
|
[#f (log-syndicate/federation-error
|
|
|
|
"Mention of nonexistent subscription ID ~v from connection ~v. Ignoring."
|
|
|
|
subid
|
|
|
|
connid)]
|
2019-05-03 09:23:08 +00:00
|
|
|
[localid
|
|
|
|
(conn-subs (hash-remove (conn-subs) subid))
|
|
|
|
(unsubscribe! localid connid)]))
|
2019-05-02 14:59:25 +00:00
|
|
|
|
|
|
|
(define (relay-add-or-del localid captures delta expected-outcome ctor)
|
|
|
|
(define-values (new-conn-matches conn-outcome)
|
|
|
|
(bag-change (conn-matches) (cons localid captures) delta #:clamp? #t))
|
|
|
|
(conn-matches new-conn-matches)
|
|
|
|
(when (eq? conn-outcome expected-outcome)
|
|
|
|
(adjust-matches localid connid captures delta expected-outcome ctor)))
|
|
|
|
|
|
|
|
(on (message (router-inbound connid (Add $localid $captures)))
|
|
|
|
(relay-add-or-del localid captures +1 'absent->present Add))
|
|
|
|
|
|
|
|
(on (message (router-inbound connid (Del $localid $captures)))
|
|
|
|
(relay-add-or-del localid captures -1 'present->absent Del))
|
|
|
|
|
|
|
|
(on (message (router-inbound connid (Msg $localid $captures)))
|
2019-05-02 22:12:02 +00:00
|
|
|
(call-with-sub
|
|
|
|
localid connid
|
|
|
|
(lambda (sub)
|
|
|
|
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
|
|
|
(when (not (equal? peer connid))
|
|
|
|
(send! (router-outbound peer (Msg peer-subid captures))))))))
|
2019-05-02 14:59:25 +00:00
|
|
|
|
|
|
|
)))
|