Factor generic stream stuff out of tcp.rkt into stream.rkt
This commit is contained in:
parent
0ab3526cba
commit
68db114840
|
@ -7,8 +7,6 @@
|
|||
(require (only-in racket/port read-line-evt))
|
||||
(require (only-in racket/string string-trim))
|
||||
(require syndicate/drivers/tcp)
|
||||
(require syndicate/drivers/stream)
|
||||
(require syndicate/drivers/racket-event)
|
||||
|
||||
(define host "127.0.0.1")
|
||||
(define port 5999)
|
||||
|
@ -20,5 +18,4 @@
|
|||
(set! port (string->number port-number))])
|
||||
|
||||
(standard-actor-system (ds)
|
||||
(at ds
|
||||
(assert (StreamConnection (port-lines-source ds) (port-sink) (TcpRemote host port))))))
|
||||
(at ds (assert (StreamConnection (port-source) (port-sink) (TcpRemote host port))))))
|
||||
|
|
|
@ -12,7 +12,9 @@
|
|||
(define (linked-thread thread-proc
|
||||
#:name [name (gensym 'linked-thread)]
|
||||
#:peer [peer (ref (entity/stop-on-retract #:name (list name 'monitor)))]
|
||||
#:custodian [c (make-custodian)])
|
||||
#:custodian [c0 #f])
|
||||
(define c (or c0 (make-custodian)))
|
||||
|
||||
(define facet this-facet)
|
||||
(define actor this-actor)
|
||||
|
||||
|
|
|
@ -2,48 +2,406 @@
|
|||
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
(provide port-lines-source
|
||||
port-sink)
|
||||
(provide (all-from-out syndicate/schemas/gen/stream)
|
||||
|
||||
(require (only-in racket/port read-line-evt))
|
||||
port-source
|
||||
port-sink
|
||||
|
||||
(require "tcp.rkt") ;; ugh, lots of tcp.rkt actually belongs in this file
|
||||
(require syndicate/drivers/racket-event)
|
||||
make-connection-handler
|
||||
make-source
|
||||
make-sink
|
||||
handle-connection
|
||||
establish-connection
|
||||
|
||||
send-credit
|
||||
send-lines-credit
|
||||
send-bytes-credit
|
||||
send-packet-credit
|
||||
send-line
|
||||
send-data
|
||||
send-eof)
|
||||
|
||||
(require (for-syntax racket/base))
|
||||
|
||||
(require (only-in racket/port
|
||||
read-bytes-line-evt
|
||||
read-bytes-evt
|
||||
read-bytes-avail!-evt
|
||||
eof-evt))
|
||||
(require racket/async-channel)
|
||||
|
||||
(require syndicate/functional-queue)
|
||||
(require syndicate/service)
|
||||
(require syndicate/pattern)
|
||||
(require syndicate/driver-support)
|
||||
(require syndicate/drivers/racket-event)
|
||||
|
||||
(define (port-lines-source ds [port (current-input-port)]
|
||||
#:initial-credit [initial-credit 0]
|
||||
#:name [name (list 'port-lines-source (object-name port))]
|
||||
#:line-mode [line-mode (LineMode-lf)])
|
||||
(at ds (assert (RequireService 'syndicate/drivers/racket-event)))
|
||||
(define-field credit initial-credit)
|
||||
(define-field sink #f)
|
||||
(require syndicate/schemas/gen/stream)
|
||||
|
||||
(define-logger syndicate/drivers/stream)
|
||||
|
||||
(provide-service [ds]
|
||||
(at ds
|
||||
(on (message #:when (and (sink) (positive? (credit))) (RacketEvent (read-line-evt port) $vs))
|
||||
(credit (- (credit) 1))
|
||||
(match (car vs)
|
||||
[(? eof-object?) (send-eof (sink))]
|
||||
[line (send-line (sink) line line-mode)])))
|
||||
(during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _)
|
||||
#:name (list 'stream-listener spec-pat)
|
||||
(match (pattern->constant spec-pat)
|
||||
[(? void?) (stop-current-facet)]
|
||||
[spec (at ds
|
||||
(during (StreamSpecListenable spec)
|
||||
(assert
|
||||
(StreamListener spec
|
||||
(make-connection-handler
|
||||
(lambda (source sink)
|
||||
(assert (StreamConnection source sink spec))))))))]))
|
||||
|
||||
(during/spawn (StreamConnection $app-source $app-sink $spec)
|
||||
#:name (list 'stream-connection spec)
|
||||
(at ds
|
||||
(during (StreamSpecConnectable spec)
|
||||
(assert (StreamConnect spec
|
||||
(object #:name 'connection-peer
|
||||
[(ConnectionHandler-connected sys-source sys-sink)
|
||||
(at sys-source (assert (Source-sink app-sink)))
|
||||
(at sys-sink (assert (Sink-source app-source)))]
|
||||
[(ConnectionHandler-rejected message)
|
||||
(log-syndicate/drivers/stream-error
|
||||
"Connection to ~a rejected: ~a" spec message)
|
||||
(at app-source (assert (StreamError message)))
|
||||
(at app-sink (assert (StreamError message)))
|
||||
(stop-current-facet)]))))))
|
||||
|
||||
;; I translate interest in StreamListener with a particular spec-pattern into a facet
|
||||
;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern
|
||||
;; by asserting StreamSpecListenable with that spec.
|
||||
(during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _)
|
||||
(define listenable-asserter
|
||||
(object [bindings
|
||||
(define spec
|
||||
(pattern->constant spec-pat (lambda (_name index) (list-ref bindings index))))
|
||||
(assert (StreamSpecListenable spec))]))
|
||||
(assert
|
||||
(Observe (:pattern
|
||||
(Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _))
|
||||
listenable-asserter)))
|
||||
|
||||
;; I translate interest in StreamConnect with a particular spec-pattern into a facet that
|
||||
;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by
|
||||
;; asserting StreamSpecConnectable with that spec.
|
||||
(during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _)
|
||||
(define connectable-asserter
|
||||
(object [bindings
|
||||
(define spec
|
||||
(pattern->constant spec-pat (lambda (_name index) (list-ref bindings index))))
|
||||
(assert (StreamSpecConnectable spec))]))
|
||||
(assert
|
||||
(Observe (:pattern
|
||||
(Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _))
|
||||
connectable-asserter)))))
|
||||
|
||||
(define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)])
|
||||
(object #:name name
|
||||
[(ConnectionHandler-connected source sink)
|
||||
(on-connected source sink)]
|
||||
[(ConnectionHandler-rejected message)
|
||||
(error 'connection-handler "~a" message)]))
|
||||
|
||||
(define (port-source [port (current-input-port)]
|
||||
#:custodian [custodian #f]
|
||||
#:name [name (list 'port-source (object-name port))])
|
||||
(define active-sink #f)
|
||||
(define issue-credit (start-inbound-relay #:custodian custodian
|
||||
(lambda () active-sink)
|
||||
port))
|
||||
(make-source #:name name
|
||||
#:on-connect sink
|
||||
#:on-credit (lambda (amount mode)
|
||||
(if (equal? amount (CreditAmount-unbounded))
|
||||
(credit +inf.0)
|
||||
(match mode
|
||||
[(Mode-lines _)
|
||||
(credit (+ (credit) (CreditAmount-count-value amount)))]
|
||||
[_ (void)])))))
|
||||
#:on-connect (lambda (new-sink) (set! active-sink new-sink))
|
||||
#:on-credit issue-credit))
|
||||
|
||||
(define (start-inbound-relay target-proc port
|
||||
#:custodian custodian)
|
||||
(define eof-received? #f)
|
||||
(define control-ch (make-async-channel))
|
||||
|
||||
(linked-thread
|
||||
#:name (cons 'input-thread (object-name port))
|
||||
#:custodian custodian
|
||||
#:peer (object #:name 'inbound-relay-monitor
|
||||
[#:asserted _
|
||||
#:retracted
|
||||
(close-input-port port)
|
||||
(when (not eof-received?) (stop-current-facet))])
|
||||
(lambda (facet)
|
||||
(define (update-count remaining-count mode q)
|
||||
(if (zero? remaining-count)
|
||||
q
|
||||
(undequeue (cons remaining-count mode) q)))
|
||||
|
||||
(define (eof-and-finish)
|
||||
(log-syndicate/drivers/stream-debug "inbound eof for ~a" (object-name port))
|
||||
(turn! facet (lambda () (send-eof (target-proc)))))
|
||||
|
||||
(let loop ((credits (make-queue)))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
[(list 'credit (CreditAmount-count 0) _mode) (loop credits)]
|
||||
[(list 'credit (CreditAmount-count n) mode)
|
||||
(loop (match (unenqueue* credits)
|
||||
[(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))]
|
||||
[_ (enqueue credits (cons n mode))]))]
|
||||
[(list 'credit (CreditAmount-unbounded) mode)
|
||||
(loop (match (unenqueue* credits)
|
||||
[(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))]
|
||||
[_ (enqueue credits (cons +inf.0 mode))]))]))
|
||||
(match (dequeue* credits)
|
||||
[(list)
|
||||
(handle-evt (eof-evt port)
|
||||
(lambda _ignored
|
||||
(eof-and-finish)))]
|
||||
[(list (cons count (and mode (Mode-bytes))) q)
|
||||
(define buffer (make-bytes (inexact->exact (min count 131072))))
|
||||
(handle-evt (read-bytes-avail!-evt buffer port)
|
||||
(match-lambda
|
||||
[(? number? read-count)
|
||||
(define bs (subbytes buffer 0 read-count))
|
||||
(log-syndicate/drivers/stream-debug "inbound data ~v for ~a" bs (object-name port))
|
||||
(turn! facet (lambda () (send-data (target-proc) bs)))
|
||||
(loop (update-count (- count read-count) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))]
|
||||
[(list (cons count (and mode (Mode-packet packet-size))) q)
|
||||
(handle-evt (read-bytes-evt packet-size port)
|
||||
(match-lambda
|
||||
[(? bytes? packet) #:when (< (bytes-length packet) packet-size)
|
||||
(log-syndicate/drivers/stream-debug
|
||||
"short inbound packet (length ~a; expected ~a bytes) ~v for ~a"
|
||||
(bytes-length packet) packet-size packet (object-name port))
|
||||
(eof-and-finish)]
|
||||
[(? bytes? packet)
|
||||
(log-syndicate/drivers/stream-debug
|
||||
"inbound packet (length ~a) ~v for ~a"
|
||||
(bytes-length packet) packet (object-name port))
|
||||
(turn! facet (lambda () (send-data (target-proc) packet mode)))
|
||||
(loop (update-count (- count 1) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))]
|
||||
[(list (cons count (and mode (Mode-lines line-mode))) q)
|
||||
(handle-evt (read-bytes-line-evt port (match line-mode
|
||||
[(LineMode-lf) 'linefeed]
|
||||
[(LineMode-crlf) 'return-linefeed]))
|
||||
(match-lambda
|
||||
[(? bytes? line)
|
||||
(log-syndicate/drivers/stream-debug "inbound line ~v for ~a" line (object-name port))
|
||||
(turn! facet (lambda () (send-line (target-proc) line line-mode)))
|
||||
(loop (update-count (- count 1) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))])))))
|
||||
(define (issue-credit amount mode)
|
||||
(async-channel-put control-ch (list 'credit amount mode)))
|
||||
issue-credit)
|
||||
|
||||
(define (port-sink [port (current-output-port)]
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:name [name (list 'port-sink (object-name port))])
|
||||
(define active-source #f)
|
||||
(define relay (outbound-relay port))
|
||||
(make-sink #:name name
|
||||
#:on-connect (lambda (source) (send-credit source (CreditAmount-unbounded) (Mode-bytes)))
|
||||
#:on-eof (lambda () (close-output-port port))
|
||||
#:on-data (lambda (data mode)
|
||||
(when (bytes? data)
|
||||
(write-bytes data port)
|
||||
(match mode
|
||||
[(Mode-bytes) (void)]
|
||||
[(Mode-lines (LineMode-lf)) (write-bytes #"\n" port)]
|
||||
[(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" port)])
|
||||
(flush-output port)))))
|
||||
#:on-connect
|
||||
(lambda (new-source)
|
||||
(set! active-source new-source)
|
||||
(when initial-credit (send-credit active-source initial-credit initial-mode)))
|
||||
#:on-data
|
||||
(lambda (data mode)
|
||||
(relay data mode)
|
||||
(match mode
|
||||
[(Mode-bytes) (send-bytes-credit active-source (bytes-length data))]
|
||||
[(Mode-lines lm) (send-lines-credit active-source 1 lm)]
|
||||
[(Mode-packet n) (send-packet-credit active-source n)]
|
||||
[(Mode-object _) (void)]))
|
||||
#:on-eof
|
||||
(lambda () (stop-current-facet))))
|
||||
|
||||
(define-syntax (EPIPE stx)
|
||||
(local-require ffi/unsafe)
|
||||
(define errno-value (cons (lookup-errno 'EPIPE) 'posix))
|
||||
(syntax-case stx () [(_) #`'#,errno-value]))
|
||||
|
||||
(define (with-stop-current-facet-on-epipe operation thunk)
|
||||
(with-handlers ([(lambda (e)
|
||||
(and (exn:fail:network:errno? e)
|
||||
(equal? (exn:fail:network:errno-errno e) (EPIPE))))
|
||||
(lambda (e)
|
||||
(log-syndicate/drivers/stream-debug "epipe while ~a" operation)
|
||||
(stop-current-facet))])
|
||||
(thunk)))
|
||||
|
||||
(define (outbound-relay o)
|
||||
(define flush-pending #f)
|
||||
(lambda (payload mode)
|
||||
(log-syndicate/drivers/stream-debug "outbound data ~v on ~a" payload (object-name o))
|
||||
(with-stop-current-facet-on-epipe 'writing
|
||||
(lambda ()
|
||||
(write-bytes payload o)
|
||||
(match mode
|
||||
[(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)]
|
||||
[(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)]
|
||||
[_ (void)])))
|
||||
(when (not flush-pending)
|
||||
(set! flush-pending #t)
|
||||
(facet-on-end-of-turn! this-facet
|
||||
(lambda ()
|
||||
(set! flush-pending #f)
|
||||
(with-stop-current-facet-on-epipe 'flushing
|
||||
(lambda () (flush-output o))))))))
|
||||
|
||||
(define (make-source #:initial-sink [initial-sink #f]
|
||||
#:name [name (gensym 'source)]
|
||||
#:on-connect [on-connect (lambda (new-sink) (void))]
|
||||
#:on-disconnect [on-disconnect0 #f]
|
||||
#:on-error [on-error0 #f]
|
||||
#:on-credit [on-credit (lambda (amount mode) (void))])
|
||||
(define sink #f)
|
||||
(define handle #f)
|
||||
|
||||
(define (set-sink! new-sink)
|
||||
(when (not (eq? sink new-sink))
|
||||
(set! sink new-sink)
|
||||
(when sink (on-connect sink))
|
||||
(set! handle (turn-replace! this-turn sink handle
|
||||
(if sink (->preserve (Sink-source self)) (void))))))
|
||||
|
||||
(define on-disconnect
|
||||
(or on-disconnect0 (lambda ()
|
||||
(log-syndicate/drivers/stream-debug "~a disconnected" self)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define on-error
|
||||
(or on-error0 (lambda (message)
|
||||
(log-syndicate/drivers/stream-debug "~a error: ~v" self message)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define self
|
||||
(object #:name name
|
||||
[#:asserted (Source-sink new-sink) (set-sink! new-sink)
|
||||
#:retracted (when (eq? sink new-sink)
|
||||
(set-sink! #f)
|
||||
(on-disconnect))]
|
||||
[#:asserted (StreamError message) (on-error message)]
|
||||
[#:message (Source-credit amount mode) (on-credit amount mode)]))
|
||||
|
||||
(set-sink! initial-sink)
|
||||
self)
|
||||
|
||||
(define (make-sink #:initial-source [initial-source #f]
|
||||
#:name [name (gensym 'sink)]
|
||||
#:on-connect [on-connect (lambda (new-source) (void))]
|
||||
#:on-disconnect [on-disconnect0 #f]
|
||||
#:on-error [on-error0 #f]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof (lambda () (void))])
|
||||
(define source #f)
|
||||
(define handle #f)
|
||||
|
||||
(define (set-source! new-source)
|
||||
(when (not (eq? new-source source))
|
||||
(set! source new-source)
|
||||
(when source (on-connect source))
|
||||
(set! handle (turn-replace! this-turn source handle
|
||||
(if source (->preserve (Source-sink self)) (void))))))
|
||||
|
||||
(define on-disconnect
|
||||
(or on-disconnect0 (lambda ()
|
||||
(log-syndicate/drivers/stream-debug "~a disconnected" self)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define on-error
|
||||
(or on-error0 (lambda (message)
|
||||
(log-syndicate/drivers/stream-debug "~a error: ~v" self message)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define self
|
||||
(object #:name name
|
||||
[#:asserted (Sink-source new-source) (set-source! new-source)
|
||||
#:retracted (when (eq? source new-source)
|
||||
(set-source! #f)
|
||||
(on-disconnect))]
|
||||
[#:asserted (StreamError message) (on-error message)]
|
||||
[#:message (Sink-data payload mode) (on-data payload mode)]
|
||||
[#:message (Sink-eof) (on-eof)]))
|
||||
|
||||
(set-source! initial-source)
|
||||
self)
|
||||
|
||||
(define (handle-connection source sink
|
||||
#:on-disconnect [on-disconnect #f]
|
||||
#:on-error [on-error #f]
|
||||
#:on-credit [on-credit void]
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void])
|
||||
(make-source #:initial-sink sink
|
||||
#:name 'app-out
|
||||
#:on-disconnect on-disconnect #:on-error on-error
|
||||
#:on-credit on-credit)
|
||||
(make-sink #:initial-source source
|
||||
#:name 'app-in
|
||||
#:on-disconnect on-disconnect #:on-error on-error
|
||||
#:on-data on-data #:on-eof on-eof)
|
||||
(when initial-credit (send-credit source initial-credit initial-mode)))
|
||||
|
||||
(define (establish-connection ds spec
|
||||
#:name [name (gensym 'establish-connection)]
|
||||
|
||||
#:on-connected [on-connected (lambda (source sink) (void))]
|
||||
#:on-rejected [on-rejected #f]
|
||||
|
||||
#:on-disconnect [on-disconnect #f]
|
||||
#:on-error [on-error #f]
|
||||
#:on-credit [on-credit void]
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void])
|
||||
(define peer
|
||||
(object #:name name
|
||||
[#:asserted (ConnectionHandler-connected source sink)
|
||||
(handle-connection source sink
|
||||
#:on-disconnect on-disconnect
|
||||
#:on-error on-error
|
||||
#:on-credit on-credit
|
||||
#:initial-credit initial-credit
|
||||
#:initial-mode initial-mode
|
||||
#:on-data on-data
|
||||
#:on-eof on-eof)
|
||||
(stop-facet ringing-facet)
|
||||
(on-connected source sink)]
|
||||
[#:asserted (ConnectionHandler-rejected message)
|
||||
(stop-facet ringing-facet)
|
||||
((or on-rejected (lambda (_message) (stop-current-facet))) message)]))
|
||||
(define ringing-facet (react (at ds (assert (StreamConnect spec peer)))))
|
||||
(void))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
(define (send-credit source amount mode)
|
||||
(send! source (Source-credit amount mode)))
|
||||
|
||||
(define (send-lines-credit source amount [mode (LineMode-lf)])
|
||||
(send-credit source (CreditAmount-count amount) (Mode-lines mode)))
|
||||
|
||||
(define (send-bytes-credit source amount)
|
||||
(send-credit source (CreditAmount-count amount) (Mode-bytes)))
|
||||
|
||||
(define (send-packet-credit source packet-size #:count [count 1])
|
||||
(send-credit source (CreditAmount-count count) (Mode-packet packet-size)))
|
||||
|
||||
(define (->bytes data)
|
||||
(if (bytes? data)
|
||||
data
|
||||
(string->bytes/utf-8 data)))
|
||||
|
||||
(define (send-line sink line [line-mode (LineMode-lf)])
|
||||
(send! sink (Sink-data (->bytes line) (Mode-lines line-mode))))
|
||||
|
||||
(define (send-data sink data [mode (Mode-bytes)])
|
||||
(send! sink (Sink-data (->bytes data) mode)))
|
||||
|
||||
(define (send-eof sink)
|
||||
(send! sink (Sink-eof)))
|
||||
|
|
|
@ -2,31 +2,16 @@
|
|||
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
(provide (all-from-out syndicate/schemas/gen/stream)
|
||||
(all-from-out syndicate/schemas/gen/tcp)
|
||||
|
||||
handle-connection
|
||||
make-source
|
||||
make-sink
|
||||
make-connection-handler
|
||||
establish-connection
|
||||
|
||||
send-credit
|
||||
send-lines-credit
|
||||
send-bytes-credit
|
||||
send-packet-credit
|
||||
send-line
|
||||
send-data
|
||||
send-eof)
|
||||
(provide (all-from-out syndicate/drivers/stream)
|
||||
(all-from-out syndicate/schemas/gen/tcp))
|
||||
|
||||
(require racket/async-channel)
|
||||
(require racket/tcp)
|
||||
(require racket/port)
|
||||
(require (only-in racket/exn exn->string))
|
||||
(require syndicate/driver-support)
|
||||
(require syndicate/functional-queue)
|
||||
(require syndicate/pattern)
|
||||
(require syndicate/schemas/gen/stream)
|
||||
(require syndicate/drivers/stream)
|
||||
(require syndicate/schemas/gen/tcp)
|
||||
(require syndicate/schemas/gen/dataspace-patterns)
|
||||
|
||||
|
@ -35,70 +20,15 @@
|
|||
(define-logger syndicate/drivers/tcp)
|
||||
|
||||
(provide-service [ds]
|
||||
(at ds
|
||||
(during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _)
|
||||
#:name (list 'simple-listener spec-pat)
|
||||
(match (pattern->constant spec-pat)
|
||||
[(? void?) (stop-current-facet)]
|
||||
[spec (at ds
|
||||
(during (StreamSpecListenable spec)
|
||||
(assert
|
||||
(StreamListener spec
|
||||
(make-connection-handler
|
||||
(lambda (source sink)
|
||||
(assert (StreamConnection source sink spec))))))))]))
|
||||
(with-services [syndicate/drivers/stream]
|
||||
(at ds
|
||||
(during/spawn (StreamListener (TcpLocal $host $port) $peer)
|
||||
#:name (TcpLocal host port)
|
||||
(run-listener ds peer host port))
|
||||
|
||||
(during/spawn (StreamConnection $app-source $app-sink $spec)
|
||||
#:name (list 'simple-connection spec)
|
||||
(at ds
|
||||
(during (StreamSpecConnectable spec)
|
||||
(assert (StreamConnect spec
|
||||
(object #:name 'connection-peer
|
||||
[(ConnectionHandler-connected sys-source sys-sink)
|
||||
(at sys-source (assert (Source-sink app-sink)))
|
||||
(at sys-sink (assert (Sink-source app-source)))]
|
||||
[(ConnectionHandler-rejected message)
|
||||
(log-syndicate/drivers/tcp-error
|
||||
"Connection to ~a rejected: ~a" spec message)
|
||||
(at app-source (assert (StreamError message)))
|
||||
(at app-sink (assert (StreamError message)))
|
||||
(stop-current-facet)]))))))
|
||||
|
||||
;; I translate interest in StreamListener with a particular spec-pattern into a facet
|
||||
;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern
|
||||
;; by asserting StreamSpecListenable with that spec.
|
||||
(during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _)
|
||||
(define listenable-asserter
|
||||
(object [bindings
|
||||
(define spec
|
||||
(pattern->constant spec-pat (lambda (_name index) (list-ref bindings index))))
|
||||
(assert (StreamSpecListenable spec))]))
|
||||
(assert
|
||||
(Observe (:pattern
|
||||
(Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _))
|
||||
listenable-asserter)))
|
||||
|
||||
;; I translate interest in StreamConnect with a particular spec-pattern into a facet that
|
||||
;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by
|
||||
;; asserting StreamSpecConnectable with that spec.
|
||||
(during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _)
|
||||
(define connectable-asserter
|
||||
(object [bindings
|
||||
(define spec
|
||||
(pattern->constant spec-pat (lambda (_name index) (list-ref bindings index))))
|
||||
(assert (StreamSpecConnectable spec))]))
|
||||
(assert
|
||||
(Observe (:pattern
|
||||
(Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _))
|
||||
connectable-asserter)))
|
||||
|
||||
(during/spawn (StreamListener (TcpLocal $host $port) $peer)
|
||||
#:name (TcpLocal host port)
|
||||
(run-listener ds peer host port))
|
||||
|
||||
(during/spawn (StreamConnect (TcpRemote $host $port) $peer)
|
||||
#:name (TcpRemote host port)
|
||||
(run-outbound ds peer host port))))
|
||||
(during/spawn (StreamConnect (TcpRemote $host $port) $peer)
|
||||
#:name (TcpRemote host port)
|
||||
(run-outbound ds peer host port)))))
|
||||
|
||||
(define (run-listener ds peer host port)
|
||||
(on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port))
|
||||
|
@ -142,35 +72,17 @@
|
|||
|
||||
(react (on-stop (facet-count (- (facet-count) 1))
|
||||
(close-input-port i))
|
||||
(define active-sink #f)
|
||||
(define issue-credit (start-inbound-relay custodian (lambda () active-sink) i))
|
||||
(set! source (make-source #:name 'socket-in
|
||||
#:on-connect (lambda (new-sink) (set! active-sink new-sink))
|
||||
#:on-credit issue-credit))
|
||||
(set! source (port-source i #:custodian custodian))
|
||||
(at ds (assert (TcpPeerInfo source local-end remote-end))))
|
||||
|
||||
(react (on-stop (facet-count (- (facet-count) 1))
|
||||
(close-output-port o))
|
||||
(define active-source #f)
|
||||
(define relay (outbound-relay o))
|
||||
(set! sink (make-sink #:name 'socket-out
|
||||
#:on-connect
|
||||
(lambda (new-source)
|
||||
(set! active-source new-source)
|
||||
(send-credit active-source (CreditAmount-unbounded) (Mode-bytes)))
|
||||
#:on-data
|
||||
(lambda (data mode)
|
||||
(relay data mode)
|
||||
(match mode
|
||||
[(Mode-bytes) (send-bytes-credit active-source (bytes-length data))]
|
||||
[(Mode-lines lm) (send-lines-credit active-source 1 lm)]
|
||||
[(Mode-packet n) (send-packet-credit active-source n)]
|
||||
[(Mode-object _) (void)]))
|
||||
#:on-eof (lambda () (stop-current-facet))))
|
||||
(set! sink (port-sink o))
|
||||
(at ds (assert (TcpPeerInfo sink local-end remote-end))))
|
||||
|
||||
(at peer (assert #:when (positive? (facet-count))
|
||||
(ConnectionHandler-connected source sink)))))
|
||||
(at peer
|
||||
(assert #:when (positive? (facet-count))
|
||||
(ConnectionHandler-connected source sink)))))
|
||||
|
||||
(define (with-connection-error-guard ds peer error-proc proc)
|
||||
((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))])
|
||||
|
@ -185,275 +97,3 @@
|
|||
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
||||
(tcp-connect host port)))
|
||||
(lambda () (spawn-connection ds connection-custodian i o peer)))))
|
||||
|
||||
(define (start-inbound-relay custodian target-proc i)
|
||||
(define eof-received? #f)
|
||||
(define control-ch (make-async-channel))
|
||||
(linked-thread
|
||||
#:name (cons 'input-thread (tcp-ends i))
|
||||
#:custodian custodian
|
||||
#:peer (object #:name 'inbound-relay-monitor
|
||||
[#:asserted _
|
||||
#:retracted
|
||||
(close-input-port i)
|
||||
(when (not eof-received?) (stop-current-facet))])
|
||||
(lambda (facet)
|
||||
(define (update-count remaining-count mode q)
|
||||
(if (zero? remaining-count)
|
||||
q
|
||||
(undequeue (cons remaining-count mode) q)))
|
||||
(define (eof-and-finish)
|
||||
(log-syndicate/drivers/tcp-debug "inbound eof for ~a" (tcp-ends i))
|
||||
(turn! facet (lambda () (send-eof (target-proc)))))
|
||||
(let loop ((credits (make-queue)))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
[(list 'credit (CreditAmount-count 0) _mode) (loop credits)]
|
||||
[(list 'credit (CreditAmount-count n) mode)
|
||||
(loop (match (unenqueue* credits)
|
||||
[(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))]
|
||||
[_ (enqueue credits (cons n mode))]))]
|
||||
[(list 'credit (CreditAmount-unbounded) mode)
|
||||
(loop (match (unenqueue* credits)
|
||||
[(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))]
|
||||
[_ (enqueue credits (cons +inf.0 mode))]))]))
|
||||
(match (dequeue* credits)
|
||||
[(list)
|
||||
(handle-evt (eof-evt i)
|
||||
(lambda _ignored
|
||||
(eof-and-finish)))]
|
||||
[(list (cons count (and mode (Mode-bytes))) q)
|
||||
(define buffer (make-bytes (inexact->exact (min count 131072))))
|
||||
(handle-evt (read-bytes-avail!-evt buffer i)
|
||||
(match-lambda
|
||||
[(? number? read-count)
|
||||
(define bs (subbytes buffer 0 read-count))
|
||||
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~a" bs (tcp-ends i))
|
||||
(turn! facet (lambda () (send-data (target-proc) bs)))
|
||||
(loop (update-count (- count read-count) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))]
|
||||
[(list (cons count (and mode (Mode-packet packet-size))) q)
|
||||
(handle-evt (read-bytes-evt packet-size i)
|
||||
(match-lambda
|
||||
[(? bytes? packet) #:when (< (bytes-length packet) packet-size)
|
||||
(log-syndicate/drivers/tcp-debug
|
||||
"short inbound packet (length ~a; expected ~a bytes) ~v for ~a"
|
||||
(bytes-length packet) packet-size packet (tcp-ends i))
|
||||
(eof-and-finish)]
|
||||
[(? bytes? packet)
|
||||
(log-syndicate/drivers/tcp-debug
|
||||
"inbound packet (length ~a) ~v for ~a"
|
||||
(bytes-length packet) packet (tcp-ends i))
|
||||
(turn! facet (lambda () (send-data (target-proc) packet mode)))
|
||||
(loop (update-count (- count 1) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))]
|
||||
[(list (cons count (and mode (Mode-lines line-mode))) q)
|
||||
(handle-evt (read-bytes-line-evt i (match line-mode
|
||||
[(LineMode-lf) 'linefeed]
|
||||
[(LineMode-crlf) 'return-linefeed]))
|
||||
(match-lambda
|
||||
[(? bytes? line)
|
||||
(log-syndicate/drivers/tcp-debug "inbound line ~v for ~a" line (tcp-ends i))
|
||||
(turn! facet (lambda () (send-line (target-proc) line line-mode)))
|
||||
(loop (update-count (- count 1) mode q))]
|
||||
[(? eof-object?) (eof-and-finish)]))])))))
|
||||
(define (issue-credit amount mode)
|
||||
(async-channel-put control-ch (list 'credit amount mode)))
|
||||
issue-credit)
|
||||
|
||||
(define-syntax (EPIPE stx)
|
||||
(local-require ffi/unsafe)
|
||||
(define errno-value (cons (lookup-errno 'EPIPE) 'posix))
|
||||
(syntax-case stx ()
|
||||
[(_) #`'#,errno-value]))
|
||||
|
||||
(define (with-stop-current-facet-on-epipe operation thunk)
|
||||
(with-handlers ([(lambda (e)
|
||||
(and (exn:fail:network:errno? e)
|
||||
(equal? (exn:fail:network:errno-errno e) (EPIPE))))
|
||||
(lambda (e)
|
||||
(log-syndicate/drivers/tcp-debug "epipe while ~a" operation)
|
||||
(stop-current-facet))])
|
||||
(thunk)))
|
||||
|
||||
(define (outbound-relay o)
|
||||
(define flush-pending #f)
|
||||
(lambda (payload mode)
|
||||
(log-syndicate/drivers/tcp-debug "outbound data ~v on ~a" payload (tcp-ends o))
|
||||
(with-stop-current-facet-on-epipe 'writing
|
||||
(lambda ()
|
||||
(write-bytes payload o)
|
||||
(match mode
|
||||
[(Mode-bytes) (void)]
|
||||
[(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)]
|
||||
[(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)])))
|
||||
(when (not flush-pending)
|
||||
(set! flush-pending #t)
|
||||
(facet-on-end-of-turn! this-facet
|
||||
(lambda ()
|
||||
(set! flush-pending #f)
|
||||
(with-stop-current-facet-on-epipe 'flushing
|
||||
(lambda () (flush-output o))))))))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
(define (handle-connection source sink
|
||||
#:on-disconnect [on-disconnect #f]
|
||||
#:on-error [on-error #f]
|
||||
#:on-credit [on-credit void]
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void])
|
||||
(make-source #:initial-sink sink
|
||||
#:name 'app-out
|
||||
#:on-disconnect on-disconnect #:on-error on-error
|
||||
#:on-credit on-credit)
|
||||
(make-sink #:initial-source source
|
||||
#:name 'app-in
|
||||
#:on-disconnect on-disconnect #:on-error on-error
|
||||
#:on-data on-data #:on-eof on-eof)
|
||||
(when initial-credit (send-credit source initial-credit initial-mode)))
|
||||
|
||||
(define (make-source
|
||||
#:initial-sink [initial-sink #f]
|
||||
#:name [name (gensym 'source)]
|
||||
#:on-connect [on-connect (lambda (new-sink) (void))]
|
||||
#:on-disconnect [on-disconnect0 #f]
|
||||
#:on-error [on-error0 #f]
|
||||
#:on-credit [on-credit (lambda (amount mode) (void))])
|
||||
(define sink #f)
|
||||
(define handle #f)
|
||||
(define (set-sink! new-sink)
|
||||
(when (not (eq? sink new-sink))
|
||||
(set! sink new-sink)
|
||||
(when sink (on-connect sink))
|
||||
(set! handle (turn-replace! this-turn sink handle
|
||||
(if sink (->preserve (Sink-source self)) (void))))))
|
||||
|
||||
(define on-disconnect
|
||||
(or on-disconnect0 (lambda ()
|
||||
(log-syndicate/drivers/tcp-debug "~a disconnected" self)
|
||||
(stop-current-facet))))
|
||||
(define on-error
|
||||
(or on-error0 (lambda (message)
|
||||
(log-syndicate/drivers/tcp-debug "~a error: ~v" self message)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define self
|
||||
(object #:name name
|
||||
[#:asserted (Source-sink new-sink) (set-sink! new-sink)
|
||||
#:retracted (when (eq? sink new-sink)
|
||||
(set-sink! #f)
|
||||
(on-disconnect))]
|
||||
[#:asserted (StreamError message) (on-error message)]
|
||||
[#:message (Source-credit amount mode) (on-credit amount mode)]))
|
||||
|
||||
(set-sink! initial-sink)
|
||||
self)
|
||||
|
||||
(define (make-sink
|
||||
#:initial-source [initial-source #f]
|
||||
#:name [name (gensym 'sink)]
|
||||
#:on-connect [on-connect (lambda (new-source) (void))]
|
||||
#:on-disconnect [on-disconnect0 #f]
|
||||
#:on-error [on-error0 #f]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof (lambda () (void))])
|
||||
(define source #f)
|
||||
(define handle #f)
|
||||
(define (set-source! new-source)
|
||||
(when (not (eq? new-source source))
|
||||
(set! source new-source)
|
||||
(when source (on-connect source))
|
||||
(set! handle (turn-replace! this-turn source handle
|
||||
(if source (->preserve (Source-sink self)) (void))))))
|
||||
|
||||
(define on-disconnect
|
||||
(or on-disconnect0 (lambda ()
|
||||
(log-syndicate/drivers/tcp-debug "~a disconnected" self)
|
||||
(stop-current-facet))))
|
||||
(define on-error
|
||||
(or on-error0 (lambda (message)
|
||||
(log-syndicate/drivers/tcp-debug "~a error: ~v" self message)
|
||||
(stop-current-facet))))
|
||||
|
||||
(define self
|
||||
(object #:name name
|
||||
[#:asserted (Sink-source new-source) (set-source! new-source)
|
||||
#:retracted (when (eq? source new-source)
|
||||
(set-source! #f)
|
||||
(on-disconnect))]
|
||||
[#:asserted (StreamError message) (on-error message)]
|
||||
[#:message (Sink-data payload mode) (on-data payload mode)]
|
||||
[#:message (Sink-eof) (on-eof)]))
|
||||
|
||||
(set-source! initial-source)
|
||||
self)
|
||||
|
||||
(define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)])
|
||||
(object #:name name
|
||||
[(ConnectionHandler-connected source sink)
|
||||
(on-connected source sink)]
|
||||
[(ConnectionHandler-rejected message)
|
||||
(error 'connection-handler "~a" message)]))
|
||||
|
||||
(define (establish-connection ds spec
|
||||
#:name [name (gensym 'establish-connection)]
|
||||
|
||||
#:on-connected [on-connected (lambda (source sink) (void))]
|
||||
#:on-rejected [on-rejected #f]
|
||||
|
||||
#:on-disconnect [on-disconnect #f]
|
||||
#:on-error [on-error #f]
|
||||
#:on-credit [on-credit void]
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void])
|
||||
(define peer
|
||||
(object #:name name
|
||||
[#:asserted (ConnectionHandler-connected source sink)
|
||||
(handle-connection source sink
|
||||
#:on-disconnect on-disconnect
|
||||
#:on-error on-error
|
||||
#:on-credit on-credit
|
||||
#:initial-credit initial-credit
|
||||
#:initial-mode initial-mode
|
||||
#:on-data on-data
|
||||
#:on-eof on-eof)
|
||||
(stop-facet ringing-facet)
|
||||
(on-connected source sink)]
|
||||
[#:asserted (ConnectionHandler-rejected message)
|
||||
(stop-facet ringing-facet)
|
||||
((or on-rejected (lambda (_message) (stop-current-facet))) message)]))
|
||||
(define ringing-facet (react (at ds (assert (StreamConnect spec peer)))))
|
||||
(void))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
(define (send-credit source amount mode)
|
||||
(send! source (Source-credit amount mode)))
|
||||
|
||||
(define (send-lines-credit source amount [mode (LineMode-lf)])
|
||||
(send-credit source (CreditAmount-count amount) (Mode-lines mode)))
|
||||
|
||||
(define (send-bytes-credit source amount)
|
||||
(send-credit source (CreditAmount-count amount) (Mode-bytes)))
|
||||
|
||||
(define (send-packet-credit source packet-size #:count [count 1])
|
||||
(send-credit source (CreditAmount-count count) (Mode-packet packet-size)))
|
||||
|
||||
(define (->bytes data)
|
||||
(if (bytes? data)
|
||||
data
|
||||
(string->bytes/utf-8 data)))
|
||||
|
||||
(define (send-line sink line [line-mode (LineMode-lf)])
|
||||
(send! sink (Sink-data (->bytes line) (Mode-lines line-mode))))
|
||||
|
||||
(define (send-data sink data [mode (Mode-bytes)])
|
||||
(send! sink (Sink-data (->bytes data) mode)))
|
||||
|
||||
(define (send-eof sink)
|
||||
(send! sink (Sink-eof)))
|
||||
|
|
|
@ -56,7 +56,8 @@
|
|||
(syntax-rules ()
|
||||
[(_ [dataspace] body ...)
|
||||
(standard-actor-system/no-services [dataspace]
|
||||
(with-services [syndicate/drivers/tcp
|
||||
(with-services [syndicate/drivers/stream
|
||||
syndicate/drivers/tcp
|
||||
syndicate/drivers/timer]
|
||||
body ...))]))
|
||||
|
||||
|
|
Loading…
Reference in New Issue