#lang syndicate ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (provide (all-from-out syndicate/schemas/gen/tcp) spawn-tcp-driver accept-connection establish-connection send-data read-bytes-avail) (require racket/tcp) (require (only-in racket/exn exn->string)) (require syndicate/driver-support) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) (define-logger syndicate/drivers/tcp) (define (spawn-tcp-driver ds) (spawn #:name 'tcp-driver #:daemon? #t (at ds (during/spawn (Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _) #:name (TcpInbound host port) (run-listener ds host port)) (during/spawn (Connection $local-peer (TcpOutbound $host $port)) #:name (TcpOutbound host port) (run-outbound ds local-peer host port))))) (define (run-listener ds host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread #:name (list (TcpInbound host port) 'thread) (lambda (facet) (define listener (tcp-listen port 512 #t host)) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn! facet (lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port)))) (loop))))) (define (run-outbound ds local-peer host port) (define connection-custodian (make-custodian)) ((with-handlers ([exn:fail:network? (lambda (e) (lambda () (at local-peer (assert (ActiveSocket-close (exn->string e))))))]) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-connect host port))) (lambda () (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (on-stop (close-input-port i) (close-output-port o)) (start-inbound-relay connection-custodian name local-peer i) (at local-peer (assert (ActiveSocket-controller (ref (entity #:name (list name 'socket) #:message (outbound-relay name o)))))))))) (define (spawn-inbound ds custodian i o spec) (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (spawn #:name name (on-stop (close-input-port i) (close-output-port o)) (define active-controller #f) (define active-controller-handle #f) (at ds (assert (Connection (ref (entity #:name (list name 'active-socket) #:assert (lambda (m handle) (match (parse-ActiveSocket m) [(ActiveSocket-controller controller) (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) (when (not active-controller) (start-inbound-relay custodian name controller i)) (set! active-controller controller) (set! active-controller-handle handle)] [(ActiveSocket-close message) (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) (stop-current-facet)])) #:retract (lambda (handle) (when (equal? handle active-controller-handle) (log-syndicate/drivers/tcp-debug "peer withdrawn ~v" this-actor) (stop-current-facet))) #:message (outbound-relay name o))) spec))))) (define (start-inbound-relay custodian name target i) (linked-thread #:name (list name 'input-thread) #:custodian custodian (lambda (facet) (let loop () (define bs (read-bytes-avail i)) (when (bytes? bs) (log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name) (turn! facet (lambda () (send-data target bs))) (loop)))))) (define (outbound-relay name o) (define flush-pending #f) (lambda (m) (match (parse-ActiveSocket m) [(ActiveSocket-Socket (Socket payload)) (log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name) (write-bytes payload o) (when (not flush-pending) (set! flush-pending #t) (facet-on-end-of-turn! this-facet (lambda () (set! flush-pending #f) (flush-output o))))]))) (define (read-bytes-avail input-port #:limit [limit 65536]) (define buffer (make-bytes limit)) (match (read-bytes-avail! buffer input-port) [(? number? count) (subbytes buffer 0 count)] [other other])) (define (accept-connection conn #:on-data on-data) (at conn (assert (ActiveSocket-controller (ref (entity #:name 'inbound-socket-controller #:message (lambda (m) (match-define (Socket data) (parse-Socket m)) (on-data data)))))))) (define (establish-connection ds spec #:on-connected on-connected #:on-data on-data #:on-disconnected [on-disconnected (lambda () (stop-current-facet))] #:on-rejected [on-rejected (lambda () (stop-current-facet))]) (define s (entity #:name 'outbound-socket #:assert (lambda (m _handle) (match (parse-ActiveSocket m) [(ActiveSocket-controller peer) (on-connected peer)] [(ActiveSocket-close message) (on-rejected message)])) #:retract (lambda (_handle) (on-disconnected)) #:message (lambda (m) (match (parse-ActiveSocket m) [(ActiveSocket-Socket (Socket data)) (on-data data)])))) (at ds (assert (Connection (ref s) spec)))) (define (send-data conn data) (send! conn (Socket (if (bytes? data) data (string->bytes/utf-8 data))))) (module+ main (require syndicate/drivers/timer) (require racket/cmdline) (define (run-echo-server ds host port) (spawn (at ds (during/spawn (Connection $conn (TcpInbound host port)) (accept-connection conn #:on-data (match-lambda [#"bye\n" (send-data conn #"Bye then!\n") (stop-current-facet)] [data (send-data conn data)])))))) (define (run-echo-client ds host port) (spawn (define-field sink #f) (define-field counter 0) (begin/dataflow (when (sink) (send! (sink) (Socket (string->bytes/utf-8 (format "~a\n" (counter))))))) (establish-connection ds (TcpOutbound host port) #:on-connected (lambda (peer) (sink peer)) #:on-rejected (lambda (reason) (log-error "Connection failed: ~v" reason)) #:on-disconnected (lambda () (log-info "Disconnected")) #:on-data (lambda (data) (log-info "Got: ~v" data) (counter (+ (counter) 1)))))) (define host #f) (define port 5999) (define mode 'server) (command-line #:once-each [("--host" "-H") hostname "Set host/interface to connect to/listen on" (set! host hostname)] [("--port" "-p") port-number "Set port number to connect to/listen on" (set! port (string->number port-number))] #:once-any ["--server" "Server mode" (set! mode 'server)] ["--client" "Client mode" (set! mode 'client)]) (actor-system/dataspace (ds) (spawn-timer-driver ds) (spawn-tcp-driver ds) (match mode ['server (run-echo-server ds (or host "0.0.0.0") port)] ['client (run-echo-client ds (or host "127.0.0.1") port)])))