From 322aa5b478c461778c26b687c8c66eaf6dc4607f Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 5 May 2019 15:54:28 +0100 Subject: [PATCH] Move federation protocol a step closer to client/server protocol --- imperative/broker/federation.rkt | 30 +++++++++++++++--------------- imperative/broker/tf.rkt | 20 +++++++++++--------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/imperative/broker/federation.rkt b/imperative/broker/federation.rkt index 6452a19..0f7bebd 100644 --- a/imperative/broker/federation.rkt +++ b/imperative/broker/federation.rkt @@ -1,7 +1,7 @@ #lang imperative-syndicate -(provide (struct-out Subscribe) - (struct-out Unsubscribe) +(provide (struct-out Assert) + (struct-out Clear) (struct-out Add) (struct-out Del) (struct-out Msg) @@ -36,14 +36,14 @@ ;; BIDIRECTIONAL, travelling in both directions along edges linking ;; peer nodes. -;; Actions - like Asserts, but only for `(observe $spec)` assertions. -(message-struct Subscribe (subscription-id spec)) -(message-struct Unsubscribe (subscription-id)) +;; Actions - like the client/server protocol, but lacking Message +(message-struct Assert (endpoint-name assertion)) +(message-struct Clear (endpoint-name)) ;; Events -(message-struct Add (subscription-id captures)) -(message-struct Del (subscription-id captures)) -(message-struct Msg (subscription-id captures)) +(message-struct Add (endpoint-name captures)) +(message-struct Del (endpoint-name captures)) +(message-struct Msg (endpoint-name captures)) ;; Connection protocol (assertion-struct router-connection (node-id connection-id)) @@ -108,9 +108,9 @@ (match (hash-count new-holders) [0 (for [(peer (in-set (peers)))] (when (not (equal? peer connid)) - (send! (router-outbound peer (Unsubscribe localid)))))] + (send! (router-outbound peer (Clear localid)))))] [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid - (send! (router-outbound peer (Unsubscribe localid))))] + (send! (router-outbound peer (Clear localid))))] [_ (void)])))) (define (adjust-matches localid connid captures delta expected-outcome ctor) @@ -142,7 +142,7 @@ nodeid connid (conn-matches)))) (on-start (for ([(spec localid) (in-hash (specs))]) - (send! (router-outbound connid (Subscribe localid spec))))) + (send! (router-outbound connid (Assert localid (observe spec)))))) (on-stop (for ([item (in-bag (conn-matches))]) (match-define (cons localid captures) item) @@ -150,7 +150,7 @@ (for ([localid (in-hash-values (conn-subs))]) (unsubscribe! localid connid))) - (on (message (router-inbound connid (Subscribe $subid $spec))) + (on (message (router-inbound connid (Assert $subid (observe $spec)))) (define known? (hash-has-key? (specs) spec)) (define localid (if known? (hash-ref (specs) spec) (make-localid))) (define sub @@ -181,10 +181,10 @@ [(not known?) (for [(peer (in-set (peers)))] (when (not (equal? peer connid)) - (send! (router-outbound peer (Subscribe localid spec)))))] + (send! (router-outbound peer (Assert localid (observe spec))))))] [(= (hash-count holders) 1) (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ connid - (send! (router-outbound peer (Subscribe localid spec))))] + (send! (router-outbound peer (Assert localid (observe spec)))))] [else (void)]) @@ -195,7 +195,7 @@ ])) - (on (message (router-inbound connid (Unsubscribe $subid))) + (on (message (router-inbound connid (Clear $subid))) (match (hash-ref (conn-subs) subid #f) [#f (log-syndicate/federation-error "Mention of nonexistent subscription ID ~v from connection ~v. Ignoring." diff --git a/imperative/broker/tf.rkt b/imperative/broker/tf.rkt index f2d48d8..ac8f4c5 100644 --- a/imperative/broker/tf.rkt +++ b/imperative/broker/tf.rkt @@ -26,10 +26,12 @@ (during (router-connection node name) (on-start - (send! (router-inbound name (Subscribe (gensym (format "~a-P-" name)) (present C)))) - (send! (router-inbound name (Subscribe (gensym (format "~a-S-" name)) (says C C))))) + (send! + (router-inbound name (Assert (gensym (format "~a-P-" name)) (observe (present C))))) + (send! + (router-inbound name (Assert (gensym (format "~a-S-" name)) (observe (says C C)))))) - (on (message (router-outbound name (Subscribe $x (says C C)))) + (on (message (router-outbound name (Assert $x (observe (says C C))))) (sleep 2) ;; We won't see our own one of these, because routers expect us to have done ;; local delivery ourselves. OHHH I am starting to get some insight into what is @@ -38,10 +40,10 @@ ;; you're in ~leaf mode, you will! (send! (router-inbound name (Msg x (list name "Hello world!"))))) - (on (message (router-outbound name (Subscribe $x (present C)))) + (on (message (router-outbound name (Assert $x (observe (present C))))) (react (field [present? #t]) - (stop-when (message (router-outbound name (Unsubscribe x)))) + (stop-when (message (router-outbound name (Clear x)))) (begin/dataflow ;; We won't see our own one of these either! For the same reasons as ;; explained above. @@ -103,7 +105,7 @@ (assert (from-broker node what))) (during (router-connection node name) - (on (message (router-outbound name (Subscribe $subid $spec))) + (on (message (router-outbound name (Assert $subid (observe $spec)))) (react (let ((! (lambda (ctor) (lambda (cs) (send! (router-inbound name (ctor subid cs))))))) @@ -112,12 +114,12 @@ #:on-remove (! Del) #:on-message (! Msg))) (assert (from-broker node (observe spec))) - (stop-when (message (router-outbound name (Unsubscribe subid)))))) + (stop-when (message (router-outbound name (Clear subid)))))) (during (observe ($ pat (from-broker node $spec))) (define ep (gensym 'ep)) - (on-start (send! (router-inbound name (Subscribe ep spec)))) - (on-stop (send! (router-inbound name (Unsubscribe ep)))) + (on-start (send! (router-inbound name (Assert ep (observe spec))))) + (on-stop (send! (router-inbound name (Clear ep)))) (assert (from-broker node (observe spec))) ;; more self-signalling (on (message (router-outbound name (Add ep $captures))) (react (assert (instantiate-term->value pat captures))