#lang syndicate ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (provide (all-from-out syndicate/schemas/gen/stream) (all-from-out syndicate/schemas/gen/tcp) handle-connection make-source make-sink make-connection-handler establish-connection send-credit send-lines-credit send-bytes-credit send-packet-credit send-line send-data send-eof) (require racket/async-channel) (require racket/tcp) (require racket/port) (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/functional-queue) (require syndicate/pattern) (require syndicate/schemas/gen/stream) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) (require (for-syntax racket/base)) (define-logger syndicate/drivers/tcp) (provide-service [ds] (at ds (during/spawn (Observe (:pattern (StreamConnection ,_ ,_ ,$spec-pat)) _) #:name (list 'simple-listener spec-pat) (match (pattern->constant spec-pat) [(? void?) (stop-current-facet)] [spec (at ds (during (StreamSpecListenable spec) (assert (StreamListener spec (make-connection-handler (lambda (source sink) (assert (StreamConnection source sink spec))))))))])) (during/spawn (StreamConnection $app-source $app-sink $spec) #:name (list 'simple-connection spec) (at ds (during (StreamSpecConnectable spec) (assert (StreamConnect spec (object #:name 'connection-peer [(ConnectionHandler-connected sys-source sys-sink) (at sys-source (assert (Source-sink app-sink))) (at sys-sink (assert (Sink-source app-source)))] [(ConnectionHandler-rejected message) (log-syndicate/drivers/tcp-error "Connection to ~a rejected: ~a" spec message) (at app-source (assert (StreamError message))) (at app-sink (assert (StreamError message))) (stop-current-facet)])))))) ;; I translate interest in StreamListener with a particular spec-pattern into a facet ;; that reacts to interest in StreamSpecListenable with a spec matching the spec-pattern ;; by asserting StreamSpecListenable with that spec. (during (Observe (:pattern (StreamListener ,$spec-pat ,_)) _) (define listenable-asserter (object [bindings (define spec (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) (assert (StreamSpecListenable spec))])) (assert (Observe (:pattern (Observe (:pattern (StreamSpecListenable ,,(:pattern (DLit ,spec-pat)))) _)) listenable-asserter))) ;; I translate interest in StreamConnect with a particular spec-pattern into a facet that ;; reacts to interest in StreamSpecConnectable with a spec matching the spec-pattern by ;; asserting StreamSpecConnectable with that spec. (during (Observe (:pattern (StreamConnect ,$spec-pat ,_)) _) (define connectable-asserter (object [bindings (define spec (pattern->constant spec-pat (lambda (_name index) (list-ref bindings index)))) (assert (StreamSpecConnectable spec))])) (assert (Observe (:pattern (Observe (:pattern (StreamSpecConnectable ,,(:pattern (DLit ,spec-pat)))) _)) connectable-asserter))) (during/spawn (StreamListener (TcpLocal $host $port) $peer) #:name (TcpLocal host port) (run-listener ds peer host port)) (during/spawn (StreamConnect (TcpRemote $host $port) $peer) #:name (TcpRemote host port) (run-outbound ds peer host port)))) (define (run-listener ds peer host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread #:name (list 'listen-thread host port) (lambda (facet) (with-connection-error-guard ds peer (lambda (message) (turn! facet (lambda () (at ds (assert (TcpListenError (TcpLocal host port) message))) (at peer (assert (ConnectionHandler-rejected message)))))) (lambda () (define listener (tcp-listen port 512 #t host)) (lambda () (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn! facet (lambda () (spawn-connection ds connection-custodian i o peer))) (loop)))))))) (define (tcp-ends p) (call-with-values (lambda () (tcp-addresses p #t)) (lambda (lh lp rh rp) (list (TcpLocal lh lp) (TcpRemote rh rp))))) (define (spawn-connection ds custodian i o peer) (match-define (and ends (list (and local-end (TcpLocal local-host local-port)) (and remote-end (TcpRemote remote-host remote-port)))) (tcp-ends i)) (define name (format "[~a:~a::~a:~a]" local-host local-port remote-host remote-port)) (log-syndicate/drivers/tcp-info "Connection ~a established" name) (spawn #:name name (actor-add-exit-hook! this-actor (lambda () (close-input-port i) (close-output-port o))) (define-field facet-count 2) (define source #f) (define sink #f) (react (on-stop (facet-count (- (facet-count) 1)) (close-input-port i)) (define active-sink #f) (define issue-credit (start-inbound-relay custodian (lambda () active-sink) i)) (set! source (make-source #:name 'socket-in #:on-connect (lambda (new-sink) (set! active-sink new-sink)) #:on-credit issue-credit)) (at ds (assert (TcpPeerInfo source local-end remote-end)))) (react (on-stop (facet-count (- (facet-count) 1)) (close-output-port o)) (define active-source #f) (define relay (outbound-relay o)) (set! sink (make-sink #:name 'socket-out #:on-connect (lambda (new-source) (set! active-source new-source) (send-credit active-source (CreditAmount-unbounded) (Mode-bytes))) #:on-data (lambda (data mode) (relay data mode) (match mode [(Mode-bytes) (send-bytes-credit active-source (bytes-length data))] [(Mode-lines lm) (send-lines-credit active-source 1 lm)] [(Mode-packet n) (send-packet-credit active-source n)] [(Mode-object _) (void)])) #:on-eof (lambda () (stop-current-facet)))) (at ds (assert (TcpPeerInfo sink local-end remote-end)))) (at peer (assert #:when (positive? (facet-count)) (ConnectionHandler-connected source sink))))) (define (with-connection-error-guard ds peer error-proc proc) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))]) (proc)))) (define (run-outbound ds peer host port) (with-connection-error-guard ds peer (lambda (message) (at peer (assert (ConnectionHandler-rejected message)))) (lambda () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) (lambda () (spawn-connection ds connection-custodian i o peer))))) (define (start-inbound-relay custodian target-proc i) (define eof-received? #f) (define control-ch (make-async-channel)) (linked-thread #:name (cons 'input-thread (tcp-ends i)) #:custodian custodian #:peer (object #:name 'inbound-relay-monitor [#:asserted _ #:retracted (close-input-port i) (when (not eof-received?) (stop-current-facet))]) (lambda (facet) (define (update-count remaining-count mode q) (if (zero? remaining-count) q (undequeue (cons remaining-count mode) q))) (define (eof-and-finish) (log-syndicate/drivers/tcp-debug "inbound eof for ~a" (tcp-ends i)) (turn! facet (lambda () (send-eof (target-proc))))) (let loop ((credits (make-queue))) (sync (handle-evt control-ch (match-lambda [(list 'credit (CreditAmount-count 0) _mode) (loop credits)] [(list 'credit (CreditAmount-count n) mode) (loop (match (unenqueue* credits) [(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))] [_ (enqueue credits (cons n mode))]))] [(list 'credit (CreditAmount-unbounded) mode) (loop (match (unenqueue* credits) [(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))] [_ (enqueue credits (cons +inf.0 mode))]))])) (match (dequeue* credits) [(list) (handle-evt (eof-evt i) (lambda _ignored (eof-and-finish)))] [(list (cons count (and mode (Mode-bytes))) q) (define buffer (make-bytes (inexact->exact (min count 131072)))) (handle-evt (read-bytes-avail!-evt buffer i) (match-lambda [(? number? read-count) (define bs (subbytes buffer 0 read-count)) (log-syndicate/drivers/tcp-debug "inbound data ~v for ~a" bs (tcp-ends i)) (turn! facet (lambda () (send-data (target-proc) bs))) (loop (update-count (- count read-count) mode q))] [(? eof-object?) (eof-and-finish)]))] [(list (cons count (and mode (Mode-packet packet-size))) q) (handle-evt (read-bytes-evt packet-size i) (match-lambda [(? bytes? packet) #:when (< (bytes-length packet) packet-size) (log-syndicate/drivers/tcp-debug "short inbound packet (length ~a; expected ~a bytes) ~v for ~a" (bytes-length packet) packet-size packet (tcp-ends i)) (eof-and-finish)] [(? bytes? packet) (log-syndicate/drivers/tcp-debug "inbound packet (length ~a) ~v for ~a" (bytes-length packet) packet (tcp-ends i)) (turn! facet (lambda () (send-data (target-proc) packet mode))) (loop (update-count (- count 1) mode q))] [(? eof-object?) (eof-and-finish)]))] [(list (cons count (and mode (Mode-lines line-mode))) q) (handle-evt (read-bytes-line-evt i (match line-mode [(LineMode-lf) 'linefeed] [(LineMode-crlf) 'return-linefeed])) (match-lambda [(? bytes? line) (log-syndicate/drivers/tcp-debug "inbound line ~v for ~a" line (tcp-ends i)) (turn! facet (lambda () (send-line (target-proc) line line-mode))) (loop (update-count (- count 1) mode q))] [(? eof-object?) (eof-and-finish)]))]))))) (define (issue-credit amount mode) (async-channel-put control-ch (list 'credit amount mode))) issue-credit) (define-syntax (EPIPE stx) (local-require ffi/unsafe) (define errno-value (cons (lookup-errno 'EPIPE) 'posix)) (syntax-case stx () [(_) #`'#,errno-value])) (define (with-stop-current-facet-on-epipe operation thunk) (with-handlers ([(lambda (e) (and (exn:fail:network:errno? e) (equal? (exn:fail:network:errno-errno e) (EPIPE)))) (lambda (e) (log-syndicate/drivers/tcp-debug "epipe while ~a" operation) (stop-current-facet))]) (thunk))) (define (outbound-relay o) (define flush-pending #f) (lambda (payload mode) (log-syndicate/drivers/tcp-debug "outbound data ~v on ~a" payload (tcp-ends o)) (with-stop-current-facet-on-epipe 'writing (lambda () (write-bytes payload o) (match mode [(Mode-bytes) (void)] [(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)] [(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)]))) (when (not flush-pending) (set! flush-pending #t) (facet-on-end-of-turn! this-facet (lambda () (set! flush-pending #f) (with-stop-current-facet-on-epipe 'flushing (lambda () (flush-output o)))))))) ;;--------------------------------------------------------------------------- (define (handle-connection source sink #:on-disconnect [on-disconnect #f] #:on-error [on-error #f] #:on-credit [on-credit void] #:initial-credit [initial-credit (CreditAmount-unbounded)] #:initial-mode [initial-mode (Mode-bytes)] #:on-data on-data #:on-eof [on-eof void]) (make-source #:initial-sink sink #:name 'app-out #:on-disconnect on-disconnect #:on-error on-error #:on-credit on-credit) (make-sink #:initial-source source #:name 'app-in #:on-disconnect on-disconnect #:on-error on-error #:on-data on-data #:on-eof on-eof) (when initial-credit (send-credit source initial-credit initial-mode))) (define (make-source #:initial-sink [initial-sink #f] #:name [name (gensym 'source)] #:on-connect [on-connect (lambda (new-sink) (void))] #:on-disconnect [on-disconnect0 #f] #:on-error [on-error0 #f] #:on-credit [on-credit (lambda (amount mode) (void))]) (define sink #f) (define handle #f) (define (set-sink! new-sink) (when (not (eq? sink new-sink)) (set! sink new-sink) (when sink (on-connect sink)) (set! handle (turn-replace! this-turn sink handle (if sink (->preserve (Sink-source self)) (void)))))) (define on-disconnect (or on-disconnect0 (lambda () (log-syndicate/drivers/tcp-debug "~a disconnected" self) (stop-current-facet)))) (define on-error (or on-error0 (lambda (message) (log-syndicate/drivers/tcp-debug "~a error: ~v" self message) (stop-current-facet)))) (define self (object #:name name [#:asserted (Source-sink new-sink) (set-sink! new-sink) #:retracted (when (eq? sink new-sink) (set-sink! #f) (on-disconnect))] [#:asserted (StreamError message) (on-error message)] [#:message (Source-credit amount mode) (on-credit amount mode)])) (set-sink! initial-sink) self) (define (make-sink #:initial-source [initial-source #f] #:name [name (gensym 'sink)] #:on-connect [on-connect (lambda (new-source) (void))] #:on-disconnect [on-disconnect0 #f] #:on-error [on-error0 #f] #:on-data on-data #:on-eof [on-eof (lambda () (void))]) (define source #f) (define handle #f) (define (set-source! new-source) (when (not (eq? new-source source)) (set! source new-source) (when source (on-connect source)) (set! handle (turn-replace! this-turn source handle (if source (->preserve (Source-sink self)) (void)))))) (define on-disconnect (or on-disconnect0 (lambda () (log-syndicate/drivers/tcp-debug "~a disconnected" self) (stop-current-facet)))) (define on-error (or on-error0 (lambda (message) (log-syndicate/drivers/tcp-debug "~a error: ~v" self message) (stop-current-facet)))) (define self (object #:name name [#:asserted (Sink-source new-source) (set-source! new-source) #:retracted (when (eq? source new-source) (set-source! #f) (on-disconnect))] [#:asserted (StreamError message) (on-error message)] [#:message (Sink-data payload mode) (on-data payload mode)] [#:message (Sink-eof) (on-eof)])) (set-source! initial-source) self) (define (make-connection-handler on-connected #:name [name (gensym 'connection-handler)]) (object #:name name [(ConnectionHandler-connected source sink) (on-connected source sink)] [(ConnectionHandler-rejected message) (error 'connection-handler "~a" message)])) (define (establish-connection ds spec #:name [name (gensym 'establish-connection)] #:on-connected [on-connected (lambda (source sink) (void))] #:on-rejected [on-rejected #f] #:on-disconnect [on-disconnect #f] #:on-error [on-error #f] #:on-credit [on-credit void] #:initial-credit [initial-credit (CreditAmount-unbounded)] #:initial-mode [initial-mode (Mode-bytes)] #:on-data on-data #:on-eof [on-eof void]) (define peer (object #:name name [#:asserted (ConnectionHandler-connected source sink) (handle-connection source sink #:on-disconnect on-disconnect #:on-error on-error #:on-credit on-credit #:initial-credit initial-credit #:initial-mode initial-mode #:on-data on-data #:on-eof on-eof) (stop-facet ringing-facet) (on-connected source sink)] [#:asserted (ConnectionHandler-rejected message) (stop-facet ringing-facet) ((or on-rejected (lambda (_message) (stop-current-facet))) message)])) (define ringing-facet (react (at ds (assert (StreamConnect spec peer))))) (void)) ;;--------------------------------------------------------------------------- (define (send-credit source amount mode) (send! source (Source-credit amount mode))) (define (send-lines-credit source amount [mode (LineMode-lf)]) (send-credit source (CreditAmount-count amount) (Mode-lines mode))) (define (send-bytes-credit source amount) (send-credit source (CreditAmount-count amount) (Mode-bytes))) (define (send-packet-credit source packet-size #:count [count 1]) (send-credit source (CreditAmount-count count) (Mode-packet packet-size))) (define (->bytes data) (if (bytes? data) data (string->bytes/utf-8 data))) (define (send-line sink line [line-mode (LineMode-lf)]) (send! sink (Sink-data (->bytes line) (Mode-lines line-mode)))) (define (send-data sink data [mode (Mode-bytes)]) (send! sink (Sink-data (->bytes data) mode))) (define (send-eof sink) (send! sink (Sink-eof)))