syndicate-rkt/syndicate/drivers/tcp.rkt

107 lines
4.4 KiB
Racket
Raw Permalink Normal View History

2021-06-10 08:00:43 +00:00
#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
2024-03-10 11:43:06 +00:00
;;; SPDX-FileCopyrightText: Copyright © 2021-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
2021-06-10 08:00:43 +00:00
(provide (all-from-out syndicate/drivers/stream)
2021-07-01 07:40:52 +00:00
(all-from-out syndicate/schemas/tcp))
2021-06-10 08:42:59 +00:00
(require racket/async-channel)
2021-06-10 08:00:43 +00:00
(require racket/tcp)
(require (only-in racket/exn exn->string))
2021-06-10 08:00:43 +00:00
(require syndicate/driver-support)
(require syndicate/functional-queue)
(require syndicate/pattern)
(require syndicate/drivers/stream)
2021-07-01 07:40:52 +00:00
(require syndicate/schemas/tcp)
(require syndicate/schemas/dataspacePatterns)
2021-06-10 08:00:43 +00:00
2021-06-10 14:46:20 +00:00
(require (for-syntax racket/base))
2021-06-10 08:00:43 +00:00
(define-logger syndicate/drivers/tcp)
2021-06-17 12:57:06 +00:00
(provide-service [ds]
(with-services [syndicate/drivers/stream]
(at ds
2021-06-18 11:48:12 +00:00
(during/spawn
2024-04-09 12:00:33 +00:00
(Observe (:pattern (StreamConnection ,_ ,_ (TcpLocal ,(Pattern-lit $host) ,(Pattern-lit $port)))) _)
#:name (TcpLocal host port)
2021-06-18 11:48:12 +00:00
(run-listener ds host port))
2021-06-17 12:57:06 +00:00
2021-06-18 11:48:12 +00:00
(during/spawn (StreamConnection $source $sink (TcpRemote $host $port))
#:name (TcpRemote host port)
2021-06-18 11:48:12 +00:00
(run-outbound ds source sink host port)))))
2021-06-10 09:42:07 +00:00
2021-06-18 11:48:12 +00:00
(define (run-listener ds host port)
(define spec (TcpLocal host port))
(on-start (log-syndicate/drivers/tcp-info "+listener on ~v" spec))
(on-stop (log-syndicate/drivers/tcp-info "-listener on ~v" spec))
2021-06-10 09:42:07 +00:00
(linked-thread
#:name (list 'listen-thread host port)
2021-06-10 09:42:07 +00:00
(lambda (facet)
2021-06-18 11:48:12 +00:00
(with-connection-error-guard ds
(lambda (message)
(turn! facet (lambda ()
2021-06-18 11:48:12 +00:00
(log-syndicate/drivers/tcp-warning "~a" message)
(at ds (assert (StreamListenerError spec message))))))
(lambda ()
(define listener (tcp-listen port 512 #t host))
(lambda ()
2021-06-18 11:48:12 +00:00
(turn! facet (lambda ()
(at ds (assert (StreamListenerReady spec)))))
(let loop ()
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-accept listener)))
2021-06-18 11:48:12 +00:00
(turn! facet (lambda () (spawn-connection ds connection-custodian i o spec #f #f)))
(loop))))))))
2021-06-10 09:42:07 +00:00
(define (tcp-ends p)
(call-with-values (lambda () (tcp-addresses p #t))
(lambda (lh lp rh rp) (list (TcpLocal lh lp) (TcpRemote rh rp)))))
2021-06-18 11:48:12 +00:00
(define (spawn-connection ds custodian i o spec peer-source peer-sink)
(match-define (and ends (list (and local-end (TcpLocal local-host local-port))
(and remote-end (TcpRemote remote-host remote-port))))
(tcp-ends i))
(define name (format "[~a:~a::~a:~a]" local-host local-port remote-host remote-port))
2021-06-18 11:48:12 +00:00
(log-syndicate/drivers/tcp-info "TCP socket ~a for ~a established" name spec)
(spawn #:name name
(actor-add-exit-hook! this-actor (lambda ()
(close-input-port i)
(close-output-port o)))
(define-field facet-count 2)
(define source #f)
(define sink #f)
(react (on-stop (facet-count (- (facet-count) 1))
(close-input-port i))
2021-06-18 11:48:12 +00:00
(set! source (port-source i #:initial-sink peer-sink #:custodian custodian))
(at ds (assert (TcpPeerInfo source local-end remote-end))))
(react (on-stop (facet-count (- (facet-count) 1))
(close-output-port o))
2021-06-18 11:48:12 +00:00
(set! sink (port-sink o #:initial-source peer-source))
(at ds (assert (TcpPeerInfo sink local-end remote-end))))
2021-06-18 11:48:12 +00:00
(when (TcpLocal? spec)
(at ds
(assert #:when (positive? (facet-count)) (StreamConnection source sink spec))))))
2021-06-18 11:48:12 +00:00
(define (with-connection-error-guard ds error-proc proc)
((with-handlers ([exn:fail:network? (lambda (e) (lambda () (error-proc (exn->string e))))])
(proc))))
2021-06-18 11:48:12 +00:00
(define (run-outbound ds source sink host port)
(with-connection-error-guard ds
(lambda (message)
2021-06-18 11:48:12 +00:00
(log-syndicate/drivers/tcp-warning "~a" message)
(at source (assert (StreamError message)))
(at sink (assert (StreamError message))))
(lambda ()
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-connect host port)))
2021-06-18 11:48:12 +00:00
(lambda ()
(spawn-connection ds connection-custodian i o (TcpRemote host port) source sink)))))