#lang syndicate ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (provide (all-from-out syndicate/schemas/gen/tcp) spawn-tcp-driver accept-connection assert-control establish-connection send-credit send-lines-credit send-bytes-credit send-packet-credit send-line send-data send-eof) (require racket/async-channel) (require racket/tcp) (require racket/port) (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/functional-queue) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) (require (for-syntax racket/base)) (define-logger syndicate/drivers/tcp) (define (spawn-tcp-driver ds) (spawn #:name 'tcp-driver #:daemon? #t (at ds (during/spawn (Observe (:pattern (Connection ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _) #:name (TcpLocal host port) (run-listener ds host port)) (during/spawn (Connection $local-peer (TcpRemote $host $port)) #:name (TcpRemote host port) (run-outbound ds local-peer host port))))) (define (run-listener ds host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread #:name (list (TcpLocal host port) 'thread) (lambda (facet) (define listener (tcp-listen port 512 #t host)) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn! facet (lambda () (spawn-inbound ds connection-custodian i o (TcpLocal host port)))) (loop))))) (define (run-outbound ds local-peer host port) (define connection-custodian (make-custodian)) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (at local-peer (assert (ActiveSocket-close (exn->string e))))))]) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) (lambda () (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (at ds (assert (ConnectionPeer local-peer (TcpLocal (car name) (cadr name))))) (actor-add-exit-hook! this-actor (lambda () (close-input-port i) (close-output-port o))) (define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i)) (define relay (outbound-relay name o)) (at local-peer (assert (ActiveSocket-controller (object #:name (list name 'socket) [#:message (Socket-credit amount mode) (issue-credit amount mode)] [#:message (Socket-data data mode) (relay data mode)] [#:message (Socket-eof) (close-output-port o)])))))))) (define (spawn-inbound ds custodian i o spec) (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (spawn #:name name (actor-add-exit-hook! this-actor (lambda () (close-input-port i) (close-output-port o))) (define issue-credit #f) (define active-controller #f) (define relay (outbound-relay name o)) (define handle (object #:name (list name 'active-socket) [#:asserted (ActiveSocket-controller controller) (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) (when (not active-controller) (set! issue-credit (start-inbound-relay custodian name (lambda () active-controller) i))) (set! active-controller controller) #:retracted (when (eq? controller active-controller) (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) (stop-current-facet))] [#:asserted (ActiveSocket-close message) (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) (stop-current-facet)] [#:message (ActiveSocket-Socket (Socket-credit amount mode)) (if issue-credit (issue-credit amount mode) (log-syndicate/drivers/tcp-warning "Socket-credit ~v/~v ignored because no controller present" amount mode))] [#:message (ActiveSocket-Socket (Socket-data data mode)) (relay data mode)] [#:message (ActiveSocket-Socket (Socket-eof)) (close-output-port o)])) (at ds (assert (ConnectionPeer handle (TcpRemote (caddr name) (cadddr name)))) (assert (Connection handle spec))))) (define (start-inbound-relay custodian name target-proc i) (define eof-received? #f) (define control-ch (make-async-channel)) (linked-thread #:name (list name 'input-thread) #:custodian custodian #:peer (object #:name 'inbound-relay-monitor [#:asserted _ #:retracted (close-input-port i) (when (not eof-received?) (stop-current-facet))]) (lambda (facet) (define (update-count remaining-count mode q) (if (zero? remaining-count) q (undequeue (cons remaining-count mode) q))) (define (eof-and-finish) (log-syndicate/drivers/tcp-debug "inbound eof for ~v" name) (turn! facet (lambda () (send-eof (target-proc))))) (let loop ((credits (make-queue))) (sync (handle-evt control-ch (match-lambda [(list 'credit (CreditAmount-count 0) _mode) (loop credits)] [(list 'credit (CreditAmount-count n) mode) (loop (match (unenqueue* credits) [(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))] [_ (enqueue credits (cons n mode))]))] [(list 'credit (CreditAmount-unbounded) mode) (loop (match (unenqueue* credits) [(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))] [_ (enqueue credits (cons +inf.0 mode))]))])) (match (dequeue* credits) [(list) never-evt] [(list (cons count (and mode (Mode-bytes))) q) (define buffer (make-bytes (inexact->exact (min count 131072)))) (handle-evt (read-bytes-avail!-evt buffer i) (match-lambda [(? number? read-count) (define bs (subbytes buffer 0 read-count)) (log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name) (turn! facet (lambda () (send-data (target-proc) bs))) (loop (update-count (- count read-count) mode q))] [(? eof-object?) (eof-and-finish)]))] [(list (cons count (and mode (Mode-packet packet-size))) q) (handle-evt (read-bytes-evt packet-size i) (match-lambda [(? bytes? packet) #:when (< (bytes-length packet) packet-size) (log-syndicate/drivers/tcp-debug "short inbound packet (length ~a; expected ~a bytes) ~v for ~v" (bytes-length packet) packet-size packet name) (eof-and-finish)] [(? bytes? packet) (log-syndicate/drivers/tcp-debug "inbound packet (length ~a) ~v for ~v" (bytes-length packet) packet name) (turn! facet (lambda () (send-data (target-proc) packet mode))) (loop (update-count (- count 1) mode q))] [(? eof-object?) (eof-and-finish)]))] [(list (cons count (and mode (Mode-lines line-mode))) q) (handle-evt (read-bytes-line-evt i (match line-mode [(LineMode-lf) 'linefeed] [(LineMode-crlf) 'return-linefeed])) (match-lambda [(? bytes? line) (log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name) (turn! facet (lambda () (send-line (target-proc) line line-mode))) (loop (update-count (- count 1) mode q))] [(? eof-object?) (eof-and-finish)]))]))))) (define (issue-credit amount mode) (async-channel-put control-ch (list 'credit amount mode))) issue-credit) (define-syntax (EPIPE stx) (local-require ffi/unsafe) (define errno-value (cons (lookup-errno 'EPIPE) 'posix)) (syntax-case stx () [(_) #`'#,errno-value])) (define (with-stop-current-facet-on-epipe operation thunk) (with-handlers ([(lambda (e) (and (exn:fail:network:errno? e) (equal? (exn:fail:network:errno-errno e) (EPIPE)))) (lambda (e) (log-syndicate/drivers/tcp-debug "epipe while ~a" operation) (stop-current-facet))]) (thunk))) (define (outbound-relay name o) (define flush-pending #f) (lambda (payload mode) (log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name) (with-stop-current-facet-on-epipe 'writing (lambda () (write-bytes payload o) (match mode [(Mode-bytes) (void)] [(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)] [(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)]))) (when (not flush-pending) (set! flush-pending #t) (facet-on-end-of-turn! this-facet (lambda () (set! flush-pending #f) (with-stop-current-facet-on-epipe 'flushing (lambda () (flush-output o)))))))) (define (accept-connection conn #:initial-credit [initial-credit (CreditAmount-unbounded)] #:initial-mode [initial-mode (Mode-bytes)] #:on-data on-data #:on-eof [on-eof void] #:on-credit [on-credit void]) (when initial-credit (send-credit conn initial-credit initial-mode)) (assert-control conn #:on-data on-data #:on-eof on-eof #:on-credit on-credit)) (define (assert-control conn #:on-data on-data #:on-eof [on-eof void] #:on-credit [on-credit void]) (at conn (assert (ActiveSocket-controller (object #:name 'inbound-socket-controller [#:message (Socket-credit amount mode) (on-credit amount mode)] [#:message (Socket-data data mode) (on-data data mode)] [#:message (Socket-eof) (on-eof)]))))) (define (establish-connection ds spec #:initial-credit [initial-credit (CreditAmount-unbounded)] #:initial-mode [initial-mode (Mode-bytes)] #:on-connected on-connected #:on-data on-data #:on-eof [on-eof void] #:on-credit [on-credit void] #:on-disconnected [on-disconnected (lambda () (stop-current-facet))] #:on-rejected [on-rejected (lambda () (stop-current-facet))]) (define s (object #:name 'outbound-socket [#:asserted (ActiveSocket-controller peer) (on-connected peer) (when initial-credit (send-credit peer initial-credit initial-mode)) #:retracted (on-disconnected)] [#:asserted (ActiveSocket-close message) (on-rejected message)] [#:message (ActiveSocket-Socket (Socket-credit amount mode)) (on-credit amount mode)] [#:message (ActiveSocket-Socket (Socket-data data mode)) (on-data data mode)] [#:message (ActiveSocket-Socket (Socket-eof)) (on-eof)])) (at ds (assert (Connection s spec)))) (define (send-credit conn amount mode) (send! conn (Socket-credit amount mode))) (define (send-lines-credit conn amount [mode (LineMode-lf)]) (send-credit conn (CreditAmount-count amount) (Mode-lines mode))) (define (send-bytes-credit conn amount) (send-credit conn (CreditAmount-count amount) (Mode-bytes))) (define (send-packet-credit conn packet-size #:count [count 1]) (send-credit conn (CreditAmount-count count) (Mode-packet packet-size))) (define (->bytes data) (if (bytes? data) data (string->bytes/utf-8 data))) (define (send-line conn line [line-mode (LineMode-lf)]) (send! conn (Socket-data (->bytes line) (Mode-lines line-mode)))) (define (send-data conn data [mode (Mode-bytes)]) (send! conn (Socket-data (->bytes data) mode))) (define (send-eof conn) (send! conn (Socket-eof)))