From cf92ae14c5688a3175c05aded1d6c745ddd70322 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 7 May 2019 12:56:22 +0100 Subject: [PATCH] Select client/server scope on connect; stub out federation/peering; protocol error and disconnection support --- syndicate/distributed/client.rkt | 20 ++++- syndicate/distributed/client/loopback.rkt | 9 ++- syndicate/distributed/client/tcp.rkt | 45 +++++++++--- syndicate/distributed/main.rkt | 8 +- syndicate/distributed/server.rkt | 85 +++++++++++++--------- syndicate/distributed/server/tcp.rkt | 12 +-- syndicate/distributed/server/websocket.rkt | 11 ++- syndicate/distributed/wire-protocol.rkt | 18 ++++- syndicate/examples/server-chat-client.rkt | 2 +- 9 files changed, 143 insertions(+), 67 deletions(-) diff --git a/syndicate/distributed/client.rkt b/syndicate/distributed/client.rkt index 59aa372..692026f 100644 --- a/syndicate/distributed/client.rkt +++ b/syndicate/distributed/client.rkt @@ -1,6 +1,8 @@ #lang imperative-syndicate -(provide generic-client-session-facet) +(provide generic-client-session-facet + (struct-out server-packet) + (struct-out server-transport-connected)) (require "wire-protocol.rkt") (require "protocol.rkt") @@ -13,11 +15,21 @@ (during (observe (from-server $a _)) (assert (server-connection a))) (during (observe (server-connected $a)) (assert (server-connection a)))) -(define (generic-client-session-facet address w) +;; Received packets from server are relayed via one of these. +(message-struct server-packet (address packet)) +;; Like `server-connected`, but for reflecting `tcp-accepted` to the +;; client end of a client-server connection without reordering wrt +;; `server-packet` messages. Implementation-facing, where +;; `server-connected` is part of the API. +(assertion-struct server-transport-connected (address)) + +(define (generic-client-session-facet address scope w) (on-start (log-syndicate/distributed-info "Connected to ~v" address)) (on-stop (log-syndicate/distributed-info "Disconnected from ~v" address)) (assert (server-connected address)) + (on-start (w (Connect scope))) + (define next-ep (let ((counter 0)) (lambda () @@ -35,6 +47,10 @@ (on (message (server-packet address (Ping))) (w (Pong))) + (on (message (server-packet address (Err $detail))) + (log-syndicate/distributed-error "Error from ~a: ~v" address detail) + (stop-current-facet)) + (during (observe ($ pat (from-server address $spec))) (define ep (next-ep)) (on-start (w (Assert ep (observe spec)))) diff --git a/syndicate/distributed/client/loopback.rkt b/syndicate/distributed/client/loopback.rkt index 021e7fc..f42f3eb 100644 --- a/syndicate/distributed/client/loopback.rkt +++ b/syndicate/distributed/client/loopback.rkt @@ -13,6 +13,11 @@ (spawn #:name 'loopback-client-factory (during/spawn (server-connection ($ address (server-loopback-connection $scope))) #:name address - (assert (server-poa address scope)) + (assert (server-poa address)) (on (message (message-server->poa address $p)) (send! (server-packet address p))) - (generic-client-session-facet address (lambda (x) (send! (message-poa->server address x)))))) + (on (asserted (observe (message-poa->server address _))) + (react + (generic-client-session-facet address + scope + (lambda (x) + (send! (message-poa->server address x)))))))) diff --git a/syndicate/distributed/client/tcp.rkt b/syndicate/distributed/client/tcp.rkt index 6f96fd9..a9c3ee9 100644 --- a/syndicate/distributed/client/tcp.rkt +++ b/syndicate/distributed/client/tcp.rkt @@ -10,18 +10,43 @@ (require/activate imperative-syndicate/drivers/tcp) -(assertion-struct server-tcp-connection (host port)) +(assertion-struct server-tcp-connection (host port scope)) -(define standard-localhost-server/tcp (server-tcp-connection "localhost" 8001)) +(define (standard-localhost-server/tcp [scope "broker"]) + (server-tcp-connection "localhost" 8001 scope)) (spawn #:name 'tcp-client-factory - (during/spawn (server-connection ($ address (server-tcp-connection $host $port))) + (during/spawn (server-connection ($ address (server-tcp-connection $host $port $scope))) #:name address (define id (list (gensym 'client) host port)) - (reassert-on (tcp-connection id (tcp-address host port)) - (retracted (tcp-accepted id)) - (asserted (tcp-rejected id _))) - (during (tcp-accepted id) - (define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p))))) - (on (message (tcp-in id $bs)) (accumulate! bs)) - (generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x)))))))) + (let boot-connection () + (define root-facet (current-facet)) + + (reassert-on (tcp-connection id (tcp-address host port)) + (retracted (tcp-accepted id)) + (asserted (tcp-rejected id _)) + (retracted (server-transport-connected address))) + + (during (tcp-accepted id) + (assert (server-transport-connected address)) + (define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p))))) + (on (message (tcp-in id $bs)) (accumulate! bs))) + + (during (server-transport-connected address) + ;; If we run generic-client-session-facet in the `tcp-accepted` handler above, + ;; then unfortunately disconnection of the TCP socket on error overtakes the error + ;; report itself, terminating the generic-client-session-facet before it has a + ;; chance to handle the error report. + ;; + ;; Could timing errors like that be something a type system could help us with? + ;; The conversation in `server-packet`s is sort-of "nested" inside the + ;; conversation in `tcp-in`s; a single facet reacting to both conversations (in + ;; this instance, to `server-packets` in an implicit frame, but explicitly to the + ;; frame of the `tcp-in`s, namely `tcp-accepted`) is probably an error. Or rather, + ;; any situation where pending "inner conversation" business could be obliterated + ;; by discarding a facet based on "outer conversation" framing is probably an + ;; error. + ;; + (generic-client-session-facet address + scope + (lambda (x) (send! (tcp-out id (encode x))))))))) diff --git a/syndicate/distributed/main.rkt b/syndicate/distributed/main.rkt index f28e703..17d3b7b 100644 --- a/syndicate/distributed/main.rkt +++ b/syndicate/distributed/main.rkt @@ -42,10 +42,10 @@ (when (log-level? syndicate/distributed-logger 'debug) (spawn #:name 'server-debug - (on (asserted (server-poa $id $scope)) - (log-syndicate/distributed-debug "+ ~v ~v" id scope)) - (on (retracted (server-poa $id $scope)) - (log-syndicate/distributed-debug "- ~v ~v" id scope)) + (on (asserted (server-poa $id)) + (log-syndicate/distributed-debug "+ ~v" id)) + (on (retracted (server-poa $id)) + (log-syndicate/distributed-debug "- ~v" id)) (on (message (message-poa->server $id $p)) (log-syndicate/distributed-debug "IN ~v ~v" id p)) (on (message (message-server->poa $id $p)) diff --git a/syndicate/distributed/server.rkt b/syndicate/distributed/server.rkt index a178a3d..a44888b 100644 --- a/syndicate/distributed/server.rkt +++ b/syndicate/distributed/server.rkt @@ -10,7 +10,7 @@ (require racket/set) ;; Internal connection protocol -(assertion-struct server-poa (connection-id scope)) ;; "Point of Attachment" +(assertion-struct server-poa (connection-id)) ;; "Point of Attachment" (assertion-struct message-poa->server (connection-id body)) (assertion-struct message-server->poa (connection-id body)) @@ -21,44 +21,63 @@ (spawn #:name 'server-factory - (during/spawn (server-poa _ _) - ;; Previously, we just had server-envelope. Now, we have both - ;; server-envelope and server-proposal. While not everything - ;; decided is (locally) suggested, it is true that everything - ;; suggested is decided (in this implementation at least), - ;; and the following clause reflects this: - (during (server-proposal $scope $assertion) - (assert (server-envelope scope assertion)))) + ;; Previously, we just had server-envelope. Now, we have both + ;; server-envelope and server-proposal. While not everything + ;; decided is (locally) suggested, it is true that everything + ;; suggested is decided (in this implementation at least), + ;; and the following clause reflects this: + (during (server-proposal $scope $assertion) + (assert (server-envelope scope assertion))) - (during/spawn (server-poa $id $scope) - (define endpoints (set)) + (during/spawn (server-poa $id) + (on (message (message-poa->server id $p)) + (match p + [(Connect scope) (stop-current-facet (react (connected id scope)))] + [(Peer scope) (stop-current-facet (react (peering id scope)))] + [_ (send-error! id 'connection-not-setup)])))) - (on (message (message-poa->server id (Assert $ep $a))) - (when (not (set-member? endpoints ep)) - (set! endpoints (set-add endpoints ep)) - (react - (on-stop (set! endpoints (set-remove endpoints ep))) +(define (send-error! id detail) + (send! (message-server->poa id (Err detail)))) - (field [assertion a]) +(define (unhandled-message id p) + (match p + [(Connect _) (send-error! id 'duplicate-connection-setup)] + [(Peer _) (send-error! id 'duplicate-connection-setup)] + [(Ping) (send! (message-server->poa id (Pong)))] + [_ (send-error! id 'invalid-message)])) - (assert (server-proposal scope (assertion))) +(define (connected id scope) + (define endpoints (set)) + (on (message (message-poa->server id $p)) + (match p + [(Assert ep a) #:when (not (set-member? endpoints ep)) + (set! endpoints (set-add endpoints ep)) + (react + (on-stop (set! endpoints (set-remove endpoints ep))) - (let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs))))))) - (add-observer-endpoint! (lambda () - (let ((a (assertion))) - (when (observe? a) - (server-envelope scope (observe-specification a))))) - #:on-add (! Add) - #:on-remove (! Del) - #:on-message (! Msg))) + (field [assertion a]) - (on (message (message-poa->server id (Assert ep $new-a))) - (assertion new-a)) + (assert (server-proposal scope (assertion))) - (stop-when (message (message-poa->server id (Clear ep))))))) + (let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs))))))) + (add-observer-endpoint! (lambda () + (let ((a (assertion))) + (when (observe? a) + (server-envelope scope (observe-specification a))))) + #:on-add (! Add) + #:on-remove (! Del) + #:on-message (! Msg))) - (on (message (message-poa->server id (Message $body))) - (send! (server-envelope scope body))) + (on (message (message-poa->server id (Assert ep $new-a))) + (assertion new-a)) - (on (message (message-poa->server id (Ping))) - (send! (message-server->poa id (Pong)))))) + (stop-when (message (message-poa->server id (Clear ep)))))] + [(Clear ep) #:when (set-member? endpoints ep) + (void)] ;; handled by stop-when clause in facet established by Assert handler + [(Message body) + (send! (server-envelope scope body))] + [other + (unhandled-message id other)]))) + +(define (peering id scope) + (error 'peering "Not yet implemented")) diff --git a/syndicate/distributed/server/tcp.rkt b/syndicate/distributed/server/tcp.rkt index 2c1ab71..9cc2272 100644 --- a/syndicate/distributed/server/tcp.rkt +++ b/syndicate/distributed/server/tcp.rkt @@ -9,20 +9,20 @@ (require/activate imperative-syndicate/drivers/tcp) (require/activate imperative-syndicate/distributed/server) -(define (server-facet/tcp id scope) +(define (server-facet/tcp id) (assert (tcp-accepted id)) - (assert (server-poa id scope)) + (assert (server-poa id)) (define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p))))) (on (message (tcp-in id $bs)) (accumulate! bs)) (on (message (message-server->poa id $p)) - (send! (tcp-out id (encode p))))) + (send! (tcp-out id (encode p))) + (when (Err? p) (stop-current-facet)))) (define default-tcp-server-port 8001) (define (spawn-tcp-server! [port default-tcp-server-port]) (spawn #:name 'tcp-server-listener - (define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup (during/spawn (tcp-connection $id (tcp-listener port)) - #:name `(server-poa ,tcp-scope ,id) - (server-facet/tcp id tcp-scope)))) + #:name `(server-poa ,id) + (server-facet/tcp id)))) diff --git a/syndicate/distributed/server/websocket.rkt b/syndicate/distributed/server/websocket.rkt index dc1d5cd..372cd07 100644 --- a/syndicate/distributed/server/websocket.rkt +++ b/syndicate/distributed/server/websocket.rkt @@ -10,10 +10,10 @@ (require/activate imperative-syndicate/drivers/timer) (require/activate imperative-syndicate/distributed/server) -(define (server-facet/websocket id scope) +(define (server-facet/websocket id) (assert (http-accepted id)) (assert (http-response-websocket id)) - (assert (server-poa id scope)) + (assert (server-poa id)) (field [ping-time-deadline 0]) (on (asserted (later-than (ping-time-deadline))) @@ -32,7 +32,6 @@ (define (spawn-websocket-server! [port default-http-server-port]) (spawn #:name 'websocket-server-listener - (during/spawn (http-request $id 'get (http-resource (http-server _ port #f) - `(,$scope ())) _ _ _) - #:name `(server-poa ,scope ,id) - (server-facet/websocket id scope)))) + (during/spawn (http-request $id 'get (http-resource (http-server _ port #f) `()) _ _ _) + #:name `(server-poa ,id) + (server-facet/websocket id)))) diff --git a/syndicate/distributed/wire-protocol.rkt b/syndicate/distributed/wire-protocol.rkt index 3382316..306de9f 100644 --- a/syndicate/distributed/wire-protocol.rkt +++ b/syndicate/distributed/wire-protocol.rkt @@ -6,6 +6,10 @@ (require bitsyntax) (require (only-in net/rfc6455 ws-idle-timeout)) +;; Enrolment +(message-struct Connect (scope)) ;; Client --> Server +(message-struct Peer (scope)) ;; Peer --> Peer + ;; Actions; Client --> Server (and Peer --> Peer, except for Message) (message-struct Assert (endpoint-name assertion)) (message-struct Clear (endpoint-name)) @@ -15,11 +19,22 @@ (message-struct Add (endpoint-name captures)) (message-struct Del (endpoint-name captures)) (message-struct Msg (endpoint-name captures)) +(message-struct Err (detail)) ;; Transport-related; Bidirectional (message-struct Ping ()) (message-struct Pong ()) +;; Peering +;; ======= +;; +;; To peer, send `(Peer Scope)` at the start of a connection instead +;; of the usual `(Connect Scope)`. +;; +;; In peer mode, *actions* and *events* travel in *both* directions, +;; but `Message`s do not appear and (for now) `Assert` is only used to +;; establish `observe`s, i.e. subscriptions. + (define (decode bs) (parameterize ((preserves:short-form-labels '#(discard capture observe))) (bit-string-case bs @@ -44,6 +59,3 @@ (handle-packet! packet))) (lambda (chunk) (buffer (bytes-append (buffer) chunk)))) - -;; Received packets from server are relayed via one of these. -(message-struct server-packet (address packet)) diff --git a/syndicate/examples/server-chat-client.rkt b/syndicate/examples/server-chat-client.rkt index f83e1c1..12bdddc 100644 --- a/syndicate/examples/server-chat-client.rkt +++ b/syndicate/examples/server-chat-client.rkt @@ -11,7 +11,7 @@ (field [username (symbol->string (strong-gensym 'chatter-))]) (define root-facet (current-facet)) - (define url standard-localhost-server/tcp) + (define url (standard-localhost-server/tcp)) (during (server-connected url) (on-start (log-info "Connected to server.")) (on-stop (log-info "Disconnected from server."))