syndicate-rkt/syndicate/drivers/tcp.rkt

172 lines
6.5 KiB
Racket

#lang syndicate
;;; SPDX-License-Identifier: LGPL-3.0-or-later
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
(provide (all-from-out syndicate/schemas/gen/tcp)
spawn-tcp-driver
accept-connection
establish-connection
send-data
read-bytes-avail)
(require racket/tcp)
(require (only-in racket/exn exn->string))
(require syndicate/driver-support)
(require syndicate/schemas/gen/tcp)
(require syndicate/schemas/gen/dataspace-patterns)
(require (for-syntax racket/base))
(define-logger syndicate/drivers/tcp)
(define (spawn-tcp-driver ds)
(spawn
#:name 'tcp-driver
#:daemon? #t
(at ds
(during/spawn
(Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _)
#:name (TcpInbound host port)
(run-listener ds host port))
(during/spawn
(Connection $local-peer (TcpOutbound $host $port))
#:name (TcpOutbound host port)
(run-outbound ds local-peer host port)))))
(define (run-listener ds host port)
(on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port))
(on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port))
(linked-thread
#:name (list (TcpInbound host port) 'thread)
(lambda (facet)
(define listener (tcp-listen port 512 #t host))
(let loop ()
(define connection-custodian (make-custodian))
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-accept listener)))
(turn! facet
(lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port))))
(loop)))))
(define (run-outbound ds local-peer host port)
(define connection-custodian (make-custodian))
((with-handlers ([exn:fail:network?
(lambda (e)
(lambda () (at local-peer (assert (ActiveSocket-close (exn->string e))))))])
(define-values (i o) (parameterize ((current-custodian connection-custodian))
(tcp-connect host port)))
(lambda ()
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
(on-stop (close-input-port i)
(close-output-port o))
(start-inbound-relay connection-custodian name local-peer i)
(define relay (outbound-relay name o))
(at local-peer
(assert (ActiveSocket-controller
(object #:name (list name 'socket)
[#:asserted (Socket data) (relay data)]))))))))
(define (spawn-inbound ds custodian i o spec)
(define name (call-with-values (lambda () (tcp-addresses i #t)) list))
(spawn
#:name name
(on-stop (close-input-port i)
(close-output-port o))
(define active-controller #f)
(define relay (outbound-relay name o))
(at ds
(assert (Connection
(object
#:name (list name 'active-socket)
[#:asserted (ActiveSocket-controller controller)
(log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor)
(when (not active-controller)
(start-inbound-relay custodian name controller i))
(set! active-controller controller)
#:retracted
(when (eq? controller active-controller)
(log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor)
(stop-current-facet))]
[#:asserted (ActiveSocket-close message)
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
(stop-current-facet)]
[#:asserted (ActiveSocket-Socket (Socket data))
(relay data)])
spec)))))
(define (start-inbound-relay custodian name target i)
(linked-thread
#:name (list name 'input-thread)
#:custodian custodian
(lambda (facet)
(let loop ()
(define bs (read-bytes-avail i))
(when (bytes? bs)
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name)
(turn! facet (lambda () (send-data target bs)))
(loop))))))
(define-syntax (EPIPE stx)
(local-require ffi/unsafe)
(define errno-value (cons (lookup-errno 'EPIPE) 'posix))
(syntax-case stx ()
[(_) #`'#,errno-value]))
(define (with-stop-current-facet-on-epipe operation thunk)
(with-handlers ([(lambda (e)
(and (exn:fail:network:errno? e)
(equal? (exn:fail:network:errno-errno e) (EPIPE))))
(lambda (e)
(log-syndicate/drivers/tcp-debug "epipe while ~a" operation)
(stop-current-facet))])
(thunk)))
(define (outbound-relay name o)
(define flush-pending #f)
(lambda (payload)
(log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name)
(with-stop-current-facet-on-epipe 'writing (lambda () (write-bytes payload o)))
(when (not flush-pending)
(set! flush-pending #t)
(facet-on-end-of-turn! this-facet
(lambda ()
(set! flush-pending #f)
(with-stop-current-facet-on-epipe 'flushing
(lambda () (flush-output o))))))))
(define (read-bytes-avail input-port #:limit [limit 65536])
(define buffer (make-bytes limit))
(match (read-bytes-avail! buffer input-port)
[(? number? count) (subbytes buffer 0 count)]
[other other]))
(define (accept-connection conn #:on-data on-data)
(at conn
(assert (ActiveSocket-controller
(object #:name 'inbound-socket-controller
[#:asserted (Socket data) (on-data data)])))))
(define (establish-connection ds spec
#:on-connected on-connected
#:on-data on-data
#:on-disconnected [on-disconnected (lambda () (stop-current-facet))]
#:on-rejected [on-rejected (lambda () (stop-current-facet))])
(define s
(object #:name 'outbound-socket
[#:asserted (ActiveSocket-controller peer)
(on-connected peer)
#:retracted
(on-disconnected)]
[#:asserted (ActiveSocket-close message)
(on-rejected message)]
[#:asserted (ActiveSocket-Socket (Socket data))
(on-data data)]))
(at ds (assert (Connection s spec))))
(define (send-data conn data)
(send! conn (Socket (if (bytes? data) data (string->bytes/utf-8 data)))))