diff --git a/syndicate/distributed/client/loopback.rkt b/syndicate/distributed/client/loopback.rkt index e1b6a84..91f80e5 100644 --- a/syndicate/distributed/client/loopback.rkt +++ b/syndicate/distributed/client/loopback.rkt @@ -5,6 +5,8 @@ (require "../internal-protocol.rkt") (require "../protocol.rkt") +(require imperative-syndicate/protocol/credit) + (require/activate imperative-syndicate/distributed/server) (spawn #:name 'loopback-client-factory @@ -12,9 +14,9 @@ #:name address (assert (server-poa address)) (on (message (message-server->poa address $p)) (send! (server-packet address p))) + (define !! (make-flow-controlled-sender message-poa->server address)) (on (asserted (observe (message-poa->server address _))) (react (generic-client-session-facet address scope - (lambda (x) - (send! (message-poa->server address x)))))))) + (lambda (x) (!! (message-poa->server address x)))))))) diff --git a/syndicate/distributed/client/tcp.rkt b/syndicate/distributed/client/tcp.rkt index d9b0e6d..3b1bc3f 100644 --- a/syndicate/distributed/client/tcp.rkt +++ b/syndicate/distributed/client/tcp.rkt @@ -19,6 +19,7 @@ (retracted (server-transport-connected address))) (during (tcp-accepted id) + (on-start (issue-unbounded-credit! tcp-in id)) (assert (server-transport-connected address)) (define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p))))) (on (message (tcp-in id $bs)) (accumulate! bs))) diff --git a/syndicate/distributed/federation.rkt b/syndicate/distributed/federation.rkt index 76c6d07..db63a04 100644 --- a/syndicate/distributed/federation.rkt +++ b/syndicate/distributed/federation.rkt @@ -77,13 +77,15 @@ (asserted (tcp-rejected link _))) (during (tcp-accepted link) + (on-start (issue-unbounded-credit! tcp-in link)) (assert (federated-uplink-connected link)) ;; out to local requester (define session-id (gensym 'uplink)) (assert (federated-link session-id local-scope)) ;; in to federated scope + (define !! (make-flow-controlled-sender message-poa->server session-id)) (define accumulate! - (packet-accumulator (lambda (p) (send! (message-poa->server session-id p))))) + (packet-accumulator (lambda (p) (!! (message-poa->server session-id p))))) (on (message (tcp-in link $bs)) (accumulate! bs)) (on (message (message-server->poa session-id $p)) @@ -102,9 +104,11 @@ (define session-id (gensym 'local-link)) (assert (federated-link session-id scope)) + (define !! (make-flow-controlled-sender message-poa->server session-id)) + (on (message (message-server->poa session-id (Assert $subid (observe $spec)))) (react - (define ((! ctor) cs) (send! (message-poa->server session-id (ctor subid cs)))) + (define ((! ctor) cs) (!! (message-poa->server session-id (ctor subid cs)))) (add-observer-endpoint! (lambda () (server-proposal scope spec)) #:on-add (! Add) #:on-remove (! Del) @@ -114,8 +118,8 @@ (during (observe ($ pat (server-envelope scope $spec))) (define ep (gensym 'ep)) - (on-start (send! (message-poa->server session-id (Assert ep (observe spec))))) - (on-stop (send! (message-poa->server session-id (Clear ep)))) + (on-start (!! (message-poa->server session-id (Assert ep (observe spec))))) + (on-stop (!! (message-poa->server session-id (Clear ep)))) (assert (server-envelope scope (observe spec))) (on (message (message-server->poa session-id (Add ep $captures))) (react (assert (instantiate-term->value pat captures)) @@ -129,7 +133,7 @@ ;; Internal state (struct subscription (id ;; LocalID spec ;; Assertion - holders ;; (Hash ConnectionID SubscriptionID) + holders ;; (Hash LinkID SubscriptionID) matches ;; (Bag (Listof Assertion)) ) #:transparent) @@ -147,9 +151,28 @@ ) (when (log-level? syndicate/federation-logger 'debug) - (begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" scope (peers))) - (begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" scope (specs))) - (begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" scope (subs)))) + (begin/dataflow (log-syndicate/federation-debug "~a peers:" scope) + (for [(peer (in-set (peers)))] + (log-syndicate/federation-debug " link ~v" peer)) + (log-syndicate/federation-debug "-")) + (begin/dataflow (log-syndicate/federation-debug "~a specs:" scope) + (for [((spec local) (in-hash (specs)))] + (log-syndicate/federation-debug " spec ~v -> local ~a" spec local)) + (log-syndicate/federation-debug "-")) + (begin/dataflow (log-syndicate/federation-debug "~a subs:" scope) + (for [((local sub) (in-hash (subs)))] + (match-define (subscription _id spec holders matches) sub) + (log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec) + (when (not (hash-empty? holders)) + (log-syndicate/federation-debug " holders:") + (for [((link ep) (in-hash holders))] + (log-syndicate/federation-debug " link ~a -> ep ~a" link ep))) + (when (not (bag-empty? matches)) + (log-syndicate/federation-debug " matches:") + (for [((captures count) (in-bag/count matches))] + (log-syndicate/federation-debug " captures ~v count ~a" + captures count)))) + (log-syndicate/federation-debug "-"))) (define (call-with-sub localid linkid f) (match (hash-ref (subs) localid #f) @@ -198,18 +221,26 @@ (during (federated-link $linkid scope) - (on-start (peers (set-add (peers) linkid))) - (on-stop (peers (set-remove (peers) linkid))) + (on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid) + (peers (set-add (peers) linkid))) + (on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid) + (peers (set-remove (peers) linkid))) (field [link-subs (hash)] ;; (Hash SubscriptionID LocalID) [link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion))) ) (when (log-level? syndicate/federation-logger 'debug) - (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-subs ~v" - scope linkid (link-subs))) - (begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-matches ~v" - scope linkid (link-matches)))) + (begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid) + (for [((sub local) (in-hash (link-subs)))] + (log-syndicate/federation-debug " sub ~a -> local ~a" sub local)) + (log-syndicate/federation-debug "-")) + (begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid) + (for [((item count) (in-bag/count (link-matches)))] + (match-define (cons local captures) item) + (log-syndicate/federation-debug " local ~a captures ~v count ~a" + local captures count)) + (log-syndicate/federation-debug "-"))) (on-start (for ([(spec localid) (in-hash (specs))]) (send! (message-server->poa linkid (Assert localid (observe spec)))))) @@ -220,6 +251,8 @@ (for ([localid (in-hash-values (link-subs))]) (unsubscribe! localid linkid))) + (on-start (issue-unbounded-credit! message-poa->server linkid)) + (on (message (message-poa->server linkid (Assert $subid (observe $spec)))) (define known? (hash-has-key? (specs) spec)) (define localid (if known? (hash-ref (specs) spec) (make-localid))) diff --git a/syndicate/distributed/server.rkt b/syndicate/distributed/server.rkt index 7360cbe..796efa5 100644 --- a/syndicate/distributed/server.rkt +++ b/syndicate/distributed/server.rkt @@ -4,18 +4,23 @@ (require "internal-protocol.rkt") (require racket/set) +(require imperative-syndicate/protocol/credit) + (spawn #:name 'server-factory ;; Previously, we just had server-envelope. Now, we have both ;; server-envelope and server-proposal. While not everything ;; decided is (locally) suggested, it is true that everything ;; suggested is decided (in this implementation at least), - ;; and the following clause reflects this: + ;; and the following clauses reflect this: (during (server-proposal $scope $assertion) (assert (server-envelope scope assertion))) + (on (message (server-proposal $scope $body)) + (send! (server-envelope scope body))) (during/spawn (server-poa $id) (on-start + (issue-credit! message-poa->server id) (let-event [(message (message-poa->server id $p))] (match p [(Connect scope) (react (connected id scope))] @@ -28,6 +33,7 @@ (define (connected id scope) (define endpoints (set)) (assert (server-active scope)) + (on-start (issue-unbounded-credit! message-poa->server id)) (on (message (message-poa->server id $p)) (match p [(Assert ep a) #:when (not (set-member? endpoints ep)) @@ -55,7 +61,7 @@ [(Clear ep) #:when (set-member? endpoints ep) (void)] ;; handled by stop-when clause in facet established by Assert handler [(Message body) - (send! (server-envelope scope body))] + (send! (server-proposal scope body))] [other (unhandled-message id other)]))) diff --git a/syndicate/distributed/server/tcp.rkt b/syndicate/distributed/server/tcp.rkt index 24bbaf5..1459ec1 100644 --- a/syndicate/distributed/server/tcp.rkt +++ b/syndicate/distributed/server/tcp.rkt @@ -13,7 +13,9 @@ (define (server-facet/tcp id) (assert (tcp-accepted id)) (assert (server-poa id)) - (define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p))))) + (on-start (issue-unbounded-credit! tcp-in id)) + (define !! (make-flow-controlled-sender message-poa->server id)) + (define accumulate! (packet-accumulator (lambda (p) (!! (message-poa->server id p))))) (on (message (tcp-in id $bs)) (accumulate! bs)) (on (message (message-server->poa id $p)) @@ -26,4 +28,5 @@ (spawn #:name 'tcp-server-listener (during/spawn (tcp-connection $id (tcp-listener port)) #:name `(server-poa ,id) + (on-start (issue-credit! (tcp-listener port))) (server-facet/tcp id)))) diff --git a/syndicate/distributed/server/websocket.rkt b/syndicate/distributed/server/websocket.rkt index a750622..812cf86 100644 --- a/syndicate/distributed/server/websocket.rkt +++ b/syndicate/distributed/server/websocket.rkt @@ -7,6 +7,8 @@ (require "../wire-protocol.rkt") (require "../internal-protocol.rkt") +(require imperative-syndicate/protocol/credit) + (require/activate imperative-syndicate/drivers/web) (require/activate imperative-syndicate/drivers/timer) (require/activate imperative-syndicate/distributed/server) @@ -21,11 +23,12 @@ (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) (send! (message-server->poa id (Ping)))) + (define !! (make-flow-controlled-sender message-poa->server id)) (on (message (websocket-in id $body)) (define-values (packet remainder) (decode body)) (when (not (equal? remainder #"")) (error 'server-facet/websocket "Multiple packets in a single websocket message")) - (send! (message-poa->server id packet))) + (!! (message-poa->server id packet))) (on (message (message-server->poa id $p)) (send! (websocket-out id (encode p))))) diff --git a/syndicate/distributed/wire-protocol.rkt b/syndicate/distributed/wire-protocol.rkt index 306de9f..0b2e9a7 100644 --- a/syndicate/distributed/wire-protocol.rkt +++ b/syndicate/distributed/wire-protocol.rkt @@ -5,6 +5,7 @@ (require (prefix-in preserves: preserves)) (require bitsyntax) (require (only-in net/rfc6455 ws-idle-timeout)) +(require imperative-syndicate/protocol/credit) ;; Enrolment (message-struct Connect (scope)) ;; Client --> Server diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 1589b7b..f921e34 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -1,18 +1,8 @@ #lang imperative-syndicate ;; TCP/IP driver interface. ;; -;; A nice refinement would be to introduce something like a -;; `(tcp-error id _)` assertion, for when something goes wrong -;; listening or connecting. At present, for example, if connecting to -;; some other host that isn't listening, the driver pretends the -;; connection is open for an infinitesimal instant before closing. -;; This would be nicer if it never signalled "open" at all, instead -;; asserting something like `tcp-error` until interest in the -;; connection goes away. -;; ;; TODO: This protocol is overly simplified. -;; a) no flow control -;; b) no facility for separate shutdown of inbound/outbound streams +;; a) no facility for separate shutdown of inbound/outbound streams (provide (struct-out tcp-connection) (struct-out tcp-connection-peer) @@ -23,13 +13,15 @@ (struct-out tcp-in-line) (struct-out tcp-address) - (struct-out tcp-listener)) + (struct-out tcp-listener) + + (all-from-out imperative-syndicate/protocol/credit)) (define-logger syndicate/tcp) (require racket/exn) (require (prefix-in tcp: racket/tcp)) -(require (only-in racket/port read-bytes-avail!-evt)) +(require (only-in racket/port read-bytes-avail!-evt read-bytes-line-evt)) (require racket/unit) (require net/tcp-sig) @@ -37,6 +29,8 @@ (require syndicate/support/bytes) +(require imperative-syndicate/protocol/credit) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Protocol messages @@ -73,19 +67,7 @@ [(list e) (assert (tcp-rejected id e))] [(list cin cout) (assert (tcp-accepted id)) - (define unblock! (run-connection id cin cout)) - (unblock!)])) - - (during/spawn (observe (tcp-in-line $id _)) - #:name (list 'drivers/tcp 'line-reader id) - (field [buffer #""]) - (on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs))) - (begin/dataflow - (define newline-pos (bytes-index (buffer) (char->integer #\newline))) - (when newline-pos - (define line (subbytes (buffer) 0 newline-pos)) - (buffer (subbytes (buffer) (+ newline-pos 1))) - (send! (tcp-in-line id line)))))) + (run-connection id cin cout)]))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Listener @@ -96,12 +78,13 @@ (define control-ch (make-channel)) (thread (lambda () - (let loop ((blocked? #t)) + (let loop ((credit 1)) ;; NB. not zero initially! (sync (handle-evt control-ch (match-lambda - ['unblock (loop #f)] + [(list 'credit 'reset) (loop 0)] + [(list 'credit (? number? amount)) (loop (+ credit amount))] ['quit (void)])) - (if blocked? + (if (zero? credit) never-evt (handle-evt (tcp:tcp-accept-evt listener) (lambda (cin+cout) @@ -115,21 +98,22 @@ (tcp-address remote-hostname remote-port) cin cout))) - (loop blocked?)))))) + (loop (- credit 1))))))) (tcp:tcp-close listener) (signal-background-activity! -1))) (signal-background-activity! +1) - (on-start (channel-put control-ch 'unblock)) (on-stop (channel-put control-ch 'quit)) + (on (message (credit* (list server-addr) $amount)) + (channel-put control-ch (list 'credit amount))) + (on (message (inbound (raw-tcp-accepted server-addr $remote-addr $cin $cout))) (define id (seal (list port remote-addr))) (spawn #:name (list 'drivers/tcp 'inbound id) (assert (tcp-connection id server-addr)) (assert (tcp-connection-peer id remote-addr)) - (define unblock! (run-connection id cin cout)) - (on (asserted (tcp-accepted id)) (unblock!)) + (run-connection id cin cout) (stop-when (asserted (tcp-rejected id _))) (stop-when (retracted (tcp-accepted id)))))) @@ -151,10 +135,25 @@ (on-stop (shutdown-connection!)) + (on (asserted (observe (credit* (list tcp-out id) _))) + (send! (credit tcp-out id +inf.0))) + + (on (message (credit* (list tcp-in id) $amount)) + (when control-ch (channel-put control-ch (list 'credit amount)))) + + (field [mode 'bytes]) + (begin/dataflow (when control-ch (channel-put control-ch (mode)))) + (on (message (inbound (tcp-in id $eof-or-bs))) (if (eof-object? eof-or-bs) (stop-current-facet) - (send! (tcp-in id eof-or-bs)))) + (send! (match (mode) + ['bytes (tcp-in id eof-or-bs)] + ['lines (tcp-in-line id eof-or-bs)])))) + + (during (observe (tcp-in-line id _)) + (on-start (mode 'lines)) + (on-stop (mode 'bytes))) (define-syntax-rule (trap-exns body ...) (with-handlers ([(lambda (e) (not (exn:break? e))) @@ -168,26 +167,30 @@ (if (string? bs) (write-string bs cout) (write-bytes bs cout)) - (flush-output cout))) - - (let ((unblocked? #f)) - (lambda () - (when (not unblocked?) - (set! unblocked? #t) - (when control-ch (channel-put control-ch 'unblock)))))) + (flush-output cout)))) (define (connection-thread control-ch id cin) - (let loop ((blocked? #t)) + (let loop ((credit 0) (mode 'bytes)) (sync (handle-evt control-ch (match-lambda - ['unblock (loop #f)] + [(list 'credit 'reset) (loop 0 mode)] + [(list 'credit (? number? amount)) (loop (+ credit amount) mode)] + ['lines (loop credit 'lines)] + ['bytes (loop credit 'bytes)] ['quit (void)])) - (if blocked? + (if (zero? credit) never-evt - (handle-evt (read-bytes-avail-evt 32768 cin) + (handle-evt (match mode + ['bytes (read-bytes-avail-evt (inexact->exact (truncate (min credit 32768))) cin)] + ['lines (read-bytes-line-evt cin 'any)]) (lambda (eof-or-bs) (ground-send! (inbound (tcp-in id eof-or-bs))) - (loop (or blocked? (eof-object? eof-or-bs)))))))) + (loop (if (eof-object? eof-or-bs) + 0 + (- credit (match mode + ['bytes (bytes-length eof-or-bs)] + ['lines 1]))) + mode)))))) (close-input-port cin) (signal-background-activity! -1)) diff --git a/syndicate/examples/chat-client.rkt b/syndicate/examples/chat-client.rkt index 54b687e..5096fa8 100644 --- a/syndicate/examples/chat-client.rkt +++ b/syndicate/examples/chat-client.rkt @@ -16,13 +16,16 @@ (printf "*** ~a\n" (exn-message reason))) (during (tcp-accepted id) - (on-start (printf "*** Connected.\n")) + (on-start (printf "*** Connected.\n") + (issue-credit! tcp-in id)) (on (retracted (tcp-accepted id)) (printf "*** Remote EOF.\n")) ;; ^ Not on-stop, because the facet is stopped by local EOF too! - (on (message (tcp-in id $bs)) + (on (message (tcp-in-line id $bs)) (write-bytes bs) - (flush-output)) + (newline) + (flush-output) + (issue-credit! tcp-in id)) (define stdin-evt (read-bytes-line-evt (current-input-port) 'any)) (on (message (inbound (external-event stdin-evt (list $line)))) diff --git a/syndicate/examples/chat-server-nested-dataspace.rkt b/syndicate/examples/chat-server-nested-dataspace.rkt index 48aec7b..b8d4d8a 100644 --- a/syndicate/examples/chat-server-nested-dataspace.rkt +++ b/syndicate/examples/chat-server-nested-dataspace.rkt @@ -11,13 +11,16 @@ (during/spawn (inbound (tcp-connection $id (tcp-listener 5999))) #:name (list 'chat-connection id) (assert (outbound (tcp-accepted id))) + (on-start (send! (outbound (credit (tcp-listener 5999) 1))) + (send! (outbound (credit tcp-in id 1)))) (let ((me (gensym 'user))) (assert (present me)) (on (message (inbound (tcp-in-line id $bs))) (match bs [#"/quit" (stop-current-facet)] [#"/stop-server" (quit-dataspace!)] - [_ (send! (speak me (bytes->string/utf-8 bs)))]))) + [_ (send! (speak me (bytes->string/utf-8 bs))) + (send! (outbound (credit tcp-in id 1)))]))) (during (present $user) (on-start (send! (outbound (tcp-out id (string->bytes/utf-8 (~a user " arrived\n")))))) (on-stop (send! (outbound (tcp-out id (string->bytes/utf-8 (~a user " left\n")))))) diff --git a/syndicate/examples/chat-server.rkt b/syndicate/examples/chat-server.rkt index e374dfe..6ce3f0c 100644 --- a/syndicate/examples/chat-server.rkt +++ b/syndicate/examples/chat-server.rkt @@ -13,9 +13,12 @@ (during/spawn (tcp-connection $id (tcp-listener 5999)) #:name (list 'chat-connection id) (assert (tcp-accepted id)) + (on-start (issue-credit! (tcp-listener 5999)) + (issue-credit! tcp-in id)) (let ((me (gensym 'user))) (assert (present me)) (on (message (tcp-in-line id $bs)) + (issue-credit! tcp-in id) (match bs [#"/quit" (stop-current-facet)] [#"/stop-server" (send! (stop-server))] diff --git a/syndicate/examples/ircd/session.rkt b/syndicate/examples/ircd/session.rkt index 8d37947..57198de 100644 --- a/syndicate/examples/ircd/session.rkt +++ b/syndicate/examples/ircd/session.rkt @@ -145,6 +145,7 @@ (define m (parse-irc-message (bytes->string/utf-8 bs))) (log-info "~a -> ~v" this-conn m) (send! (ircd-action this-conn m)) + (issue-credit! tcp-in this-conn) (match m [(irc-message _ "PING" _ _) (void)] ;; RFC says servers don't reply to PINGs [(or (irc-message _ "PASS" (list P) _) @@ -227,6 +228,8 @@ (during/spawn (tcp-connection $this-conn server-handle) #:name `(ircd-connection ,this-conn) (define connection-root-facet (current-facet)) + (on-start (issue-credit! server-handle) + (issue-credit! tcp-in this-conn)) (during (tcp-connection-peer this-conn (tcp-address $peer-host _)) (assert (tcp-accepted this-conn)) (ircd-connection-facet connection-root-facet this-conn peer-host))))) diff --git a/syndicate/protocol/credit.rkt b/syndicate/protocol/credit.rkt new file mode 100644 index 0000000..a25f828 --- /dev/null +++ b/syndicate/protocol/credit.rkt @@ -0,0 +1,72 @@ +#lang imperative-syndicate + +(provide (all-defined-out)) + +(require syndicate/functional-queue) + +(define-logger syndicate/protocol/credit) + +;; (credit* Any (U Number 'reset)) +;; (credit Any ... (U Number 'reset)) +;; +;; Send this message to issue `amount` units of credit (in the context +;; of credit-based flow control) to the given `context`. +;; +;; A `context` may identify any essentially asynchronous stream where +;; either the possibility of overwhelming a consumer exists, or the +;; need for occasionally changing the settings of a producer in an +;; atomic way exists. For example, reading HTTP headers proceeds +;; line-by-line until the body is reached, at which point it proceeds +;; byte-by-byte. +;; +;; The `amount` may either be a number or `'reset`, which should zero +;; out (discard) any available credit. In particular, it may be +;; `+inf.0`, effectively turning credit-based flow control off for the +;; named context. +;; +;; See also https://eighty-twenty.org/2011/05/15/origins-of-ack-and-flow-control. +;; +(message-struct credit* (context amount)) + +(define-match-expander credit + (syntax-rules () [(_ context ... amount) (credit* (list context ...) amount)]) + (syntax-rules () [(_ context ... amount) (credit* (list context ...) amount)])) + +(define (issue-credit! #:amount [amount 1] . context) + (send! (credit* context amount))) + +(define (issue-unbounded-credit! . context) + (send! (credit* context +inf.0))) + +(define (make-flow-controlled-sender . context) + (make-flow-controlled-sender* context)) + +(define (make-flow-controlled-sender* context) + (field [q (make-queue)] + [item-credit 0]) + (when (log-level? syndicate/protocol/credit-logger 'debug) + (begin/dataflow + (log-syndicate/protocol/credit-debug + "context ~a, queue length ~a, credit ~a" + context + (queue-length (q)) + (item-credit)))) + (begin/dataflow + (when (and (positive? (item-credit)) + (not (queue-empty? (q)))) + (define-values (item new-q) (dequeue (q))) + (send! item) + (q new-q) + (item-credit (- (item-credit) 1)))) + (on (message (credit* context $amount)) + (item-credit (if (eq? amount 'reset) 0 (+ (item-credit) amount)))) + (lambda (item) (q (enqueue (q) item)))) + +;; It's quite possible that credit-based flow control is not the right +;; approach for Syndicate. Using assertions that describe the content +;; of a stream more relationally ought to allow "replay" of +;; information in different contexts; though the trade-off is not only +;; reduced performance, but a need to garbage-collect +;; no-longer-interesting portions of the stream; that is, +;; *acknowledgements*. In a reliable-delivery context, it would appear +;; that at least one of acks or flow-control is required! (?!?)