From 6546e335ef5071211208d7997fad8cac07a60f5e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 11 Jun 2021 15:29:12 +0200 Subject: [PATCH] Refactor/repair tcp.prs and tcp.rkt --- syndicate-examples/tcp-client.rkt | 19 ++-- syndicate-examples/tcp-echo-server.rkt | 2 +- syndicate-examples/tcp-relay-server.rkt | 28 ++++++ syndicate/drivers/tcp.rkt | 111 +++++++++++++----------- syndicate/schemas/tcp.prs | 12 ++- 5 files changed, 105 insertions(+), 67 deletions(-) create mode 100644 syndicate-examples/tcp-relay-server.rkt diff --git a/syndicate-examples/tcp-client.rkt b/syndicate-examples/tcp-client.rkt index 39d472a..7178422 100644 --- a/syndicate-examples/tcp-client.rkt +++ b/syndicate-examples/tcp-client.rkt @@ -5,6 +5,7 @@ (module+ main (require racket/cmdline) (require (only-in racket/port read-line-evt)) + (require (only-in racket/string string-trim)) (require syndicate/drivers/tcp) (require syndicate/drivers/racket-event) @@ -23,15 +24,21 @@ (spawn (establish-connection ds (TcpOutbound host port) + #:initial-mode (Mode-lines (LineMode-lf)) #:on-connected (lambda (peer) (at ds (when (message (RacketEvent (read-line-evt (current-input-port)) $vs)) (match (car vs) - [(? eof-object?) - (log-info "EOF on stdin.") - (stop-current-facet)] - [line - (send-data peer (string-append line "\n"))])))) + [(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))] + [line (send-line peer line)])))) #:on-rejected (lambda (message) (stop-current-facet (log-error "~a" message))) #:on-disconnected (lambda () (stop-current-facet (log-info "Disconnected"))) - #:on-data display)))) + #:on-data (lambda (line _mode) + ;; \e7 DECSC, save cursor position + ;; \n\e[A Force a new line if at end of screen, then back up; effect of \r + ;; \e[L Insert a line here, pushing the current line's contents down one + ;; ~a Placeholder for the incoming line + ;; \e8 DECRC, Restore cursor - it will be one line too high + ;; \e[B Correct for the one line to high + (printf "\e7\n\e[A\e[L~a\e8\e[B" line) + (flush-output)))))) diff --git a/syndicate-examples/tcp-echo-server.rkt b/syndicate-examples/tcp-echo-server.rkt index 8e4096c..ed14293 100644 --- a/syndicate-examples/tcp-echo-server.rkt +++ b/syndicate-examples/tcp-echo-server.rkt @@ -20,4 +20,4 @@ (spawn (at ds (during/spawn (Connection $conn (TcpInbound host port)) - (accept-connection conn #:on-data (lambda (data) (send-data conn data)))))))) + (accept-connection conn #:on-data (lambda (data mode) (send-data conn data mode)))))))) diff --git a/syndicate-examples/tcp-relay-server.rkt b/syndicate-examples/tcp-relay-server.rkt new file mode 100644 index 0000000..151e573 --- /dev/null +++ b/syndicate-examples/tcp-relay-server.rkt @@ -0,0 +1,28 @@ +#lang syndicate +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones + +(module+ main + (require racket/cmdline) + (require syndicate/drivers/tcp) + + (define host "0.0.0.0") + (define port 5999) + + (command-line #:once-each + [("--host" "-H") hostname "Set hostname to listen on" + (set! host hostname)] + [("--port" "-p") port-number "Set port number to listen on" + (set! port (string->number port-number))]) + + (message-struct Line (text)) + + (actor-system/dataspace (ds) + (spawn-tcp-driver ds) + (spawn + (at ds + (during/spawn (Connection $conn (TcpInbound host port)) + (accept-connection conn + #:initial-mode (Mode-lines (LineMode-lf)) + #:on-data (lambda (data mode) (send! ds (Line data)))) + (at ds (when (message (Line $data)) (send-line conn data)))))))) diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index 2b441dc..6bb9a79 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -7,6 +7,8 @@ accept-connection establish-connection send-credit + send-lines-credit + send-bytes-credit send-line send-data send-eof) @@ -55,14 +57,6 @@ (lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port)))) (loop))))) -(define (mode->line-terminator mode) - (match mode - [(LineMode-lf) "\n"] - [(LineMode-crlf) "\r\n"])) - -(define (render-line line mode) - (string->bytes/utf-8 (string-append line (mode->line-terminator mode)))) - (define (run-outbound ds local-peer host port) (define connection-custodian (make-custodian)) ((with-handlers ([exn:fail:network? @@ -79,9 +73,8 @@ (at local-peer (assert (ActiveSocket-controller (object #:name (list name 'socket) - [#:asserted (Socket-Credit c) (issue-credit c)] - [#:asserted (Socket-line line mode) (relay (render-line line mode))] - [#:asserted (Socket-data data) (relay data)] + [#:asserted (Socket-credit amount mode) (issue-credit amount mode)] + [#:asserted (Socket-data data mode) (relay data mode)] [#:asserted (Socket-eof) (close-output-port o)])))))))) (define (spawn-inbound ds custodian i o spec) @@ -111,15 +104,13 @@ [#:asserted (ActiveSocket-close message) (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) (stop-current-facet)] - [#:asserted (ActiveSocket-Socket (Socket-Credit c)) + [#:asserted (ActiveSocket-Socket (Socket-credit amount mode)) (if issue-credit - (issue-credit c) + (issue-credit amount mode) (log-syndicate/drivers/tcp-warning - "Socket-Credit ~v ignored because no controller present" c))] - [#:asserted (ActiveSocket-Socket (Socket-line line mode)) - (relay (render-line line mode))] - [#:asserted (ActiveSocket-Socket (Socket-data data)) - (relay data)] + "Socket-credit ~v/~v ignored because no controller present" amount mode))] + [#:asserted (ActiveSocket-Socket (Socket-data data mode)) + (relay data mode)] [#:asserted (ActiveSocket-Socket (Socket-eof)) (close-output-port o)]) spec))))) @@ -140,22 +131,25 @@ (if (zero? remaining-count) q (undequeue (cons remaining-count mode) q))) + (define (eof-and-finish) + (log-syndicate/drivers/tcp-debug "inbound eof for ~v" name) + (turn! facet (lambda () (send-eof (target-proc))))) (let loop ((credits (make-queue))) (sync (handle-evt control-ch (match-lambda - [(Credit (CreditAmount-count 0) _mode) (loop credits)] - [(Credit (CreditAmount-count n) mode) + [(list 'credit (CreditAmount-count 0) _mode) (loop credits)] + [(list 'credit (CreditAmount-count n) mode) (loop (match (unenqueue* credits) [(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))] [_ (enqueue credits (cons n mode))]))] - [(Credit (CreditAmount-unbounded) mode) + [(list 'credit (CreditAmount-unbounded) mode) (loop (match (unenqueue* credits) [(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))] [_ (enqueue credits (cons +inf.0 mode))]))])) (match (dequeue* credits) [(list) never-evt] - [(list (cons count (and mode (CreditMode-bytes))) q) + [(list (cons count (and mode (Mode-bytes))) q) (define buffer (make-bytes (inexact->exact (min count 131072)))) (handle-evt (read-bytes-avail!-evt buffer i) (match-lambda @@ -164,23 +158,19 @@ (log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name) (turn! facet (lambda () (send-data (target-proc) bs))) (loop (update-count (- count read-count) mode q))] - [(? eof-object?) - (log-syndicate/drivers/tcp-debug "inbound eof for ~v" name) - (turn! facet (lambda () (send-eof (target-proc))))]))] - [(list (cons count (and mode (CreditMode-lines line-mode))) q) - (handle-evt (read-line-evt i (match line-mode - [(LineMode-lf) 'linefeed] - [(LineMode-crlf) 'return-linefeed])) + [(? eof-object?) (eof-and-finish)]))] + [(list (cons count (and mode (Mode-lines line-mode))) q) + (handle-evt (read-bytes-line-evt i (match line-mode + [(LineMode-lf) 'linefeed] + [(LineMode-crlf) 'return-linefeed])) (match-lambda - [(? string? line) + [(? bytes? line) (log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name) (turn! facet (lambda () (send-line (target-proc) line line-mode))) (loop (update-count (- count 1) mode q))] - [(? eof-object?) - (log-syndicate/drivers/tcp-debug "inbound eof for ~v" name) - (turn! facet (lambda () (send-eof (target-proc))))]))]))))) - (define (issue-credit c) - (async-channel-put control-ch c)) + [(? eof-object?) (eof-and-finish)]))]))))) + (define (issue-credit amount mode) + (async-channel-put control-ch (list 'credit amount mode))) issue-credit) (define-syntax (EPIPE stx) @@ -200,9 +190,15 @@ (define (outbound-relay name o) (define flush-pending #f) - (lambda (payload) + (lambda (payload mode) (log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name) - (with-stop-current-facet-on-epipe 'writing (lambda () (write-bytes payload o))) + (with-stop-current-facet-on-epipe 'writing + (lambda () + (write-bytes payload o) + (match mode + [(Mode-bytes) (void)] + [(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)] + [(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)]))) (when (not flush-pending) (set! flush-pending #t) (facet-on-end-of-turn! this-facet @@ -213,26 +209,25 @@ (define (accept-connection conn #:initial-credit [initial-credit (CreditAmount-unbounded)] - #:initial-mode [initial-mode (CreditMode-bytes)] + #:initial-mode [initial-mode (Mode-bytes)] #:on-data on-data #:on-eof [on-eof void] - #:on-credit [on-credit (lambda (c) (void))]) + #:on-credit [on-credit void]) (at conn (assert (ActiveSocket-controller (object #:name 'inbound-socket-controller - [#:asserted (Socket-Credit c) (on-credit c)] - [#:asserted (Socket-line line mode) (on-data line mode)] - [#:asserted (Socket-data data) (on-data data)] + [#:asserted (Socket-credit amount mode) (on-credit amount mode)] + [#:asserted (Socket-data data mode) (on-data data mode)] [#:asserted (Socket-eof) (on-eof)])))) (when initial-credit (send-credit conn initial-credit initial-mode))) (define (establish-connection ds spec #:initial-credit [initial-credit (CreditAmount-unbounded)] - #:initial-mode [initial-mode (CreditMode-bytes)] + #:initial-mode [initial-mode (Mode-bytes)] #:on-connected on-connected #:on-data on-data #:on-eof [on-eof void] - #:on-credit [on-credit (lambda (c) (void))] + #:on-credit [on-credit void] #:on-disconnected [on-disconnected (lambda () (stop-current-facet))] #:on-rejected [on-rejected (lambda () (stop-current-facet))]) (define s @@ -243,20 +238,30 @@ #:retracted (on-disconnected)] [#:asserted (ActiveSocket-close message) (on-rejected message)] - [#:asserted (ActiveSocket-Socket (Socket-Credit c)) (on-credit c)] - [#:asserted (ActiveSocket-Socket (Socket-line line mode)) (on-data line mode)] - [#:asserted (ActiveSocket-Socket (Socket-data data)) (on-data data)] + [#:asserted (ActiveSocket-Socket (Socket-credit amount mode)) (on-credit amount mode)] + [#:asserted (ActiveSocket-Socket (Socket-data data mode)) (on-data data mode)] [#:asserted (ActiveSocket-Socket (Socket-eof)) (on-eof)])) (at ds (assert (Connection s spec)))) -(define (send-credit conn amount unit) - (send! conn (Socket-Credit (Credit amount unit)))) +(define (send-credit conn amount mode) + (send! conn (Socket-credit amount mode))) -(define (send-line conn line [mode (LineMode-crlf)]) - (send! conn (Socket-line line mode))) +(define (send-lines-credit conn amount [mode (LineMode-lf)]) + (send-credit conn (CreditAmount-count amount) (Mode-lines mode))) -(define (send-data conn data) - (send! conn (Socket-data (if (bytes? data) data (string->bytes/utf-8 data))))) +(define (send-bytes-credit conn amount) + (send-credit conn (CreditAmount-count amount) (Mode-bytes))) + +(define (->bytes data) + (if (bytes? data) + data + (string->bytes/utf-8 data))) + +(define (send-line conn line [line-mode (LineMode-lf)]) + (send! conn (Socket-data (->bytes line) (Mode-lines line-mode)))) + +(define (send-data conn data [mode (Mode-bytes)]) + (send! conn (Socket-data (->bytes data) mode))) (define (send-eof conn) (send! conn (Socket-eof))) diff --git a/syndicate/schemas/tcp.prs b/syndicate/schemas/tcp.prs index c0bbd24..ad854f8 100644 --- a/syndicate/schemas/tcp.prs +++ b/syndicate/schemas/tcp.prs @@ -14,14 +14,12 @@ ActiveSocket = . Socket = - / Credit - / - / + / + / / . -LineMode = =lf / =crlf . - -Credit = . -CreditMode = =bytes / @lines LineMode . CreditAmount = @count int / @unbounded =unbounded . + +Mode = =bytes / @lines LineMode . +LineMode = =lf / =crlf .