Cosmetic
This commit is contained in:
parent
a890a7147b
commit
fe47abd540
|
@ -47,8 +47,7 @@
|
||||||
(define/query-set subscribers (subscription queue-id $who) who
|
(define/query-set subscribers (subscription queue-id $who) who
|
||||||
#:on-add (enq! waiters who))
|
#:on-add (enq! waiters who))
|
||||||
|
|
||||||
(on (message (delivery $who queue-id $body))
|
(on (message (delivery $who queue-id $body)) (enq! messages body))
|
||||||
(enq! messages body))
|
|
||||||
|
|
||||||
(begin/dataflow
|
(begin/dataflow
|
||||||
(when (and (not (queue-empty? (waiters)))
|
(when (and (not (queue-empty? (waiters)))
|
||||||
|
|
|
@ -46,9 +46,7 @@
|
||||||
|
|
||||||
(on (asserted (subscription queue-id $who)) (enq! waiters who))
|
(on (asserted (subscription queue-id $who)) (enq! waiters who))
|
||||||
(on (retracted (subscription queue-id $who)) (waiters (queue-remove who (waiters))))
|
(on (retracted (subscription queue-id $who)) (waiters (queue-remove who (waiters))))
|
||||||
|
(on (message (delivery $who queue-id $body)) (enq! messages body))
|
||||||
(on (message (delivery $who queue-id $body))
|
|
||||||
(enq! messages body))
|
|
||||||
|
|
||||||
(begin/dataflow
|
(begin/dataflow
|
||||||
(when (and (not (queue-empty? (waiters)))
|
(when (and (not (queue-empty? (waiters)))
|
||||||
|
|
|
@ -64,7 +64,7 @@
|
||||||
(not (queue-empty? (messages))))
|
(not (queue-empty? (messages))))
|
||||||
(define who (deq! waiters))
|
(define who (deq! waiters))
|
||||||
(define old-credit (hash-ref (credits) who 0))
|
(define old-credit (hash-ref (credits) who 0))
|
||||||
(when (positive? old-credit)
|
(when (positive? old-credit) ;; lazily remove entries from waiters
|
||||||
(define new-credit (- old-credit 1))
|
(define new-credit (- old-credit 1))
|
||||||
(credits (hash-set (credits) who new-credit))
|
(credits (hash-set (credits) who new-credit))
|
||||||
(when (positive? new-credit) (enq! waiters who))
|
(when (positive? new-credit) (enq! waiters who))
|
||||||
|
|
Loading…
Reference in New Issue