diff --git a/imperative/distributed/federation-prototype/federation.rkt b/imperative/distributed/federation-prototype/federation.rkt deleted file mode 100644 index e80ef07..0000000 --- a/imperative/distributed/federation-prototype/federation.rkt +++ /dev/null @@ -1,216 +0,0 @@ -#lang imperative-syndicate - -(provide (struct-out router-connection) - (struct-out router-inbound) - (struct-out router-outbound)) - -(require "../wire-protocol.rkt") -(require imperative-syndicate/bag) -(require racket/set) - -;; Node IDs must be dataspace-unique. -;; -;; Connection IDs must be dataspace-unique. They are properly -;; node-unique, but for brevity assertions omit the node ID in most -;; places. -;; -;; Subscription IDs must be connection-unique AND must correspond -;; one-to-one with a specific subscription spec. That is, a -;; subscription ID is merely shorthand for its spec, and two -;; subscription IDs within a connection must be `equal?` exactly when -;; their corresponding specs are `equal?`. -;; -;; Local IDs must be node-unique. They are used as subscription IDs in -;; outbound messages. -;; -;; Nodes maintain a bidirectional mapping between subscription IDs -;; (scoped within their connection ID) and local IDs. One local ID may -;; map to multiple subscription IDs - this is the place where -;; aggregation pops up. - -;; Unlike the server protocol, both Actions and Events are -;; BIDIRECTIONAL, travelling in both directions along edges linking -;; peer nodes. - -;; Connection protocol -(assertion-struct router-connection (node-id connection-id)) -(message-struct router-inbound (connection-id body)) -(message-struct router-outbound (connection-id body)) - -;; Internal state -(struct subscription (id ;; LocalID - spec ;; Assertion - holders ;; (Hash ConnectionID SubscriptionID) - matches ;; (Bag (Listof Assertion)) - ) - #:transparent) - -(define-logger syndicate/federation) - -(spawn #:name 'router - (during/spawn (observe (router-connection $nodeid _)) - #:name (list 'router nodeid) - - ;; Generates a fresh local ID naming a subscription - ;; propagated to our peers. - (define make-localid - (let ((next 0)) - (lambda () (begin0 next (set! next (+ next 1)))))) - - (field [peers (set)] ;; (Set ConnID) - [specs (hash)] ;; (Hash Spec LocalID) - [subs (hasheq)] ;; (Hash LocalID Subscription) - ) - - (when (log-level? syndicate/federation-logger 'debug) - (begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" nodeid (peers))) - (begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" nodeid (specs))) - (begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" nodeid (subs)))) - - (define (call-with-sub localid connid f) - (match (hash-ref (subs) localid #f) - [#f (log-syndicate/federation-error - "Mention of nonexistent local ID ~v from connection ~v. Ignoring." - localid - connid)] - [sub (f sub)])) - - (define (unsubscribe! localid connid) - (call-with-sub - localid connid - (lambda (sub) - (define new-holders (hash-remove (subscription-holders sub) connid)) - (if (hash-empty? new-holders) - (begin (specs (hash-remove (specs) (subscription-spec sub))) - (subs (hash-remove (subs) localid))) - (subs (hash-set (subs) localid (struct-copy subscription sub - [holders new-holders])))) - - ;; The messages we send depend on (hash-count new-holders): - ;; - if >1, there are enough other active subscribers that we don't need to send - ;; any messages. - ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid) - ;; - if =0, we retract the subscription from all peers except connid - - (match (hash-count new-holders) - [0 (for [(peer (in-set (peers)))] - (when (not (equal? peer connid)) - (send! (router-outbound peer (Clear localid)))))] - [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid - (send! (router-outbound peer (Clear localid))))] - [_ (void)])))) - - (define (adjust-matches localid connid captures delta expected-outcome ctor) - (call-with-sub - localid connid - (lambda (sub) - (define-values (new-matches outcome) - (bag-change (subscription-matches sub) captures delta #:clamp? #t)) - (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) - (when (eq? outcome expected-outcome) - (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) - (when (not (equal? peer connid)) - (send! (router-outbound peer (ctor peer-subid captures))))))))) - - (during (observe (router-connection nodeid $connid)) - (assert (router-connection nodeid connid)) - - (on-start (peers (set-add (peers) connid))) - (on-stop (peers (set-remove (peers) connid))) - - (field [conn-subs (hash)] ;; (Hash SubscriptionID LocalID) - [conn-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) - ) - - (when (log-level? syndicate/federation-logger 'debug) - (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-subs ~v" - nodeid connid (conn-subs))) - (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a conn-matches ~v" - nodeid connid (conn-matches)))) - - (on-start (for ([(spec localid) (in-hash (specs))]) - (send! (router-outbound connid (Assert localid (observe spec)))))) - - (on-stop (for ([item (in-bag (conn-matches))]) - (match-define (cons localid captures) item) - (adjust-matches localid connid captures -1 'present->absent Del)) - (for ([localid (in-hash-values (conn-subs))]) - (unsubscribe! localid connid))) - - (on (message (router-inbound connid (Assert $subid (observe $spec)))) - (define known? (hash-has-key? (specs) spec)) - (define localid (if known? (hash-ref (specs) spec) (make-localid))) - (define sub - (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (bag))))) - (define holders (subscription-holders sub)) - (cond - [(hash-has-key? holders connid) - (log-syndicate/federation-error - "Duplicate subscription ~a, ID ~a, from connection ~a. Ignoring." - spec - subid - connid)] - [else - (conn-subs (hash-set (conn-subs) subid localid)) - (when (not known?) (specs (hash-set (specs) spec localid))) - (subs (hash-set (subs) localid (struct-copy subscription sub - [holders (hash-set holders connid subid)]))) - - ;; If not known, then relay the subscription to all peers except `connid`. - ;; - ;; If known, then one or more connections that aren't this one have - ;; previously subscribed with this spec. If exactly one other connection has - ;; previously subscribed, the only subscription that needs sent is to that - ;; peer; otherwise, no subscriptions at all need sent, since everyone has - ;; already been informed of this subscription. - - (cond - [(not known?) - (for [(peer (in-set (peers)))] - (when (not (equal? peer connid)) - (send! (router-outbound peer (Assert localid (observe spec))))))] - [(= (hash-count holders) 1) - (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ connid - (send! (router-outbound peer (Assert localid (observe spec)))))] - [else - (void)]) - - ;; Once subscription relaying has taken place, send up matches to the active - ;; connection. - (for [(captures (in-bag (subscription-matches sub)))] - (send! (router-outbound connid (Add subid captures)))) - - ])) - - (on (message (router-inbound connid (Clear $subid))) - (match (hash-ref (conn-subs) subid #f) - [#f (log-syndicate/federation-error - "Mention of nonexistent subscription ID ~v from connection ~v. Ignoring." - subid - connid)] - [localid - (conn-subs (hash-remove (conn-subs) subid)) - (unsubscribe! localid connid)])) - - (define (relay-add-or-del localid captures delta expected-outcome ctor) - (define-values (new-conn-matches conn-outcome) - (bag-change (conn-matches) (cons localid captures) delta #:clamp? #t)) - (conn-matches new-conn-matches) - (when (eq? conn-outcome expected-outcome) - (adjust-matches localid connid captures delta expected-outcome ctor))) - - (on (message (router-inbound connid (Add $localid $captures))) - (relay-add-or-del localid captures +1 'absent->present Add)) - - (on (message (router-inbound connid (Del $localid $captures))) - (relay-add-or-del localid captures -1 'present->absent Del)) - - (on (message (router-inbound connid (Msg $localid $captures))) - (call-with-sub - localid connid - (lambda (sub) - (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) - (when (not (equal? peer connid)) - (send! (router-outbound peer (Msg peer-subid captures)))))))) - - ))) diff --git a/imperative/distributed/federation-prototype/tf.rkt b/imperative/distributed/federation-prototype/tf.rkt deleted file mode 100644 index 1ecd967..0000000 --- a/imperative/distributed/federation-prototype/tf.rkt +++ /dev/null @@ -1,191 +0,0 @@ -#lang imperative-syndicate - -(require imperative-syndicate/pattern) - -(require/activate imperative-syndicate/drivers/timer) -(require/activate "../wire-protocol.rkt") -(require/activate "federation.rkt") - -(assertion-struct present (who)) -(message-struct says (who what)) - -(message-struct change-presence (who what)) -(message-struct terminate (who)) - -(spawn #:name 'monitor - (on (message (router-outbound $name $body)) - (log-info "-> ~a : ~v" name body)) - (on (message (router-inbound $name $body)) - (log-info " ~a ->: ~v" name body))) - -(define C (capture (discard))) - -(define (leaf name node) - (spawn #:name (list 'leaf name) - (stop-when (message (terminate name))) - - (during (router-connection node name) - - (on-start - (send! - (router-inbound name (Assert (gensym (format "~a-P-" name)) (observe (present C))))) - (send! - (router-inbound name (Assert (gensym (format "~a-S-" name)) (observe (says C C)))))) - - (on (message (router-outbound name (Assert $x (observe (says C C))))) - (sleep 2) - ;; We won't see our own one of these, because routers expect us to have done - ;; local delivery ourselves. OHHH I am starting to get some insight into what is - ;; underneath the way multicast lets you choose whether or not to see your own - ;; transmissions! If you're in ~relay-node mode, you won't want to see them; if - ;; you're in ~leaf mode, you will! - (send! (router-inbound name (Msg x (list name "Hello world!"))))) - - (on (message (router-outbound name (Assert $x (observe (present C))))) - (react - (field [present? #t]) - (stop-when (message (router-outbound name (Clear x)))) - (begin/dataflow - ;; We won't see our own one of these either! For the same reasons as - ;; explained above. - (if (present?) - (send! (router-inbound name (Add x (list name)))) - (send! (router-inbound name (Del x (list name)))))) - (on (message (change-presence name $new-presence)) - (present? new-presence)))) - - ))) - -(assertion-struct to-server (node assertion)) -(assertion-struct from-server (node assertion)) - -(define (leaf2 name node) - (local-require imperative-syndicate/term) - (spawn #:name (list 'leaf2 name) - - ;;---------------------------------------- - - (stop-when (message (terminate name))) - - (field [present? #t]) - (assert #:when (present?) (to-server node (present name))) - (on (message (change-presence name $new-presence)) - (present? new-presence)) - - ;; TODO: Doing it this way, with the implementation in `leaf` - ;; above, causes missing "absent" messages because `leaf` - ;; processes don't respond to the specific `presence` - ;; interests generated by the way `during` is implemented, - ;; only to general ones. - ;; - ;; NB: There's a semantic difference here! In the - ;; commented-out version immediately below, we care about the - ;; start and stop events of the *facet*, so will get "absent" - ;; messages upon clean termination. In the other version, - ;; with separate asserted/retracted handlers, we will *not* - ;; get "absent" messages on clean termination, because the - ;; assertion is still there even as the actor terminates! - ;; - ;; (during (from-server node (present $who)) - ;; (on-start (log-info "~a: ~a present" name who)) - ;; (on-stop (log-info "~a: ~a absent" name who))) - - (on (asserted (from-server node (present $who))) (log-info "~a: ~a present" name who)) - (on (retracted (from-server node (present $who))) (log-info "~a: ~a absent" name who)) - - (on (asserted (from-server node (observe (present _)))) - (log-info "~a: someone cares about presence!" name)) - - (on (message (from-server node (says $who $what))) - (log-info "~a: ~a says ~v" name who what)) - - ;;---------------------------------------- - - (during (to-server node $what) - ;; This takes care of the self-signalling discussed above. - (assert (from-server node what))) - - (during (router-connection node name) - (on (message (router-outbound name (Assert $subid (observe $spec)))) - (react - (let ((! (lambda (ctor) - (lambda (cs) (send! (router-inbound name (ctor subid cs))))))) - (add-observer-endpoint! (lambda () (to-server node spec)) - #:on-add (! Add) - #:on-remove (! Del) - #:on-message (! Msg))) - (assert (from-server node (observe spec))) - (stop-when (message (router-outbound name (Clear subid)))))) - - (during (observe ($ pat (from-server node $spec))) - (define ep (gensym 'ep)) - (on-start (send! (router-inbound name (Assert ep (observe spec))))) - (on-stop (send! (router-inbound name (Clear ep)))) - (assert (from-server node (observe spec))) ;; more self-signalling - (on (message (router-outbound name (Add ep $captures))) - (react (assert (instantiate-term->value pat captures)) - (stop-when (message (router-outbound name (Del ep captures)))))) - (on (message (router-outbound name (Msg ep $captures))) - (send! (instantiate-term->value pat captures)))) - - ))) - -(define (relay node1 node2) - (spawn #:name (list 'relay node1 node2) - (define node1-connid (string->symbol (format "~a(~a)" node1 node2))) - (define node2-connid (string->symbol (format "~a(~a)" node2 node1))) - (field [pending1 '()] - [pending2 '()]) - - (stop-when (message (terminate (list 'relay node1 node2)))) - - (during (router-connection node1 node1-connid) - (on (message (router-outbound node1-connid $body)) - (pending1 (cons body (pending1))))) - - (during (router-connection node2 node2-connid) - (on (message (router-outbound node2-connid $body)) - (pending2 (cons body (pending2))))) - - (during (router-connection node1 node1-connid) - (during (router-connection node2 node2-connid) - (begin/dataflow - (when (pair? (pending1)) - (for [(body (reverse (pending1)))] - (send! (router-inbound node2-connid body))) - (pending1 '()))) - (begin/dataflow - (when (pair? (pending2)) - (for [(body (reverse (pending2)))] - (send! (router-inbound node1-connid body))) - (pending2 '()))))))) - -(spawn* (define-syntax-rule (pause n action) - (begin (sleep n) - (log-info "\n********** ~v" 'action) - action)) - - (pause 0 (begin - (leaf 'c1 'n1) - (leaf2 'c2 'n1) - (leaf 'c3 'n2) - (leaf 'c4 'n2) - )) - - (pause 0.5 (relay 'n1 'n2)) - (pause 0.25 (leaf2 'c5 'n3)) - (pause 0.25 (relay 'n2 'n3)) - (pause 0.5 'delivery-of-the-says-messages) ;; the newline is important here - - (pause 1 (send! (change-presence 'c1 #f))) - (pause 0.2 (send! (change-presence 'c1 #t))) - - (pause 0.4 (send! (terminate (list 'relay 'n1 'n2)))) - (pause 0.4 (relay 'n1 'n2)) - - (pause 0.2 (send! (terminate 'c1))) - (pause 0.2 (send! (terminate 'c3))) - (pause 0.2 (send! (terminate 'c2))) - (pause 0.2 (send! (terminate 'c4))) - - )