107 lines
4.4 KiB
Racket
107 lines
4.4 KiB
Racket
#lang syndicate
|
|
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
|
;;; SPDX-FileCopyrightText: Copyright © 2021-2022 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
|
|
|
(provide (all-from-out syndicate/drivers/stream)
|
|
(all-from-out syndicate/schemas/tcp))
|
|
|
|
(require racket/async-channel)
|
|
(require racket/tcp)
|
|
(require (only-in racket/exn exn->string))
|
|
(require syndicate/driver-support)
|
|
(require syndicate/functional-queue)
|
|
(require syndicate/pattern)
|
|
(require syndicate/drivers/stream)
|
|
(require syndicate/schemas/tcp)
|
|
(require syndicate/schemas/dataspacePatterns)
|
|
|
|
(require (for-syntax racket/base))
|
|
|
|
(define-logger syndicate/drivers/tcp)
|
|
|
|
(provide-service [ds]
|
|
(with-services [syndicate/drivers/stream]
|
|
(at ds
|
|
(during/spawn
|
|
(Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,(DLit $host) ,(DLit $port)))) _)
|
|
#:name (TcpLocal host port)
|
|
(run-listener ds host port))
|
|
|
|
(during/spawn (StreamConnection $source $sink (TcpRemote $host $port))
|
|
#:name (TcpRemote host port)
|
|
(run-outbound ds source sink host port)))))
|
|
|
|
(define (run-listener ds host port)
|
|
(define spec (TcpLocal host port))
|
|
(on-start (log-syndicate/drivers/tcp-info "+listener on ~v" spec))
|
|
(on-stop (log-syndicate/drivers/tcp-info "-listener on ~v" spec))
|
|
(linked-thread
|
|
#:name (list 'listen-thread host port)
|
|
(lambda (facet)
|
|
(with-connection-error-guard ds
|
|
(lambda (message)
|
|
(turn! facet (lambda ()
|
|
(log-syndicate/drivers/tcp-warning "~a" message)
|
|
(at ds (assert (StreamListenerError spec message))))))
|
|
(lambda ()
|
|
(define listener (tcp-listen port 512 #t host))
|
|
(lambda ()
|
|
(turn! facet (lambda ()
|
|
(at ds (assert (StreamListenerReady spec)))))
|
|
(let loop ()
|
|
(define connection-custodian (make-custodian))
|
|
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
|
(tcp-accept listener)))
|
|
(turn! facet (lambda () (spawn-connection ds connection-custodian i o spec #f #f)))
|
|
(loop))))))))
|
|
|
|
(define (tcp-ends p)
|
|
(call-with-values (lambda () (tcp-addresses p #t))
|
|
(lambda (lh lp rh rp) (list (TcpLocal lh lp) (TcpRemote rh rp)))))
|
|
|
|
(define (spawn-connection ds custodian i o spec peer-source peer-sink)
|
|
(match-define (and ends (list (and local-end (TcpLocal local-host local-port))
|
|
(and remote-end (TcpRemote remote-host remote-port))))
|
|
(tcp-ends i))
|
|
(define name (format "[~a:~a::~a:~a]" local-host local-port remote-host remote-port))
|
|
(log-syndicate/drivers/tcp-info "TCP socket ~a for ~a established" name spec)
|
|
(spawn #:name name
|
|
(actor-add-exit-hook! this-actor (lambda ()
|
|
(close-input-port i)
|
|
(close-output-port o)))
|
|
|
|
(define-field facet-count 2)
|
|
(define source #f)
|
|
(define sink #f)
|
|
|
|
(react (on-stop (facet-count (- (facet-count) 1))
|
|
(close-input-port i))
|
|
(set! source (port-source i #:initial-sink peer-sink #:custodian custodian))
|
|
(at ds (assert (TcpPeerInfo source local-end remote-end))))
|
|
|
|
(react (on-stop (facet-count (- (facet-count) 1))
|
|
(close-output-port o))
|
|
(set! sink (port-sink o #:initial-source peer-source))
|
|
(at ds (assert (TcpPeerInfo sink local-end remote-end))))
|
|
|
|
(when (TcpLocal? spec)
|
|
(at ds
|
|
(assert #:when (positive? (facet-count)) (StreamConnection source sink spec))))))
|
|
|
|
(define (with-connection-error-guard ds error-proc proc)
|
|
((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))])
|
|
(proc))))
|
|
|
|
(define (run-outbound ds source sink host port)
|
|
(with-connection-error-guard ds
|
|
(lambda (message)
|
|
(log-syndicate/drivers/tcp-warning "~a" message)
|
|
(at source (assert (StreamError message)))
|
|
(at sink (assert (StreamError message))))
|
|
(lambda ()
|
|
(define connection-custodian (make-custodian))
|
|
(define-values (i o) (parameterize ((current-custodian connection-custodian))
|
|
(tcp-connect host port)))
|
|
(lambda ()
|
|
(spawn-connection ds connection-custodian i o (TcpRemote host port) source sink)))))
|