Update federation to match latest paper-journal work: proper propagation of Add/Del
This commit is contained in:
parent
104b87cd56
commit
b5bae7f8f6
|
@ -175,7 +175,7 @@
|
|||
(struct subscription (id ;; LocalID
|
||||
spec ;; Assertion
|
||||
holders ;; (Hash LinkID SubscriptionID)
|
||||
matches ;; (Bag (Listof Assertion))
|
||||
matches ;; (Hash (Listof Assertion) (Set LinkID))
|
||||
)
|
||||
#:transparent)
|
||||
|
||||
|
@ -213,11 +213,11 @@
|
|||
(log-syndicate/federation-debug " holders:")
|
||||
(for [((link ep) (in-hash holders))]
|
||||
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
|
||||
(when (not (bag-empty? matches))
|
||||
(when (not (hash-empty? matches))
|
||||
(log-syndicate/federation-debug " matches:")
|
||||
(for [((captures count) (in-bag/count matches))]
|
||||
(log-syndicate/federation-debug " captures ~v count ~a"
|
||||
captures count))))
|
||||
(for [((captures holders) (in-hash matches))]
|
||||
(log-syndicate/federation-debug " captures ~v held by ~a"
|
||||
captures holders))))
|
||||
(log-syndicate/federation-debug "-")))
|
||||
|
||||
(define (call-with-sub localid linkid f)
|
||||
|
@ -228,16 +228,19 @@
|
|||
linkid)]
|
||||
[sub (f sub)]))
|
||||
|
||||
(define (store-sub! sub)
|
||||
(match-define (subscription localid spec holders matches) sub)
|
||||
(if (and (hash-empty? holders) (hash-empty? matches))
|
||||
(begin (specs (hash-remove (specs) spec))
|
||||
(subs (hash-remove (subs) localid)))
|
||||
(subs (hash-set (subs) localid sub))))
|
||||
|
||||
(define (unsubscribe! localid linkid)
|
||||
(call-with-sub
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(define new-holders (hash-remove (subscription-holders sub) linkid))
|
||||
(if (hash-empty? new-holders)
|
||||
(begin (specs (hash-remove (specs) (subscription-spec sub)))
|
||||
(subs (hash-remove (subs) localid)))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub
|
||||
[holders new-holders]))))
|
||||
(store-sub! (struct-copy subscription sub [holders new-holders]))
|
||||
|
||||
;; The messages we send depend on (hash-count new-holders):
|
||||
;; - if >1, there are enough other active subscribers that we don't need to send
|
||||
|
@ -253,17 +256,24 @@
|
|||
(send-to-link! peer (Clear localid)))]
|
||||
[_ (void)]))))
|
||||
|
||||
(define (adjust-matches localid linkid captures delta expected-outcome ctor)
|
||||
(define (remove-match! localid captures linkid)
|
||||
(call-with-sub
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(define-values (new-matches outcome)
|
||||
(bag-change (subscription-matches sub) captures delta #:clamp? #t))
|
||||
(subs (hash-set (subs) localid (struct-copy subscription sub [matches new-matches])))
|
||||
(when (eq? outcome expected-outcome)
|
||||
(for ([(peer peer-subid) (in-hash (subscription-holders sub))])
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (ctor peer-subid captures))))))))
|
||||
(define old-matches (subscription-matches sub))
|
||||
(define old-match-holders (hash-ref old-matches captures set))
|
||||
(define new-match-holders (set-remove old-match-holders linkid))
|
||||
(define new-matches (hash-set old-matches captures new-match-holders))
|
||||
(store-sub! (struct-copy subscription sub [matches new-matches]))
|
||||
(match (set-count new-match-holders)
|
||||
[0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))]
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Del peer-subid captures))))]
|
||||
[1 (for [(peer (in-set new-match-holders))] ;; only one, ≠ linkid
|
||||
(define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f))
|
||||
(when maybe-peer-subid
|
||||
(send-to-link! peer (Del maybe-peer-subid captures))))]
|
||||
[_ (void)]))))
|
||||
|
||||
(during (server-envelope management-scope (federated-link $linkid scope))
|
||||
|
||||
|
@ -273,7 +283,7 @@
|
|||
(peers (set-remove (peers) linkid)))
|
||||
|
||||
(field [link-subs (hash)] ;; (Hash SubscriptionID LocalID)
|
||||
[link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
|
||||
[link-matches (hash)] ;; (Hash LocalID (Set (Listof Assertion)))
|
||||
)
|
||||
|
||||
(when (log-level? syndicate/federation-logger 'debug)
|
||||
|
@ -282,18 +292,22 @@
|
|||
(log-syndicate/federation-debug " sub ~a -> local ~a" sub local))
|
||||
(log-syndicate/federation-debug "-"))
|
||||
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid)
|
||||
(for [((item count) (in-bag/count (link-matches)))]
|
||||
(match-define (cons local captures) item)
|
||||
(log-syndicate/federation-debug " local ~a captures ~v count ~a"
|
||||
local captures count))
|
||||
(for [((local matches) (in-hash (link-matches)))]
|
||||
(for [(captures (in-set matches))]
|
||||
(log-syndicate/federation-debug " local ~a captures ~v"
|
||||
local captures)))
|
||||
(log-syndicate/federation-debug "-")))
|
||||
|
||||
(define (err! detail)
|
||||
(send-to-link! linkid (Err detail))
|
||||
(stop-current-facet))
|
||||
|
||||
(on-start (for ([(spec localid) (in-hash (specs))])
|
||||
(send-to-link! linkid (Assert localid (observe spec)))))
|
||||
|
||||
(on-stop (for ([item (in-bag (link-matches))])
|
||||
(match-define (cons localid captures) item)
|
||||
(adjust-matches localid linkid captures -1 'present->absent Del))
|
||||
(on-stop (for [((localid matches) (in-hash (link-matches)))]
|
||||
(for [(captures (in-set matches))]
|
||||
(remove-match! localid captures linkid)))
|
||||
(for ([localid (in-hash-values (link-subs))])
|
||||
(unsubscribe! localid linkid)))
|
||||
|
||||
|
@ -339,8 +353,11 @@
|
|||
|
||||
;; Once subscription relaying has taken place, send up matches to the active
|
||||
;; link.
|
||||
(for [(captures (in-bag (subscription-matches sub)))]
|
||||
(send-to-link! linkid (Add subid captures)))
|
||||
(for [((captures match-holders) (in-hash (subscription-matches sub)))]
|
||||
;; Compute the number of times someone OTHER THAN this link has asserted
|
||||
;; a match to this spec. If it's nonzero, we need to hear about it:
|
||||
(when (not (set-empty? (set-remove match-holders linkid)))
|
||||
(send-to-link! linkid (Add subid captures))))
|
||||
|
||||
]))
|
||||
|
||||
|
@ -355,20 +372,54 @@
|
|||
(link-subs (hash-remove (link-subs) subid))
|
||||
(unsubscribe! localid linkid)]))
|
||||
|
||||
(define (relay-add-or-del localid captures delta expected-outcome ctor)
|
||||
(define-values (new-link-matches link-outcome)
|
||||
(bag-change (link-matches) (cons localid captures) delta #:clamp? #t))
|
||||
(link-matches new-link-matches)
|
||||
(when (eq? link-outcome expected-outcome)
|
||||
(adjust-matches localid linkid captures delta expected-outcome ctor)))
|
||||
;; OPTIMISATION. Logically, the remote peer receiving a `Clear` should retract any
|
||||
;; matching `Add`s that it had previously sent. However we don't want to incur the
|
||||
;; cost of the calculation and the network traffic. Happily, we have all the
|
||||
;; information we need locally! So instead we make use of the multicast (!)
|
||||
;; facility Syndicate offers and snoop on outbound messages.
|
||||
(on (message (server-envelope management-scope
|
||||
(message-server->poa linkid (Clear $localid))))
|
||||
(for [(captures (in-set (hash-ref (link-matches) localid set)))]
|
||||
(log-syndicate/federation-debug "Retracting on clear ~v ~v ~v"
|
||||
linkid localid captures)
|
||||
(remove-match! localid captures linkid))
|
||||
(link-matches (hash-remove (link-matches) localid)))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid (Add $localid $captures))))
|
||||
(relay-add-or-del localid captures +1 'absent->present Add))
|
||||
(define matches (hash-ref (link-matches) localid set))
|
||||
(if (set-member? matches captures)
|
||||
(err! 'duplicate-capture)
|
||||
(begin
|
||||
(link-matches (hash-set (link-matches) localid (set-add matches captures)))
|
||||
(call-with-sub
|
||||
localid linkid
|
||||
(lambda (sub)
|
||||
(define old-matches (subscription-matches sub))
|
||||
(define old-match-holders (hash-ref old-matches captures set))
|
||||
(define new-match-holders (set-add old-match-holders linkid))
|
||||
(define new-matches (hash-set old-matches captures new-match-holders))
|
||||
(store-sub! (struct-copy subscription sub [matches new-matches]))
|
||||
(match (set-count old-match-holders)
|
||||
[0 (for [((peer peer-subid) (in-hash (subscription-holders sub)))]
|
||||
(when (not (equal? peer linkid))
|
||||
(send-to-link! peer (Add peer-subid captures))))]
|
||||
[1 (for [(peer (in-set old-match-holders))] ;; only one, ≠ linkid
|
||||
(define maybe-peer-subid (hash-ref (subscription-holders sub) peer #f))
|
||||
(when maybe-peer-subid ;; the other holder may not itself subscribe!
|
||||
(send-to-link! peer (Add maybe-peer-subid captures))))]
|
||||
[_ (void)]))))))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid (Del $localid $captures))))
|
||||
(relay-add-or-del localid captures -1 'present->absent Del))
|
||||
(define matches (hash-ref (link-matches) localid set))
|
||||
(if (not (set-member? matches captures))
|
||||
(err! 'nonexistent-capture)
|
||||
(let ((new-matches (set-remove matches captures)))
|
||||
(link-matches (if (set-empty? new-matches)
|
||||
(hash-remove (link-matches) localid)
|
||||
(hash-set (link-matches) localid new-matches)))
|
||||
(remove-match! localid captures linkid))))
|
||||
|
||||
(on (message (server-envelope management-scope
|
||||
(message-poa->server linkid (Msg $localid $captures))))
|
||||
|
|
Loading…
Reference in New Issue