#lang imperative-syndicate (provide (struct-out Assert) (struct-out Clear) (struct-out Add) (struct-out Del) (struct-out Msg) (struct-out router-connection) (struct-out router-inbound) (struct-out router-outbound)) (require racket/set) (require imperative-syndicate/bag) ;; Node IDs must be dataspace-unique. ;; ;; Connection IDs must be dataspace-unique. They are properly ;; node-unique, but for brevity assertions omit the node ID in most ;; places. ;; ;; Subscription IDs must be connection-unique AND must correspond ;; one-to-one with a specific subscription spec. That is, a ;; subscription ID is merely shorthand for its spec, and two ;; subscription IDs within a connection must be `equal?` exactly when ;; their corresponding specs are `equal?`. ;; ;; Local IDs must be node-unique. They are used as subscription IDs in ;; outbound messages. ;; ;; Nodes maintain a bidirectional mapping between subscription IDs ;; (scoped within their connection ID) and local IDs. One local ID may ;; map to multiple subscription IDs - this is the place where ;; aggregation pops up. ;; Unlike the broker protocol, both Actions and Events are ;; BIDIRECTIONAL, travelling in both directions along edges linking ;; peer nodes. ;; Actions - like the client/server protocol, but lacking Message (message-struct Assert (endpoint-name assertion)) (message-struct Clear (endpoint-name)) ;; Events (message-struct Add (endpoint-name captures)) (message-struct Del (endpoint-name captures)) (message-struct Msg (endpoint-name captures)) ;; Connection protocol (assertion-struct router-connection (node-id connection-id)) (message-struct router-inbound (connection-id body)) (message-struct router-outbound (connection-id body)) ;; Internal state (struct subscription (id ;; LocalID spec ;; Assertion holders ;; (Hash ConnectionID SubscriptionID) matches ;; (Bag (Listof Assertion)) ) #:transparent) (define-logger syndicate/federation) (spawn #:name 'router (during/spawn (observe (router-connection $nodeid _)) #:name (list 'router nodeid) ;; Generates a fresh local ID naming a subscription ;; propagated to our peers. (define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1)))))) (field [peers (set)] ;; (Set ConnID) [specs (hash)] ;; (Hash Spec LocalID) [subs (hasheq)] ;; (Hash LocalID Subscription) ) (when (log-level? syndicate/federation-logger 'debug) (begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" nodeid (peers))) (begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" nodeid (specs))) (begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" nodeid (subs)))) (define (call-with-sub localid connid f) (match (hash-ref (subs) localid #f) [#f (log-syndicate/federation-error "Mention of nonexistent local ID ~v from connection ~v. Ignoring." localid connid)] [sub (f sub)])) (define (unsubscribe! localid connid) (call-with-sub localid connid (lambda (sub) (define new-holders (hash-remove (subscription-holders sub) connid)) (if (hash-empty? new-holders) (begin (specs (hash-remove (specs) (subscription-spec sub))) (subs (hash-remove (subs) localid))) (subs (hash-set (subs) localid (struct-copy subscription sub [holders new-holders])))) ;; The messages we send depend on (hash-count new-holders): ;; - if >1, there are enough other active subscribers that we don't need to send ;; any messages. ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid) ;; - if =0, we retract the subscription from all peers except connid (match (hash-count new-holders) [0 (for [(peer (in-set (peers)))] (when (not (equal? peer connid)) (send! (router-outbound peer (Clear localid)))))] [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid (send! (router-outbound peer (Clear localid))))] [_ (void)])))) (define (adjust-matches localid connid captures delta expected-outcome ctor) (call-with-sub localid connid (lambda (sub) (define-values (new-matches outcome) (bag-change (subscription-matches sub) captures delta #:clamp? #t)) (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) (when (eq? outcome expected-outcome) (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) (when (not (equal? peer connid)) (send! (router-outbound peer (ctor peer-subid captures))))))))) (during (observe (router-connection nodeid $connid)) (assert (router-connection nodeid connid)) (on-start (peers (set-add (peers) connid))) (on-stop (peers (set-remove (peers) connid))) (field [conn-subs (hash)] ;; (Hash SubscriptionID LocalID) [conn-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) ) (when (log-level? syndicate/federation-logger 'debug) (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-subs ~v" nodeid connid (conn-subs))) (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-matches ~v" nodeid connid (conn-matches)))) (on-start (for ([(spec localid) (in-hash (specs))]) (send! (router-outbound connid (Assert localid (observe spec)))))) (on-stop (for ([item (in-bag (conn-matches))]) (match-define (cons localid captures) item) (adjust-matches localid connid captures -1 'present->absent Del)) (for ([localid (in-hash-values (conn-subs))]) (unsubscribe! localid connid))) (on (message (router-inbound connid (Assert $subid (observe $spec)))) (define known? (hash-has-key? (specs) spec)) (define localid (if known? (hash-ref (specs) spec) (make-localid))) (define sub (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag))))) (define holders (subscription-holders sub)) (cond [(hash-has-key? holders connid) (log-syndicate/federation-error "Duplicate subscription ~a, ID ~a, from connection ~a. Ignoring." spec subid connid)] [else (conn-subs (hash-set (conn-subs) subid localid)) (when (not known?) (specs (hash-set (specs) spec localid))) (subs (hash-set (subs) localid (struct-copy subscription sub [holders (hash-set holders connid subid)]))) ;; If not known, then relay the subscription to all peers except `connid`. ;; ;; If known, then one or more connections that aren't this one have ;; previously subscribed with this spec. If exactly one other connection has ;; previously subscribed, the only subscription that needs sent is to that ;; peer; otherwise, no subscriptions at all need sent, since everyone has ;; already been informed of this subscription. (cond [(not known?) (for [(peer (in-set (peers)))] (when (not (equal? peer connid)) (send! (router-outbound peer (Assert localid (observe spec))))))] [(= (hash-count holders) 1) (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ connid (send! (router-outbound peer (Assert localid (observe spec)))))] [else (void)]) ;; Once subscription relaying has taken place, send up matches to the active ;; connection. (for [(captures (in-bag (subscription-matches sub)))] (send! (router-outbound connid (Add subid captures)))) ])) (on (message (router-inbound connid (Clear $subid))) (match (hash-ref (conn-subs) subid #f) [#f (log-syndicate/federation-error "Mention of nonexistent subscription ID ~v from connection ~v. Ignoring." subid connid)] [localid (conn-subs (hash-remove (conn-subs) subid)) (unsubscribe! localid connid)])) (define (relay-add-or-del localid captures delta expected-outcome ctor) (define-values (new-conn-matches conn-outcome) (bag-change (conn-matches) (cons localid captures) delta #:clamp? #t)) (conn-matches new-conn-matches) (when (eq? conn-outcome expected-outcome) (adjust-matches localid connid captures delta expected-outcome ctor))) (on (message (router-inbound connid (Add $localid $captures))) (relay-add-or-del localid captures +1 'absent->present Add)) (on (message (router-inbound connid (Del $localid $captures))) (relay-add-or-del localid captures -1 'present->absent Del)) (on (message (router-inbound connid (Msg $localid $captures))) (call-with-sub localid connid (lambda (sub) (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) (when (not (equal? peer connid)) (send! (router-outbound peer (Msg peer-subid captures)))))))) )))