Remove federation-prototype
This commit is contained in:
parent
81034e017e
commit
4d828a5ad2
|
@ -1,216 +0,0 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (struct-out router-connection)
|
||||
(struct-out router-inbound)
|
||||
(struct-out router-outbound))
|
||||
|
||||
(require "../wire-protocol.rkt")
|
||||
(require imperative-syndicate/bag)
|
||||
(require racket/set)
|
||||
|
||||
;; Node IDs must be dataspace-unique.
|
||||
;;
|
||||
;; Connection IDs must be dataspace-unique. They are properly
|
||||
;; node-unique, but for brevity assertions omit the node ID in most
|
||||
;; places.
|
||||
;;
|
||||
;; Subscription IDs must be connection-unique AND must correspond
|
||||
;; one-to-one with a specific subscription spec. That is, a
|
||||
;; subscription ID is merely shorthand for its spec, and two
|
||||
;; subscription IDs within a connection must be `equal?` exactly when
|
||||
;; their corresponding specs are `equal?`.
|
||||
;;
|
||||
;; Local IDs must be node-unique. They are used as subscription IDs in
|
||||
;; outbound messages.
|
||||
;;
|
||||
;; Nodes maintain a bidirectional mapping between subscription IDs
|
||||
;; (scoped within their connection ID) and local IDs. One local ID may
|
||||
;; map to multiple subscription IDs - this is the place where
|
||||
;; aggregation pops up.
|
||||
|
||||
;; Unlike the server protocol, both Actions and Events are
|
||||
;; BIDIRECTIONAL, travelling in both directions along edges linking
|
||||
;; peer nodes.
|
||||
|
||||
;; Connection protocol
|
||||
(assertion-struct router-connection (node-id connection-id))
|
||||
(message-struct router-inbound (connection-id body))
|
||||
(message-struct router-outbound (connection-id body))
|
||||
|
||||
;; Internal state
|
||||
(struct subscription (id ;; LocalID
|
||||
spec ;; Assertion
|
||||
holders ;; (Hash ConnectionID SubscriptionID)
|
||||
matches ;; (Bag (Listof Assertion))
|
||||
)
|
||||
#:transparent)
|
||||
|
||||
(define-logger syndicate/federation)
|
||||
|
||||
(spawn #:name 'router
|
||||
(during/spawn (observe (router-connection $nodeid _))
|
||||
#:name (list 'router nodeid)
|
||||
|
||||
;; Generates a fresh local ID naming a subscription
|
||||
;; propagated to our peers.
|
||||
(define make-localid
|
||||
(let ((next 0))
|
||||
(lambda () (begin0 next (set! next (+ next 1))))))
|
||||
|
||||
(field [peers (set)] ;; (Set ConnID)
|
||||
[specs (hash)] ;; (Hash Spec LocalID)
|
||||
[subs (hasheq)] ;; (Hash LocalID Subscription)
|
||||
)
|
||||
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" nodeid (peers)))
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" nodeid (specs)))
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" nodeid (subs))))
|
||||
|
||||
(define (call-with-sub localid connid f)
|
||||
(match (hash-ref (subs) localid #f)
|
||||
[#f (log-syndicate/federation-error
|
||||
"Mention of nonexistent local ID ~v from connection ~v. Ignoring."
|
||||
localid
|
||||
connid)]
|
||||
[sub (f sub)]))
|
||||
|
||||
(define (unsubscribe! localid connid)
|
||||
(call-with-sub
|
||||
localid connid
|
||||
(lambda (sub)
|
||||
(define new-holders (hash-remove (subscription-holders sub) connid))
|
||||
(if (hash-empty? new-holders)
|
||||
(begin (specs (hash-remove (specs) (subscription-spec sub)))
|
||||
(subs (hash-remove (subs) localid)))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub
|
||||
[holders new-holders]))))
|
||||
|
||||
;; The messages we send depend on (hash-count new-holders):
|
||||
;; - if >1, there are enough other active subscribers that we don't need to send
|
||||
;; any messages.
|
||||
;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid)
|
||||
;; - if =0, we retract the subscription from all peers except connid
|
||||
|
||||
(match (hash-count new-holders)
|
||||
[0 (for [(peer (in-set (peers)))]
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (Clear localid)))))]
|
||||
[1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid
|
||||
(send! (router-outbound peer (Clear localid))))]
|
||||
[_ (void)]))))
|
||||
|
||||
(define (adjust-matches localid connid captures delta expected-outcome ctor)
|
||||
(call-with-sub
|
||||
localid connid
|
||||
(lambda (sub)
|
||||
(define-values (new-matches outcome)
|
||||
(bag-change (subscription-matches sub) captures delta #:clamp? #t))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches])))
|
||||
(when (eq? outcome expected-outcome)
|
||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (ctor peer-subid captures)))))))))
|
||||
|
||||
(during (observe (router-connection nodeid $connid))
|
||||
(assert (router-connection nodeid connid))
|
||||
|
||||
(on-start (peers (set-add (peers) connid)))
|
||||
(on-stop (peers (set-remove (peers) connid)))
|
||||
|
||||
(field [conn-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
||||
[conn-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
|
||||
)
|
||||
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-subs ~v"
|
||||
nodeid connid (conn-subs)))
|
||||
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-matches ~v"
|
||||
nodeid connid (conn-matches))))
|
||||
|
||||
(on-start (for ([(spec localid) (in-hash (specs))])
|
||||
(send! (router-outbound connid (Assert localid (observe spec))))))
|
||||
|
||||
(on-stop (for ([item (in-bag (conn-matches))])
|
||||
(match-define (cons localid captures) item)
|
||||
(adjust-matches localid connid captures -1 'present->absent Del))
|
||||
(for ([localid (in-hash-values (conn-subs))])
|
||||
(unsubscribe! localid connid)))
|
||||
|
||||
(on (message (router-inbound connid (Assert $subid (observe $spec))))
|
||||
(define known? (hash-has-key? (specs) spec))
|
||||
(define localid (if known? (hash-ref (specs) spec) (make-localid)))
|
||||
(define sub
|
||||
(hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag)))))
|
||||
(define holders (subscription-holders sub))
|
||||
(cond
|
||||
[(hash-has-key? holders connid)
|
||||
(log-syndicate/federation-error
|
||||
"Duplicate subscription ~a, ID ~a, from connection ~a. Ignoring."
|
||||
spec
|
||||
subid
|
||||
connid)]
|
||||
[else
|
||||
(conn-subs (hash-set (conn-subs) subid localid))
|
||||
(when (not known?) (specs (hash-set (specs) spec localid)))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub
|
||||
[holders (hash-set holders connid subid)])))
|
||||
|
||||
;; If not known, then relay the subscription to all peers except `connid`.
|
||||
;;
|
||||
;; If known, then one or more connections that aren't this one have
|
||||
;; previously subscribed with this spec. If exactly one other connection has
|
||||
;; previously subscribed, the only subscription that needs sent is to that
|
||||
;; peer; otherwise, no subscriptions at all need sent, since everyone has
|
||||
;; already been informed of this subscription.
|
||||
|
||||
(cond
|
||||
[(not known?)
|
||||
(for [(peer (in-set (peers)))]
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (Assert localid (observe spec))))))]
|
||||
[(= (hash-count holders) 1)
|
||||
(for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ connid
|
||||
(send! (router-outbound peer (Assert localid (observe spec)))))]
|
||||
[else
|
||||
(void)])
|
||||
|
||||
;; Once subscription relaying has taken place, send up matches to the active
|
||||
;; connection.
|
||||
(for [(captures (in-bag (subscription-matches sub)))]
|
||||
(send! (router-outbound connid (Add subid captures))))
|
||||
|
||||
]))
|
||||
|
||||
(on (message (router-inbound connid (Clear $subid)))
|
||||
(match (hash-ref (conn-subs) subid #f)
|
||||
[#f (log-syndicate/federation-error
|
||||
"Mention of nonexistent subscription ID ~v from connection ~v. Ignoring."
|
||||
subid
|
||||
connid)]
|
||||
[localid
|
||||
(conn-subs (hash-remove (conn-subs) subid))
|
||||
(unsubscribe! localid connid)]))
|
||||
|
||||
(define (relay-add-or-del localid captures delta expected-outcome ctor)
|
||||
(define-values (new-conn-matches conn-outcome)
|
||||
(bag-change (conn-matches) (cons localid captures) delta #:clamp? #t))
|
||||
(conn-matches new-conn-matches)
|
||||
(when (eq? conn-outcome expected-outcome)
|
||||
(adjust-matches localid connid captures delta expected-outcome ctor)))
|
||||
|
||||
(on (message (router-inbound connid (Add $localid $captures)))
|
||||
(relay-add-or-del localid captures +1 'absent->present Add))
|
||||
|
||||
(on (message (router-inbound connid (Del $localid $captures)))
|
||||
(relay-add-or-del localid captures -1 'present->absent Del))
|
||||
|
||||
(on (message (router-inbound connid (Msg $localid $captures)))
|
||||
(call-with-sub
|
||||
localid connid
|
||||
(lambda (sub)
|
||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||
(when (not (equal? peer connid))
|
||||
(send! (router-outbound peer (Msg peer-subid captures))))))))
|
||||
|
||||
)))
|
|
@ -1,191 +0,0 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(require imperative-syndicate/pattern)
|
||||
|
||||
(require/activate imperative-syndicate/drivers/timer)
|
||||
(require/activate "../wire-protocol.rkt")
|
||||
(require/activate "federation.rkt")
|
||||
|
||||
(assertion-struct present (who))
|
||||
(message-struct says (who what))
|
||||
|
||||
(message-struct change-presence (who what))
|
||||
(message-struct terminate (who))
|
||||
|
||||
(spawn #:name 'monitor
|
||||
(on (message (router-outbound $name $body))
|
||||
(log-info "-> ~a : ~v" name body))
|
||||
(on (message (router-inbound $name $body))
|
||||
(log-info " ~a ->: ~v" name body)))
|
||||
|
||||
(define C (capture (discard)))
|
||||
|
||||
(define (leaf name node)
|
||||
(spawn #:name (list 'leaf name)
|
||||
(stop-when (message (terminate name)))
|
||||
|
||||
(during (router-connection node name)
|
||||
|
||||
(on-start
|
||||
(send!
|
||||
(router-inbound name (Assert (gensym (format "~a-P-" name)) (observe (present C)))))
|
||||
(send!
|
||||
(router-inbound name (Assert (gensym (format "~a-S-" name)) (observe (says C C))))))
|
||||
|
||||
(on (message (router-outbound name (Assert $x (observe (says C C)))))
|
||||
(sleep 2)
|
||||
;; We won't see our own one of these, because routers expect us to have done
|
||||
;; local delivery ourselves. OHHH I am starting to get some insight into what is
|
||||
;; underneath the way multicast lets you choose whether or not to see your own
|
||||
;; transmissions! If you're in ~relay-node mode, you won't want to see them; if
|
||||
;; you're in ~leaf mode, you will!
|
||||
(send! (router-inbound name (Msg x (list name "Hello world!")))))
|
||||
|
||||
(on (message (router-outbound name (Assert $x (observe (present C)))))
|
||||
(react
|
||||
(field [present? #t])
|
||||
(stop-when (message (router-outbound name (Clear x))))
|
||||
(begin/dataflow
|
||||
;; We won't see our own one of these either! For the same reasons as
|
||||
;; explained above.
|
||||
(if (present?)
|
||||
(send! (router-inbound name (Add x (list name))))
|
||||
(send! (router-inbound name (Del x (list name))))))
|
||||
(on (message (change-presence name $new-presence))
|
||||
(present? new-presence))))
|
||||
|
||||
)))
|
||||
|
||||
(assertion-struct to-server (node assertion))
|
||||
(assertion-struct from-server (node assertion))
|
||||
|
||||
(define (leaf2 name node)
|
||||
(local-require imperative-syndicate/term)
|
||||
(spawn #:name (list 'leaf2 name)
|
||||
|
||||
;;----------------------------------------
|
||||
|
||||
(stop-when (message (terminate name)))
|
||||
|
||||
(field [present? #t])
|
||||
(assert #:when (present?) (to-server node (present name)))
|
||||
(on (message (change-presence name $new-presence))
|
||||
(present? new-presence))
|
||||
|
||||
;; TODO: Doing it this way, with the implementation in `leaf`
|
||||
;; above, causes missing "absent" messages because `leaf`
|
||||
;; processes don't respond to the specific `presence`
|
||||
;; interests generated by the way `during` is implemented,
|
||||
;; only to general ones.
|
||||
;;
|
||||
;; NB: There's a semantic difference here! In the
|
||||
;; commented-out version immediately below, we care about the
|
||||
;; start and stop events of the *facet*, so will get "absent"
|
||||
;; messages upon clean termination. In the other version,
|
||||
;; with separate asserted/retracted handlers, we will *not*
|
||||
;; get "absent" messages on clean termination, because the
|
||||
;; assertion is still there even as the actor terminates!
|
||||
;;
|
||||
;; (during (from-server node (present $who))
|
||||
;; (on-start (log-info "~a: ~a present" name who))
|
||||
;; (on-stop (log-info "~a: ~a absent" name who)))
|
||||
|
||||
(on (asserted (from-server node (present $who))) (log-info "~a: ~a present" name who))
|
||||
(on (retracted (from-server node (present $who))) (log-info "~a: ~a absent" name who))
|
||||
|
||||
(on (asserted (from-server node (observe (present _))))
|
||||
(log-info "~a: someone cares about presence!" name))
|
||||
|
||||
(on (message (from-server node (says $who $what)))
|
||||
(log-info "~a: ~a says ~v" name who what))
|
||||
|
||||
;;----------------------------------------
|
||||
|
||||
(during (to-server node $what)
|
||||
;; This takes care of the self-signalling discussed above.
|
||||
(assert (from-server node what)))
|
||||
|
||||
(during (router-connection node name)
|
||||
(on (message (router-outbound name (Assert $subid (observe $spec))))
|
||||
(react
|
||||
(let ((! (lambda (ctor)
|
||||
(lambda (cs) (send! (router-inbound name (ctor subid cs)))))))
|
||||
(add-observer-endpoint! (lambda () (to-server node spec))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))
|
||||
(assert (from-server node (observe spec)))
|
||||
(stop-when (message (router-outbound name (Clear subid))))))
|
||||
|
||||
(during (observe ($ pat (from-server node $spec)))
|
||||
(define ep (gensym 'ep))
|
||||
(on-start (send! (router-inbound name (Assert ep (observe spec)))))
|
||||
(on-stop (send! (router-inbound name (Clear ep))))
|
||||
(assert (from-server node (observe spec))) ;; more self-signalling
|
||||
(on (message (router-outbound name (Add ep $captures)))
|
||||
(react (assert (instantiate-term->value pat captures))
|
||||
(stop-when (message (router-outbound name (Del ep captures))))))
|
||||
(on (message (router-outbound name (Msg ep $captures)))
|
||||
(send! (instantiate-term->value pat captures))))
|
||||
|
||||
)))
|
||||
|
||||
(define (relay node1 node2)
|
||||
(spawn #:name (list 'relay node1 node2)
|
||||
(define node1-connid (string->symbol (format "~a(~a)" node1 node2)))
|
||||
(define node2-connid (string->symbol (format "~a(~a)" node2 node1)))
|
||||
(field [pending1 '()]
|
||||
[pending2 '()])
|
||||
|
||||
(stop-when (message (terminate (list 'relay node1 node2))))
|
||||
|
||||
(during (router-connection node1 node1-connid)
|
||||
(on (message (router-outbound node1-connid $body))
|
||||
(pending1 (cons body (pending1)))))
|
||||
|
||||
(during (router-connection node2 node2-connid)
|
||||
(on (message (router-outbound node2-connid $body))
|
||||
(pending2 (cons body (pending2)))))
|
||||
|
||||
(during (router-connection node1 node1-connid)
|
||||
(during (router-connection node2 node2-connid)
|
||||
(begin/dataflow
|
||||
(when (pair? (pending1))
|
||||
(for [(body (reverse (pending1)))]
|
||||
(send! (router-inbound node2-connid body)))
|
||||
(pending1 '())))
|
||||
(begin/dataflow
|
||||
(when (pair? (pending2))
|
||||
(for [(body (reverse (pending2)))]
|
||||
(send! (router-inbound node1-connid body)))
|
||||
(pending2 '())))))))
|
||||
|
||||
(spawn* (define-syntax-rule (pause n action)
|
||||
(begin (sleep n)
|
||||
(log-info "\n********** ~v" 'action)
|
||||
action))
|
||||
|
||||
(pause 0 (begin
|
||||
(leaf 'c1 'n1)
|
||||
(leaf2 'c2 'n1)
|
||||
(leaf 'c3 'n2)
|
||||
(leaf 'c4 'n2)
|
||||
))
|
||||
|
||||
(pause 0.5 (relay 'n1 'n2))
|
||||
(pause 0.25 (leaf2 'c5 'n3))
|
||||
(pause 0.25 (relay 'n2 'n3))
|
||||
(pause 0.5 'delivery-of-the-says-messages) ;; the newline is important here
|
||||
|
||||
(pause 1 (send! (change-presence 'c1 #f)))
|
||||
(pause 0.2 (send! (change-presence 'c1 #t)))
|
||||
|
||||
(pause 0.4 (send! (terminate (list 'relay 'n1 'n2))))
|
||||
(pause 0.4 (relay 'n1 'n2))
|
||||
|
||||
(pause 0.2 (send! (terminate 'c1)))
|
||||
(pause 0.2 (send! (terminate 'c3)))
|
||||
(pause 0.2 (send! (terminate 'c2)))
|
||||
(pause 0.2 (send! (terminate 'c4)))
|
||||
|
||||
)
|
Loading…
Reference in New Issue