Refactor/repair tcp.prs and tcp.rkt
This commit is contained in:
parent
d304d1e87d
commit
6546e335ef
|
@ -5,6 +5,7 @@
|
|||
(module+ main
|
||||
(require racket/cmdline)
|
||||
(require (only-in racket/port read-line-evt))
|
||||
(require (only-in racket/string string-trim))
|
||||
(require syndicate/drivers/tcp)
|
||||
(require syndicate/drivers/racket-event)
|
||||
|
||||
|
@ -23,15 +24,21 @@
|
|||
(spawn
|
||||
(establish-connection
|
||||
ds (TcpOutbound host port)
|
||||
#:initial-mode (Mode-lines (LineMode-lf))
|
||||
#:on-connected (lambda (peer)
|
||||
(at ds
|
||||
(when (message (RacketEvent (read-line-evt (current-input-port)) $vs))
|
||||
(match (car vs)
|
||||
[(? eof-object?)
|
||||
(log-info "EOF on stdin.")
|
||||
(stop-current-facet)]
|
||||
[line
|
||||
(send-data peer (string-append line "\n"))]))))
|
||||
[(? eof-object?) (stop-current-facet (log-info "EOF on stdin."))]
|
||||
[line (send-line peer line)]))))
|
||||
#:on-rejected (lambda (message) (stop-current-facet (log-error "~a" message)))
|
||||
#:on-disconnected (lambda () (stop-current-facet (log-info "Disconnected")))
|
||||
#:on-data display))))
|
||||
#:on-data (lambda (line _mode)
|
||||
;; \e7 DECSC, save cursor position
|
||||
;; \n\e[A Force a new line if at end of screen, then back up; effect of \r
|
||||
;; \e[L Insert a line here, pushing the current line's contents down one
|
||||
;; ~a Placeholder for the incoming line
|
||||
;; \e8 DECRC, Restore cursor - it will be one line too high
|
||||
;; \e[B Correct for the one line to high
|
||||
(printf "\e7\n\e[A\e[L~a\e8\e[B" line)
|
||||
(flush-output))))))
|
||||
|
|
|
@ -20,4 +20,4 @@
|
|||
(spawn
|
||||
(at ds
|
||||
(during/spawn (Connection $conn (TcpInbound host port))
|
||||
(accept-connection conn #:on-data (lambda (data) (send-data conn data))))))))
|
||||
(accept-connection conn #:on-data (lambda (data mode) (send-data conn data mode))))))))
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
#lang syndicate
|
||||
;;; SPDX-License-Identifier: LGPL-3.0-or-later
|
||||
;;; SPDX-FileCopyrightText: Copyright © 2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
(module+ main
|
||||
(require racket/cmdline)
|
||||
(require syndicate/drivers/tcp)
|
||||
|
||||
(define host "0.0.0.0")
|
||||
(define port 5999)
|
||||
|
||||
(command-line #:once-each
|
||||
[("--host" "-H") hostname "Set hostname to listen on"
|
||||
(set! host hostname)]
|
||||
[("--port" "-p") port-number "Set port number to listen on"
|
||||
(set! port (string->number port-number))])
|
||||
|
||||
(message-struct Line (text))
|
||||
|
||||
(actor-system/dataspace (ds)
|
||||
(spawn-tcp-driver ds)
|
||||
(spawn
|
||||
(at ds
|
||||
(during/spawn (Connection $conn (TcpInbound host port))
|
||||
(accept-connection conn
|
||||
#:initial-mode (Mode-lines (LineMode-lf))
|
||||
#:on-data (lambda (data mode) (send! ds (Line data))))
|
||||
(at ds (when (message (Line $data)) (send-line conn data))))))))
|
|
@ -7,6 +7,8 @@
|
|||
accept-connection
|
||||
establish-connection
|
||||
send-credit
|
||||
send-lines-credit
|
||||
send-bytes-credit
|
||||
send-line
|
||||
send-data
|
||||
send-eof)
|
||||
|
@ -55,14 +57,6 @@
|
|||
(lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port))))
|
||||
(loop)))))
|
||||
|
||||
(define (mode->line-terminator mode)
|
||||
(match mode
|
||||
[(LineMode-lf) "\n"]
|
||||
[(LineMode-crlf) "\r\n"]))
|
||||
|
||||
(define (render-line line mode)
|
||||
(string->bytes/utf-8 (string-append line (mode->line-terminator mode))))
|
||||
|
||||
(define (run-outbound ds local-peer host port)
|
||||
(define connection-custodian (make-custodian))
|
||||
((with-handlers ([exn:fail:network?
|
||||
|
@ -79,9 +73,8 @@
|
|||
(at local-peer
|
||||
(assert (ActiveSocket-controller
|
||||
(object #:name (list name 'socket)
|
||||
[#:asserted (Socket-Credit c) (issue-credit c)]
|
||||
[#:asserted (Socket-line line mode) (relay (render-line line mode))]
|
||||
[#:asserted (Socket-data data) (relay data)]
|
||||
[#:asserted (Socket-credit amount mode) (issue-credit amount mode)]
|
||||
[#:asserted (Socket-data data mode) (relay data mode)]
|
||||
[#:asserted (Socket-eof) (close-output-port o)]))))))))
|
||||
|
||||
(define (spawn-inbound ds custodian i o spec)
|
||||
|
@ -111,15 +104,13 @@
|
|||
[#:asserted (ActiveSocket-close message)
|
||||
(log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message)
|
||||
(stop-current-facet)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-Credit c))
|
||||
[#:asserted (ActiveSocket-Socket (Socket-credit amount mode))
|
||||
(if issue-credit
|
||||
(issue-credit c)
|
||||
(issue-credit amount mode)
|
||||
(log-syndicate/drivers/tcp-warning
|
||||
"Socket-Credit ~v ignored because no controller present" c))]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-line line mode))
|
||||
(relay (render-line line mode))]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-data data))
|
||||
(relay data)]
|
||||
"Socket-credit ~v/~v ignored because no controller present" amount mode))]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-data data mode))
|
||||
(relay data mode)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-eof))
|
||||
(close-output-port o)])
|
||||
spec)))))
|
||||
|
@ -140,22 +131,25 @@
|
|||
(if (zero? remaining-count)
|
||||
q
|
||||
(undequeue (cons remaining-count mode) q)))
|
||||
(define (eof-and-finish)
|
||||
(log-syndicate/drivers/tcp-debug "inbound eof for ~v" name)
|
||||
(turn! facet (lambda () (send-eof (target-proc)))))
|
||||
(let loop ((credits (make-queue)))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
[(Credit (CreditAmount-count 0) _mode) (loop credits)]
|
||||
[(Credit (CreditAmount-count n) mode)
|
||||
[(list 'credit (CreditAmount-count 0) _mode) (loop credits)]
|
||||
[(list 'credit (CreditAmount-count n) mode)
|
||||
(loop (match (unenqueue* credits)
|
||||
[(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))]
|
||||
[_ (enqueue credits (cons n mode))]))]
|
||||
[(Credit (CreditAmount-unbounded) mode)
|
||||
[(list 'credit (CreditAmount-unbounded) mode)
|
||||
(loop (match (unenqueue* credits)
|
||||
[(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))]
|
||||
[_ (enqueue credits (cons +inf.0 mode))]))]))
|
||||
(match (dequeue* credits)
|
||||
[(list)
|
||||
never-evt]
|
||||
[(list (cons count (and mode (CreditMode-bytes))) q)
|
||||
[(list (cons count (and mode (Mode-bytes))) q)
|
||||
(define buffer (make-bytes (inexact->exact (min count 131072))))
|
||||
(handle-evt (read-bytes-avail!-evt buffer i)
|
||||
(match-lambda
|
||||
|
@ -164,23 +158,19 @@
|
|||
(log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name)
|
||||
(turn! facet (lambda () (send-data (target-proc) bs)))
|
||||
(loop (update-count (- count read-count) mode q))]
|
||||
[(? eof-object?)
|
||||
(log-syndicate/drivers/tcp-debug "inbound eof for ~v" name)
|
||||
(turn! facet (lambda () (send-eof (target-proc))))]))]
|
||||
[(list (cons count (and mode (CreditMode-lines line-mode))) q)
|
||||
(handle-evt (read-line-evt i (match line-mode
|
||||
[(LineMode-lf) 'linefeed]
|
||||
[(LineMode-crlf) 'return-linefeed]))
|
||||
[(? eof-object?) (eof-and-finish)]))]
|
||||
[(list (cons count (and mode (Mode-lines line-mode))) q)
|
||||
(handle-evt (read-bytes-line-evt i (match line-mode
|
||||
[(LineMode-lf) 'linefeed]
|
||||
[(LineMode-crlf) 'return-linefeed]))
|
||||
(match-lambda
|
||||
[(? string? line)
|
||||
[(? bytes? line)
|
||||
(log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name)
|
||||
(turn! facet (lambda () (send-line (target-proc) line line-mode)))
|
||||
(loop (update-count (- count 1) mode q))]
|
||||
[(? eof-object?)
|
||||
(log-syndicate/drivers/tcp-debug "inbound eof for ~v" name)
|
||||
(turn! facet (lambda () (send-eof (target-proc))))]))])))))
|
||||
(define (issue-credit c)
|
||||
(async-channel-put control-ch c))
|
||||
[(? eof-object?) (eof-and-finish)]))])))))
|
||||
(define (issue-credit amount mode)
|
||||
(async-channel-put control-ch (list 'credit amount mode)))
|
||||
issue-credit)
|
||||
|
||||
(define-syntax (EPIPE stx)
|
||||
|
@ -200,9 +190,15 @@
|
|||
|
||||
(define (outbound-relay name o)
|
||||
(define flush-pending #f)
|
||||
(lambda (payload)
|
||||
(lambda (payload mode)
|
||||
(log-syndicate/drivers/tcp-debug "outbound data ~v for ~v" payload name)
|
||||
(with-stop-current-facet-on-epipe 'writing (lambda () (write-bytes payload o)))
|
||||
(with-stop-current-facet-on-epipe 'writing
|
||||
(lambda ()
|
||||
(write-bytes payload o)
|
||||
(match mode
|
||||
[(Mode-bytes) (void)]
|
||||
[(Mode-lines (LineMode-lf)) (write-bytes #"\n" o)]
|
||||
[(Mode-lines (LineMode-crlf)) (write-bytes #"\r\n" o)])))
|
||||
(when (not flush-pending)
|
||||
(set! flush-pending #t)
|
||||
(facet-on-end-of-turn! this-facet
|
||||
|
@ -213,26 +209,25 @@
|
|||
|
||||
(define (accept-connection conn
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (CreditMode-bytes)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void]
|
||||
#:on-credit [on-credit (lambda (c) (void))])
|
||||
#:on-credit [on-credit void])
|
||||
(at conn
|
||||
(assert (ActiveSocket-controller
|
||||
(object #:name 'inbound-socket-controller
|
||||
[#:asserted (Socket-Credit c) (on-credit c)]
|
||||
[#:asserted (Socket-line line mode) (on-data line mode)]
|
||||
[#:asserted (Socket-data data) (on-data data)]
|
||||
[#:asserted (Socket-credit amount mode) (on-credit amount mode)]
|
||||
[#:asserted (Socket-data data mode) (on-data data mode)]
|
||||
[#:asserted (Socket-eof) (on-eof)]))))
|
||||
(when initial-credit (send-credit conn initial-credit initial-mode)))
|
||||
|
||||
(define (establish-connection ds spec
|
||||
#:initial-credit [initial-credit (CreditAmount-unbounded)]
|
||||
#:initial-mode [initial-mode (CreditMode-bytes)]
|
||||
#:initial-mode [initial-mode (Mode-bytes)]
|
||||
#:on-connected on-connected
|
||||
#:on-data on-data
|
||||
#:on-eof [on-eof void]
|
||||
#:on-credit [on-credit (lambda (c) (void))]
|
||||
#:on-credit [on-credit void]
|
||||
#:on-disconnected [on-disconnected (lambda () (stop-current-facet))]
|
||||
#:on-rejected [on-rejected (lambda () (stop-current-facet))])
|
||||
(define s
|
||||
|
@ -243,20 +238,30 @@
|
|||
#:retracted
|
||||
(on-disconnected)]
|
||||
[#:asserted (ActiveSocket-close message) (on-rejected message)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-Credit c)) (on-credit c)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-line line mode)) (on-data line mode)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-data data)) (on-data data)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-credit amount mode)) (on-credit amount mode)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-data data mode)) (on-data data mode)]
|
||||
[#:asserted (ActiveSocket-Socket (Socket-eof)) (on-eof)]))
|
||||
(at ds (assert (Connection s spec))))
|
||||
|
||||
(define (send-credit conn amount unit)
|
||||
(send! conn (Socket-Credit (Credit amount unit))))
|
||||
(define (send-credit conn amount mode)
|
||||
(send! conn (Socket-credit amount mode)))
|
||||
|
||||
(define (send-line conn line [mode (LineMode-crlf)])
|
||||
(send! conn (Socket-line line mode)))
|
||||
(define (send-lines-credit conn amount [mode (LineMode-lf)])
|
||||
(send-credit conn (CreditAmount-count amount) (Mode-lines mode)))
|
||||
|
||||
(define (send-data conn data)
|
||||
(send! conn (Socket-data (if (bytes? data) data (string->bytes/utf-8 data)))))
|
||||
(define (send-bytes-credit conn amount)
|
||||
(send-credit conn (CreditAmount-count amount) (Mode-bytes)))
|
||||
|
||||
(define (->bytes data)
|
||||
(if (bytes? data)
|
||||
data
|
||||
(string->bytes/utf-8 data)))
|
||||
|
||||
(define (send-line conn line [line-mode (LineMode-lf)])
|
||||
(send! conn (Socket-data (->bytes line) (Mode-lines line-mode))))
|
||||
|
||||
(define (send-data conn data [mode (Mode-bytes)])
|
||||
(send! conn (Socket-data (->bytes data) mode)))
|
||||
|
||||
(define (send-eof conn)
|
||||
(send! conn (Socket-eof)))
|
||||
|
|
|
@ -14,14 +14,12 @@ ActiveSocket =
|
|||
.
|
||||
|
||||
Socket =
|
||||
/ Credit
|
||||
/ <line @text string @mode LineMode>
|
||||
/ <data @payload bytes>
|
||||
/ <credit @amount CreditAmount @mode Mode>
|
||||
/ <data @payload bytes @mode Mode>
|
||||
/ <eof>
|
||||
.
|
||||
|
||||
LineMode = =lf / =crlf .
|
||||
|
||||
Credit = <credit @amount CreditAmount @unit CreditMode>.
|
||||
CreditMode = =bytes / @lines LineMode .
|
||||
CreditAmount = @count int / @unbounded =unbounded .
|
||||
|
||||
Mode = =bytes / @lines LineMode .
|
||||
LineMode = =lf / =crlf .
|
||||
|
|
Loading…
Reference in New Issue