Federation kernel
This commit is contained in:
parent
4fd2c9895c
commit
17d5b88784
|
@ -0,0 +1,223 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (struct-out Subscribe)
|
||||
(struct-out Unsubscribe)
|
||||
(struct-out Add)
|
||||
(struct-out Del)
|
||||
(struct-out Msg)
|
||||
(struct-out router-connection)
|
||||
(struct-out router-inbound)
|
||||
(struct-out router-outbound))
|
||||
|
||||
(require racket/set)
|
||||
(require imperative-syndicate/bag)
|
||||
|
||||
;; Node IDs must be dataspace-unique.
|
||||
;;
|
||||
;; Connection IDs must be dataspace-unique. They are properly
|
||||
;; node-unique, but for brevity assertions omit the node ID in most
|
||||
;; places.
|
||||
;;
|
||||
;; Subscription IDs must be connection-unique AND must correspond
|
||||
;; one-to-one with a specific subscription spec. That is, a
|
||||
;; subscription ID is merely shorthand for its spec, and two
|
||||
;; subscription IDs within a connection must be `equal?` exactly when
|
||||
;; their corresponding specs are `equal?`.
|
||||
;;
|
||||
;; Local IDs must be node-unique. They are used as subscription IDs in
|
||||
;; outbound messages.
|
||||
;;
|
||||
;; Nodes maintain a bidirectional mapping between subscription IDs
|
||||
;; (scoped within their connection ID) and local IDs. One local ID may
|
||||
;; map to multiple subscription IDs - this is the place where
|
||||
;; aggregation pops up.
|
||||
|
||||
;; Unlike the broker protocol, both Actions and Events are
|
||||
;; BIDIRECTIONAL, travelling in both directions along edges linking
|
||||
;; peer nodes.
|
||||
|
||||
;; Actions - like Asserts, but only for `(observe $spec)` assertions.
|
||||
(message-struct Subscribe (subscription-id spec))
|
||||
(message-struct Unsubscribe (subscription-id))
|
||||
|
||||
;; Events
|
||||
(message-struct Add (subscription-id captures))
|
||||
(message-struct Del (subscription-id captures))
|
||||
(message-struct Msg (subscription-id captures))
|
||||
|
||||
;; Connection protocol
|
||||
(assertion-struct router-connection (node-id connection-id))
|
||||
(message-struct router-inbound (connection-id body))
|
||||
(message-struct router-outbound (connection-id body))
|
||||
|
||||
;; Internal state
|
||||
(struct subscription (id ;; LocalID
|
||||
spec ;; Assertion
|
||||
holders ;; (Hash ConnectionID SubscriptionID)
|
||||
matches ;; (Bag (Listof Assertion))
|
||||
)
|
||||
#:transparent)
|
||||
|
||||
(define-logger syndicate/federation)
|
||||
|
||||
(spawn #:name 'router
|
||||
(during/spawn (observe (router-connection $nodeid _))
|
||||
#:name (list 'router nodeid)
|
||||
|
||||
;; Generates a fresh local ID naming a subscription
|
||||
;; propagated to our peers.
|
||||
(define make-localid
|
||||
(let ((next 0))
|
||||
(lambda () (begin0 next (set! next (+ next 1))))))
|
||||
|
||||
(field [peers (set)] ;; (Set ConnID)
|
||||
[specs (hash)] ;; (Hash Spec LocalID)
|
||||
[subs (hasheq)] ;; (Hash LocalID Subscription)
|
||||
)
|
||||
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" nodeid (peers)))
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" nodeid (specs)))
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" nodeid (subs))))
|
||||
|
||||
(define-syntax with-localid->sub
|
||||
(syntax-rules (->)
|
||||
((_ (localid connid -> sub) body ...)
|
||||
(match (hash-ref (subs) localid #f)
|
||||
[#f (log-syndicate/federation-error
|
||||
"Mention of nonexistent local ID ~v from connection ~v. Ignoring."
|
||||
localid
|
||||
connid)]
|
||||
[sub body ...]))))
|
||||
|
||||
(define (unsubscribe! localid connid)
|
||||
(with-localid->sub [localid connid -> sub]
|
||||
(define new-holders (hash-remove (subscription-holders sub) connid))
|
||||
(specs (hash-remove (specs) (subscription-spec sub)))
|
||||
(subs (if (hash-empty? new-holders)
|
||||
(hash-remove (subs) localid)
|
||||
(hash-set (subs) localid (struct-copy subscription sub
|
||||
[holders new-holders]))))
|
||||
|
||||
;; The messages we send depend on (hash-count new-holders):
|
||||
;; - if >1, there are enough other active subscribers that we don't need to send
|
||||
;; any messages.
|
||||
;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid)
|
||||
;; - if =0, we retract the subscription from all peers except connid
|
||||
|
||||
(match (hash-count new-holders)
|
||||
[0 (for [(peer (in-set (peers)))]
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (Unsubscribe localid)))))]
|
||||
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid
|
||||
(send! (router-outbound peer (Unsubscribe localid))))]
|
||||
[_ (void)])))
|
||||
|
||||
(define (adjust-matches localid connid captures delta expected-outcome ctor)
|
||||
(with-localid->sub [localid connid -> sub]
|
||||
(define-values (new-matches outcome)
|
||||
(bag-change (subscription-matches sub) captures delta #:clamp? #t))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches])))
|
||||
(when (eq? outcome expected-outcome)
|
||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (ctor peer-subid captures))))))))
|
||||
|
||||
(during (observe (router-connection nodeid $connid))
|
||||
(assert (router-connection nodeid connid))
|
||||
|
||||
(on-start (peers (set-add (peers) connid)))
|
||||
(on-stop (peers (set-remove (peers) connid)))
|
||||
|
||||
(field [conn-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
||||
[conn-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
|
||||
)
|
||||
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-subs ~v"
|
||||
nodeid connid (conn-subs)))
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-matches ~v"
|
||||
nodeid connid (conn-matches))))
|
||||
|
||||
(on-start (for ([(spec localid) (in-hash (specs))])
|
||||
(send! (router-outbound connid (Subscribe localid spec)))))
|
||||
|
||||
(on-stop (for ([item (in-bag (conn-matches))])
|
||||
(match-define (cons localid captures) item)
|
||||
(adjust-matches localid connid captures -1 'present->absent Del))
|
||||
(for ([localid (in-hash-values (conn-subs))])
|
||||
(unsubscribe! localid connid)))
|
||||
|
||||
(on (message (router-inbound connid (Subscribe $subid $spec)))
|
||||
(define known? (hash-has-key? (specs) spec))
|
||||
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
||||
(define sub
|
||||
(hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag)))))
|
||||
(define holders (subscription-holders sub))
|
||||
(cond
|
||||
[(hash-has-key? holders connid)
|
||||
(log-syndicate/federation-error
|
||||
"Duplicate subscription ~a, ID ~a, from connection ~a. Ignoring."
|
||||
spec
|
||||
subid
|
||||
connid)]
|
||||
[else
|
||||
(conn-subs (hash-set (conn-subs) subid localid))
|
||||
(when (not known?) (specs (hash-set (specs) spec localid)))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub
|
||||
[holders (hash-set holders connid subid)])))
|
||||
|
||||
;; If not known, then relay the subscription to all peers except `connid`.
|
||||
;;
|
||||
;; If known, then one or more connections that aren't this one have
|
||||
;; previously subscribed with this spec. If exactly one other connection has
|
||||
;; previously subscribed, the only subscription that needs sent is to that
|
||||
;; peer; otherwise, no subscriptions at all need sent, since everyone has
|
||||
;; already been informed of this subscription.
|
||||
|
||||
(cond
|
||||
[(not known?)
|
||||
(for [(peer (in-set (peers)))]
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (Subscribe localid spec)))))]
|
||||
[(= (hash-count holders) 1)
|
||||
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ connid
|
||||
(send! (router-outbound peer (Subscribe localid spec))))]
|
||||
[else
|
||||
(void)])
|
||||
|
||||
;; Once subscription relaying has taken place, send up matches to the active
|
||||
;; connection.
|
||||
(for [(captures (in-bag (subscription-matches sub)))]
|
||||
(send! (router-outbound connid (Add subid captures))))
|
||||
|
||||
]))
|
||||
|
||||
(on (message (router-inbound connid (Unsubscribe $subid)))
|
||||
(match (hash-ref (conn-subs) subid #f)
|
||||
[#f (log-syndicate/federation-error
|
||||
"Mention of nonexistent subscription ID ~v from connection ~v. Ignoring."
|
||||
subid
|
||||
connid)]
|
||||
[localid (unsubscribe! localid connid)]))
|
||||
|
||||
(define (relay-add-or-del localid captures delta expected-outcome ctor)
|
||||
(define-values (new-conn-matches conn-outcome)
|
||||
(bag-change (conn-matches) (cons localid captures) delta #:clamp? #t))
|
||||
(conn-matches new-conn-matches)
|
||||
(when (eq? conn-outcome expected-outcome)
|
||||
(adjust-matches localid connid captures delta expected-outcome ctor)))
|
||||
|
||||
(on (message (router-inbound connid (Add $localid $captures)))
|
||||
(relay-add-or-del localid captures +1 'absent->present Add))
|
||||
|
||||
(on (message (router-inbound connid (Del $localid $captures)))
|
||||
(relay-add-or-del localid captures -1 'present->absent Del))
|
||||
|
||||
(on (message (router-inbound connid (Msg $localid $captures)))
|
||||
(with-localid->sub [localid connid -> sub]
|
||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (Msg peer-subid captures)))))))
|
||||
|
||||
)))
|
|
@ -0,0 +1,108 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(require imperative-syndicate/pattern)
|
||||
|
||||
(require/activate imperative-syndicate/drivers/timer)
|
||||
(require/activate "federation2.rkt")
|
||||
|
||||
(assertion-struct present (who))
|
||||
(message-struct says (who what))
|
||||
|
||||
(message-struct change-presence (who what))
|
||||
(message-struct terminate (who))
|
||||
|
||||
(spawn #:name 'monitor
|
||||
(on (message (router-outbound $name $body))
|
||||
(printf "-> ~a : ~v\n" name body))
|
||||
(on (message (router-inbound $name $body))
|
||||
(printf " ~a ->: ~v\n" name body)))
|
||||
|
||||
(define C (capture (discard)))
|
||||
|
||||
(define (leaf name node)
|
||||
(spawn #:name (list 'leaf name)
|
||||
(stop-when (message (terminate name)))
|
||||
|
||||
(during (router-connection node name)
|
||||
|
||||
(on-start
|
||||
(send! (router-inbound name (Subscribe (gensym (format "~a-P-" name)) (present C))))
|
||||
(send! (router-inbound name (Subscribe (gensym (format "~a-S-" name)) (says C C)))))
|
||||
|
||||
(on (message (router-outbound name (Subscribe $x (says C C))))
|
||||
(sleep 2)
|
||||
;; We won't see our own one of these, because routers expect us to have done
|
||||
;; local delivery ourselves. OHHH I am starting to get some insight into what is
|
||||
;; underneath the way multicast lets you choose whether or not to see your own
|
||||
;; transmissions! If you're in ~relay-node mode, you won't want to see them; if
|
||||
;; you're in ~leaf mode, you will!
|
||||
(send! (router-inbound name (Msg x (list name "Hello world!")))))
|
||||
|
||||
(on (message (router-outbound name (Subscribe $x (present C))))
|
||||
(react
|
||||
(field [present? #t])
|
||||
(stop-when (message (router-outbound name (Unsubscribe x))))
|
||||
(begin/dataflow
|
||||
;; We won't see our own one of these either! For the same reasons as
|
||||
;; explained above.
|
||||
(if (present?)
|
||||
(send! (router-inbound name (Add x (list name))))
|
||||
(send! (router-inbound name (Del x (list name))))))
|
||||
(on (message (change-presence name $new-presence))
|
||||
(present? new-presence))))
|
||||
|
||||
)))
|
||||
|
||||
(define (relay node1 node2)
|
||||
(spawn #:name (list 'relay node1 node2)
|
||||
(define node1-connid (string->symbol (format "~a->~a" node1 node2)))
|
||||
(define node2-connid (string->symbol (format "~a->~a" node2 node1)))
|
||||
(field [pending1 '()]
|
||||
[pending2 '()])
|
||||
|
||||
(during (router-connection node1 node1-connid)
|
||||
(on (message (router-outbound node1-connid $body))
|
||||
(pending1 (cons body (pending1)))))
|
||||
|
||||
(during (router-connection node2 node2-connid)
|
||||
(on (message (router-outbound node2-connid $body))
|
||||
(pending2 (cons body (pending2)))))
|
||||
|
||||
(during (router-connection node1 node1-connid)
|
||||
(during (router-connection node2 node2-connid)
|
||||
(begin/dataflow
|
||||
(when (pair? (pending1))
|
||||
(for [(body (reverse (pending1)))]
|
||||
(send! (router-inbound node2-connid body)))
|
||||
(pending1 '())))
|
||||
(begin/dataflow
|
||||
(when (pair? (pending2))
|
||||
(for [(body (reverse (pending2)))]
|
||||
(send! (router-inbound node1-connid body)))
|
||||
(pending2 '())))))))
|
||||
|
||||
(spawn* (define-syntax-rule (pause n action)
|
||||
(begin (sleep n)
|
||||
(newline)
|
||||
(printf "********** ~v\n" 'action)
|
||||
action))
|
||||
|
||||
(pause 0 (begin (leaf 'c1 'n1)
|
||||
(leaf 'c2 'n1)
|
||||
(leaf 'c3 'n2)
|
||||
(leaf 'c4 'n2)))
|
||||
|
||||
(pause 0.5 (relay 'n1 'n2))
|
||||
(pause 0.25 (leaf 'c5 'n3))
|
||||
(pause 0.25 (relay 'n2 'n3))
|
||||
(pause 0.5 'delivery-of-the-says-messages) ;; the newline is important here
|
||||
|
||||
(pause 1 (send! (change-presence 'c1 #f)))
|
||||
(pause 0.2 (send! (change-presence 'c1 #t)))
|
||||
|
||||
(pause 0.2 (send! (terminate 'c1)))
|
||||
(pause 0.2 (send! (terminate 'c3)))
|
||||
(pause 0.2 (send! (terminate 'c2)))
|
||||
(pause 0.2 (send! (terminate 'c4)))
|
||||
|
||||
)
|
Loading…
Reference in New Issue