From 336811c51e24077673a7c51c9fb426021025df4a Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 10 Jun 2021 10:00:43 +0200 Subject: [PATCH] Beginnings of a TCP driver --- syndicate/drivers/tcp.rkt | 135 ++++++++++++++++++++++++++++++++++++++ syndicate/schemas/tcp.prs | 17 +++++ 2 files changed, 152 insertions(+) create mode 100644 syndicate/drivers/tcp.rkt create mode 100644 syndicate/schemas/tcp.prs diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt new file mode 100644 index 0000000..3f5b045 --- /dev/null +++ b/syndicate/drivers/tcp.rkt @@ -0,0 +1,135 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(require racket/tcp) +(require syndicate/driver-support) +(require syndicate/schemas/gen/tcp) +(require syndicate/schemas/gen/dataspace-patterns) + +(define-logger syndicate/drivers/tcp) + +(define spawn-tcp-driver + (action (ds) + (spawn + #:name 'tcp-driver + #:daemon? #t + + (at ds + (during/spawn + (Observe (:pattern (Connection ,_ (TcpInbound ,(DLit $host) ,(DLit $port)))) _) + #:name (TcpInbound host port) + (run-listener this-turn ds host port)))))) + +(define run-listener + (action (ds host port) + (on-start (log-syndicate/drivers/tcp-info "+listener on ~v ~v" host port)) + (on-stop (log-syndicate/drivers/tcp-info "-listener on ~v ~v" host port)) + (linked-thread + #:name (list (TcpInbound host port) 'thread) + this-turn + (ref (entity #:name 'listen-monitor #:retract (action (_handle) (stop-current-facet)))) + (lambda () + (define listener (tcp-listen port 512 #t host)) + (let loop () + (define connection-custodian (make-custodian)) + (define-values (i o) (parameterize ((current-custodian connection-custodian)) + (tcp-accept listener))) + (turn-freshen this-turn (action () (spawn-connection this-turn + ds + connection-custodian + i + o + (TcpInbound host port)))) + (loop)))))) + +(define spawn-connection + (action (ds custodian i o spec) + (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) + (spawn + #:name name + + (on-stop (close-input-port i) + (close-output-port o)) + + (define active-controller #f) + (define active-controller-handle #f) + (define flush-pending #f) + + (define start-thread + (action () + (linked-thread + #:name (list name 'input-thread) + #:custodian custodian + this-turn + (ref (entity #:name (list name 'socket-monitor) + #:retract (action (_handle) (stop-current-facet)))) + (lambda () + (let loop () + (define bs (read-bytes-avail i)) + (when (bytes? bs) + (log-syndicate/drivers/tcp-info "inbound data ~v for ~v" bs this-actor) + (turn-freshen this-turn (action () (send! active-controller (Socket bs)))) + (loop))))))) + + (define active-socket + (ref (entity #:name (list name 'active-socket) + #:assert + (action (m handle) + (match (parse-ActiveSocket m) + [(ActiveSocket-controller controller) + (log-syndicate/drivers/tcp-info "~v controller for ~v" controller this-actor) + (when (not active-controller) (start-thread this-turn)) + (set! active-controller controller) + (set! active-controller-handle handle)] + [(ActiveSocket-close _) + (log-syndicate/drivers/tcp-info "closing ~v" this-actor) + (stop-current-facet)])) + #:retract + (action (handle) + (log-syndicate/drivers/tcp-info "peer withdrawn ~v" this-actor) + (when (equal? handle active-controller-handle) + (stop-current-facet))) + #:message + (action (m) + (match (parse-ActiveSocket m) + [(ActiveSocket-Socket (Socket payload)) + (log-syndicate/drivers/tcp-info "outbound data ~v for ~v" payload this-actor) + (write-bytes payload o) + (when (not flush-pending) + (set! flush-pending #t) + (facet-on-end-of-turn! this-facet + (action () + (set! flush-pending #f) + (flush-output o))))]))))) + + (at ds + (assert (Connection active-socket spec)))))) + +(define (read-bytes-avail input-port #:limit [limit 65536]) + (define buffer (make-bytes limit)) + (match (read-bytes-avail! buffer input-port) + [(? number? count) (subbytes buffer 0 count)] + [other other])) + +(module+ main + (require syndicate/drivers/timer) + (actor-system/dataspace (ds) + (spawn-timer-driver this-turn ds) + (spawn-tcp-driver this-turn ds) + (spawn + (at ds + (during/spawn (Connection $conn (TcpInbound "0.0.0.0" 5999)) + (on-start (log-info "Starting service ~a" this-facet)) + (on-stop (log-info "Stopping service ~a" this-facet)) + (at conn + (assert (ActiveSocket-controller + (ref (entity #:message + (action (m) + (match (parse-Socket m) + [(Socket #"bye\n") + (log-info "Bye!") + (stop-current-facet)] + [(Socket data) + (log-info "Echoing ~v" data) + (send! conn (Socket data))])))))))))))) diff --git a/syndicate/schemas/tcp.prs b/syndicate/schemas/tcp.prs new file mode 100644 index 0000000..952c4f3 --- /dev/null +++ b/syndicate/schemas/tcp.prs @@ -0,0 +1,17 @@ +version 1 . +embeddedType EntityRef.Ref . + +Connection = . +ConnectionPeer = . + +TcpOutbound = . +TcpInbound = . + +ActiveSocket = + / + / + / Socket +. + +; TODO: +Socket = .