Flow-controlled TCP (except in netstack); flow-controlled server/federation messages; fixes federation startup glitching

This commit is contained in:
Tony Garnock-Jones 2019-05-12 13:07:38 +01:00
parent 12650e2058
commit ae97fb1d1e
13 changed files with 206 additions and 70 deletions

View File

@ -5,6 +5,8 @@
(require "../internal-protocol.rkt")
(require "../protocol.rkt")
(require imperative-syndicate/protocol/credit)
(require/activate imperative-syndicate/distributed/server)
(spawn #:name 'loopback-client-factory
@ -12,9 +14,9 @@
#:name address
(assert (server-poa address))
(on (message (message-server->poa address $p)) (send! (server-packet address p)))
(define !! (make-flow-controlled-sender message-poa->server address))
(on (asserted (observe (message-poa->server address _)))
(generic-client-session-facet address
(lambda (x)
(send! (message-poa->server address x))))))))
(lambda (x) (!! (message-poa->server address x))))))))

View File

@ -19,6 +19,7 @@
(retracted (server-transport-connected address)))
(during (tcp-accepted id)
(on-start (issue-unbounded-credit! tcp-in id))
(assert (server-transport-connected address))
(define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p)))))
(on (message (tcp-in id $bs)) (accumulate! bs)))

View File

