#lang syndicate ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (provide (all-from-out syndicate/schemas/gen/tcp) spawn-tcp-driver accept-connection establish-connection send-data read-bytes-avail) (require racket/tcp) (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) (define-logger syndicate/drivers/tcp) (define (spawn-tcp-driver ds) (spawn #:name 'tcp-driver #:daemon? #t (at ds (during/spawn (Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _) #:name (TcpInbound host port) (run-listener ds host port)) (during/spawn (Connection $local-peer (TcpOutbound $host $port)) #:name (TcpOutbound host port) (run-outbound ds local-peer host port))))) (define (run-listener ds host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread #:name (list (TcpInbound host port) 'thread) (lambda (facet) (define listener (tcp-listen port 512 #t host)) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn! facet (lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port)))) (loop))))) (define (run-outbound ds local-peer host port) (define connection-custodian (make-custodian)) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (at local-peer (assert (ActiveSocket-close (exn->string e))))))]) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) (lambda () (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (on-stop (close-input-port i) (close-output-port o)) (start-inbound-relay connection-custodian name local-peer i) (define relay (outbound-relay name o)) (at local-peer (assert (ActiveSocket-controller (object #:name (list name 'socket) [#:asserted (Socket data) (relay data)])))))))) (define (spawn-inbound ds custodian i o spec) (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (spawn #:name name (on-stop (close-input-port i) (close-output-port o)) (define active-controller #f) (define relay (outbound-relay name o)) (at ds (assert (Connection (object #:name (list name 'active-socket) [#:asserted (ActiveSocket-controller controller) (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) (when (not active-controller) (start-inbound-relay custodian name controller i)) (set! active-controller controller) #:retracted (when (eq? controller active-controller) (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) (stop-current-facet))] [#:asserted (ActiveSocket-close message) (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) (stop-current-facet)] [#:asserted (ActiveSocket-Socket (Socket data)) (relay data)]) spec))))) (define (start-inbound-relay custodian name target i) (linked-thread #:name (list name 'input-thread) #:custodian custodian (lambda (facet) (let loop () (define bs (read-bytes-avail i)) (when (bytes? bs) (log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name) (turn! facet (lambda () (send-data target bs))) (loop)))))) (define (outbound-relay name o) (define flush-pending #f) (lambda (payload) (log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name) (write-bytes payload o) (when (not flush-pending) (set! flush-pending #t) (facet-on-end-of-turn! this-facet (lambda () (set! flush-pending #f) (flush-output o)))))) (define (read-bytes-avail input-port #:limit [limit 65536]) (define buffer (make-bytes limit)) (match (read-bytes-avail! buffer input-port) [(? number? count) (subbytes buffer 0 count)] [other other])) (define (accept-connection conn #:on-data on-data) (at conn (assert (ActiveSocket-controller (object #:name 'inbound-socket-controller [#:asserted (Socket data) (on-data data)]))))) (define (establish-connection ds spec #:on-connected on-connected #:on-data on-data #:on-disconnected [on-disconnected (lambda () (stop-current-facet))] #:on-rejected [on-rejected (lambda () (stop-current-facet))]) (define s (object #:name 'outbound-socket [#:asserted (ActiveSocket-controller peer) (on-connected peer) #:retracted (on-disconnected)] [#:asserted (ActiveSocket-close message) (on-rejected message)] [#:asserted (ActiveSocket-Socket (Socket data)) (on-data data)])) (at ds (assert (Connection s spec)))) (define (send-data conn data) (send! conn (Socket (if (bytes? data) data (string->bytes/utf-8 data)))))