#lang syndicate ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (provide (all-from-out syndicate/schemas/gen/tcp) spawn-tcp-driver accept-connection establish-connection send-data read-bytes-avail) (require racket/tcp) (require syndicate/driver-support) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) (define-logger syndicate/drivers/tcp) (define spawn-tcp-driver (action (ds) (spawn #:name 'tcp-driver #:daemon? #t (at ds (during/spawn (Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _) #:name (TcpInbound host port) (run-listener this-turn ds host port)) (during/spawn (Connection $local-peer (TcpOutbound $host $port)) #:name (TcpOutbound host port) (run-outbound this-turn ds local-peer host port)))))) (define run-listener (action (ds host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread #:name (list (TcpInbound host port) 'thread) this-turn (ref (entity #:name 'listen-monitor #:retract (action (_handle) (stop-current-facet)))) (lambda () (define listener (tcp-listen port 512 #t host)) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn-freshen this-turn (action () (spawn-inbound this-turn ds connection-custodian i o (TcpInbound host port)))) (loop)))))) (define run-outbound (action (ds local-peer host port) (define connection-custodian (make-custodian)) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (at local-peer (assert (ActiveSocket-close e)))))]) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) (lambda () (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (on-stop (close-input-port i) (close-output-port o)) (start-inbound-relay this-turn connection-custodian name local-peer i) (at local-peer (assert (ActiveSocket-controller (ref (entity #:name (list name 'socket) #:message (outbound-relay name o))))))))))) (define spawn-inbound (action (ds custodian i o spec) (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (spawn #:name name (on-stop (close-input-port i) (close-output-port o)) (define active-controller #f) (define active-controller-handle #f) (at ds (assert (Connection (ref (entity #:name (list name 'active-socket) #:assert (action (m handle) (match (parse-ActiveSocket m) [(ActiveSocket-controller controller) (log-syndicate/drivers/tcp-info "~v controller for ~v" controller this-actor) (when (not active-controller) (start-inbound-relay this-turn custodian name controller i)) (set! active-controller controller) (set! active-controller-handle handle)] [(ActiveSocket-close reason) (log-syndicate/drivers/tcp-info "closing ~v reason ~v" this-actor reason) (stop-current-facet)])) #:retract (action (handle) (log-syndicate/drivers/tcp-info "peer withdrawn ~v" this-actor) (when (equal? handle active-controller-handle) (stop-current-facet))) #:message (outbound-relay name o))) spec)))))) (define start-inbound-relay (action (custodian name target i) (linked-thread #:name (list name 'input-thread) #:custodian custodian this-turn (ref (entity #:name (list name 'socket-monitor) #:retract (action (_handle) (stop-current-facet)))) (lambda () (let loop () (define bs (read-bytes-avail i)) (when (bytes? bs) (log-syndicate/drivers/tcp-info "inbound data ~v for ~v" bs name) (turn-freshen this-turn (action () (send! target (Socket bs)))) (loop))))))) (define (outbound-relay name o) (define flush-pending #f) (action (m) (match (parse-ActiveSocket m) [(ActiveSocket-Socket (Socket payload)) (log-syndicate/drivers/tcp-info "outbound data ~v for ~v" payload name) (write-bytes payload o) (when (not flush-pending) (set! flush-pending #t) (facet-on-end-of-turn! this-facet (action () (set! flush-pending #f) (flush-output o))))]))) (define (read-bytes-avail input-port #:limit [limit 65536]) (define buffer (make-bytes limit)) (match (read-bytes-avail! buffer input-port) [(? number? count) (subbytes buffer 0 count)] [other other])) (define accept-connection (action (conn #:on-data on-data-action) (at conn (assert (ActiveSocket-controller (ref (entity #:name 'inbound-socket-controller #:message (action (m) (match-define (Socket data) (parse-Socket m)) (on-data-action this-turn data))))))))) (define establish-connection (action (ds spec #:on-connected on-connected-action #:on-data on-data-action #:on-disconnected [on-disconnected-action (action () (stop-current-facet))] #:on-rejected [on-rejected-action (action () (stop-current-facet))] ) (at ds (assert (Connection (ref (entity #:name 'outbound-socket #:assert (action (m _handle) (match (parse-ActiveSocket m) [(ActiveSocket-controller peer) (on-connected-action this-turn peer)] [(ActiveSocket-close e) (on-rejected-action this-turn e)])) #:retract (action (_handle) (on-disconnected-action this-turn)) #:message (action (m) (match (parse-ActiveSocket m) [(ActiveSocket-Socket (Socket data)) (on-data-action this-turn data)])))) spec))))) (define send-data (action (conn data) (send! conn (Socket data)))) (module+ main (require syndicate/drivers/timer) (require racket/cmdline) (define run-echo-server (action (ds host port) (spawn (at ds (during/spawn (Connection $conn (TcpInbound host port)) (accept-connection this-turn conn #:on-data (action (data) (match data [#"bye\n" (stop-current-facet)] [_ (send-data this-turn conn data)])))))))) (define run-echo-client (action (ds host port) (spawn (define-field sink #f) (define-field counter 0) (begin/dataflow (when (sink) (send! (sink) (Socket (string->bytes/utf-8 (format "~a\n" (counter))))))) (establish-connection this-turn ds (TcpOutbound host port) #:on-connected (action (peer) (sink peer)) #:on-rejected (action (reason) (log-error "Connection failed: ~v" reason)) #:on-disconnected (action () (log-info "Disconnected")) #:on-data (action (data) (log-info "Got: ~v" data) (counter (+ (counter) 1))))))) (define host #f) (define port 5999) (define mode 'server) (command-line #:once-each [("--host" "-H") hostname "Set host/interface to connect to/listen on" (set! host hostname)] [("--port" "-p") port-number "Set port number to connect to/listen on" (set! port (string->number port-number))] #:once-any ["--server" "Server mode" (set! mode 'server)] ["--client" "Client mode" (set! mode 'client)]) (actor-system/dataspace (ds) (spawn-timer-driver this-turn ds) (spawn-tcp-driver this-turn ds) (match mode ['server (run-echo-server this-turn ds (or host "0.0.0.0") port)] ['client (run-echo-client this-turn ds (or host "127.0.0.1") port)])))