#lang syndicate ;;; SPDX-License-Identifier: LGPL-3.0-or-later ;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones (require racket/tcp) (require syndicate/driver-support) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) (define-logger syndicate/drivers/tcp) (define spawn-tcp-driver (action (ds) (spawn #:name 'tcp-driver #:daemon? #t (at ds (during/spawn (Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _) #:name (TcpInbound host port) (run-listener this-turn ds host port)))))) (define run-listener (action (ds host port) (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) (linked-thread #:name (list (TcpInbound host port) 'thread) this-turn (ref (entity #:name 'listen-monitor #:retract (action (_handle) (stop-current-facet)))) (lambda () (define listener (tcp-listen port 512 #t host)) (let loop () (define connection-custodian (make-custodian)) (define-values (i o) (parameterize ((current-custodian connection-custodian)) (tcp-accept listener))) (turn-freshen this-turn (action () (spawn-connection this-turn ds connection-custodian i o (TcpInbound host port)))) (loop)))))) (define spawn-connection (action (ds custodian i o spec) (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (spawn #:name name (on-stop (close-input-port i) (close-output-port o)) (define active-controller #f) (define active-controller-handle #f) (define flush-pending #f) (define start-thread (action () (linked-thread #:name (list name 'input-thread) #:custodian custodian this-turn (ref (entity #:name (list name 'socket-monitor) #:retract (action (_handle) (stop-current-facet)))) (lambda () (let loop () (define bs (read-bytes-avail i)) (when (bytes? bs) (log-syndicate/drivers/tcp-info "inbound data ~v for ~v" bs this-actor) (turn-freshen this-turn (action () (send! active-controller (Socket bs)))) (loop))))))) (define active-socket (ref (entity #:name (list name 'active-socket) #:assert (action (m handle) (match (parse-ActiveSocket m) [(ActiveSocket-controller controller) (log-syndicate/drivers/tcp-info "~v controller for ~v" controller this-actor) (when (not active-controller) (start-thread this-turn)) (set! active-controller controller) (set! active-controller-handle handle)] [(ActiveSocket-close _) (log-syndicate/drivers/tcp-info "closing ~v" this-actor) (stop-current-facet)])) #:retract (action (handle) (log-syndicate/drivers/tcp-info "peer withdrawn ~v" this-actor) (when (equal? handle active-controller-handle) (stop-current-facet))) #:message (action (m) (match (parse-ActiveSocket m) [(ActiveSocket-Socket (Socket payload)) (log-syndicate/drivers/tcp-info "outbound data ~v for ~v" payload this-actor) (write-bytes payload o) (when (not flush-pending) (set! flush-pending #t) (facet-on-end-of-turn! this-facet (action () (set! flush-pending #f) (flush-output o))))]))))) (at ds (assert (Connection active-socket spec)))))) (define (read-bytes-avail input-port #:limit [limit 65536]) (define buffer (make-bytes limit)) (match (read-bytes-avail! buffer input-port) [(? number? count) (subbytes buffer 0 count)] [other other])) (module+ main (require syndicate/drivers/timer) (actor-system/dataspace (ds) (spawn-timer-driver this-turn ds) (spawn-tcp-driver this-turn ds) (spawn (at ds (during/spawn (Connection $conn (TcpInbound "0.0.0.0" 5999)) (on-start (log-info "Starting service ~a" this-facet)) (on-stop (log-info "Stopping service ~a" this-facet)) (at conn (assert (ActiveSocket-controller (ref (entity #:message (action (m) (match (parse-Socket m) [(Socket #"bye\n") (log-info "Bye!") (stop-current-facet)] [(Socket data) (log-info "Echoing ~v" data) (send! conn (Socket data))]))))))))))))