diff --git a/imperative/distributed/federation.rkt b/imperative/distributed/federation.rkt index 9a43484..c2085ac 100644 --- a/imperative/distributed/federation.rkt +++ b/imperative/distributed/federation.rkt @@ -175,7 +175,7 @@ (struct subscription (id ;; LocalID spec ;; Assertion holders ;; (Hash LinkID SubscriptionID) - matches ;; (Bag (Listof Assertion)) + matches ;; (Hash (Listof Assertion) (Set LinkID)) ) #:transparent) @@ -213,11 +213,11 @@ (log-syndicate/federation-debug " holders:") (for [((link ep) (in-hash holders))] (log-syndicate/federation-debug " link ~a -> ep ~a" link ep))) - (when (not (bag-empty? matches)) + (when (not (hash-empty? matches)) (log-syndicate/federation-debug " matches:") - (for [((captures count) (in-bag/count matches))] - (log-syndicate/federation-debug " captures ~v count ~a" - captures count)))) + (for [((captures holders) (in-hash matches))] + (log-syndicate/federation-debug " captures ~v held by ~a" + captures holders)))) (log-syndicate/federation-debug "-"))) (define (call-with-sub localid linkid f) @@ -228,16 +228,19 @@ linkid)] [sub (f sub)])) + (define (store-sub! sub) + (match-define (subscription localid spec holders matches) sub) + (if (and (hash-empty? holders) (hash-empty? matches)) + (begin (specs (hash-remove (specs) spec)) + (subs (hash-remove (subs) localid))) + (subs (hash-set (subs) localid sub)))) + (define (unsubscribe! localid linkid) (call-with-sub localid linkid (lambda (sub) (define new-holders (hash-remove (subscription-holders sub) linkid)) - (if (hash-empty? new-holders) - (begin (specs (hash-remove (specs) (subscription-spec sub))) - (subs (hash-remove (subs) localid))) - (subs (hash-set (subs) localid (struct-copy subscription sub - [holders new-holders])))) + (store-sub! (struct-copy subscription sub [holders new-holders])) ;; The messages we send depend on (hash-count new-holders): ;; - if >1, there are enough other active subscribers that we don't need to send @@ -253,17 +256,24 @@ (send-to-link! peer (Clear localid)))] [_ (void)])))) - (define (adjust-matches localid linkid captures delta expected-outcome ctor) + (define (remove-match! localid captures linkid) (call-with-sub localid linkid (lambda (sub) - (define-values (new-matches outcome) - (bag-change (subscription-matches sub) captures delta #:clamp? #t)) - (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) - (when (eq? outcome expected-outcome) - (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) - (when (not (equal? peer linkid)) - (send-to-link! peer (ctor peer-subid captures)))))))) + (define old-matches (subscription-matches sub)) + (define old-match-holders (hash-ref old-matches captures set)) + (define new-match-holders (set-remove old-match-holders linkid)) + (define new-matches (hash-set old-matches captures new-match-holders)) + (store-sub! (struct-copy subscription sub [matches new-matches])) + (match (set-count new-match-holders) + [0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))] + (when (not (equal? peer linkid)) + (send-to-link! peer (Del peer-subid captures))))] + [1 (for [(peer (in-set new-match-holders))] ;; only one, ≠ linkid + (define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f)) + (when maybe-peer-subid + (send-to-link! peer (Del maybe-peer-subid captures))))] + [_ (void)])))) (during (server-envelope management-scope (federated-link $linkid scope)) @@ -273,7 +283,7 @@ (peers (set-remove (peers) linkid))) (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) - [link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) + [link-matches (hash)] ;; (Hash LocalID (Set (Listof Assertion))) ) (when (log-level? syndicate/federation-logger 'debug) @@ -282,18 +292,22 @@ (log-syndicate/federation-debug " sub ~a -> local ~a" sub local)) (log-syndicate/federation-debug "-")) (begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid) - (for [((item count) (in-bag/count (link-matches)))] - (match-define (cons local captures) item) - (log-syndicate/federation-debug " local ~a captures ~v count ~a" - local captures count)) + (for [((local matches) (in-hash (link-matches)))] + (for [(captures (in-set matches))] + (log-syndicate/federation-debug " local ~a captures ~v" + local captures))) (log-syndicate/federation-debug "-"))) + (define (err! detail) + (send-to-link! linkid (Err detail)) + (stop-current-facet)) + (on-start (for ([(spec localid) (in-hash (specs))]) (send-to-link! linkid (Assert localid (observe spec))))) - (on-stop (for ([item (in-bag (link-matches))]) - (match-define (cons localid captures) item) - (adjust-matches localid linkid captures -1 'present->absent Del)) + (on-stop (for [((localid matches) (in-hash (link-matches)))] + (for [(captures (in-set matches))] + (remove-match! localid captures linkid))) (for ([localid (in-hash-values (link-subs))]) (unsubscribe! localid linkid))) @@ -339,8 +353,11 @@ ;; Once subscription relaying has taken place, send up matches to the active ;; link. - (for [(captures (in-bag (subscription-matches sub)))] - (send-to-link! linkid (Add subid captures))) + (for [((captures match-holders) (in-hash (subscription-matches sub)))] + ;; Compute the number of times someone OTHER THAN this link has asserted + ;; a match to this spec. If it's nonzero, we need to hear about it: + (when (not (set-empty? (set-remove match-holders linkid))) + (send-to-link! linkid (Add subid captures)))) ])) @@ -355,20 +372,54 @@ (link-subs (hash-remove (link-subs) subid)) (unsubscribe! localid linkid)])) - (define (relay-add-or-del localid captures delta expected-outcome ctor) - (define-values (new-link-matches link-outcome) - (bag-change (link-matches) (cons localid captures) delta #:clamp? #t)) - (link-matches new-link-matches) - (when (eq? link-outcome expected-outcome) - (adjust-matches localid linkid captures delta expected-outcome ctor))) + ;; OPTIMISATION. Logically, the remote peer receiving a `Clear` should retract any + ;; matching `Add`s that it had previously sent. However we don't want to incur the + ;; cost of the calculation and the network traffic. Happily, we have all the + ;; information we need locally! So instead we make use of the multicast (!) + ;; facility Syndicate offers and snoop on outbound messages. + (on (message (server-envelope management-scope + (message-server->poa linkid (Clear $localid)))) + (for [(captures (in-set (hash-ref (link-matches) localid set)))] + (log-syndicate/federation-debug "Retracting on clear ~v ~v ~v" + linkid localid captures) + (remove-match! localid captures linkid)) + (link-matches (hash-remove (link-matches) localid))) (on (message (server-envelope management-scope (message-poa->server linkid (Add $localid $captures)))) - (relay-add-or-del localid captures +1 'absent->present Add)) + (define matches (hash-ref (link-matches) localid set)) + (if (set-member? matches captures) + (err! 'duplicate-capture) + (begin + (link-matches (hash-set (link-matches) localid (set-add matches captures))) + (call-with-sub + localid linkid + (lambda (sub) + (define old-matches (subscription-matches sub)) + (define old-match-holders (hash-ref old-matches captures set)) + (define new-match-holders (set-add old-match-holders linkid)) + (define new-matches (hash-set old-matches captures new-match-holders)) + (store-sub! (struct-copy subscription sub [matches new-matches])) + (match (set-count old-match-holders) + [0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))] + (when (not (equal? peer linkid)) + (send-to-link! peer (Add peer-subid captures))))] + [1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid + (define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f)) + (when maybe-peer-subid ;; the other holder may not itself subscribe! + (send-to-link! peer (Add maybe-peer-subid captures))))] + [_ (void)])))))) (on (message (server-envelope management-scope (message-poa->server linkid (Del $localid $captures)))) - (relay-add-or-del localid captures -1 'present->absent Del)) + (define matches (hash-ref (link-matches) localid set)) + (if (not (set-member? matches captures)) + (err! 'nonexistent-capture) + (let ((new-matches (set-remove matches captures))) + (link-matches (if (set-empty? new-matches) + (hash-remove (link-matches) localid) + (hash-set (link-matches) localid new-matches))) + (remove-match! localid captures linkid)))) (on (message (server-envelope management-scope (message-poa->server linkid (Msg $localid $captures))))