diff --git a/syndicate/drivers/tcp.rkt b/syndicate/drivers/tcp.rkt index a3d5e7f..2b441dc 100644 --- a/syndicate/drivers/tcp.rkt +++ b/syndicate/drivers/tcp.rkt @@ -6,13 +6,17 @@ spawn-tcp-driver accept-connection establish-connection + send-credit + send-line send-data + send-eof) - read-bytes-avail) - +(require racket/async-channel) (require racket/tcp) +(require racket/port) (require (only-in racket/exn exn->string)) (require syndicate/driver-support) +(require syndicate/functional-queue) (require syndicate/schemas/gen/tcp) (require syndicate/schemas/gen/dataspace-patterns) @@ -51,6 +55,14 @@ (lambda () (spawn-inbound ds connection-custodian i o (TcpInbound host port)))) (loop))))) +(define (mode->line-terminator mode) + (match mode + [(LineMode-lf) "\n"] + [(LineMode-crlf) "\r\n"])) + +(define (render-line line mode) + (string->bytes/utf-8 (string-append line (mode->line-terminator mode)))) + (define (run-outbound ds local-peer host port) (define connection-custodian (make-custodian)) ((with-handlers ([exn:fail:network? @@ -62,12 +74,15 @@ (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) (on-stop (close-input-port i) (close-output-port o)) - (start-inbound-relay connection-custodian name local-peer i) + (define issue-credit (start-inbound-relay connection-custodian name (lambda () local-peer) i)) (define relay (outbound-relay name o)) (at local-peer (assert (ActiveSocket-controller (object #:name (list name 'socket) - [#:asserted (Socket data) (relay data)])))))))) + [#:asserted (Socket-Credit c) (issue-credit c)] + [#:asserted (Socket-line line mode) (relay (render-line line mode))] + [#:asserted (Socket-data data) (relay data)] + [#:asserted (Socket-eof) (close-output-port o)])))))))) (define (spawn-inbound ds custodian i o spec) (define name (call-with-values (lambda () (tcp-addresses i #t)) list)) @@ -76,6 +91,7 @@ (on-stop (close-input-port i) (close-output-port o)) + (define issue-credit #f) (define active-controller #f) (define relay (outbound-relay name o)) (at ds @@ -85,7 +101,8 @@ [#:asserted (ActiveSocket-controller controller) (log-syndicate/drivers/tcp-debug "~v controller for ~v" controller this-actor) (when (not active-controller) - (start-inbound-relay custodian name controller i)) + (set! issue-credit + (start-inbound-relay custodian name (lambda () active-controller) i))) (set! active-controller controller) #:retracted (when (eq? controller active-controller) @@ -94,21 +111,77 @@ [#:asserted (ActiveSocket-close message) (log-syndicate/drivers/tcp-debug "closing ~v:\n~a" this-actor message) (stop-current-facet)] - [#:asserted (ActiveSocket-Socket (Socket data)) - (relay data)]) + [#:asserted (ActiveSocket-Socket (Socket-Credit c)) + (if issue-credit + (issue-credit c) + (log-syndicate/drivers/tcp-warning + "Socket-Credit ~v ignored because no controller present" c))] + [#:asserted (ActiveSocket-Socket (Socket-line line mode)) + (relay (render-line line mode))] + [#:asserted (ActiveSocket-Socket (Socket-data data)) + (relay data)] + [#:asserted (ActiveSocket-Socket (Socket-eof)) + (close-output-port o)]) spec))))) -(define (start-inbound-relay custodian name target i) +(define (start-inbound-relay custodian name target-proc i) + (define eof-received? #f) + (define control-ch (make-async-channel)) (linked-thread #:name (list name 'input-thread) #:custodian custodian + #:peer (object #:name 'inbound-relay-monitor + [#:asserted _ + #:retracted + (close-input-port i) + (when (not eof-received?) (stop-current-facet))]) (lambda (facet) - (let loop () - (define bs (read-bytes-avail i)) - (when (bytes? bs) - (log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name) - (turn! facet (lambda () (send-data target bs))) - (loop)))))) + (define (update-count remaining-count mode q) + (if (zero? remaining-count) + q + (undequeue (cons remaining-count mode) q))) + (let loop ((credits (make-queue))) + (sync (handle-evt control-ch + (match-lambda + [(Credit (CreditAmount-count 0) _mode) (loop credits)] + [(Credit (CreditAmount-count n) mode) + (loop (match (unenqueue* credits) + [(list q (cons c (== mode))) (enqueue q (cons (+ c n) mode))] + [_ (enqueue credits (cons n mode))]))] + [(Credit (CreditAmount-unbounded) mode) + (loop (match (unenqueue* credits) + [(list q (cons _ (== mode))) (enqueue q (cons +inf.0 mode))] + [_ (enqueue credits (cons +inf.0 mode))]))])) + (match (dequeue* credits) + [(list) + never-evt] + [(list (cons count (and mode (CreditMode-bytes))) q) + (define buffer (make-bytes (inexact->exact (min count 131072)))) + (handle-evt (read-bytes-avail!-evt buffer i) + (match-lambda + [(? number? read-count) + (define bs (subbytes buffer 0 read-count)) + (log-syndicate/drivers/tcp-debug "inbound data ~v for ~v" bs name) + (turn! facet (lambda () (send-data (target-proc) bs))) + (loop (update-count (- count read-count) mode q))] + [(? eof-object?) + (log-syndicate/drivers/tcp-debug "inbound eof for ~v" name) + (turn! facet (lambda () (send-eof (target-proc))))]))] + [(list (cons count (and mode (CreditMode-lines line-mode))) q) + (handle-evt (read-line-evt i (match line-mode + [(LineMode-lf) 'linefeed] + [(LineMode-crlf) 'return-linefeed])) + (match-lambda + [(? string? line) + (log-syndicate/drivers/tcp-debug "inbound line ~v for ~v" line name) + (turn! facet (lambda () (send-line (target-proc) line line-mode))) + (loop (update-count (- count 1) mode q))] + [(? eof-object?) + (log-syndicate/drivers/tcp-debug "inbound eof for ~v" name) + (turn! facet (lambda () (send-eof (target-proc))))]))]))))) + (define (issue-credit c) + (async-channel-put control-ch c)) + issue-credit) (define-syntax (EPIPE stx) (local-require ffi/unsafe) @@ -138,34 +211,52 @@ (with-stop-current-facet-on-epipe 'flushing (lambda () (flush-output o)))))))) -(define (read-bytes-avail input-port #:limit [limit 65536]) - (define buffer (make-bytes limit)) - (match (read-bytes-avail! buffer input-port) - [(? number? count) (subbytes buffer 0 count)] - [other other])) - -(define (accept-connection conn #:on-data on-data) +(define (accept-connection conn + #:initial-credit [initial-credit (CreditAmount-unbounded)] + #:initial-mode [initial-mode (CreditMode-bytes)] + #:on-data on-data + #:on-eof [on-eof void] + #:on-credit [on-credit (lambda (c) (void))]) (at conn (assert (ActiveSocket-controller (object #:name 'inbound-socket-controller - [#:asserted (Socket data) (on-data data)]))))) + [#:asserted (Socket-Credit c) (on-credit c)] + [#:asserted (Socket-line line mode) (on-data line mode)] + [#:asserted (Socket-data data) (on-data data)] + [#:asserted (Socket-eof) (on-eof)])))) + (when initial-credit (send-credit conn initial-credit initial-mode))) (define (establish-connection ds spec + #:initial-credit [initial-credit (CreditAmount-unbounded)] + #:initial-mode [initial-mode (CreditMode-bytes)] #:on-connected on-connected #:on-data on-data + #:on-eof [on-eof void] + #:on-credit [on-credit (lambda (c) (void))] #:on-disconnected [on-disconnected (lambda () (stop-current-facet))] #:on-rejected [on-rejected (lambda () (stop-current-facet))]) (define s (object #:name 'outbound-socket [#:asserted (ActiveSocket-controller peer) (on-connected peer) + (when initial-credit (send-credit peer initial-credit initial-mode)) #:retracted (on-disconnected)] - [#:asserted (ActiveSocket-close message) - (on-rejected message)] - [#:asserted (ActiveSocket-Socket (Socket data)) - (on-data data)])) + [#:asserted (ActiveSocket-close message) (on-rejected message)] + [#:asserted (ActiveSocket-Socket (Socket-Credit c)) (on-credit c)] + [#:asserted (ActiveSocket-Socket (Socket-line line mode)) (on-data line mode)] + [#:asserted (ActiveSocket-Socket (Socket-data data)) (on-data data)] + [#:asserted (ActiveSocket-Socket (Socket-eof)) (on-eof)])) (at ds (assert (Connection s spec)))) +(define (send-credit conn amount unit) + (send! conn (Socket-Credit (Credit amount unit)))) + +(define (send-line conn line [mode (LineMode-crlf)]) + (send! conn (Socket-line line mode))) + (define (send-data conn data) - (send! conn (Socket (if (bytes? data) data (string->bytes/utf-8 data))))) + (send! conn (Socket-data (if (bytes? data) data (string->bytes/utf-8 data))))) + +(define (send-eof conn) + (send! conn (Socket-eof))) diff --git a/syndicate/functional-queue.rkt b/syndicate/functional-queue.rkt new file mode 100644 index 0000000..563497c --- /dev/null +++ b/syndicate/functional-queue.rkt @@ -0,0 +1,130 @@ +#lang racket/base +;;; SPDX-License-Identifier: LGPL-3.0-or-later +;;; SPDX-FileCopyrightText: Copyright © 2011-2021 Tony Garnock-Jones + +(provide make-queue + queue? + enqueue + enqueue-all + undequeue + queue-prepare-for-dequeue + dequeue + dequeue* + queue-prepare-for-unenqueue + unenqueue + unenqueue* + list->queue + queue->list + queue-length + queue-empty? + queue-append + queue-append-list + queue-extract + queue-filter + queue-remove + queue-partition) + +(require (only-in racket/list partition)) + +(struct queue (head tail) #:transparent) + +(define (make-queue) + (queue '() '())) + +(define (enqueue q v) + (queue (queue-head q) + (cons v (queue-tail q)))) + +(define (enqueue-all q v) + (queue (queue-head q) + (append (reverse v) (queue-tail q)))) + +(define (undequeue v q) + (queue (cons v (queue-head q)) + (queue-tail q))) + +(define (queue-prepare-for-dequeue q) + (if (null? (queue-head q)) + (queue (reverse (queue-tail q)) '()) + q)) + +(define (dequeue q) + (let* ((q1 (queue-prepare-for-dequeue q)) + (h (queue-head q1))) + (if (pair? h) + (values (car h) (queue (cdr h) (queue-tail q1))) + (values)))) + +(define (dequeue* q) + (call-with-values (lambda () (dequeue q)) list)) + +(define (queue-prepare-for-unenqueue q) + (if (null? (queue-tail q)) + (queue '() (reverse (queue-head q))) + q)) + +(define (unenqueue q) + (let* ((q1 (queue-prepare-for-unenqueue q)) + (t (queue-tail q1))) + (if (pair? t) + (values (queue (queue-head q1) (cdr t)) (car t)) + (values)))) + +(define (unenqueue* q) + (call-with-values (lambda () (unenqueue q)) list)) + +(define (list->queue xs) + (queue xs '())) + +(define (queue->list q) + (append (queue-head q) (reverse (queue-tail q)))) + +(define (queue-length q) + (+ (length (queue-head q)) + (length (queue-tail q)))) + +(define (queue-empty? q) + (and (null? (queue-head q)) + (null? (queue-tail q)))) + +(define (queue-append q1 q2) + (queue (append (queue-head q1) + (reverse (queue-tail q1)) + (queue-head q2)) + (queue-tail q2))) + +(define (queue-append-list q1 xs) + (queue (queue-head q1) + (append (reverse xs) (queue-tail q1)))) + +(define (queue-extract q predicate [default-value #f]) + (let search-head ((head (queue-head q)) + (rejected-head-rev '())) + (cond + ((null? head) (let search-tail ((tail (reverse (queue-tail q))) + (rejected-tail-rev '())) + (cond + ((null? tail) (values default-value q)) + ((predicate (car tail)) (values (car tail) + (queue (queue-head q) + (append (reverse (cdr tail)) + rejected-tail-rev)))) + (else (search-tail (cdr tail) (cons (car tail) rejected-tail-rev)))))) + ((predicate (car head)) (values (car head) + (queue (append (reverse rejected-head-rev) + (cdr head)) + (queue-tail q)))) + (else (search-head (cdr head) (cons (car head) rejected-head-rev)))))) + +(define (queue-filter pred q) + (queue (filter pred (queue-head q)) + (filter pred (queue-tail q)))) + +(define (queue-remove item q) + (list->queue (remove item (queue->list q)))) + +(define (queue-partition pred q) + (define-values (head-t head-f) (partition pred (queue-head q))) + (define-values (tail-t tail-f) (partition pred (queue-tail q))) + (values (queue head-t tail-t) + (queue head-f tail-f))) diff --git a/syndicate/schemas/tcp.prs b/syndicate/schemas/tcp.prs index d41eaed..c0bbd24 100644 --- a/syndicate/schemas/tcp.prs +++ b/syndicate/schemas/tcp.prs @@ -13,5 +13,15 @@ ActiveSocket = / Socket . -; TODO: -Socket = . +Socket = + / Credit + / + / + / +. + +LineMode = =lf / =crlf . + +Credit = . +CreditMode = =bytes / @lines LineMode . +CreditAmount = @count int / @unbounded =unbounded . diff --git a/syndicate/syntax.rkt b/syndicate/syntax.rkt index 64f07a8..7a6d818 100644 --- a/syndicate/syntax.rkt +++ b/syndicate/syntax.rkt @@ -13,6 +13,7 @@ ref react + let-event define-field stop-facet stop-current-facet @@ -52,6 +53,8 @@ (require "pattern.rkt") (require "syntax-classes.rkt") +(define-logger syndicate/object) ;; used by the (object) macro + (define-syntax this-turn (make-set!-transformer (lambda (stx) @@ -79,11 +82,12 @@ (define-syntax (object stx) (syntax-parse stx - [(_ name: handler ...) + [(_ name-stx: handler ...) #`(let ((state (make-hash))) + (define name name-stx.N) (define (handler-function assertion) - (-object-clauses assertion [] [handler ...])) - (ref (entity #:name name.N + (-object-clauses name assertion [] [handler ...])) + (ref (entity #:name name #:assert (lambda (m h) (-object-assert state handler-function m h)) #:retract (lambda (h) (-object-retract state h)) #:message (lambda (m) (-object-message handler-function m)))))])) @@ -105,18 +109,22 @@ (define-syntax (-object-clauses stx) (syntax-parse stx - [(_ input [completed ...] []) + [(_ name input [completed ...] []) #'(match input completed ... - [_ #f])] + [_ + (log-syndicate/object-debug "Unhandled assertion ~v in ~v" input name) + #f])] - [(_ input [completed ...] [ [#:spawn pat body ...] more ... ]) - #'(-object-clauses input + [(_ name input [completed ...] [ [#:spawn pat body ...] more ... ]) + #'(-object-clauses name + input [completed ...] [ [#:during pat (spawn/link body ...)] more ... ])] - [(_ input [completed ...] [ [#:asserted pat body+ ... #:retracted body- ...] more ... ]) - #`(-object-clauses input + [(_ name input [completed ...] [ [#:asserted pat body+ ... #:retracted body- ...] more ... ]) + #`(-object-clauses name + input [ completed ... [(-object-pattern pat) body+ ... #,(if (null? (syntax->list #'(body- ...))) @@ -124,13 +132,15 @@ #`(lambda () body- ...))] ] [more ...])] - [(_ input [completed ...] [ [#:asserted pat body+ ...] more ... ]) - #'(-object-clauses input + [(_ name input [completed ...] [ [#:asserted pat body+ ...] more ... ]) + #'(-object-clauses name + input [completed ...] [ [#:asserted pat body+ ... #:retracted] more ... ])] - [(_ input [completed ...] [ [pat body ...] more ... ]) - #'(-object-clauses input + [(_ name input [completed ...] [ [pat body ...] more ... ]) + #'(-object-clauses name + input [completed ...] [ [#:asserted pat (define f (react (facet-prevent-inert-check! this-facet) body ...)) @@ -150,6 +160,15 @@ (define-syntax-rule (react setup-expr ...) (turn-facet! this-turn (lambda () setup-expr ...))) +(define-syntax (let-event stx) + (syntax-parse stx + [(_ [] body ...) + #'(begin body ...)] + [(_ [#:do expr e ...] body ...) + #'(begin expr (let-event [e ...] body ...))] + [(_ [e0 e ...] body ...) + #'(react (stop-when e0 (let-event [e ...] body ...)))])) + (define-syntax-rule (define-field id initial-value) (define id (turn-field! this-turn 'id initial-value)))