diff --git a/syndicate/distributed/buffer.rkt b/syndicate/distributed/buffer.rkt new file mode 100644 index 0000000..9b7fb67 --- /dev/null +++ b/syndicate/distributed/buffer.rkt @@ -0,0 +1,14 @@ +#lang imperative-syndicate + +(provide make-buffer) + +(define (make-buffer) + (field [pending '()]) + (define (push item) + (pending (cons item (pending)))) + (define (drain handler) + (begin/dataflow + (when (pair? (pending)) + (for [(item (in-list (reverse (pending))))] (handler item)) + (pending '())))) + (values push drain)) diff --git a/syndicate/distributed/client.rkt b/syndicate/distributed/client.rkt index a6ca427..23cf308 100644 --- a/syndicate/distributed/client.rkt +++ b/syndicate/distributed/client.rkt @@ -5,6 +5,7 @@ (require "wire-protocol.rkt") (require "internal-protocol.rkt") (require "protocol.rkt") +(require "turn.rkt") (require imperative-syndicate/term) (define-logger syndicate/distributed) @@ -14,6 +15,8 @@ (during (observe (from-server $a _)) (assert (server-connection a))) (during (observe (server-connected $a)) (assert (server-connection a)))) +(struct sub (spec [captures #:mutable]) #:transparent) + (define (generic-client-session-facet address scope w) (on-start (log-syndicate/distributed-info "Connected to ~v" address)) (on-stop (log-syndicate/distributed-info "Disconnected from ~v" address)) @@ -25,7 +28,7 @@ (log-syndicate/distributed-debug "C OUT ~v ~v" address p) (w* p))))) - (on-start (w (Connect scope))) + (define turn (turn-recorder (lambda (items) (w (Turn items))))) (define next-ep (let ((counter 0)) @@ -33,13 +36,38 @@ (begin0 counter (set! counter (+ counter 1)))))) - (during (to-server address $a) - (define ep (next-ep)) - (on-start (w (Assert ep a))) - (on-stop (w (Clear ep)))) + (define pubs (hash)) + (define subs (hash)) + (define matches (hash)) + + (on-start (w (Connect scope))) + (on-stop (for* [(s (in-hash-values matches)) (a (in-hash-values (sub-captures s)))] (retract! a))) + + (define (instantiate s vs) + (instantiate-term->value (from-server address (sub-spec s)) vs)) + + (on (asserted (to-server address $a)) + (define ep (next-ep)) + (extend-turn! turn (Assert ep a)) + (set! pubs (hash-set pubs a ep))) + + (on (retracted (to-server address $a)) + (define ep (hash-ref pubs a)) + (extend-turn! turn (Clear ep)) + (set! pubs (hash-remove pubs a))) (on (message (to-server address $a)) - (w (Message a))) + (extend-turn! turn (Message a))) + + (on (asserted (observe (from-server address $spec))) + (define ep (next-ep)) + (extend-turn! turn (Assert ep (observe spec))) + (set! subs (hash-set subs spec ep)) + (set! matches (hash-set matches ep (sub spec (hash))))) + + (on (retracted (observe (from-server address $spec))) + (extend-turn! turn (Clear (hash-ref subs spec))) + (set! subs (hash-remove subs spec))) (on (message (server-packet address (Ping))) (w (Pong))) @@ -48,12 +76,20 @@ (log-syndicate/distributed-error "Error from ~a: ~v" address detail) (stop-current-facet)) - (during (observe ($ pat (from-server address $spec))) - (define ep (next-ep)) - (on-start (w (Assert ep (observe spec)))) - (on-stop (w (Clear ep))) - (on (message (server-packet address (Add ep $vs))) - (react (assert (instantiate-term->value pat vs)) - (stop-when (message (server-packet address (Del ep vs)))))) - (on (message (server-packet address (Msg ep $vs))) - (send! (instantiate-term->value pat vs))))) + (on (message (server-packet address (Turn $items))) + (for [(item (in-list items))] + (match item + [(Add ep vs) (let* ((s (hash-ref matches ep)) + (a (instantiate s vs))) + (set-sub-captures! s (hash-set (sub-captures s) vs a)) + (assert! a))] + [(Del ep vs) (let* ((s (hash-ref matches ep)) + (a (hash-ref (sub-captures s) vs))) + (retract! a) + (set-sub-captures! s (hash-remove (sub-captures s) vs)))] + [(Msg ep vs) (let* ((s (hash-ref matches ep))) + (send! (instantiate s vs)))] + [(End ep) (let* ((s (hash-ref matches ep #f))) + (when s + (for [(a (in-hash-values (sub-captures s)))] (retract! a)) + (set! matches (hash-remove matches ep))))])))) diff --git a/syndicate/distributed/federation.rkt b/syndicate/distributed/federation.rkt index 3fc60da..8b7d015 100644 --- a/syndicate/distributed/federation.rkt +++ b/syndicate/distributed/federation.rkt @@ -4,6 +4,8 @@ (require "wire-protocol.rkt") (require "internal-protocol.rkt") (require "protocol.rkt") +(require "buffer.rkt") +(require "turn.rkt") (require imperative-syndicate/term) (require imperative-syndicate/reassert) @@ -100,33 +102,27 @@ ;; We have to buffer in both directions, because at startup there's latency ;; between asserting a federated-link record and it being ready to receive ;; message-poa->server records. - (field [pending-in '()] - [pending-out '()]) + (define-values (push-in drain-in) (make-buffer)) + (define-values (push-out drain-out) (make-buffer)) (on (message (from-server peer-addr (message-server->poa session-id $p))) - (pending-in (cons p (pending-in)))) - + (push-in p)) (on (message (server-envelope management-scope (message-server->poa session-id $p))) - (pending-out (cons p (pending-out)))) + (push-out p)) + (define (wrap p) (message-poa->server session-id p)) (during (server-envelope management-scope (observe (message-poa->server session-id _))) (during (from-server peer-addr (observe (message-poa->server session-id _))) - (begin/dataflow - (when (pair? (pending-in)) - (for [(p (reverse (pending-in)))] - (send! (server-proposal management-scope - (message-poa->server session-id p)))) - (pending-in '()))) - (begin/dataflow - (when (pair? (pending-out)) - (for [(p (reverse (pending-out)))] - (send! (to-server peer-addr (message-poa->server session-id p)))) - (pending-out '()))))))))) + (drain-in (lambda (p) (send! (server-proposal management-scope (wrap p))))) + (drain-out (lambda (p) (send! (to-server peer-addr (wrap p))))))))))) ;;--------------------------------------------------------------------------- ;; Local links. (spawn #:name 'federated-local-link-factory + + (struct sub (spec [captures #:mutable]) #:transparent) + (during (federation-management-scope $management-scope) (during (server-envelope management-scope (federated-link _ $scope)) (during/spawn (server-active scope) @@ -138,93 +134,121 @@ (define (!! m) (send! (server-proposal management-scope (message-poa->server session-id m)))) - (on (message (server-envelope management-scope - (message-server->poa session-id - (Assert $subid (observe $spec))))) - (react - (define ((! ctor) cs) (!! (ctor subid cs))) - (add-observer-endpoint! (lambda () (server-proposal scope spec)) - #:on-add (! Add) - #:on-remove (! Del) - #:on-message (! Msg)) - (assert (server-envelope scope (observe spec))) - (stop-when (message - (server-envelope management-scope - (message-server->poa session-id (Clear subid))))))) + (define turn (turn-recorder (lambda (items) (!! (Turn items))))) - (during (observe ($ pat (server-envelope scope $spec))) - (define ep (gensym 'ep)) - (on-start (!! (Assert ep (observe spec)))) - (on-stop (!! (Clear ep))) - (on (message (server-envelope management-scope - (message-server->poa session-id (Add ep $captures)))) - (react (assert (instantiate-term->value pat captures)) - (stop-when (message - (server-envelope management-scope - (message-server->poa session-id - (Del ep captures))))))) - (on (message (server-envelope management-scope - (message-server->poa session-id (Msg ep $captures)))) - (send! (instantiate-term->value pat captures)))))))) + (define remote-endpoints (hash)) + (define local-endpoints (hash)) + (define local-matches (hash)) + + (define (instantiate s vs) + (instantiate-term->value (server-envelope scope (sub-spec s)) vs)) + + (on (asserted (observe (server-envelope scope $spec))) + (define ep (gensym 'ep)) + (extend-turn! turn (Assert ep (observe spec))) + (set! local-endpoints (hash-set local-endpoints spec ep)) + (set! local-matches (hash-set local-matches ep (sub spec (hash))))) + + (on (retracted (observe (server-envelope scope $spec))) + (define ep (hash-ref local-endpoints spec)) + (extend-turn! turn (Clear ep)) + (set! local-endpoints (hash-remove local-endpoints spec))) + + (on (message (server-envelope management-scope + (message-server->poa session-id (Turn $items)))) + (for [(item (in-list items))] + (match item + [(Assert subid (observe spec)) + (when (hash-has-key? remote-endpoints subid) + (error 'local-link "Duplicate endpoint" subid)) + (react + (define ep-facet (current-facet)) + (set! remote-endpoints (hash-set remote-endpoints subid ep-facet)) + (on-stop (set! remote-endpoints (hash-remove remote-endpoints subid))) + (assert (server-envelope scope (observe spec))) + (define ((! ctor) cs) (extend-turn! turn (ctor subid cs))) + (add-observer-endpoint! (lambda () (server-proposal scope spec)) + #:on-add (! Add) + #:on-remove (! Del) + #:on-message (! Msg)))] + [(Clear subid) + (stop-facet (hash-ref remote-endpoints subid) + (extend-turn! turn (End subid)))] + [(Add ep vs) (let* ((s (hash-ref local-matches ep)) + (a (instantiate s vs))) + (set-sub-captures! s (hash-set (sub-captures s) vs a)) + (assert! a))] + [(Del ep vs) (let* ((s (hash-ref local-matches ep)) + (a (hash-ref (sub-captures s) vs))) + (retract! a) + (set-sub-captures! s (hash-remove (sub-captures s) vs)))] + [(Msg ep vs) (let* ((s (hash-ref local-matches ep))) + (send! (instantiate s vs)))] + [(End ep) (let* ((s (hash-ref local-matches ep #f))) + (when s + (for [(a (in-hash-values (sub-captures s)))] (retract! a)) + (set! local-matches (hash-remove local-matches ep))))]))))))) ;;--------------------------------------------------------------------------- ;; Federated scopes. -;; Internal state -(struct subscription (id ;; LocalID - spec ;; Assertion - holders ;; (Hash LinkID SubscriptionID) - matches ;; (Hash (Listof Assertion) (Set LinkID)) - ) - #:transparent) - (spawn #:name 'federated-scope-factory + + (struct subscription (id ;; LocalID + spec ;; Assertion + holders ;; (Hash LinkID SubscriptionID) + matches ;; (Hash (Listof Assertion) (Set LinkID)) + ) + #:transparent) + (during (federation-management-scope $management-scope) - - (define (send-to-link! linkid m) - (send! (server-proposal management-scope (message-server->poa linkid m)))) - (during/spawn (server-envelope management-scope (federated-link _ $scope)) #:name (list 'federated-scope management-scope scope) ;; Generates a fresh local ID naming a subscription propagated to our peers. (define make-localid (let ((next 0)) (lambda () (begin0 next (set! next (+ next 1)))))) - (field [peers (set)] ;; (Set LinkID) + (field [turns (hash)] ;; (Map LinkID Turn) [specs (hash)] ;; (Hash Spec LocalID) [subs (hasheq)] ;; (Hash LocalID Subscription) ) - (when (log-level? syndicate/federation-logger 'debug) - (begin/dataflow (log-syndicate/federation-debug "~a peers:" scope) - (for [(peer (in-set (peers)))] - (log-syndicate/federation-debug " link ~v" peer)) - (log-syndicate/federation-debug "-")) - (begin/dataflow (log-syndicate/federation-debug "~a specs:" scope) - (for [((spec local) (in-hash (specs)))] - (log-syndicate/federation-debug " spec ~v -> local ~a" spec local)) - (log-syndicate/federation-debug "-")) - (begin/dataflow (log-syndicate/federation-debug "~a subs:" scope) - (for [((local sub) (in-hash (subs)))] - (match-define (subscription _id spec holders matches) sub) - (log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec) - (when (not (hash-empty? holders)) - (log-syndicate/federation-debug " holders:") - (for [((link ep) (in-hash holders))] - (log-syndicate/federation-debug " link ~a -> ep ~a" link ep))) - (when (not (hash-empty? matches)) - (log-syndicate/federation-debug " matches:") - (for [((captures holders) (in-hash matches))] - (log-syndicate/federation-debug " captures ~v held by ~a" - captures holders)))) - (log-syndicate/federation-debug "-"))) + (define (send-to-link! peer p) + (extend-turn! (hash-ref (turns) peer) p)) - (define (call-with-sub localid linkid f) + (when (log-level? syndicate/federation-logger 'debug) + (begin/dataflow + (log-syndicate/federation-debug "~a turns:" scope) + (for [((peer turn) (in-hash (turns)))] + (log-syndicate/federation-debug " link ~v -> ~v" peer (turn 'debug))) + (log-syndicate/federation-debug "-")) + (begin/dataflow + (log-syndicate/federation-debug "~a specs:" scope) + (for [((spec local) (in-hash (specs)))] + (log-syndicate/federation-debug " spec ~v -> local ~a" spec local)) + (log-syndicate/federation-debug "-")) + (begin/dataflow + (log-syndicate/federation-debug "~a subs:" scope) + (for [((local sub) (in-hash (subs)))] + (match-define (subscription _id spec holders matches) sub) + (log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec) + (when (not (hash-empty? holders)) + (log-syndicate/federation-debug " holders:") + (for [((link ep) (in-hash holders))] + (log-syndicate/federation-debug " link ~a -> ep ~a" link ep))) + (when (not (hash-empty? matches)) + (log-syndicate/federation-debug " matches:") + (for [((captures holders) (in-hash matches))] + (log-syndicate/federation-debug " captures ~v held by ~a" + captures holders)))) + (log-syndicate/federation-debug "-"))) + + (define (call-with-sub localid linkid f #:not-found-ok? [not-found-ok? #t]) (match (hash-ref (subs) localid #f) - [#f (log-syndicate/federation-error - "Mention of nonexistent local ID ~v from link ~v. Ignoring." - localid - linkid)] + [#f (when (not not-found-ok?) + (log-syndicate/federation-error + "Mention of nonexistent local ID ~v from link ~v. Ignoring." + localid linkid))] [sub (f sub)])) (define (store-sub! sub) @@ -236,6 +260,7 @@ (define (unsubscribe! localid linkid) (call-with-sub + #:not-found-ok? #f localid linkid (lambda (sub) (define new-holders (hash-remove (subscription-holders sub) linkid)) @@ -248,9 +273,9 @@ ;; - if =0, we retract the subscription from all peers except linkid (match (hash-count new-holders) - [0 (for [(peer (in-set (peers)))] + [0 (for [((peer turn) (in-hash (turns)))] (when (not (equal? peer linkid)) - (send-to-link! peer (Clear localid))))] + (extend-turn! turn (Clear localid))))] [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ linkid (send-to-link! peer (Clear localid)))] [_ (void)])))) @@ -278,15 +303,35 @@ (during (server-envelope management-scope (federated-link $linkid scope)) - (on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid) - (peers (set-add (peers) linkid))) - (on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid) - (peers (set-remove (peers) linkid))) + (define turn (turn-recorder + (lambda (items) + (send! (server-proposal management-scope + (message-server->poa linkid (Turn items))))))) (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) [link-matches (hash)] ;; (Hash LocalID (Set (Listof Assertion))) ) + (define (err! detail) + (send! (server-proposal management-scope (message-server->poa linkid (Err detail)))) + (reset-turn! turn) + (stop-current-facet)) + + (on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid) + (turns (hash-set (turns) linkid turn)) + (for ([(spec localid) (in-hash (specs))]) + (extend-turn! turn (Assert localid (observe spec)))) + (commit-turn! turn)) + + (on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid) + (turns (hash-remove (turns) linkid)) + (for [((localid matches) (in-hash (link-matches)))] + (for [(captures (in-set matches))] + (remove-match! localid captures linkid))) + (for ([localid (in-hash-values (link-subs))]) + (unsubscribe! localid linkid)) + (commit-turn! turn)) + (when (log-level? syndicate/federation-logger 'debug) (begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid) (for [((sub local) (in-hash (link-subs)))] @@ -299,136 +344,111 @@ local captures))) (log-syndicate/federation-debug "-"))) - (define (err! detail) - (send-to-link! linkid (Err detail)) - (stop-current-facet)) - - (on-start (for ([(spec localid) (in-hash (specs))]) - (send-to-link! linkid (Assert localid (observe spec))))) - - (on-stop (for [((localid matches) (in-hash (link-matches)))] - (for [(captures (in-set matches))] - (remove-match! localid captures linkid))) - (for ([localid (in-hash-values (link-subs))]) - (unsubscribe! localid linkid))) - (on (message (server-envelope management-scope - (message-poa->server linkid - (Assert $subid (observe $spec))))) - (define known? (hash-has-key? (specs) spec)) - (define localid (if known? (hash-ref (specs) spec) (make-localid))) - (define sub - (hash-ref (subs) localid (lambda () (subscription localid spec (hash) (hash))))) - (define holders (subscription-holders sub)) - (cond - [(hash-has-key? holders linkid) - (log-syndicate/federation-error - "Duplicate subscription ~a, ID ~a, from link ~a. Ignoring." - spec - subid - linkid)] - [else - (link-subs (hash-set (link-subs) subid localid)) - (when (not known?) (specs (hash-set (specs) spec localid))) - (subs (hash-set (subs) localid (struct-copy subscription sub - [holders (hash-set holders linkid subid)]))) + (message-poa->server linkid (Turn $items)))) + (for [(item (in-list items))] + (match item + [(Assert subid (observe spec)) + (define known? (hash-has-key? (specs) spec)) + (define localid (if known? (hash-ref (specs) spec) (make-localid))) + (define sub (hash-ref (subs) localid (lambda () (subscription localid + spec + (hash) + (hash))))) + (define holders (subscription-holders sub)) + (cond + [(hash-has-key? holders linkid) + (log-syndicate/federation-error + "Duplicate subscription ~a, ID ~a, from link ~a." + spec subid linkid) + (err! 'duplicate-subscription)] + [else + (link-subs (hash-set (link-subs) subid localid)) + (when (not known?) (specs (hash-set (specs) spec localid))) + (subs (hash-set (subs) + localid + (struct-copy subscription sub + [holders (hash-set holders linkid subid)]))) - ;; If not known, then relay the subscription to all peers except `linkid`. - ;; - ;; If known, then one or more links that aren't this one have previously - ;; subscribed with this spec. If exactly one other link has previously - ;; subscribed, the only subscription that needs sent is to that peer; - ;; otherwise, no subscriptions at all need sent, since everyone has already - ;; been informed of this subscription. + ;; If not known, then relay the subscription to all peers except `linkid`. + ;; + ;; If known, then one or more links that aren't this one have previously + ;; subscribed with this spec. If exactly one other link has previously + ;; subscribed, the only subscription that needs sent is to that peer; + ;; otherwise, no subscriptions at all need sent, since everyone has already + ;; been informed of this subscription. - (cond - [(not known?) - (for [(peer (in-set (peers)))] - (when (not (equal? peer linkid)) - (send-to-link! peer (Assert localid (observe spec)))))] - [(= (hash-count holders) 1) - (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid - (send-to-link! peer (Assert localid (observe spec))))] - [else - (void)]) + (cond + [(not known?) + (for [((peer peer-turn) (in-hash (turns)))] + (when (not (equal? peer linkid)) + (extend-turn! peer-turn (Assert localid (observe spec)))))] + [(= (hash-count holders) 1) + (for [(peer (in-hash-keys holders))] ;; there will only be one, ≠ linkid + (send-to-link! peer (Assert localid (observe spec))))] + [else + (void)]) - ;; Once subscription relaying has taken place, send up matches to the active - ;; link. - (for [((captures match-holders) (in-hash (subscription-matches sub)))] - ;; Compute the number of times someone OTHER THAN this link has asserted - ;; a match to this spec. If it's nonzero, we need to hear about it: - (when (not (set-empty? (set-remove match-holders linkid))) - (send-to-link! linkid (Add subid captures)))) + ;; Once subscription relaying has taken place, send up matches to the active + ;; link. + (for [((captures match-holders) (in-hash (subscription-matches sub)))] + ;; Compute the number of times someone OTHER THAN this link has asserted + ;; a match to this spec. If it's nonzero, we need to hear about it: + (when (not (set-empty? (set-remove match-holders linkid))) + (extend-turn! turn (Add subid captures)))) - ])) - - (on (message (server-envelope management-scope - (message-poa->server linkid (Clear $subid)))) - (match (hash-ref (link-subs) subid #f) - [#f (log-syndicate/federation-error - "Mention of nonexistent subscription ID ~v from link ~v. Ignoring." - subid - linkid)] - [localid - (link-subs (hash-remove (link-subs) subid)) - (unsubscribe! localid linkid)])) - - ;; OPTIMISATION. Logically, the remote peer receiving a `Clear` should retract any - ;; matching `Add`s that it had previously sent. However we don't want to incur the - ;; cost of the calculation and the network traffic. Happily, we have all the - ;; information we need locally! So instead we make use of the multicast (!) - ;; facility Syndicate offers and snoop on outbound messages. - (on (message (server-envelope management-scope - (message-server->poa linkid (Clear $localid)))) - (for [(captures (in-set (hash-ref (link-matches) localid set)))] - (log-syndicate/federation-debug "Retracting on clear ~v ~v ~v" - linkid localid captures) - (remove-match! localid captures linkid)) - (link-matches (hash-remove (link-matches) localid))) - - (on (message (server-envelope management-scope - (message-poa->server linkid (Add $localid $captures)))) - (define matches (hash-ref (link-matches) localid set)) - (if (set-member? matches captures) - (err! 'duplicate-capture) - (begin - (link-matches (hash-set (link-matches) localid (set-add matches captures))) - (call-with-sub - localid linkid - (lambda (sub) - (define old-matches (subscription-matches sub)) - (define old-match-holders (hash-ref old-matches captures set)) - (define new-match-holders (set-add old-match-holders linkid)) - (define new-matches (hash-set old-matches captures new-match-holders)) - (store-sub! (struct-copy subscription sub [matches new-matches])) - (match (set-count old-match-holders) - [0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))] - (when (not (equal? peer linkid)) - (send-to-link! peer (Add peer-subid captures))))] - [1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid - (define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f)) - (when maybe-peer-subid ;; the other holder may not itself subscribe! - (send-to-link! peer (Add maybe-peer-subid captures))))] - [_ (void)])))))) - - (on (message (server-envelope management-scope - (message-poa->server linkid (Del $localid $captures)))) - (define matches (hash-ref (link-matches) localid set)) - (if (not (set-member? matches captures)) - (err! 'nonexistent-capture) - (let ((new-matches (set-remove matches captures))) - (link-matches (if (set-empty? new-matches) - (hash-remove (link-matches) localid) - (hash-set (link-matches) localid new-matches))) - (remove-match! localid captures linkid)))) - - (on (message (server-envelope management-scope - (message-poa->server linkid (Msg $localid $captures)))) - (call-with-sub - localid linkid - (lambda (sub) - (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) - (when (not (equal? peer linkid)) - (send-to-link! peer (Msg peer-subid captures))))))) - - )))) + ])] + [(Clear subid) + (match (hash-ref (link-subs) subid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent subscription ID ~v from link ~v." + subid linkid) + (err! 'nonexistent-subscription)] + [localid + (link-subs (hash-remove (link-subs) subid)) + (unsubscribe! localid linkid)])] + [(End localid) + (for [(captures (in-set (hash-ref (link-matches) localid set)))] + (remove-match! localid captures linkid)) + (link-matches (hash-remove (link-matches) localid))] + [(Add localid captures) + (define matches (hash-ref (link-matches) localid set)) + (cond + [(set-member? matches captures) + (err! 'duplicate-capture)] + [else + (link-matches (hash-set (link-matches) localid (set-add matches captures))) + (call-with-sub + localid linkid + (lambda (sub) + (define old-matches (subscription-matches sub)) + (define old-match-holders (hash-ref old-matches captures set)) + (define new-match-holders (set-add old-match-holders linkid)) + (define new-matches (hash-set old-matches captures new-match-holders)) + (store-sub! (struct-copy subscription sub [matches new-matches])) + (match (set-count old-match-holders) + [0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))] + (when (not (equal? peer linkid)) + (send-to-link! peer (Add peer-subid captures))))] + [1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid + (define peer-subid (hash-ref (subscription-holders sub) peer #f)) + (when peer-subid ;; the other holder may not itself subscribe! + (send-to-link! peer (Add peer-subid captures))))] + [_ (void)])))])] + [(Del localid captures) + (define matches (hash-ref (link-matches) localid set)) + (if (not (set-member? matches captures)) + (err! 'nonexistent-capture) + (let ((new-matches (set-remove matches captures))) + (link-matches (if (set-empty? new-matches) + (hash-remove (link-matches) localid) + (hash-set (link-matches) localid new-matches))) + (remove-match! localid captures linkid)))] + [(Msg localid captures) + (call-with-sub + localid linkid + (lambda (sub) + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer linkid)) + (send-to-link! peer (Msg peer-subid captures))))))] + ))))))) diff --git a/syndicate/distributed/server.rkt b/syndicate/distributed/server.rkt index b5e3d81..56e6d59 100644 --- a/syndicate/distributed/server.rkt +++ b/syndicate/distributed/server.rkt @@ -2,7 +2,7 @@ (require "wire-protocol.rkt") (require "internal-protocol.rkt") -(require racket/set) +(require "turn.rkt") (spawn #:name 'server-factory @@ -11,59 +11,67 @@ ;; decided is (locally) suggested, it is true that everything ;; suggested is decided (in this implementation at least), ;; and the following clauses reflect this: - (during (server-proposal $scope $assertion) - (assert (server-envelope scope assertion))) + (on (asserted (server-proposal $scope $assertion)) + (assert! (server-envelope scope assertion))) + (on (retracted (server-proposal $scope $assertion)) + (retract! (server-envelope scope assertion))) (on (message (server-proposal $scope $body)) (send! (server-envelope scope body))) - (during (observe (server-envelope $scope $spec)) - (assert (server-proposal scope (observe spec)))) + (on (asserted (observe (server-envelope $scope $spec))) + (assert! (server-proposal scope (observe spec)))) + (on (retracted (observe (server-envelope $scope $spec))) + (retract! (server-proposal scope (observe spec)))) (during/spawn (server-poa $id) + (define root-facet (current-facet)) (on-start (match (let-event [(message (message-poa->server id $p))] p) - [(Connect scope) (react (connected id scope))] - [_ (send-error! id 'connection-not-setup)])))) + [(Connect scope) (react (connected id scope root-facet))] + [_ (send! (message-server->poa id (Err 'connection-not-setup)))])))) -(define (send-error! id detail) - (send! (message-server->poa id (Err detail)))) +(define (connected id scope root-facet) + (define endpoints (hash)) + + (define turn (turn-recorder (lambda (items) (send! (message-server->poa id (Turn items)))))) -(define (connected id scope) - (define endpoints (set)) (assert (server-active scope)) + + (define (send-error! detail) + (send! (message-server->poa id (Err detail))) + (reset-turn! turn) + (stop-facet root-facet)) + (on (message (message-poa->server id $p)) (match p - [(Assert ep a) #:when (not (set-member? endpoints ep)) - (set! endpoints (set-add endpoints ep)) - (react - (on-stop (set! endpoints (set-remove endpoints ep))) + [(Turn items) + (for [(item (in-list items))] + (match item + [(Assert ep a) + (if (hash-has-key? endpoints ep) + (send-error! 'duplicate-endpoint) + (react + (define ep-facet (current-facet)) + (set! endpoints (hash-set endpoints ep ep-facet)) + (on-stop (set! endpoints (hash-remove endpoints ep))) - (field [assertion a]) + (assert (server-proposal scope a)) - (assert (server-proposal scope (assertion))) - - (let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs))))))) - (add-observer-endpoint! (lambda () - (let ((a (assertion))) - (when (observe? a) - (server-envelope scope (observe-specification a))))) - #:on-add (! Add) - #:on-remove (! Del) - #:on-message (! Msg))) - - (on (message (message-poa->server id (Assert ep $new-a))) - (assertion new-a)) - - (stop-when (message (message-poa->server id (Clear ep)))))] - [(Clear ep) #:when (set-member? endpoints ep) - (void)] ;; handled by stop-when clause in facet established by Assert handler - [(Message body) - (send! (server-proposal scope body))] - [other - (unhandled-message id other)]))) - -(define (unhandled-message id p) - (match p - [(Connect _) (send-error! id 'duplicate-connection-setup)] - [(Ping) (send! (message-server->poa id (Pong)))] - [(Pong) (void)] - [_ (send-error! id 'invalid-message)])) + (when (observe? a) + (define ((! ctor) cs) (extend-turn! turn (ctor ep cs))) + (add-observer-endpoint! + (lambda () (server-envelope scope (observe-specification a))) + #:on-add (! Add) + #:on-remove (! Del) + #:on-message (! Msg)))))] + [(Clear ep) + (match (hash-ref endpoints ep #f) + [#f (send-error! 'unknown-endpoint)] + [ep-facet (stop-facet ep-facet (extend-turn! turn (End ep)))])] + [(Message body) + (send! (server-proposal scope body))]))] + [(Ping) + (send! (message-server->poa id (Pong)))] + [(Pong) + (void)] + [_ + (send-error! 'invalid-message)]))) diff --git a/syndicate/distributed/turn.rkt b/syndicate/distributed/turn.rkt new file mode 100644 index 0000000..3b9bfec --- /dev/null +++ b/syndicate/distributed/turn.rkt @@ -0,0 +1,34 @@ +#lang imperative-syndicate + +(provide turn-recorder + extend-turn! + commit-turn! + reset-turn!) + +(require (submod "../dataspace.rkt" priorities)) + +(define (extend-turn! t item) (t 'extend item)) +(define (commit-turn! t) (t 'commit)) +(define (reset-turn! t) (t 'reset)) + +(define (turn-recorder on-commit) + (field [commit-needed #f]) + (define items '()) + (define t + (match-lambda* + [(list 'extend item) + (set! items (cons item items)) + (commit-needed #t)] + [(list 'commit) + (when (commit-needed) + (on-commit (reverse items)) + (reset-turn! t))] + [(list 'reset) + (set! items '()) + (commit-needed #f)] + [(list 'debug) + (reverse items)])) + (begin/dataflow + #:priority *idle-priority* + (commit-turn! t)) + t) diff --git a/syndicate/distributed/wire-protocol.rkt b/syndicate/distributed/wire-protocol.rkt index e33d707..4d32adb 100644 --- a/syndicate/distributed/wire-protocol.rkt +++ b/syndicate/distributed/wire-protocol.rkt @@ -9,6 +9,9 @@ ;; Enrolment (message-struct Connect (scope)) ;; Client --> Server +;; Transactions +(message-struct Turn (items)) ;; Bidirectional + ;; Actions; Client --> Server (and Peer --> Peer, except for Message) (message-struct Assert (endpoint-name assertion)) (message-struct Clear (endpoint-name)) @@ -18,6 +21,7 @@ (message-struct Add (endpoint-name captures)) (message-struct Del (endpoint-name captures)) (message-struct Msg (endpoint-name captures)) +(message-struct End (endpoint-name)) (message-struct Err (detail)) ;; Transport-related; Bidirectional