syndicate-rkt/syndicate/drivers/stream.rkt

364 lines
15 KiB
Racket

#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide (all-from-out syndicate/schemas/gen/stream)
port-source
port-sink
make-source
make-sink
handle-connection
establish-connection
send-credit
send-lines-credit
send-bytes-credit
send-packet-credit
send-line
send-data
send-eof)
(require (for-syntax racket/base))
(require (only-in racket/port
read-bytes-line-evt
read-bytes-evt
read-bytes-avail!-evt
eof-evt))
(require racket/async-channel)
(require syndicate/functional-queue)
(require syndicate/service)
(require syndicate/pattern)
(require syndicate/driver-support)
(require syndicate/schemas/gen/stream)
(define-logger syndicate/drivers/stream)
(provide-service [ds]
;; No active components.
)
(define (port-source [port (current-input-port)]
#:initial-sink [initial-sink #f]
#:custodian [custodian #f]
#:name [name (list 'port-source (object-name port))])
(define active-sink initial-sink)
(define issue-credit (start-inbound-relay #:custodian custodian
(lambda () active-sink)
port))
(make-source #:name name
#:initial-sink initial-sink
#:on-connect (lambda (new-sink) (set! active-sink new-sink))
#:on-credit issue-credit))
(define (start-inbound-relay target-proc port
#:custodian custodian)
(define eof-received? #f)
(define control-ch (make-async-channel))
(linked-thread
#:name (list 'input-thread (object-name port))
#:custodian custodian
#:peer (object #:name 'inbound-relay-monitor
[#:asserted _
#:retracted
(close-input-port port)
(when (not eof-received?) (stop-current-facet))])
(lambda (facet)
(define (update-count remaining-count mode q)
(if (zero? remaining-count)
q
(undequeue (cons remaining-count mode) q)))
(define (eof-and-finish)
(log-syndicate/drivers/stream-debug "inbound eof for ~a" (object-name port))
(turn! facet (lambda () (send-eof (target-proc)))))
(let loop ((credits (make-queue)))
(sync (handle-evt control-ch
(match-lambda
[(list 'credit (CreditAmount-count 0) _mode) (loop credits)]
[(list 'credit (CreditAmount-count n) mode)
(loop (match (unenqueue* credits)
[(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))]
[_ (enqueue credits (cons n mode))]))]
[(list 'credit (CreditAmount-unbounded) mode)
(loop (match (unenqueue* credits)
[(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))]
[_ (enqueue credits (cons +inf.0 mode))]))]))
(match (dequeue* credits)
[(list)
(handle-evt (eof-evt port)
(lambda _ignored
(eof-and-finish)))]
[(list (cons count (and mode (Mode-bytes))) q)
(define buffer (make-bytes (inexact->exact (min count 131072))))
(handle-evt (read-bytes-avail!-evt buffer port)
(match-lambda
[(? number? read-count)
(define bs (subbytes buffer 0 read-count))
(log-syndicate/drivers/stream-debug "inbound data ~v for ~a" bs (object-name port))
(turn! facet (lambda () (send-data (target-proc) bs)))
(loop (update-count (- count read-count) mode q))]
[(? eof-object?) (eof-and-finish)]))]
[(list (cons count (and mode (Mode-packet packet-size))) q)
(handle-evt (read-bytes-evt packet-size port)
(match-lambda
[(? bytes? packet) #:when (< (bytes-length packet) packet-size)
(log-syndicate/drivers/stream-debug
"short inbound packet (length ~a; expected ~a bytes) ~v for ~a"
(bytes-length packet) packet-size packet (object-name port))
(eof-and-finish)]
[(? bytes? packet)
(log-syndicate/drivers/stream-debug
"inbound packet (length ~a) ~v for ~a"
(bytes-length packet) packet (object-name port))
(turn! facet (lambda () (send-data (target-proc) packet mode)))
(loop (update-count (- count 1) mode q))]
[(? eof-object?) (eof-and-finish)]))]
[(list (cons count (and mode (Mode-lines line-mode))) q)
(handle-evt (read-bytes-line-evt port (match line-mode
[(LineMode-lf) 'linefeed]
[(LineMode-crlf) 'return-linefeed]))
(match-lambda
[(? bytes? line)
(log-syndicate/drivers/stream-debug "inbound line ~v for ~a" line (object-name port))
(turn! facet (lambda () (send-line (target-proc) line line-mode)))
(loop (update-count (- count 1) mode q))]
[(? eof-object?) (eof-and-finish)]))])))))
(define (issue-credit amount mode)
(async-channel-put control-ch (list 'credit amount mode)))
issue-credit)
(define (port-sink [port (current-output-port)]
#:initial-source [initial-source #f]
#:initial-credit [initial-credit (CreditAmount-unbounded)]
#:initial-mode [initial-mode (Mode-bytes)]
#:name [name (list 'port-sink (object-name port))])
(define active-source initial-source)
(define relay (outbound-relay port))
(make-sink #:name name
#:initial-source initial-source
#:on-connect
(lambda (new-source)
(set! active-source new-source)
(when initial-credit (send-credit active-source initial-credit initial-mode)))
#:on-data
(lambda (data mode)
(relay data mode)
(match mode
[(Mode-bytes) (send-bytes-credit active-source (bytes-length data))]
[(Mode-lines lm) (send-lines-credit active-source 1 lm)]
[(Mode-packet n) (send-packet-credit active-source n)]
[(Mode-object _) (void)]))
#:on-eof
(lambda () (stop-current-facet))))
(define-syntax (EPIPE stx)
(local-require ffi/unsafe)
(define errno-value (cons (lookup-errno 'EPIPE) 'posix))
(syntax-case stx () [(_) #`'#,errno-value]))
(define (with-stop-current-facet-on-epipe operation thunk)
(with-handlers ([(lambda (e)
(and (exn:fail:network:errno? e)
(equal? (exn:fail:network:errno-errno e) (EPIPE))))
(lambda (e)
(log-syndicate/drivers/stream-debug "epipe while ~a" operation)
(stop-current-facet))])
(thunk)))
(define (outbound-relay o)
(define flush-pending #f)
(lambda (payload mode)
(log-syndicate/drivers/stream-debug "outbound data ~v on ~a" payload (object-name o))
(with-stop-current-facet-on-epipe 'writing
(lambda ()
(write-bytes payload o)
(match mode
[(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)]
[(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)]
[_ (void)])))
(when (not flush-pending)
(set! flush-pending #t)
(facet-on-end-of-turn! this-facet
(lambda ()
(set! flush-pending #f)
(with-stop-current-facet-on-epipe 'flushing
(lambda () (flush-output o))))))))
(define (make-source #:initial-sink [initial-sink #f]
#:name [name (gensym 'source)]
#:on-connect [on-connect (lambda (new-sink) (void))]
#:on-disconnect [on-disconnect0 #f]
#:on-error [on-error0 #f]
#:on-credit [on-credit (lambda (amount mode) (void))])
(define sink #f)
(define handle #f)
(define (set-sink! new-sink)
(when (not (eq? sink new-sink))
(set! sink new-sink)
(when sink (on-connect sink))
(set! handle (turn-replace! this-turn sink handle
(if sink (->preserve (Sink-source self)) (void))))))
(define on-disconnect
(or on-disconnect0 (lambda ()
(log-syndicate/drivers/stream-debug "~a disconnected" self)
(stop-current-facet))))
(define on-error
(or on-error0 (lambda (message)
(log-syndicate/drivers/stream-debug "~a error: ~v" self message)
(stop-current-facet))))
(define self
(object #:name name
[#:asserted (Source-sink new-sink) (set-sink! new-sink)
#:retracted (when (eq? sink new-sink)
(set-sink! #f)
(on-disconnect))]
[#:asserted (StreamError message) (on-error message)]
[#:message (Source-credit amount mode) (on-credit amount mode)]))
(set-sink! initial-sink)
self)
(define (make-sink #:initial-source [initial-source #f]
#:name [name (gensym 'sink)]
#:on-connect [on-connect (lambda (new-source) (void))]
#:on-disconnect [on-disconnect0 #f]
#:on-error [on-error0 #f]
#:on-data on-data
#:on-eof [on-eof (lambda () (void))])
(define source #f)
(define handle #f)
(define (set-source! new-source)
(when (not (eq? new-source source))
(set! source new-source)
(when source (on-connect source))
(set! handle (turn-replace! this-turn source handle
(if source (->preserve (Source-sink self)) (void))))))
(define on-disconnect
(or on-disconnect0 (lambda ()
(log-syndicate/drivers/stream-debug "~a disconnected" self)
(stop-current-facet))))
(define on-error
(or on-error0 (lambda (message)
(log-syndicate/drivers/stream-debug "~a error: ~v" self message)
(stop-current-facet))))
(define self
(object #:name name
[#:asserted (Sink-source new-source) (set-source! new-source)
#:retracted (when (eq? source new-source)
(set-source! #f)
(on-disconnect))]
[#:asserted (StreamError message) (on-error message)]
[#:message (Sink-data payload mode) (on-data payload mode)]
[#:message (Sink-eof) (on-eof)]))
(set-source! initial-source)
self)
(define (handle-connection source sink
#:on-disconnect [on-disconnect #f]
#:on-error [on-error #f]
#:on-credit [on-credit void]
#:initial-credit [initial-credit (CreditAmount-unbounded)]
#:initial-mode [initial-mode (Mode-bytes)]
#:on-data on-data
#:on-eof [on-eof void])
(make-source #:initial-sink sink
#:name 'app-out
#:on-disconnect on-disconnect #:on-error on-error
#:on-credit on-credit)
(make-sink #:initial-source source
#:name 'app-in
#:on-disconnect on-disconnect #:on-error on-error
#:on-data on-data #:on-eof on-eof)
(when initial-credit (send-credit source initial-credit initial-mode)))
(define (establish-connection ds spec
#:name [name (gensym 'establish-connection)]
#:on-connect [on-connect (lambda (source sink) (void))]
#:on-rejected [on-rejected #f]
#:on-disconnect [on-disconnect #f]
#:on-error [on-error #f]
#:on-credit [on-credit void]
#:initial-credit [initial-credit (CreditAmount-unbounded)]
#:initial-mode [initial-mode (Mode-bytes)]
#:on-data on-data
#:on-eof [on-eof void])
(define connection-state 'pending)
(begin/dataflow (log-info "connection-state ~a" connection-state))
(define (transition new-state)
(when (not (equal? connection-state new-state))
(match* (connection-state new-state)
[('pending 'connected)
(when initial-credit (send-credit (peer-source) initial-credit initial-mode))
(on-connect (peer-source) (peer-sink))]
[(_ 'disconnected)
(on-disconnect)]
[('pending (list 'error m))
(on-rejected m)]
[(_ (list 'error m))
(on-error m)])))
(define-field peer-source #f)
(define-field peer-sink #f)
(begin/dataflow
(when (and (peer-source) (peer-sink))
(transition 'connected)))
(define source (make-source #:name (list 'source name)
#:on-connect peer-sink
#:on-disconnect (lambda () (transition 'disconnected))
#:on-error (lambda (m) (transition (list 'error m)))
#:on-credit on-credit))
(define sink (make-sink #:name (list 'sink name)
#:on-connect peer-source
#:on-disconnect (lambda () (transition 'disconnected))
#:on-error (lambda (m) (transition (list 'error m)))
#:on-data on-data
#:on-eof on-eof))
(at ds (assert (StreamConnection source sink spec))))
;;---------------------------------------------------------------------------
(define (send-credit source amount mode)
(send! source (Source-credit amount mode)))
(define (send-lines-credit source amount [mode (LineMode-lf)])
(send-credit source (CreditAmount-count amount) (Mode-lines mode)))
(define (send-bytes-credit source amount)
(send-credit source (CreditAmount-count amount) (Mode-bytes)))
(define (send-packet-credit source packet-size #:count [count 1])
(send-credit source (CreditAmount-count count) (Mode-packet packet-size)))
(define (->bytes data)
(if (bytes? data)
data
(string->bytes/utf-8 data)))
(define (send-line sink line [line-mode (LineMode-lf)])
(send! sink (Sink-data (->bytes line) (Mode-lines line-mode))))
(define (send-data sink data [mode (Mode-bytes)])
(send! sink (Sink-data (->bytes data) mode)))
(define (send-eof sink)
(send! sink (Sink-eof)))