#lang syndicate ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021-2024 Tony Garnock-Jones (provide (all-from-out syndicate/drivers/stream) (all-from-out syndicate/schemas/tcp)) (require racket/async-channel) (require racket/tcp) (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/functional-queue) (require syndicate/pattern) (require syndicate/drivers/stream) (require syndicate/schemas/tcp) (require syndicate/schemas/dataspacePatterns) (require (for-syntax racket/base)) (define-logger syndicate/drivers/tcp) (provide-service [ds] (with-services [syndicate/drivers/stream] (at ds (during/spawn (Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,(Pattern-lit $host) ,(Pattern-lit $port)))) _) #:name (TcpLocal host port) (run-listener ds host port)) (during/spawn (StreamConnection $source $sink (TcpRemote $host $port)) #:name (TcpRemote host port) (run-outbound ds source sink host port))))) (define (run-listener ds host port) (define spec (TcpLocal host port)) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v" spec)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v" spec)) (linked-thread #:name (list 'listen-thread host port) (lambda (facet) (with-connection-error-guard ds (lambda (message) (turn! facet (lambda () (log-syndicate/drivers/tcp-warning "~a" message) (at ds (assert (StreamListenerError spec message)))))) (lambda () (define listener (tcp-listen port 512 #t host)) (lambda () (turn! facet (lambda () (at ds (assert (StreamListenerReady spec))))) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn! facet (lambda () (spawn-connection ds connection-custodian i o spec #f #f))) (loop)))))))) (define (tcp-ends p) (call-with-values (lambda () (tcp-addresses p #t)) (lambda (lh lp rh rp) (list (TcpLocal lh lp) (TcpRemote rh rp))))) (define (spawn-connection ds custodian i o spec peer-source peer-sink) (match-define (and ends (list (and local-end (TcpLocal local-host local-port)) (and remote-end (TcpRemote remote-host remote-port)))) (tcp-ends i)) (define name (format "[~a:~a::~a:~a]" local-host local-port remote-host remote-port)) (log-syndicate/drivers/tcp-info "TCP socket ~a for ~a established" name spec) (spawn #:name name (actor-add-exit-hook! this-actor (lambda () (close-input-port i) (close-output-port o))) (define-field facet-count 2) (define source #f) (define sink #f) (react (on-stop (facet-count (- (facet-count) 1)) (close-input-port i)) (set! source (port-source i #:initial-sink peer-sink #:custodian custodian)) (at ds (assert (TcpPeerInfo source local-end remote-end)))) (react (on-stop (facet-count (- (facet-count) 1)) (close-output-port o)) (set! sink (port-sink o #:initial-source peer-source)) (at ds (assert (TcpPeerInfo sink local-end remote-end)))) (when (TcpLocal? spec) (at ds (assert #:when (positive? (facet-count)) (StreamConnection source sink spec)))))) (define (with-connection-error-guard ds error-proc proc) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))]) (proc)))) (define (run-outbound ds source sink host port) (with-connection-error-guard ds (lambda (message) (log-syndicate/drivers/tcp-warning "~a" message) (at source (assert (StreamError message))) (at sink (assert (StreamError message)))) (lambda () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) (lambda () (spawn-connection ds connection-custodian i o (TcpRemote host port) source sink)))))