From 25fb492083a95ac68cb9a8912dee2ca090654eb0 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 2 May 2019 15:59:25 +0100 Subject: [PATCH] Federation kernel --- imperative/broker/federation2.rkt | 223 ++++++++++++++++++++++++++++++ imperative/broker/tf.rkt | 108 +++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 imperative/broker/federation2.rkt create mode 100644 imperative/broker/tf.rkt diff --git a/imperative/broker/federation2.rkt b/imperative/broker/federation2.rkt new file mode 100644 index 0000000..39d7a7f --- /dev/null +++ b/imperative/broker/federation2.rkt @@ -0,0 +1,223 @@ +#lang imperative-syndicate + +(provide (struct-out Subscribe) + (struct-out Unsubscribe) + (struct-out Add) + (struct-out Del) + (struct-out Msg) + (struct-out router-connection) + (struct-out router-inbound) + (struct-out router-outbound)) + +(require racket/set) +(require imperative-syndicate/bag) + +;; Node IDs must be dataspace-unique. +;; +;; Connection IDs must be dataspace-unique. They are properly +;; node-unique, but for brevity assertions omit the node ID in most +;; places. +;; +;; Subscription IDs must be connection-unique AND must correspond +;; one-to-one with a specific subscription spec. That is, a +;; subscription ID is merely shorthand for its spec, and two +;; subscription IDs within a connection must be `equal?` exactly when +;; their corresponding specs are `equal?`. +;; +;; Local IDs must be node-unique. They are used as subscription IDs in +;; outbound messages. +;; +;; Nodes maintain a bidirectional mapping between subscription IDs +;; (scoped within their connection ID) and local IDs. One local ID may +;; map to multiple subscription IDs - this is the place where +;; aggregation pops up. + +;; Unlike the broker protocol, both Actions and Events are +;; BIDIRECTIONAL, travelling in both directions along edges linking +;; peer nodes. + +;; Actions - like Asserts, but only for `(observe $spec)` assertions. +(message-struct Subscribe (subscription-id spec)) +(message-struct Unsubscribe (subscription-id)) + +;; Events +(message-struct Add (subscription-id captures)) +(message-struct Del (subscription-id captures)) +(message-struct Msg (subscription-id captures)) + +;; Connection protocol +(assertion-struct router-connection (node-id connection-id)) +(message-struct router-inbound (connection-id body)) +(message-struct router-outbound (connection-id body)) + +;; Internal state +(struct subscription (id ;; LocalID + spec ;; Assertion + holders ;; (Hash ConnectionID SubscriptionID) + matches ;; (Bag (Listof Assertion)) + ) + #:transparent) + +(define-logger syndicate/federation) + +(spawn #:name 'router + (during/spawn (observe (router-connection $nodeid _)) + #:name (list 'router nodeid) + + ;; Generates a fresh local ID naming a subscription + ;; propagated to our peers. + (define make-localid + (let ((next 0)) + (lambda () (begin0 next (set! next (+ next 1)))))) + + (field [peers (set)] ;; (Set ConnID) + [specs (hash)] ;; (Hash Spec LocalID) + [subs (hasheq)] ;; (Hash LocalID Subscription) + ) + + (when (log-level? syndicate/federation-logger 'debug) + (begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" nodeid (peers))) + (begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" nodeid (specs))) + (begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" nodeid (subs)))) + + (define-syntax with-localid->sub + (syntax-rules (->) + ((_ (localid connid -> sub) body ...) + (match (hash-ref (subs) localid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent local ID ~v from connection ~v. Ignoring." + localid + connid)] + [sub body ...])))) + + (define (unsubscribe! localid connid) + (with-localid->sub [localid connid -> sub] + (define new-holders (hash-remove (subscription-holders sub) connid)) + (specs (hash-remove (specs) (subscription-spec sub))) + (subs (if (hash-empty? new-holders) + (hash-remove (subs) localid) + (hash-set (subs) localid (struct-copy subscription sub + [holders new-holders])))) + + ;; The messages we send depend on (hash-count new-holders): + ;; - if >1, there are enough other active subscribers that we don't need to send + ;; any messages. + ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid) + ;; - if =0, we retract the subscription from all peers except connid + + (match (hash-count new-holders) + [0 (for [(peer (in-set (peers)))] + (when (not (equal? peer connid)) + (send! (router-outbound peer (Unsubscribe localid)))))] + [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid + (send! (router-outbound peer (Unsubscribe localid))))] + [_ (void)]))) + + (define (adjust-matches localid connid captures delta expected-outcome ctor) + (with-localid->sub [localid connid -> sub] + (define-values (new-matches outcome) + (bag-change (subscription-matches sub) captures delta #:clamp? #t)) + (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) + (when (eq? outcome expected-outcome) + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer connid)) + (send! (router-outbound peer (ctor peer-subid captures)))))))) + + (during (observe (router-connection nodeid $connid)) + (assert (router-connection nodeid connid)) + + (on-start (peers (set-add (peers) connid))) + (on-stop (peers (set-remove (peers) connid))) + + (field [conn-subs (hash)] ;; (Hash SubscriptionID LocalID) + [conn-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) + ) + + (when (log-level? syndicate/federation-logger 'debug) + (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-subs ~v" + nodeid connid (conn-subs))) + (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-matches ~v" + nodeid connid (conn-matches)))) + + (on-start (for ([(spec localid) (in-hash (specs))]) + (send! (router-outbound connid (Subscribe localid spec))))) + + (on-stop (for ([item (in-bag (conn-matches))]) + (match-define (cons localid captures) item) + (adjust-matches localid connid captures -1 'present->absent Del)) + (for ([localid (in-hash-values (conn-subs))]) + (unsubscribe! localid connid))) + + (on (message (router-inbound connid (Subscribe $subid $spec))) + (define known? (hash-has-key? (specs) spec)) + (define localid (if known? (hash-ref (specs) spec) (make-localid))) + (define sub + (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag))))) + (define holders (subscription-holders sub)) + (cond + [(hash-has-key? holders connid) + (log-syndicate/federation-error + "Duplicate subscription ~a, ID ~a, from connection ~a. Ignoring." + spec + subid + connid)] + [else + (conn-subs (hash-set (conn-subs) subid localid)) + (when (not known?) (specs (hash-set (specs) spec localid))) + (subs (hash-set (subs) localid (struct-copy subscription sub + [holders (hash-set holders connid subid)]))) + + ;; If not known, then relay the subscription to all peers except `connid`. + ;; + ;; If known, then one or more connections that aren't this one have + ;; previously subscribed with this spec. If exactly one other connection has + ;; previously subscribed, the only subscription that needs sent is to that + ;; peer; otherwise, no subscriptions at all need sent, since everyone has + ;; already been informed of this subscription. + + (cond + [(not known?) + (for [(peer (in-set (peers)))] + (when (not (equal? peer connid)) + (send! (router-outbound peer (Subscribe localid spec)))))] + [(= (hash-count holders) 1) + (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ connid + (send! (router-outbound peer (Subscribe localid spec))))] + [else + (void)]) + + ;; Once subscription relaying has taken place, send up matches to the active + ;; connection. + (for [(captures (in-bag (subscription-matches sub)))] + (send! (router-outbound connid (Add subid captures)))) + + ])) + + (on (message (router-inbound connid (Unsubscribe $subid))) + (match (hash-ref (conn-subs) subid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent subscription ID ~v from connection ~v. Ignoring." + subid + connid)] + [localid (unsubscribe! localid connid)])) + + (define (relay-add-or-del localid captures delta expected-outcome ctor) + (define-values (new-conn-matches conn-outcome) + (bag-change (conn-matches) (cons localid captures) delta #:clamp? #t)) + (conn-matches new-conn-matches) + (when (eq? conn-outcome expected-outcome) + (adjust-matches localid connid captures delta expected-outcome ctor))) + + (on (message (router-inbound connid (Add $localid $captures))) + (relay-add-or-del localid captures +1 'absent->present Add)) + + (on (message (router-inbound connid (Del $localid $captures))) + (relay-add-or-del localid captures -1 'present->absent Del)) + + (on (message (router-inbound connid (Msg $localid $captures))) + (with-localid->sub [localid connid -> sub] + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer connid)) + (send! (router-outbound peer (Msg peer-subid captures))))))) + + ))) diff --git a/imperative/broker/tf.rkt b/imperative/broker/tf.rkt new file mode 100644 index 0000000..8753f1e --- /dev/null +++ b/imperative/broker/tf.rkt @@ -0,0 +1,108 @@ +#lang imperative-syndicate + +(require imperative-syndicate/pattern) + +(require/activate imperative-syndicate/drivers/timer) +(require/activate "federation2.rkt") + +(assertion-struct present (who)) +(message-struct says (who what)) + +(message-struct change-presence (who what)) +(message-struct terminate (who)) + +(spawn #:name 'monitor + (on (message (router-outbound $name $body)) + (printf "-> ~a : ~v\n" name body)) + (on (message (router-inbound $name $body)) + (printf " ~a ->: ~v\n" name body))) + +(define C (capture (discard))) + +(define (leaf name node) + (spawn #:name (list 'leaf name) + (stop-when (message (terminate name))) + + (during (router-connection node name) + + (on-start + (send! (router-inbound name (Subscribe (gensym (format "~a-P-" name)) (present C)))) + (send! (router-inbound name (Subscribe (gensym (format "~a-S-" name)) (says C C))))) + + (on (message (router-outbound name (Subscribe $x (says C C)))) + (sleep 2) + ;; We won't see our own one of these, because routers expect us to have done + ;; local delivery ourselves. OHHH I am starting to get some insight into what is + ;; underneath the way multicast lets you choose whether or not to see your own + ;; transmissions! If you're in ~relay-node mode, you won't want to see them; if + ;; you're in ~leaf mode, you will! + (send! (router-inbound name (Msg x (list name "Hello world!"))))) + + (on (message (router-outbound name (Subscribe $x (present C)))) + (react + (field [present? #t]) + (stop-when (message (router-outbound name (Unsubscribe x)))) + (begin/dataflow + ;; We won't see our own one of these either! For the same reasons as + ;; explained above. + (if (present?) + (send! (router-inbound name (Add x (list name)))) + (send! (router-inbound name (Del x (list name)))))) + (on (message (change-presence name $new-presence)) + (present? new-presence)))) + + ))) + +(define (relay node1 node2) + (spawn #:name (list 'relay node1 node2) + (define node1-connid (string->symbol (format "~a->~a" node1 node2))) + (define node2-connid (string->symbol (format "~a->~a" node2 node1))) + (field [pending1 '()] + [pending2 '()]) + + (during (router-connection node1 node1-connid) + (on (message (router-outbound node1-connid $body)) + (pending1 (cons body (pending1))))) + + (during (router-connection node2 node2-connid) + (on (message (router-outbound node2-connid $body)) + (pending2 (cons body (pending2))))) + + (during (router-connection node1 node1-connid) + (during (router-connection node2 node2-connid) + (begin/dataflow + (when (pair? (pending1)) + (for [(body (reverse (pending1)))] + (send! (router-inbound node2-connid body))) + (pending1 '()))) + (begin/dataflow + (when (pair? (pending2)) + (for [(body (reverse (pending2)))] + (send! (router-inbound node1-connid body))) + (pending2 '()))))))) + +(spawn* (define-syntax-rule (pause n action) + (begin (sleep n) + (newline) + (printf "********** ~v\n" 'action) + action)) + + (pause 0 (begin (leaf 'c1 'n1) + (leaf 'c2 'n1) + (leaf 'c3 'n2) + (leaf 'c4 'n2))) + + (pause 0.5 (relay 'n1 'n2)) + (pause 0.25 (leaf 'c5 'n3)) + (pause 0.25 (relay 'n2 'n3)) + (pause 0.5 'delivery-of-the-says-messages) ;; the newline is important here + + (pause 1 (send! (change-presence 'c1 #f))) + (pause 0.2 (send! (change-presence 'c1 #t))) + + (pause 0.2 (send! (terminate 'c1))) + (pause 0.2 (send! (terminate 'c3))) + (pause 0.2 (send! (terminate 'c2))) + (pause 0.2 (send! (terminate 'c4))) + + )