@ -77,13 +77,15 @@
(asserted (tcp-rejected link _)))
(during (tcp-accepted link)
(on-start (issue-unbounded-credit! tcp-in link))
(assert (federated-uplink-connected link)) ;; out to local requester
(define session-id (gensym 'uplink))
(assert (federated-link session-id local-scope)) ;; in to federated scope
(define !! (make-flow-controlled-sender message-poa->server session-id))
(define accumulate!
(packet-accumulator (lambda (p) (send! (message-poa->server session-id p)))))
(packet-accumulator (lambda (p) (!! (message-poa->server session-id p)))))
(on (message (tcp-in link $bs)) (accumulate! bs))
(on (message (message-server->poa session-id $p))
@ -102,9 +104,11 @@
(define session-id (gensym 'local-link))
(assert (federated-link session-id scope))
(define !! (make-flow-controlled-sender message-poa->server session-id))
(on (message (message-server->poa session-id (Assert $subid (observe $spec))))
(define ((! ctor) cs) (send! (message-poa->server session-id (ctor subid cs))))
(define ((! ctor) cs) (!! (message-poa->server session-id (ctor subid cs))))
(add-observer-endpoint! (lambda () (server-proposal scope spec))
#:on-add (! Add)
#:on-remove (! Del)
@ -114,8 +118,8 @@
(during (observe ($ pat (server-envelope scope $spec)))
(define ep (gensym 'ep))
(on-start (send! (message-poa->server session-id (Assert ep (observe spec)))))
(on-stop (send! (message-poa->server session-id (Clear ep))))
(on-start (!! (message-poa->server session-id (Assert ep (observe spec)))))
(on-stop (!! (message-poa->server session-id (Clear ep))))
(assert (server-envelope scope (observe spec)))
(on (message (message-server->poa session-id (Add ep $captures)))
(react (assert (instantiate-term->value pat captures))
@ -129,7 +133,7 @@
;; Internal state
(struct subscription (id ;; LocalID
spec ;; Assertion
holders ;; (Hash ConnectionID SubscriptionID)
holders ;; (Hash LinkID SubscriptionID)
matches ;; (Bag (Listof Assertion))
@ -147,9 +151,28 @@
(when (log-level? syndicate/federation-logger 'debug)
(begin/dataflow (log-syndicate/federation-debug "::: ~a peers ~v" scope (peers)))
(begin/dataflow (log-syndicate/federation-debug "::: ~a specs ~v" scope (specs)))
(begin/dataflow (log-syndicate/federation-debug "::: ~a subs ~v" scope (subs))))
(begin/dataflow (log-syndicate/federation-debug "~a peers:" scope)
(for [(peer (in-set (peers)))]
(log-syndicate/federation-debug " link ~v" peer))
(log-syndicate/federation-debug "-"))
(begin/dataflow (log-syndicate/federation-debug "~a specs:" scope)
(for [((spec local) (in-hash (specs)))]
(log-syndicate/federation-debug " spec ~v -> local ~a" spec local))
(log-syndicate/federation-debug "-"))
(begin/dataflow (log-syndicate/federation-debug "~a subs:" scope)
(for [((local sub) (in-hash (subs)))]
(match-define (subscription _id spec holders matches) sub)
(log-syndicate/federation-debug " local ~a -> sub spec ~v" local spec)
(when (not (hash-empty? holders))
(log-syndicate/federation-debug " holders:")
(for [((link ep) (in-hash holders))]
(log-syndicate/federation-debug " link ~a -> ep ~a" link ep)))
(when (not (bag-empty? matches))
(log-syndicate/federation-debug " matches:")
(for [((captures count) (in-bag/count matches))]
(log-syndicate/federation-debug " captures ~v count ~a"
captures count))))
(log-syndicate/federation-debug "-")))
(define (call-with-sub localid linkid f)
(match (hash-ref (subs) localid #f)
@ -198,18 +221,26 @@
(during (federated-link $linkid scope)
(on-start (peers (set-add (peers) linkid)))
(on-stop (peers (set-remove (peers) linkid)))
(on-start (log-syndicate/federation-debug "+PEER ~a link ~a" scope linkid)
(peers (set-add (peers) linkid)))
(on-stop (log-syndicate/federation-debug "-PEER ~a link ~a" scope linkid)
(peers (set-remove (peers) linkid)))
(field [link-subs (hash)] ;; (Hash SubscriptionID LocalID)
[link-matches (bag)] ;; (Bag (Cons LocalID (Listof Assertion)))
(when (log-level? syndicate/federation-logger 'debug)
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-subs ~v"
scope linkid (link-subs)))
(begin/dataflow (log-syndicate/federation-debug "::: ~a ~a link-matches ~v"
scope linkid (link-matches))))
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-subs:" scope linkid)
(for [((sub local) (in-hash (link-subs)))]
(log-syndicate/federation-debug " sub ~a -> local ~a" sub local))
(log-syndicate/federation-debug "-"))
(begin/dataflow (log-syndicate/federation-debug "~a ~a link-matches:" scope linkid)
(for [((item count) (in-bag/count (link-matches)))]
(match-define (cons local captures) item)
(log-syndicate/federation-debug " local ~a captures ~v count ~a"
local captures count))
(log-syndicate/federation-debug "-")))
(on-start (for ([(spec localid) (in-hash (specs))])
(send! (message-server->poa linkid (Assert localid (observe spec))))))
@ -220,6 +251,8 @@
(for ([localid (in-hash-values (link-subs))])
(unsubscribe! localid linkid)))
(on-start (issue-unbounded-credit! message-poa->server linkid))
(on (message (message-poa->server linkid (Assert $subid (observe $spec))))
(define known? (hash-has-key? (specs) spec))
(define localid (if known? (hash-ref (specs) spec) (make-localid)))

View File

@ -4,18 +4,23 @@
(require "internal-protocol.rkt")
(require racket/set)
(require imperative-syndicate/protocol/credit)
(spawn #:name 'server-factory
;; Previously, we just had server-envelope. Now, we have both
;; server-envelope and server-proposal. While not everything
;; decided is (locally) suggested, it is true that everything
;; suggested is decided (in this implementation at least),
;; and the following clause reflects this:
;; and the following clauses reflect this:
(during (server-proposal $scope $assertion)
(assert (server-envelope scope assertion)))
(on (message (server-proposal $scope $body))
(send! (server-envelope scope body)))
(during/spawn (server-poa $id)
(issue-credit! message-poa->server id)
(let-event [(message (message-poa->server id $p))]
(match p
[(Connect scope) (react (connected id scope))]
@ -28,6 +33,7 @@
(define (connected id scope)
(define endpoints (set))
(assert (server-active scope))
(on-start (issue-unbounded-credit! message-poa->server id))
(on (message (message-poa->server id $p))
(match p
[(Assert ep a) #:when (not (set-member? endpoints ep))
@ -55,7 +61,7 @@
[(Clear ep) #:when (set-member? endpoints ep)
(void)] ;; handled by stop-when clause in facet established by Assert handler
[(Message body)
(send! (server-envelope scope body))]
(send! (server-proposal scope body))]
(unhandled-message id other)])))

View File

@ -13,7 +13,9 @@
(define (server-facet/tcp id)
(assert (tcp-accepted id))
(assert (server-poa id))
(define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p)))))
(on-start (issue-unbounded-credit! tcp-in id))
(define !! (make-flow-controlled-sender message-poa->server id))
(define accumulate! (packet-accumulator (lambda (p) (!! (message-poa->server id p)))))
(on (message (tcp-in id $bs))
(accumulate! bs))
(on (message (message-server->poa id $p))
@ -26,4 +28,5 @@
(spawn #:name 'tcp-server-listener
(during/spawn (tcp-connection $id (tcp-listener port))
#:name `(server-poa ,id)
(on-start (issue-credit! (tcp-listener port)))
(server-facet/tcp id))))

View File

@ -7,6 +7,8 @@
(require "../wire-protocol.rkt")
(require "../internal-protocol.rkt")
(require imperative-syndicate/protocol/credit)
(require/activate imperative-syndicate/drivers/web)
(require/activate imperative-syndicate/drivers/timer)
(require/activate imperative-syndicate/distributed/server)
@ -21,11 +23,12 @@
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
(send! (message-server->poa id (Ping))))
(define !! (make-flow-controlled-sender message-poa->server id))
(on (message (websocket-in id $body))
(define-values (packet remainder) (decode body))
(when (not (equal? remainder #""))
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
(send! (message-poa->server id packet)))
(!! (message-poa->server id packet)))
(on (message (message-server->poa id $p))
(send! (websocket-out id (encode p)))))

View File

@ -5,6 +5,7 @@
(require (prefix-in preserves: preserves))
(require bitsyntax)
(require (only-in net/rfc6455 ws-idle-timeout))
(require imperative-syndicate/protocol/credit)
;; Enrolment
(message-struct Connect (scope)) ;; Client --> Server

View File

@ -1,18 +1,8 @@
#lang imperative-syndicate
;; TCP/IP driver interface.
;; A nice refinement would be to introduce something like a
;; `(tcp-error id _)` assertion, for when something goes wrong
;; listening or connecting. At present, for example, if connecting to
;; some other host that isn't listening, the driver pretends the
;; connection is open for an infinitesimal instant before closing.
;; This would be nicer if it never signalled "open" at all, instead
;; asserting something like `tcp-error` until interest in the
;; connection goes away.
;; TODO: This protocol is overly simplified.
;; a) no flow control
;; b) no facility for separate shutdown of inbound/outbound streams
;; a) no facility for separate shutdown of inbound/outbound streams
(provide (struct-out tcp-connection)
(struct-out tcp-connection-peer)
@ -23,13 +13,15 @@
(struct-out tcp-in-line)
(struct-out tcp-address)
(struct-out tcp-listener))
(struct-out tcp-listener)
(all-from-out imperative-syndicate/protocol/credit))
(define-logger syndicate/tcp)
(require racket/exn)
(require (prefix-in tcp: racket/tcp))
(require (only-in racket/port read-bytes-avail!-evt))
(require (only-in racket/port read-bytes-avail!-evt read-bytes-line-evt))
(require racket/unit)
(require net/tcp-sig)
@ -37,6 +29,8 @@
(require syndicate/support/bytes)
(require imperative-syndicate/protocol/credit)
;; Protocol messages
@ -73,19 +67,7 @@
[(list e) (assert (tcp-rejected id e))]
[(list cin cout)
(assert (tcp-accepted id))
(define unblock! (run-connection id cin cout))
(during/spawn (observe (tcp-in-line $id _))
#:name (list 'drivers/tcp 'line-reader id)
(field [buffer #""])
(on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs)))
(define newline-pos (bytes-index (buffer) (char->integer #\newline)))
(when newline-pos
(define line (subbytes (buffer) 0 newline-pos))
(buffer (subbytes (buffer) (+ newline-pos 1)))
(send! (tcp-in-line id line))))))
(run-connection id cin cout)])))
;; Listener
@ -96,12 +78,13 @@
(define control-ch (make-channel))
(thread (lambda ()
(let loop ((blocked? #t))
(let loop ((credit 1)) ;; NB. not zero initially!
(sync (handle-evt control-ch
['unblock (loop #f)]
[(list 'credit 'reset) (loop 0)]
[(list 'credit (? number? amount)) (loop (+ credit amount))]
['quit (void)]))
(if blocked?
(if (zero? credit)
(handle-evt (tcp:tcp-accept-evt listener)
(lambda (cin+cout)
@ -115,21 +98,22 @@
(tcp-address remote-hostname remote-port)
(loop blocked?))))))
(loop (- credit 1)))))))
(tcp:tcp-close listener)
(signal-background-activity! -1)))
(signal-background-activity! +1)
(on-start (channel-put control-ch 'unblock))
(on-stop (channel-put control-ch 'quit))
(on (message (credit* (list server-addr) $amount))
(channel-put control-ch (list 'credit amount)))
(on (message (inbound (raw-tcp-accepted server-addr $remote-addr $cin $cout)))
(define id (seal (list port remote-addr)))
(spawn #:name (list 'drivers/tcp 'inbound id)
(assert (tcp-connection id server-addr))
(assert (tcp-connection-peer id remote-addr))
(define unblock! (run-connection id cin cout))
(on (asserted (tcp-accepted id)) (unblock!))
(run-connection id cin cout)
(stop-when (asserted (tcp-rejected id _)))
(stop-when (retracted (tcp-accepted id))))))
@ -151,10 +135,25 @@
(on-stop (shutdown-connection!))
(on (asserted (observe (credit* (list tcp-out id) _)))
(send! (credit tcp-out id +inf.0)))
(on (message (credit* (list tcp-in id) $amount))
(when control-ch (channel-put control-ch (list 'credit amount))))
(field [mode 'bytes])
(begin/dataflow (when control-ch (channel-put control-ch (mode))))
(on (message (inbound (tcp-in id $eof-or-bs)))
(if (eof-object? eof-or-bs)
(send! (tcp-in id eof-or-bs))))
(send! (match (mode)
['bytes (tcp-in id eof-or-bs)]
['lines (tcp-in-line id eof-or-bs)]))))
(during (observe (tcp-in-line id _))
(on-start (mode 'lines))
(on-stop (mode 'bytes)))
(define-syntax-rule (trap-exns body ...)
(with-handlers ([(lambda (e) (not (exn:break? e)))
@ -168,26 +167,30 @@
(if (string? bs)
(write-string bs cout)
(write-bytes bs cout))
(flush-output cout)))
(let ((unblocked? #f))
(lambda ()
(when (not unblocked?)
(set! unblocked? #t)
(when control-ch (channel-put control-ch 'unblock))))))
(flush-output cout))))
(define (connection-thread control-ch id cin)
(let loop ((blocked? #t))
(let loop ((credit 0) (mode 'bytes))
(sync (handle-evt control-ch
['unblock (loop #f)]
[(list 'credit 'reset) (loop 0 mode)]
[(list 'credit (? number? amount)) (loop (+ credit amount) mode)]
['lines (loop credit 'lines)]
['bytes (loop credit 'bytes)]
['quit (void)]))
(if blocked?
(if (zero? credit)
(handle-evt (read-bytes-avail-evt 32768 cin)
(handle-evt (match mode
['bytes (read-bytes-avail-evt (inexact->exact (truncate (min credit 32768))) cin)]
['lines (read-bytes-line-evt cin 'any)])
(lambda (eof-or-bs)
(ground-send! (inbound (tcp-in id eof-or-bs)))
(loop (or blocked? (eof-object? eof-or-bs))))))))
(loop (if (eof-object? eof-or-bs)
(- credit (match mode
['bytes (bytes-length eof-or-bs)]
['lines 1])))
(close-input-port cin)
(signal-background-activity! -1))

View File

@ -16,13 +16,16 @@
(printf "*** ~a\n" (exn-message reason)))
(during (tcp-accepted id)
(on-start (printf "*** Connected.\n"))
(on-start (printf "*** Connected.\n")
(issue-credit! tcp-in id))
(on (retracted (tcp-accepted id)) (printf "*** Remote EOF.\n"))
;; ^ Not on-stop, because the facet is stopped by local EOF too!
(on (message (tcp-in id $bs))
(on (message (tcp-in-line id $bs))
(write-bytes bs)
(issue-credit! tcp-in id))
(define stdin-evt (read-bytes-line-evt (current-input-port) 'any))
(on (message (inbound (external-event stdin-evt (list $line))))

View File

@ -11,13 +11,16 @@
(during/spawn (inbound (tcp-connection $id (tcp-listener 5999)))
#:name (list 'chat-connection id)
(assert (outbound (tcp-accepted id)))
(on-start (send! (outbound (credit (tcp-listener 5999) 1)))
(send! (outbound (credit tcp-in id 1))))
(let ((me (gensym 'user)))
(assert (present me))
(on (message (inbound (tcp-in-line id $bs)))
(match bs
[#"/quit" (stop-current-facet)]
[#"/stop-server" (quit-dataspace!)]
[_ (send! (speak me (bytes->string/utf-8 bs)))])))
[_ (send! (speak me (bytes->string/utf-8 bs)))
(send! (outbound (credit tcp-in id 1)))])))
(during (present $user)
(on-start (send! (outbound (tcp-out id (string->bytes/utf-8 (~a user " arrived\n"))))))
(on-stop (send! (outbound (tcp-out id (string->bytes/utf-8 (~a user " left\n"))))))

View File

@ -13,9 +13,12 @@
(during/spawn (tcp-connection $id (tcp-listener 5999))
#:name (list 'chat-connection id)
(assert (tcp-accepted id))
(on-start (issue-credit! (tcp-listener 5999))
(issue-credit! tcp-in id))
(let ((me (gensym 'user)))
(assert (present me))
(on (message (tcp-in-line id $bs))
(issue-credit! tcp-in id)
(match bs
[#"/quit" (stop-current-facet)]
[#"/stop-server" (send! (stop-server))]

View File

@ -145,6 +145,7 @@
(define m (parse-irc-message (bytes->string/utf-8 bs)))
(log-info "~a -> ~v" this-conn m)
(send! (ircd-action this-conn m))
(issue-credit! tcp-in this-conn)
(match m
[(irc-message _ "PING" _ _) (void)] ;; RFC says servers don't reply to PINGs
[(or (irc-message _ "PASS" (list P) _)
@ -227,6 +228,8 @@
(during/spawn (tcp-connection $this-conn server-handle)
#:name `(ircd-connection ,this-conn)
(define connection-root-facet (current-facet))
(on-start (issue-credit! server-handle)
(issue-credit! tcp-in this-conn))
(during (tcp-connection-peer this-conn (tcp-address $peer-host _))
(assert (tcp-accepted this-conn))
(ircd-connection-facet connection-root-facet this-conn peer-host)))))

View File

@ -0,0 +1,72 @@
#lang imperative-syndicate
(provide (all-defined-out))
(require syndicate/functional-queue)
(define-logger syndicate/protocol/credit)
;; (credit* Any (U Number 'reset))
;; (credit Any ... (U Number 'reset))
;; Send this message to issue `amount` units of credit (in the context
;; of credit-based flow control) to the given `context`.
;; A `context` may identify any essentially asynchronous stream where
;; either the possibility of overwhelming a consumer exists, or the
;; need for occasionally changing the settings of a producer in an
;; atomic way exists. For example, reading HTTP headers proceeds
;; line-by-line until the body is reached, at which point it proceeds
;; byte-by-byte.
;; The `amount` may either be a number or `'reset`, which should zero
;; out (discard) any available credit. In particular, it may be
;; `+inf.0`, effectively turning credit-based flow control off for the
;; named context.
;; See also https://eighty-twenty.org/2011/05/15/origins-of-ack-and-flow-control.
(message-struct credit* (context amount))
(define-match-expander credit
(syntax-rules () [(_ context ... amount) (credit* (list context ...) amount)])
(syntax-rules () [(_ context ... amount) (credit* (list context ...) amount)]))
(define (issue-credit! #:amount [amount 1] . context)
(send! (credit* context amount)))
(define (issue-unbounded-credit! . context)
(send! (credit* context +inf.0)))
(define (make-flow-controlled-sender . context)
(make-flow-controlled-sender* context))
(define (make-flow-controlled-sender* context)
(field [q (make-queue)]
[item-credit 0])
(when (log-level? syndicate/protocol/credit-logger 'debug)
"context ~a, queue length ~a, credit ~a"
(queue-length (q))
(when (and (positive? (item-credit))
(not (queue-empty? (q))))
(define-values (item new-q) (dequeue (q)))
(send! item)
(q new-q)
(item-credit (- (item-credit) 1))))
(on (message (credit* context $amount))
(item-credit (if (eq? amount 'reset) 0 (+ (item-credit) amount))))
(lambda (item) (q (enqueue (q) item))))
;; It's quite possible that credit-based flow control is not the right
;; approach for Syndicate. Using assertions that describe the content
;; of a stream more relationally ought to allow "replay" of
;; information in different contexts; though the trade-off is not only
;; reduced performance, but a need to garbage-collect
;; no-longer-interesting portions of the stream; that is,
;; *acknowledgements*. In a reliable-delivery context, it would appear
;; that at least one of acks or flow-control is required! (?!?)