diff --git a/syndicate-examples/tcp-client-naive.rkt b/syndicate-examples/tcp-client-naive.rkt index 056ddb9..2d298c1 100644 --- a/syndicate-examples/tcp-client-naive.rkt +++ b/syndicate-examples/tcp-client-naive.rkt @@ -7,8 +7,6 @@ (require (only-in racket/port read-line-evt)) (require (only-in racket/string string-trim)) (require syndicate/drivers/tcp) - (require syndicate/drivers/stream) - (require syndicate/drivers/racket-event) (define host "127.0.0.1") (define port 5999) @@ -20,5 +18,4 @@ (set! port (string->number port-number))]) (standard-actor-system (ds) - (at ds - (assert (StreamConnection (port-lines-source ds) (port-sink) (TcpRemote host port)))))) + (at ds (assert (StreamConnection (port-source) (port-sink) (TcpRemote host port)))))) diff --git a/syndicate/driver-support.rkt b/syndicate/driver-support.rkt index 9f365e1..972c11d 100644 --- a/syndicate/driver-support.rkt +++ b/syndicate/driver-support.rkt @@ -12,7 +12,9 @@ (define (linked-thread thread-proc #:name [name (gensym 'linked-thread)] #:peer [peer (ref (entity/stop-on-retract #:name (list name 'monitor)))] - #:custodian [c (make-custodian)]) + #:custodian [c0 #f]) + (define c (or c0 (make-custodian))) + (define facet this-facet) (define actor this-actor) diff --git a/syndicate/drivers/stream.rkt b/syndicate/drivers/stream.rkt index 594734c..2ba37e6 100644 --- a/syndicate/drivers/stream.rkt +++ b/syndicate/drivers/stream.rkt @@ -2,48 +2,406 @@ ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones -(provide port-lines-source - port-sink) +(provide (all-from-out syndicate/schemas/gen/stream) -(require (only-in racket/port read-line-evt)) + port-source + port-sink -(require "tcp.rkt") ;; ugh, lots of tcp.rkt actually belongs in this file -(require syndicate/drivers/racket-event) + make-connection-handler + make-source + make-sink + handle-connection + establish-connection + + send-credit + send-lines-credit + send-bytes-credit + send-packet-credit + send-line + send-data + send-eof) + +(require (for-syntax racket/base)) + +(require (only-in racket/port + read-bytes-line-evt + read-bytes-evt + read-bytes-avail!-evt + eof-evt)) +(require racket/async-channel) + +(require syndicate/functional-queue) (require syndicate/service) +(require syndicate/pattern) +(require syndicate/driver-support) +(require syndicate/drivers/racket-event) -(define (port-lines-source ds [port (current-input-port)] - #:initial-credit [initial-credit 0] - #:name [name (list 'port-lines-source (object-name port))] - #:line-mode [line-mode (LineMode-lf)]) - (at ds (assert (RequireService 'syndicate/drivers/racket-event))) - (define-field credit initial-credit) - (define-field sink #f) +(require syndicate/schemas/gen/stream) + +(define-logger syndicate/drivers/stream) + +(provide-service [ds] (at ds - (on (message #:when (and (sink) (positive? (credit))) (RacketEvent (read-line-evt port) $vs)) - (credit (- (credit) 1)) - (match (car vs) - [(? eof-object?) (send-eof (sink))] - [line (send-line (sink) line line-mode)]))) + (during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _) + #:name (list 'stream-listener spec-pat) + (match (pattern->constant spec-pat) + [(? void?) (stop-current-facet)] + [spec (at ds + (during (StreamSpecListenable spec) + (assert + (StreamListener spec + (make-connection-handler + (lambda (source sink) + (assert (StreamConnection source sink spec))))))))])) + + (during/spawn (StreamConnection $app-source $app-sink $spec) + #:name (list 'stream-connection spec) + (at ds + (during (StreamSpecConnectable spec) + (assert (StreamConnect spec + (object #:name 'connection-peer + [(ConnectionHandler-connected sys-source sys-sink) + (at sys-source (assert (Source-sink app-sink))) + (at sys-sink (assert (Sink-source app-source)))] + [(ConnectionHandler-rejected message) + (log-syndicate/drivers/stream-error + "Connection to ~a rejected: ~a" spec message) + (at app-source (assert (StreamError message))) + (at app-sink (assert (StreamError message))) + (stop-current-facet)])))))) + + ;; I translate interest in StreamListener with a particular spec-pattern into a facet + ;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern + ;; by asserting StreamSpecListenable with that spec. + (during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _) + (define listenable-asserter + (object [bindings + (define spec + (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) + (assert (StreamSpecListenable spec))])) + (assert + (Observe (:pattern + (Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _)) + listenable-asserter))) + + ;; I translate interest in StreamConnect with a particular spec-pattern into a facet that + ;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by + ;; asserting StreamSpecConnectable with that spec. + (during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _) + (define connectable-asserter + (object [bindings + (define spec + (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) + (assert (StreamSpecConnectable spec))])) + (assert + (Observe (:pattern + (Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _)) + connectable-asserter))))) + +(define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)]) + (object #:name name + [(ConnectionHandler-connected source sink) + (on-connected source sink)] + [(ConnectionHandler-rejected message) + (error 'connection-handler "~a" message)])) + +(define (port-source [port (current-input-port)] + #:custodian [custodian #f] + #:name [name (list 'port-source (object-name port))]) + (define active-sink #f) + (define issue-credit (start-inbound-relay #:custodian custodian + (lambda () active-sink) + port)) (make-source #:name name - #:on-connect sink - #:on-credit (lambda (amount mode) - (if (equal? amount (CreditAmount-unbounded)) - (credit +inf.0) - (match mode - [(Mode-lines _) - (credit (+ (credit) (CreditAmount-count-value amount)))] - [_ (void)]))))) + #:on-connect (lambda (new-sink) (set! active-sink new-sink)) + #:on-credit issue-credit)) + +(define (start-inbound-relay target-proc port + #:custodian custodian) + (define eof-received? #f) + (define control-ch (make-async-channel)) + + (linked-thread + #:name (cons 'input-thread (object-name port)) + #:custodian custodian + #:peer (object #:name 'inbound-relay-monitor + [#:asserted _ + #:retracted + (close-input-port port) + (when (not eof-received?) (stop-current-facet))]) + (lambda (facet) + (define (update-count remaining-count mode q) + (if (zero? remaining-count) + q + (undequeue (cons remaining-count mode) q))) + + (define (eof-and-finish) + (log-syndicate/drivers/stream-debug "inbound eof for ~a" (object-name port)) + (turn! facet (lambda () (send-eof (target-proc))))) + + (let loop ((credits (make-queue))) + (sync (handle-evt control-ch + (match-lambda + [(list 'credit (CreditAmount-count 0) _mode) (loop credits)] + [(list 'credit (CreditAmount-count n) mode) + (loop (match (unenqueue* credits) + [(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))] + [_ (enqueue credits (cons n mode))]))] + [(list 'credit (CreditAmount-unbounded) mode) + (loop (match (unenqueue* credits) + [(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))] + [_ (enqueue credits (cons +inf.0 mode))]))])) + (match (dequeue* credits) + [(list) + (handle-evt (eof-evt port) + (lambda _ignored + (eof-and-finish)))] + [(list (cons count (and mode (Mode-bytes))) q) + (define buffer (make-bytes (inexact->exact (min count 131072)))) + (handle-evt (read-bytes-avail!-evt buffer port) + (match-lambda + [(? number? read-count) + (define bs (subbytes buffer 0 read-count)) + (log-syndicate/drivers/stream-debug "inbound data ~v for ~a" bs (object-name port)) + (turn! facet (lambda () (send-data (target-proc) bs))) + (loop (update-count (- count read-count) mode q))] + [(? eof-object?) (eof-and-finish)]))] + [(list (cons count (and mode (Mode-packet packet-size))) q) + (handle-evt (read-bytes-evt packet-size port) + (match-lambda + [(? bytes? packet) #:when (< (bytes-length packet) packet-size) + (log-syndicate/drivers/stream-debug + "short inbound packet (length ~a; expected ~a bytes) ~v for ~a" + (bytes-length packet) packet-size packet (object-name port)) + (eof-and-finish)] + [(? bytes? packet) + (log-syndicate/drivers/stream-debug + "inbound packet (length ~a) ~v for ~a" + (bytes-length packet) packet (object-name port)) + (turn! facet (lambda () (send-data (target-proc) packet mode))) + (loop (update-count (- count 1) mode q))] + [(? eof-object?) (eof-and-finish)]))] + [(list (cons count (and mode (Mode-lines line-mode))) q) + (handle-evt (read-bytes-line-evt port (match line-mode + [(LineMode-lf) 'linefeed] + [(LineMode-crlf) 'return-linefeed])) + (match-lambda + [(? bytes? line) + (log-syndicate/drivers/stream-debug "inbound line ~v for ~a" line (object-name port)) + (turn! facet (lambda () (send-line (target-proc) line line-mode))) + (loop (update-count (- count 1) mode q))] + [(? eof-object?) (eof-and-finish)]))]))))) + (define (issue-credit amount mode) + (async-channel-put control-ch (list 'credit amount mode))) + issue-credit) (define (port-sink [port (current-output-port)] + #:initial-credit [initial-credit (CreditAmount-unbounded)] + #:initial-mode [initial-mode (Mode-bytes)] #:name [name (list 'port-sink (object-name port))]) + (define active-source #f) + (define relay (outbound-relay port)) (make-sink #:name name - #:on-connect (lambda (source) (send-credit source (CreditAmount-unbounded) (Mode-bytes))) - #:on-eof (lambda () (close-output-port port)) - #:on-data (lambda (data mode) - (when (bytes? data) - (write-bytes data port) - (match mode - [(Mode-bytes) (void)] - [(Mode-lines (LineMode-lf)) (write-bytes #"\n" port)] - [(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" port)]) - (flush-output port))))) + #:on-connect + (lambda (new-source) + (set! active-source new-source) + (when initial-credit (send-credit active-source initial-credit initial-mode))) + #:on-data + (lambda (data mode) + (relay data mode) + (match mode + [(Mode-bytes) (send-bytes-credit active-source (bytes-length data))] + [(Mode-lines lm) (send-lines-credit active-source 1 lm)] + [(Mode-packet n) (send-packet-credit active-source n)] + [(Mode-object _) (void)])) + #:on-eof + (lambda () (stop-current-facet)))) + +(define-syntax (EPIPE stx) + (local-require ffi/unsafe) + (define errno-value (cons (lookup-errno 'EPIPE) 'posix)) + (syntax-case stx () [(_) #`'#,errno-value])) + +(define (with-stop-current-facet-on-epipe operation thunk) + (with-handlers ([(lambda (e) + (and (exn:fail:network:errno? e) + (equal? (exn:fail:network:errno-errno e) (EPIPE)))) + (lambda (e) + (log-syndicate/drivers/stream-debug "epipe while ~a" operation) + (stop-current-facet))]) + (thunk))) + +(define (outbound-relay o) + (define flush-pending #f) + (lambda (payload mode) + (log-syndicate/drivers/stream-debug "outbound data ~v on ~a" payload (object-name o)) + (with-stop-current-facet-on-epipe 'writing + (lambda () + (write-bytes payload o) + (match mode + [(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)] + [(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)] + [_ (void)]))) + (when (not flush-pending) + (set! flush-pending #t) + (facet-on-end-of-turn! this-facet + (lambda () + (set! flush-pending #f) + (with-stop-current-facet-on-epipe 'flushing + (lambda () (flush-output o)))))))) + +(define (make-source #:initial-sink [initial-sink #f] + #:name [name (gensym 'source)] + #:on-connect [on-connect (lambda (new-sink) (void))] + #:on-disconnect [on-disconnect0 #f] + #:on-error [on-error0 #f] + #:on-credit [on-credit (lambda (amount mode) (void))]) + (define sink #f) + (define handle #f) + + (define (set-sink! new-sink) + (when (not (eq? sink new-sink)) + (set! sink new-sink) + (when sink (on-connect sink)) + (set! handle (turn-replace! this-turn sink handle + (if sink (->preserve (Sink-source self)) (void)))))) + + (define on-disconnect + (or on-disconnect0 (lambda () + (log-syndicate/drivers/stream-debug "~a disconnected" self) + (stop-current-facet)))) + + (define on-error + (or on-error0 (lambda (message) + (log-syndicate/drivers/stream-debug "~a error: ~v" self message) + (stop-current-facet)))) + + (define self + (object #:name name + [#:asserted (Source-sink new-sink) (set-sink! new-sink) + #:retracted (when (eq? sink new-sink) + (set-sink! #f) + (on-disconnect))] + [#:asserted (StreamError message) (on-error message)] + [#:message (Source-credit amount mode) (on-credit amount mode)])) + + (set-sink! initial-sink) + self) + +(define (make-sink #:initial-source [initial-source #f] + #:name [name (gensym 'sink)] + #:on-connect [on-connect (lambda (new-source) (void))] + #:on-disconnect [on-disconnect0 #f] + #:on-error [on-error0 #f] + #:on-data on-data + #:on-eof [on-eof (lambda () (void))]) + (define source #f) + (define handle #f) + + (define (set-source! new-source) + (when (not (eq? new-source source)) + (set! source new-source) + (when source (on-connect source)) + (set! handle (turn-replace! this-turn source handle + (if source (->preserve (Source-sink self)) (void)))))) + + (define on-disconnect + (or on-disconnect0 (lambda () + (log-syndicate/drivers/stream-debug "~a disconnected" self) + (stop-current-facet)))) + + (define on-error + (or on-error0 (lambda (message) + (log-syndicate/drivers/stream-debug "~a error: ~v" self message) + (stop-current-facet)))) + + (define self + (object #:name name + [#:asserted (Sink-source new-source) (set-source! new-source) + #:retracted (when (eq? source new-source) + (set-source! #f) + (on-disconnect))] + [#:asserted (StreamError message) (on-error message)] + [#:message (Sink-data payload mode) (on-data payload mode)] + [#:message (Sink-eof) (on-eof)])) + + (set-source! initial-source) + self) + +(define (handle-connection source sink + #:on-disconnect [on-disconnect #f] + #:on-error [on-error #f] + #:on-credit [on-credit void] + #:initial-credit [initial-credit (CreditAmount-unbounded)] + #:initial-mode [initial-mode (Mode-bytes)] + #:on-data on-data + #:on-eof [on-eof void]) + (make-source #:initial-sink sink + #:name 'app-out + #:on-disconnect on-disconnect #:on-error on-error + #:on-credit on-credit) + (make-sink #:initial-source source + #:name 'app-in + #:on-disconnect on-disconnect #:on-error on-error + #:on-data on-data #:on-eof on-eof) + (when initial-credit (send-credit source initial-credit initial-mode))) + +(define (establish-connection ds spec + #:name [name (gensym 'establish-connection)] + + #:on-connected [on-connected (lambda (source sink) (void))] + #:on-rejected [on-rejected #f] + + #:on-disconnect [on-disconnect #f] + #:on-error [on-error #f] + #:on-credit [on-credit void] + #:initial-credit [initial-credit (CreditAmount-unbounded)] + #:initial-mode [initial-mode (Mode-bytes)] + #:on-data on-data + #:on-eof [on-eof void]) + (define peer + (object #:name name + [#:asserted (ConnectionHandler-connected source sink) + (handle-connection source sink + #:on-disconnect on-disconnect + #:on-error on-error + #:on-credit on-credit + #:initial-credit initial-credit + #:initial-mode initial-mode + #:on-data on-data + #:on-eof on-eof) + (stop-facet ringing-facet) + (on-connected source sink)] + [#:asserted (ConnectionHandler-rejected message) + (stop-facet ringing-facet) + ((or on-rejected (lambda (_message) (stop-current-facet))) message)])) + (define ringing-facet (react (at ds (assert (StreamConnect spec peer))))) + (void)) + +;;--------------------------------------------------------------------------- + +(define (send-credit source amount mode) + (send! source (Source-credit amount mode))) + +(define (send-lines-credit source amount [mode (LineMode-lf)]) + (send-credit source (CreditAmount-count amount) (Mode-lines mode))) + +(define (send-bytes-credit source amount) + (send-credit source (CreditAmount-count amount) (Mode-bytes))) + +(define (send-packet-credit source packet-size #:count [count 1]) + (send-credit source (CreditAmount-count count) (Mode-packet packet-size))) + +(define (->bytes data) + (if (bytes? data) + data + (string->bytes/utf-8 data))) + +(define (send-line sink line [line-mode (LineMode-lf)]) + (send! sink (Sink-data (->bytes line) (Mode-lines line-mode)))) + +(define (send-data sink data [mode (Mode-bytes)]) + (send! sink (Sink-data (->bytes data) mode))) + +(define (send-eof sink) + (send! sink (Sink-eof))) diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 80327f4..3b6f779 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -2,31 +2,16 @@ ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones -(provide (all-from-out syndicate/schemas/gen/stream) - (all-from-out syndicate/schemas/gen/tcp) - - handle-connection - make-source - make-sink - make-connection-handler - establish-connection - - send-credit - send-lines-credit - send-bytes-credit - send-packet-credit - send-line - send-data - send-eof) +(provide (all-from-out syndicate/drivers/stream) + (all-from-out syndicate/schemas/gen/tcp)) (require racket/async-channel) (require racket/tcp) -(require racket/port) (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/functional-queue) (require syndicate/pattern) -(require syndicate/schemas/gen/stream) +(require syndicate/drivers/stream) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) @@ -35,70 +20,15 @@ (define-logger syndicate/drivers/tcp) (provide-service [ds] - (at ds - (during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _) - #:name (list 'simple-listener spec-pat) - (match (pattern->constant spec-pat) - [(? void?) (stop-current-facet)] - [spec (at ds - (during (StreamSpecListenable spec) - (assert - (StreamListener spec - (make-connection-handler - (lambda (source sink) - (assert (StreamConnection source sink spec))))))))])) + (with-services [syndicate/drivers/stream] + (at ds + (during/spawn (StreamListener (TcpLocal $host $port) $peer) + #:name (TcpLocal host port) + (run-listener ds peer host port)) - (during/spawn (StreamConnection $app-source $app-sink $spec) - #:name (list 'simple-connection spec) - (at ds - (during (StreamSpecConnectable spec) - (assert (StreamConnect spec - (object #:name 'connection-peer - [(ConnectionHandler-connected sys-source sys-sink) - (at sys-source (assert (Source-sink app-sink))) - (at sys-sink (assert (Sink-source app-source)))] - [(ConnectionHandler-rejected message) - (log-syndicate/drivers/tcp-error - "Connection to ~a rejected: ~a" spec message) - (at app-source (assert (StreamError message))) - (at app-sink (assert (StreamError message))) - (stop-current-facet)])))))) - - ;; I translate interest in StreamListener with a particular spec-pattern into a facet - ;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern - ;; by asserting StreamSpecListenable with that spec. - (during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _) - (define listenable-asserter - (object [bindings - (define spec - (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) - (assert (StreamSpecListenable spec))])) - (assert - (Observe (:pattern - (Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _)) - listenable-asserter))) - - ;; I translate interest in StreamConnect with a particular spec-pattern into a facet that - ;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by - ;; asserting StreamSpecConnectable with that spec. - (during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _) - (define connectable-asserter - (object [bindings - (define spec - (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) - (assert (StreamSpecConnectable spec))])) - (assert - (Observe (:pattern - (Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _)) - connectable-asserter))) - - (during/spawn (StreamListener (TcpLocal $host $port) $peer) - #:name (TcpLocal host port) - (run-listener ds peer host port)) - - (during/spawn (StreamConnect (TcpRemote $host $port) $peer) - #:name (TcpRemote host port) - (run-outbound ds peer host port)))) + (during/spawn (StreamConnect (TcpRemote $host $port) $peer) + #:name (TcpRemote host port) + (run-outbound ds peer host port))))) (define (run-listener ds peer host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) @@ -142,35 +72,17 @@ (react (on-stop (facet-count (- (facet-count) 1)) (close-input-port i)) - (define active-sink #f) - (define issue-credit (start-inbound-relay custodian (lambda () active-sink) i)) - (set! source (make-source #:name 'socket-in - #:on-connect (lambda (new-sink) (set! active-sink new-sink)) - #:on-credit issue-credit)) + (set! source (port-source i #:custodian custodian)) (at ds (assert (TcpPeerInfo source local-end remote-end)))) (react (on-stop (facet-count (- (facet-count) 1)) (close-output-port o)) - (define active-source #f) - (define relay (outbound-relay o)) - (set! sink (make-sink #:name 'socket-out - #:on-connect - (lambda (new-source) - (set! active-source new-source) - (send-credit active-source (CreditAmount-unbounded) (Mode-bytes))) - #:on-data - (lambda (data mode) - (relay data mode) - (match mode - [(Mode-bytes) (send-bytes-credit active-source (bytes-length data))] - [(Mode-lines lm) (send-lines-credit active-source 1 lm)] - [(Mode-packet n) (send-packet-credit active-source n)] - [(Mode-object _) (void)])) - #:on-eof (lambda () (stop-current-facet)))) + (set! sink (port-sink o)) (at ds (assert (TcpPeerInfo sink local-end remote-end)))) - (at peer (assert #:when (positive? (facet-count)) - (ConnectionHandler-connected source sink))))) + (at peer + (assert #:when (positive? (facet-count)) + (ConnectionHandler-connected source sink))))) (define (with-connection-error-guard ds peer error-proc proc) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))]) @@ -185,275 +97,3 @@ (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) (lambda () (spawn-connection ds connection-custodian i o peer))))) - -(define (start-inbound-relay custodian target-proc i) - (define eof-received? #f) - (define control-ch (make-async-channel)) - (linked-thread - #:name (cons 'input-thread (tcp-ends i)) - #:custodian custodian - #:peer (object #:name 'inbound-relay-monitor - [#:asserted _ - #:retracted - (close-input-port i) - (when (not eof-received?) (stop-current-facet))]) - (lambda (facet) - (define (update-count remaining-count mode q) - (if (zero? remaining-count) - q - (undequeue (cons remaining-count mode) q))) - (define (eof-and-finish) - (log-syndicate/drivers/tcp-debug "inbound eof for ~a" (tcp-ends i)) - (turn! facet (lambda () (send-eof (target-proc))))) - (let loop ((credits (make-queue))) - (sync (handle-evt control-ch - (match-lambda - [(list 'credit (CreditAmount-count 0) _mode) (loop credits)] - [(list 'credit (CreditAmount-count n) mode) - (loop (match (unenqueue* credits) - [(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))] - [_ (enqueue credits (cons n mode))]))] - [(list 'credit (CreditAmount-unbounded) mode) - (loop (match (unenqueue* credits) - [(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))] - [_ (enqueue credits (cons +inf.0 mode))]))])) - (match (dequeue* credits) - [(list) - (handle-evt (eof-evt i) - (lambda _ignored - (eof-and-finish)))] - [(list (cons count (and mode (Mode-bytes))) q) - (define buffer (make-bytes (inexact->exact (min count 131072)))) - (handle-evt (read-bytes-avail!-evt buffer i) - (match-lambda - [(? number? read-count) - (define bs (subbytes buffer 0 read-count)) - (log-syndicate/drivers/tcp-debug "inbound data ~v for ~a" bs (tcp-ends i)) - (turn! facet (lambda () (send-data (target-proc) bs))) - (loop (update-count (- count read-count) mode q))] - [(? eof-object?) (eof-and-finish)]))] - [(list (cons count (and mode (Mode-packet packet-size))) q) - (handle-evt (read-bytes-evt packet-size i) - (match-lambda - [(? bytes? packet) #:when (< (bytes-length packet) packet-size) - (log-syndicate/drivers/tcp-debug - "short inbound packet (length ~a; expected ~a bytes) ~v for ~a" - (bytes-length packet) packet-size packet (tcp-ends i)) - (eof-and-finish)] - [(? bytes? packet) - (log-syndicate/drivers/tcp-debug - "inbound packet (length ~a) ~v for ~a" - (bytes-length packet) packet (tcp-ends i)) - (turn! facet (lambda () (send-data (target-proc) packet mode))) - (loop (update-count (- count 1) mode q))] - [(? eof-object?) (eof-and-finish)]))] - [(list (cons count (and mode (Mode-lines line-mode))) q) - (handle-evt (read-bytes-line-evt i (match line-mode - [(LineMode-lf) 'linefeed] - [(LineMode-crlf) 'return-linefeed])) - (match-lambda - [(? bytes? line) - (log-syndicate/drivers/tcp-debug "inbound line ~v for ~a" line (tcp-ends i)) - (turn! facet (lambda () (send-line (target-proc) line line-mode))) - (loop (update-count (- count 1) mode q))] - [(? eof-object?) (eof-and-finish)]))]))))) - (define (issue-credit amount mode) - (async-channel-put control-ch (list 'credit amount mode))) - issue-credit) - -(define-syntax (EPIPE stx) - (local-require ffi/unsafe) - (define errno-value (cons (lookup-errno 'EPIPE) 'posix)) - (syntax-case stx () - [(_) #`'#,errno-value])) - -(define (with-stop-current-facet-on-epipe operation thunk) - (with-handlers ([(lambda (e) - (and (exn:fail:network:errno? e) - (equal? (exn:fail:network:errno-errno e) (EPIPE)))) - (lambda (e) - (log-syndicate/drivers/tcp-debug "epipe while ~a" operation) - (stop-current-facet))]) - (thunk))) - -(define (outbound-relay o) - (define flush-pending #f) - (lambda (payload mode) - (log-syndicate/drivers/tcp-debug "outbound data ~v on ~a" payload (tcp-ends o)) - (with-stop-current-facet-on-epipe 'writing - (lambda () - (write-bytes payload o) - (match mode - [(Mode-bytes) (void)] - [(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)] - [(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)]))) - (when (not flush-pending) - (set! flush-pending #t) - (facet-on-end-of-turn! this-facet - (lambda () - (set! flush-pending #f) - (with-stop-current-facet-on-epipe 'flushing - (lambda () (flush-output o)))))))) - -;;--------------------------------------------------------------------------- - -(define (handle-connection source sink - #:on-disconnect [on-disconnect #f] - #:on-error [on-error #f] - #:on-credit [on-credit void] - #:initial-credit [initial-credit (CreditAmount-unbounded)] - #:initial-mode [initial-mode (Mode-bytes)] - #:on-data on-data - #:on-eof [on-eof void]) - (make-source #:initial-sink sink - #:name 'app-out - #:on-disconnect on-disconnect #:on-error on-error - #:on-credit on-credit) - (make-sink #:initial-source source - #:name 'app-in - #:on-disconnect on-disconnect #:on-error on-error - #:on-data on-data #:on-eof on-eof) - (when initial-credit (send-credit source initial-credit initial-mode))) - -(define (make-source - #:initial-sink [initial-sink #f] - #:name [name (gensym 'source)] - #:on-connect [on-connect (lambda (new-sink) (void))] - #:on-disconnect [on-disconnect0 #f] - #:on-error [on-error0 #f] - #:on-credit [on-credit (lambda (amount mode) (void))]) - (define sink #f) - (define handle #f) - (define (set-sink! new-sink) - (when (not (eq? sink new-sink)) - (set! sink new-sink) - (when sink (on-connect sink)) - (set! handle (turn-replace! this-turn sink handle - (if sink (->preserve (Sink-source self)) (void)))))) - - (define on-disconnect - (or on-disconnect0 (lambda () - (log-syndicate/drivers/tcp-debug "~a disconnected" self) - (stop-current-facet)))) - (define on-error - (or on-error0 (lambda (message) - (log-syndicate/drivers/tcp-debug "~a error: ~v" self message) - (stop-current-facet)))) - - (define self - (object #:name name - [#:asserted (Source-sink new-sink) (set-sink! new-sink) - #:retracted (when (eq? sink new-sink) - (set-sink! #f) - (on-disconnect))] - [#:asserted (StreamError message) (on-error message)] - [#:message (Source-credit amount mode) (on-credit amount mode)])) - - (set-sink! initial-sink) - self) - -(define (make-sink - #:initial-source [initial-source #f] - #:name [name (gensym 'sink)] - #:on-connect [on-connect (lambda (new-source) (void))] - #:on-disconnect [on-disconnect0 #f] - #:on-error [on-error0 #f] - #:on-data on-data - #:on-eof [on-eof (lambda () (void))]) - (define source #f) - (define handle #f) - (define (set-source! new-source) - (when (not (eq? new-source source)) - (set! source new-source) - (when source (on-connect source)) - (set! handle (turn-replace! this-turn source handle - (if source (->preserve (Source-sink self)) (void)))))) - - (define on-disconnect - (or on-disconnect0 (lambda () - (log-syndicate/drivers/tcp-debug "~a disconnected" self) - (stop-current-facet)))) - (define on-error - (or on-error0 (lambda (message) - (log-syndicate/drivers/tcp-debug "~a error: ~v" self message) - (stop-current-facet)))) - - (define self - (object #:name name - [#:asserted (Sink-source new-source) (set-source! new-source) - #:retracted (when (eq? source new-source) - (set-source! #f) - (on-disconnect))] - [#:asserted (StreamError message) (on-error message)] - [#:message (Sink-data payload mode) (on-data payload mode)] - [#:message (Sink-eof) (on-eof)])) - - (set-source! initial-source) - self) - -(define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)]) - (object #:name name - [(ConnectionHandler-connected source sink) - (on-connected source sink)] - [(ConnectionHandler-rejected message) - (error 'connection-handler "~a" message)])) - -(define (establish-connection ds spec - #:name [name (gensym 'establish-connection)] - - #:on-connected [on-connected (lambda (source sink) (void))] - #:on-rejected [on-rejected #f] - - #:on-disconnect [on-disconnect #f] - #:on-error [on-error #f] - #:on-credit [on-credit void] - #:initial-credit [initial-credit (CreditAmount-unbounded)] - #:initial-mode [initial-mode (Mode-bytes)] - #:on-data on-data - #:on-eof [on-eof void]) - (define peer - (object #:name name - [#:asserted (ConnectionHandler-connected source sink) - (handle-connection source sink - #:on-disconnect on-disconnect - #:on-error on-error - #:on-credit on-credit - #:initial-credit initial-credit - #:initial-mode initial-mode - #:on-data on-data - #:on-eof on-eof) - (stop-facet ringing-facet) - (on-connected source sink)] - [#:asserted (ConnectionHandler-rejected message) - (stop-facet ringing-facet) - ((or on-rejected (lambda (_message) (stop-current-facet))) message)])) - (define ringing-facet (react (at ds (assert (StreamConnect spec peer))))) - (void)) - -;;--------------------------------------------------------------------------- - -(define (send-credit source amount mode) - (send! source (Source-credit amount mode))) - -(define (send-lines-credit source amount [mode (LineMode-lf)]) - (send-credit source (CreditAmount-count amount) (Mode-lines mode))) - -(define (send-bytes-credit source amount) - (send-credit source (CreditAmount-count amount) (Mode-bytes))) - -(define (send-packet-credit source packet-size #:count [count 1]) - (send-credit source (CreditAmount-count count) (Mode-packet packet-size))) - -(define (->bytes data) - (if (bytes? data) - data - (string->bytes/utf-8 data))) - -(define (send-line sink line [line-mode (LineMode-lf)]) - (send! sink (Sink-data (->bytes line) (Mode-lines line-mode)))) - -(define (send-data sink data [mode (Mode-bytes)]) - (send! sink (Sink-data (->bytes data) mode))) - -(define (send-eof sink) - (send! sink (Sink-eof))) diff --git a/syndicate/service.rkt b/syndicate/service.rkt index ce9e6da..d0099eb 100644 --- a/syndicate/service.rkt +++ b/syndicate/service.rkt @@ -56,7 +56,8 @@ (syntax-rules () [(_ [dataspace] body ...) (standard-actor-system/no-services [dataspace] - (with-services [syndicate/drivers/tcp + (with-services [syndicate/drivers/stream + syndicate/drivers/tcp syndicate/drivers/timer] body ...))]))