Select client/server scope on connect; stub out federation/peering; protocol error and disconnection support
This commit is contained in:
parent
0ac2bb768e
commit
cf92ae14c5
|
@ -1,6 +1,8 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide generic-client-session-facet)
|
||||
(provide generic-client-session-facet
|
||||
(struct-out server-packet)
|
||||
(struct-out server-transport-connected))
|
||||
|
||||
(require "wire-protocol.rkt")
|
||||
(require "protocol.rkt")
|
||||
|
@ -13,11 +15,21 @@
|
|||
(during (observe (from-server $a _)) (assert (server-connection a)))
|
||||
(during (observe (server-connected $a)) (assert (server-connection a))))
|
||||
|
||||
(define (generic-client-session-facet address w)
|
||||
;; Received packets from server are relayed via one of these.
|
||||
(message-struct server-packet (address packet))
|
||||
;; Like `server-connected`, but for reflecting `tcp-accepted` to the
|
||||
;; client end of a client-server connection without reordering wrt
|
||||
;; `server-packet` messages. Implementation-facing, where
|
||||
;; `server-connected` is part of the API.
|
||||
(assertion-struct server-transport-connected (address))
|
||||
|
||||
(define (generic-client-session-facet address scope w)
|
||||
(on-start (log-syndicate/distributed-info "Connected to ~v" address))
|
||||
(on-stop (log-syndicate/distributed-info "Disconnected from ~v" address))
|
||||
(assert (server-connected address))
|
||||
|
||||
(on-start (w (Connect scope)))
|
||||
|
||||
(define next-ep
|
||||
(let ((counter 0))
|
||||
(lambda ()
|
||||
|
@ -35,6 +47,10 @@
|
|||
(on (message (server-packet address (Ping)))
|
||||
(w (Pong)))
|
||||
|
||||
(on (message (server-packet address (Err $detail)))
|
||||
(log-syndicate/distributed-error "Error from ~a: ~v" address detail)
|
||||
(stop-current-facet))
|
||||
|
||||
(during (observe ($ pat (from-server address $spec)))
|
||||
(define ep (next-ep))
|
||||
(on-start (w (Assert ep (observe spec))))
|
||||
|
|
|
@ -13,6 +13,11 @@
|
|||
(spawn #:name 'loopback-client-factory
|
||||
(during/spawn (server-connection ($ address (server-loopback-connection $scope)))
|
||||
#:name address
|
||||
(assert (server-poa address scope))
|
||||
(assert (server-poa address))
|
||||
(on (message (message-server->poa address $p)) (send! (server-packet address p)))
|
||||
(generic-client-session-facet address (lambda (x) (send! (message-poa->server address x))))))
|
||||
(on (asserted (observe (message-poa->server address _)))
|
||||
(react
|
||||
(generic-client-session-facet address
|
||||
scope
|
||||
(lambda (x)
|
||||
(send! (message-poa->server address x))))))))
|
||||
|
|
|
@ -10,18 +10,43 @@
|
|||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
|
||||
(assertion-struct server-tcp-connection (host port))
|
||||
(assertion-struct server-tcp-connection (host port scope))
|
||||
|
||||
(define standard-localhost-server/tcp (server-tcp-connection "localhost" 8001))
|
||||
(define (standard-localhost-server/tcp [scope "broker"])
|
||||
(server-tcp-connection "localhost" 8001 scope))
|
||||
|
||||
(spawn #:name 'tcp-client-factory
|
||||
(during/spawn (server-connection ($ address (server-tcp-connection $host $port)))
|
||||
(during/spawn (server-connection ($ address (server-tcp-connection $host $port $scope)))
|
||||
#:name address
|
||||
(define id (list (gensym 'client) host port))
|
||||
(reassert-on (tcp-connection id (tcp-address host port))
|
||||
(retracted (tcp-accepted id))
|
||||
(asserted (tcp-rejected id _)))
|
||||
(during (tcp-accepted id)
|
||||
(define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p)))))
|
||||
(on (message (tcp-in id $bs)) (accumulate! bs))
|
||||
(generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x))))))))
|
||||
(let boot-connection ()
|
||||
(define root-facet (current-facet))
|
||||
|
||||
(reassert-on (tcp-connection id (tcp-address host port))
|
||||
(retracted (tcp-accepted id))
|
||||
(asserted (tcp-rejected id _))
|
||||
(retracted (server-transport-connected address)))
|
||||
|
||||
(during (tcp-accepted id)
|
||||
(assert (server-transport-connected address))
|
||||
(define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p)))))
|
||||
(on (message (tcp-in id $bs)) (accumulate! bs)))
|
||||
|
||||
(during (server-transport-connected address)
|
||||
;; If we run generic-client-session-facet in the `tcp-accepted` handler above,
|
||||
;; then unfortunately disconnection of the TCP socket on error overtakes the error
|
||||
;; report itself, terminating the generic-client-session-facet before it has a
|
||||
;; chance to handle the error report.
|
||||
;;
|
||||
;; Could timing errors like that be something a type system could help us with?
|
||||
;; The conversation in `server-packet`s is sort-of "nested" inside the
|
||||
;; conversation in `tcp-in`s; a single facet reacting to both conversations (in
|
||||
;; this instance, to `server-packets` in an implicit frame, but explicitly to the
|
||||
;; frame of the `tcp-in`s, namely `tcp-accepted`) is probably an error. Or rather,
|
||||
;; any situation where pending "inner conversation" business could be obliterated
|
||||
;; by discarding a facet based on "outer conversation" framing is probably an
|
||||
;; error.
|
||||
;;
|
||||
(generic-client-session-facet address
|
||||
scope
|
||||
(lambda (x) (send! (tcp-out id (encode x)))))))))
|
||||
|
|
|
@ -42,10 +42,10 @@
|
|||
|
||||
(when (log-level? syndicate/distributed-logger 'debug)
|
||||
(spawn #:name 'server-debug
|
||||
(on (asserted (server-poa $id $scope))
|
||||
(log-syndicate/distributed-debug "+ ~v ~v" id scope))
|
||||
(on (retracted (server-poa $id $scope))
|
||||
(log-syndicate/distributed-debug "- ~v ~v" id scope))
|
||||
(on (asserted (server-poa $id))
|
||||
(log-syndicate/distributed-debug "+ ~v" id))
|
||||
(on (retracted (server-poa $id))
|
||||
(log-syndicate/distributed-debug "- ~v" id))
|
||||
(on (message (message-poa->server $id $p))
|
||||
(log-syndicate/distributed-debug "IN ~v ~v" id p))
|
||||
(on (message (message-server->poa $id $p))
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
(require racket/set)
|
||||
|
||||
;; Internal connection protocol
|
||||
(assertion-struct server-poa (connection-id scope)) ;; "Point of Attachment"
|
||||
(assertion-struct server-poa (connection-id)) ;; "Point of Attachment"
|
||||
(assertion-struct message-poa->server (connection-id body))
|
||||
(assertion-struct message-server->poa (connection-id body))
|
||||
|
||||
|
@ -21,44 +21,63 @@
|
|||
|
||||
(spawn #:name 'server-factory
|
||||
|
||||
(during/spawn (server-poa _ _)
|
||||
;; Previously, we just had server-envelope. Now, we have both
|
||||
;; server-envelope and server-proposal. While not everything
|
||||
;; decided is (locally) suggested, it is true that everything
|
||||
;; suggested is decided (in this implementation at least),
|
||||
;; and the following clause reflects this:
|
||||
(during (server-proposal $scope $assertion)
|
||||
(assert (server-envelope scope assertion))))
|
||||
;; Previously, we just had server-envelope. Now, we have both
|
||||
;; server-envelope and server-proposal. While not everything
|
||||
;; decided is (locally) suggested, it is true that everything
|
||||
;; suggested is decided (in this implementation at least),
|
||||
;; and the following clause reflects this:
|
||||
(during (server-proposal $scope $assertion)
|
||||
(assert (server-envelope scope assertion)))
|
||||
|
||||
(during/spawn (server-poa $id $scope)
|
||||
(define endpoints (set))
|
||||
(during/spawn (server-poa $id)
|
||||
(on (message (message-poa->server id $p))
|
||||
(match p
|
||||
[(Connect scope) (stop-current-facet (react (connected id scope)))]
|
||||
[(Peer scope) (stop-current-facet (react (peering id scope)))]
|
||||
[_ (send-error! id 'connection-not-setup)]))))
|
||||
|
||||
(on (message (message-poa->server id (Assert $ep $a)))
|
||||
(when (not (set-member? endpoints ep))
|
||||
(set! endpoints (set-add endpoints ep))
|
||||
(react
|
||||
(on-stop (set! endpoints (set-remove endpoints ep)))
|
||||
(define (send-error! id detail)
|
||||
(send! (message-server->poa id (Err detail))))
|
||||
|
||||
(field [assertion a])
|
||||
(define (unhandled-message id p)
|
||||
(match p
|
||||
[(Connect _) (send-error! id 'duplicate-connection-setup)]
|
||||
[(Peer _) (send-error! id 'duplicate-connection-setup)]
|
||||
[(Ping) (send! (message-server->poa id (Pong)))]
|
||||
[_ (send-error! id 'invalid-message)]))
|
||||
|
||||
(assert (server-proposal scope (assertion)))
|
||||
(define (connected id scope)
|
||||
(define endpoints (set))
|
||||
(on (message (message-poa->server id $p))
|
||||
(match p
|
||||
[(Assert ep a) #:when (not (set-member? endpoints ep))
|
||||
(set! endpoints (set-add endpoints ep))
|
||||
(react
|
||||
(on-stop (set! endpoints (set-remove endpoints ep)))
|
||||
|
||||
(let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs)))))))
|
||||
(add-observer-endpoint! (lambda ()
|
||||
(let ((a (assertion)))
|
||||
(when (observe? a)
|
||||
(server-envelope scope (observe-specification a)))))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))
|
||||
(field [assertion a])
|
||||
|
||||
(on (message (message-poa->server id (Assert ep $new-a)))
|
||||
(assertion new-a))
|
||||
(assert (server-proposal scope (assertion)))
|
||||
|
||||
(stop-when (message (message-poa->server id (Clear ep)))))))
|
||||
(let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs)))))))
|
||||
(add-observer-endpoint! (lambda ()
|
||||
(let ((a (assertion)))
|
||||
(when (observe? a)
|
||||
(server-envelope scope (observe-specification a)))))
|
||||
#:on-add (! Add)
|
||||
#:on-remove (! Del)
|
||||
#:on-message (! Msg)))
|
||||
|
||||
(on (message (message-poa->server id (Message $body)))
|
||||
(send! (server-envelope scope body)))
|
||||
(on (message (message-poa->server id (Assert ep $new-a)))
|
||||
(assertion new-a))
|
||||
|
||||
(on (message (message-poa->server id (Ping)))
|
||||
(send! (message-server->poa id (Pong))))))
|
||||
(stop-when (message (message-poa->server id (Clear ep)))))]
|
||||
[(Clear ep) #:when (set-member? endpoints ep)
|
||||
(void)] ;; handled by stop-when clause in facet established by Assert handler
|
||||
[(Message body)
|
||||
(send! (server-envelope scope body))]
|
||||
[other
|
||||
(unhandled-message id other)])))
|
||||
|
||||
(define (peering id scope)
|
||||
(error 'peering "Not yet implemented"))
|
||||
|
|
|
@ -9,20 +9,20 @@
|
|||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/distributed/server)
|
||||
|
||||
(define (server-facet/tcp id scope)
|
||||
(define (server-facet/tcp id)
|
||||
(assert (tcp-accepted id))
|
||||
(assert (server-poa id scope))
|
||||
(assert (server-poa id))
|
||||
(define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p)))))
|
||||
(on (message (tcp-in id $bs))
|
||||
(accumulate! bs))
|
||||
(on (message (message-server->poa id $p))
|
||||
(send! (tcp-out id (encode p)))))
|
||||
(send! (tcp-out id (encode p)))
|
||||
(when (Err? p) (stop-current-facet))))
|
||||
|
||||
(define default-tcp-server-port 8001)
|
||||
|
||||
(define (spawn-tcp-server! [port default-tcp-server-port])
|
||||
(spawn #:name 'tcp-server-listener
|
||||
(define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup
|
||||
(during/spawn (tcp-connection $id (tcp-listener port))
|
||||
#:name `(server-poa ,tcp-scope ,id)
|
||||
(server-facet/tcp id tcp-scope))))
|
||||
#:name `(server-poa ,id)
|
||||
(server-facet/tcp id))))
|
||||
|
|
|
@ -10,10 +10,10 @@
|
|||
(require/activate imperative-syndicate/drivers/timer)
|
||||
(require/activate imperative-syndicate/distributed/server)
|
||||
|
||||
(define (server-facet/websocket id scope)
|
||||
(define (server-facet/websocket id)
|
||||
(assert (http-accepted id))
|
||||
(assert (http-response-websocket id))
|
||||
(assert (server-poa id scope))
|
||||
(assert (server-poa id))
|
||||
|
||||
(field [ping-time-deadline 0])
|
||||
(on (asserted (later-than (ping-time-deadline)))
|
||||
|
@ -32,7 +32,6 @@
|
|||
|
||||
(define (spawn-websocket-server! [port default-http-server-port])
|
||||
(spawn #:name 'websocket-server-listener
|
||||
(during/spawn (http-request $id 'get (http-resource (http-server _ port #f)
|
||||
`(,$scope ())) _ _ _)
|
||||
#:name `(server-poa ,scope ,id)
|
||||
(server-facet/websocket id scope))))
|
||||
(during/spawn (http-request $id 'get (http-resource (http-server _ port #f) `()) _ _ _)
|
||||
#:name `(server-poa ,id)
|
||||
(server-facet/websocket id))))
|
||||
|
|
|
@ -6,6 +6,10 @@
|
|||
(require bitsyntax)
|
||||
(require (only-in net/rfc6455 ws-idle-timeout))
|
||||
|
||||
;; Enrolment
|
||||
(message-struct Connect (scope)) ;; Client --> Server
|
||||
(message-struct Peer (scope)) ;; Peer --> Peer
|
||||
|
||||
;; Actions; Client --> Server (and Peer --> Peer, except for Message)
|
||||
(message-struct Assert (endpoint-name assertion))
|
||||
(message-struct Clear (endpoint-name))
|
||||
|
@ -15,11 +19,22 @@
|
|||
(message-struct Add (endpoint-name captures))
|
||||
(message-struct Del (endpoint-name captures))
|
||||
(message-struct Msg (endpoint-name captures))
|
||||
(message-struct Err (detail))
|
||||
|
||||
;; Transport-related; Bidirectional
|
||||
(message-struct Ping ())
|
||||
(message-struct Pong ())
|
||||
|
||||
;; Peering
|
||||
;; =======
|
||||
;;
|
||||
;; To peer, send `(Peer Scope)` at the start of a connection instead
|
||||
;; of the usual `(Connect Scope)`.
|
||||
;;
|
||||
;; In peer mode, *actions* and *events* travel in *both* directions,
|
||||
;; but `Message`s do not appear and (for now) `Assert` is only used to
|
||||
;; establish `observe`s, i.e. subscriptions.
|
||||
|
||||
(define (decode bs)
|
||||
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
|
||||
(bit-string-case bs
|
||||
|
@ -44,6 +59,3 @@
|
|||
(handle-packet! packet)))
|
||||
(lambda (chunk)
|
||||
(buffer (bytes-append (buffer) chunk))))
|
||||
|
||||
;; Received packets from server are relayed via one of these.
|
||||
(message-struct server-packet (address packet))
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
(field [username (symbol->string (strong-gensym 'chatter-))])
|
||||
|
||||
(define root-facet (current-facet))
|
||||
(define url standard-localhost-server/tcp)
|
||||
(define url (standard-localhost-server/tcp))
|
||||
(during (server-connected url)
|
||||
(on-start (log-info "Connected to server."))
|
||||
(on-stop (log-info "Disconnected from server."))
|
||||
|
|
Loading…
Reference in New Issue