syndicate-rkt/syndicate/protocol/credit.rkt

76 lines
2.8 KiB
Racket

;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2010-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
#lang syndicate
(provide (all-defined-out))
(require "../functional-queue.rkt")
(define-logger syndicate/protocol/credit)
;; (credit* Any (U Number 'reset))
;; (credit Any ... (U Number 'reset))
;;
;; Send this message to issue `amount` units of credit (in the context
;; of credit-based flow control) to the given `context`.
;;
;; A `context` may identify any essentially asynchronous stream where
;; either the possibility of overwhelming a consumer exists, or the
;; need for occasionally changing the settings of a producer in an
;; atomic way exists. For example, reading HTTP headers proceeds
;; line-by-line until the body is reached, at which point it proceeds
;; byte-by-byte.
;;
;; The `amount` may either be a number or `'reset`, which should zero
;; out (discard) any available credit. In particular, it may be
;; `+inf.0`, effectively turning credit-based flow control off for the
;; named context.
;;
;; See also https://eighty-twenty.org/2011/05/15/origins-of-ack-and-flow-control.
;;
(message-struct credit* (context amount))
(define-match-expander credit
(syntax-rules () [(_ context ... amount) (credit* (list context ...) amount)])
(syntax-rules () [(_ context ... amount) (credit* (list context ...) amount)]))
(define (issue-credit! #:amount [amount 1] . context)
(send! (credit* context amount)))
(define (issue-unbounded-credit! . context)
(send! (credit* context +inf.0)))
(define (make-flow-controlled-sender . context)
(make-flow-controlled-sender* context))
(define (make-flow-controlled-sender* context)
(field [q (make-queue)]
[item-credit 0])
(when (log-level? syndicate/protocol/credit-logger 'debug)
(begin/dataflow
(log-syndicate/protocol/credit-debug
"context ~a, queue length ~a, credit ~a"
context
(queue-length (q))
(item-credit))))
(begin/dataflow
(when (and (positive? (item-credit))
(not (queue-empty? (q))))
(define-values (item new-q) (dequeue (q)))
(send! item)
(q new-q)
(item-credit (- (item-credit) 1))))
(on (message (credit* context $amount))
(item-credit (if (eq? amount 'reset) 0 (+ (item-credit) amount))))
(lambda (item) (q (enqueue (q) item))))
;; It's quite possible that credit-based flow control is not the right
;; approach for Syndicate. Using assertions that describe the content
;; of a stream more relationally ought to allow "replay" of
;; information in different contexts; though the trade-off is not only
;; reduced performance, but a need to garbage-collect
;; no-longer-interesting portions of the stream; that is,
;; *acknowledgements*. In a reliable-delivery context, it would appear
;; that at least one of acks or flow-control is required! (?!?)