diff --git a/syndicate/broker/federation2.rkt b/syndicate/broker/federation2.rkt index 39d7a7f..26e9743 100644 --- a/syndicate/broker/federation2.rkt +++ b/syndicate/broker/federation2.rkt @@ -80,48 +80,50 @@ (begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" nodeid (specs))) (begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" nodeid (subs)))) - (define-syntax with-localid->sub - (syntax-rules (->) - ((_ (localid connid -> sub) body ...) - (match (hash-ref (subs) localid #f) - [#f (log-syndicate/federation-error - "Mention of nonexistent local ID ~v from connection ~v. Ignoring." - localid - connid)] - [sub body ...])))) + (define (call-with-sub localid connid f) + (match (hash-ref (subs) localid #f) + [#f (log-syndicate/federation-error + "Mention of nonexistent local ID ~v from connection ~v. Ignoring." + localid + connid)] + [sub (f sub)])) (define (unsubscribe! localid connid) - (with-localid->sub [localid connid -> sub] - (define new-holders (hash-remove (subscription-holders sub) connid)) - (specs (hash-remove (specs) (subscription-spec sub))) - (subs (if (hash-empty? new-holders) - (hash-remove (subs) localid) - (hash-set (subs) localid (struct-copy subscription sub - [holders new-holders])))) + (call-with-sub + localid connid + (lambda (sub) + (define new-holders (hash-remove (subscription-holders sub) connid)) + (specs (hash-remove (specs) (subscription-spec sub))) + (subs (if (hash-empty? new-holders) + (hash-remove (subs) localid) + (hash-set (subs) localid (struct-copy subscription sub + [holders new-holders])))) - ;; The messages we send depend on (hash-count new-holders): - ;; - if >1, there are enough other active subscribers that we don't need to send - ;; any messages. - ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid) - ;; - if =0, we retract the subscription from all peers except connid + ;; The messages we send depend on (hash-count new-holders): + ;; - if >1, there are enough other active subscribers that we don't need to send + ;; any messages. + ;; - if =1, we retract the subscription from that peer (INVARIANT: will not be connid) + ;; - if =0, we retract the subscription from all peers except connid - (match (hash-count new-holders) - [0 (for [(peer (in-set (peers)))] - (when (not (equal? peer connid)) - (send! (router-outbound peer (Unsubscribe localid)))))] - [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid - (send! (router-outbound peer (Unsubscribe localid))))] - [_ (void)]))) + (match (hash-count new-holders) + [0 (for [(peer (in-set (peers)))] + (when (not (equal? peer connid)) + (send! (router-outbound peer (Unsubscribe localid)))))] + [1 (for [(peer (in-hash-keys new-holders))] ;; there will only be one, ≠ connid + (send! (router-outbound peer (Unsubscribe localid))))] + [_ (void)])))) (define (adjust-matches localid connid captures delta expected-outcome ctor) - (with-localid->sub [localid connid -> sub] - (define-values (new-matches outcome) - (bag-change (subscription-matches sub) captures delta #:clamp? #t)) - (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) - (when (eq? outcome expected-outcome) - (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) - (when (not (equal? peer connid)) - (send! (router-outbound peer (ctor peer-subid captures)))))))) + (call-with-sub + localid connid + (lambda (sub) + (define-values (new-matches outcome) + (bag-change (subscription-matches sub) captures delta #:clamp? #t)) + (subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches]))) + (when (eq? outcome expected-outcome) + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer connid)) + (send! (router-outbound peer (ctor peer-subid captures))))))))) (during (observe (router-connection nodeid $connid)) (assert (router-connection nodeid connid)) @@ -215,9 +217,11 @@ (relay-add-or-del localid captures -1 'present->absent Del)) (on (message (router-inbound connid (Msg $localid $captures))) - (with-localid->sub [localid connid -> sub] - (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) - (when (not (equal? peer connid)) - (send! (router-outbound peer (Msg peer-subid captures))))))) + (call-with-sub + localid connid + (lambda (sub) + (for ([(peer peer-subid) (in-hash (subscription-holders sub))]) + (when (not (equal? peer connid)) + (send! (router-outbound peer (Msg peer-subid captures)))))))) )))