Restructure broker protocol adapters; add loopback

This commit is contained in:
Tony Garnock-Jones 2019-03-25 11:44:12 +00:00
parent 25970d9f16
commit a39bd458d9
10 changed files with 228 additions and 186 deletions

View File

@ -1,71 +1,46 @@
#lang imperative-syndicate
(provide standard-localhost-broker/tcp)
(provide generic-client-session-facet)
(require "wire-protocol.rkt")
(require "protocol.rkt")
(require imperative-syndicate/term)
(require imperative-syndicate/reassert)
(require/activate imperative-syndicate/drivers/tcp)
(define standard-localhost-broker/tcp "tcp://localhost:8001/")
(define-logger syndicate/broker)
(message-struct broker-packet (url packet))
(spawn #:name 'client-factory
(during (to-broker $u _) (assert (broker-connection u)))
(during (observe (from-broker $u _)) (assert (broker-connection u)))
(during (observe (broker-connected $u)) (assert (broker-connection u)))
(during (to-broker $a _) (assert (broker-connection a)))
(during (observe (from-broker $a _)) (assert (broker-connection a)))
(during (observe (broker-connected $a)) (assert (broker-connection a))))
(during/spawn (broker-connection $url)
#:name `(client-connection ,url)
(match url
[(pregexp #px"^tcp://([^:]+):([0-9]+)/?" (list _ host portstr))
(define port (string->number portstr))
(client-tcp-session-facet url host port)]
[else (error 'client-factory "Invalid server URL: ~v" url)])))
(define (generic-client-session-facet address w)
(on-start (log-syndicate/broker-info "Connected to ~v" address))
(on-stop (log-syndicate/broker-info "Disconnected from ~v" address))
(assert (broker-connected address))
(define (client-tcp-session-facet url host port)
(define id (list (gensym 'client) host port))
(reassert-on (tcp-connection id (tcp-address host port))
(retracted (tcp-accepted id))
(asserted (tcp-rejected id _)))
(during (tcp-accepted id)
(on-start (log-syndicate/broker-info "Connected to ~v" url))
(on-stop (log-syndicate/broker-info "Disconnected from ~v" url))
(assert (broker-connected url))
(define next-ep
(let ((counter 0))
(lambda ()
(begin0 counter
(set! counter (+ counter 1))))))
(define accumulate!
(packet-accumulator (lambda (p) (send! (broker-packet url p)))))
(on (message (tcp-in id $bs))
(accumulate! bs))
(during (to-broker address $a)
(define ep (next-ep))
(on-start (w (Assert ep a)))
(on-stop (w (Clear ep))))
(define (w x) (send! (tcp-out id (encode x))))
(on (message (to-broker address $a))
(w (Message a)))
(define next-ep
(let ((counter 0))
(lambda ()
(begin0 counter
(set! counter (+ counter 1))))))
(on (message (broker-packet address (Ping)))
(w (Pong)))
(during (to-broker url $a)
(define ep (next-ep))
(on-start (w (Assert ep a)))
(on-stop (w (Clear ep))))
(on (message (to-broker url $a)) (w (Message a)))
(on (message (broker-packet url (Ping))) (w (Pong)))
(during (observe (from-broker url $spec))
(define ep (next-ep))
(on-start (w (Assert ep (observe spec))))
(on-stop (w (Clear ep)))
(on (message (broker-packet url (Add ep $vs)))
(react (assert (instantiate-term->value (from-broker url spec) vs))
(stop-when (message (broker-packet url (Del ep vs))))))
(on (message (broker-packet url (Msg ep $vs)))
(send! (instantiate-term->value (from-broker url spec) vs))))))
(during (observe (from-broker address $spec))
(define ep (next-ep))
(on-start (w (Assert ep (observe spec))))
(on-stop (w (Clear ep)))
(on (message (broker-packet address (Add ep $vs)))
(react (assert (instantiate-term->value (from-broker address spec) vs))
(stop-when (message (broker-packet address (Del ep vs))))))
(on (message (broker-packet address (Msg ep $vs)))
(send! (instantiate-term->value (from-broker address spec) vs)))))

View File

@ -0,0 +1,18 @@
#lang imperative-syndicate
(provide (struct-out broker-loopback-connection))
(require "../client.rkt")
(require "../wire-protocol.rkt")
(require "../protocol.rkt")
(require/activate imperative-syndicate/broker/server)
(assertion-struct broker-loopback-connection (scope))
(spawn #:name 'loopback-client-factory
(during/spawn (broker-connection ($ address (broker-loopback-connection $scope)))
#:name address
(assert (server-connection address scope))
(on (message (server-outbound address $p)) (send! (broker-packet address p)))
(generic-client-session-facet address (lambda (x) (send! (server-inbound address x))))))

View File

@ -0,0 +1,27 @@
#lang imperative-syndicate
(provide standard-localhost-broker/tcp
(struct-out broker-tcp-connection))
(require "../client.rkt")
(require "../wire-protocol.rkt")
(require "../protocol.rkt")
(require imperative-syndicate/reassert)
(require/activate imperative-syndicate/drivers/tcp)
(assertion-struct broker-tcp-connection (host port))
(define standard-localhost-broker/tcp (broker-tcp-connection "localhost" 8001))
(spawn #:name 'tcp-client-factory
(during/spawn (broker-connection ($ address (broker-tcp-connection $host $port)))
#:name address
(define id (list (gensym 'client) host port))
(reassert-on (tcp-connection id (tcp-address host port))
(retracted (tcp-accepted id))
(asserted (tcp-rejected id _)))
(during (tcp-accepted id)
(define accumulate! (packet-accumulator (lambda (p) (send! (broker-packet address p)))))
(on (message (tcp-in id $bs)) (accumulate! bs))
(generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x))))))))

View File

@ -2,47 +2,51 @@
(provide (all-from-out "protocol.rkt")
(all-from-out "client.rkt")
(all-from-out "server.rkt"))
(all-from-out "client/tcp.rkt")
(all-from-out "client/loopback.rkt")
(all-from-out "server.rkt")
(all-from-out "server/tcp.rkt")
(all-from-out "server/websocket.rkt"))
(require "protocol.rkt")
(require/activate "client.rkt")
(require/activate "client/tcp.rkt")
(require/activate "client/loopback.rkt")
(require/activate "server.rkt")
(require/activate imperative-syndicate/drivers/tcp)
(require/activate imperative-syndicate/drivers/web)
(define *default-tcp-port* 8001)
(define *default-http-port* 8000)
(define (main #:tcp-port [tcp-port *default-tcp-port*]
#:http-port [http-port *default-http-port*])
(spawn #:name 'server-listener
(when tcp-port
(define tcp-scope "broker")
(during/spawn (tcp-connection $id (tcp-listener tcp-port))
#:name `(server-connection ,tcp-scope ,id)
(server-facet/tcp id tcp-scope)))
(when http-port
(during/spawn (http-request $id 'get (http-resource (http-server _ http-port #f)
`(,$scope ())) _ _ _)
#:name `(server-connection ,scope ,id)
(server-facet/websocket id scope)))))
(require/activate "server/tcp.rkt")
(require/activate "server/websocket.rkt")
(module+ main
(require racket/cmdline)
(define tcp-port *default-tcp-port*)
(define http-port *default-http-port*)
(define tcp-port default-tcp-broker-port)
(define http-port default-http-broker-port)
(command-line #:once-any
["--tcp" port
((format "Listen on plain TCP port (default ~a)" *default-tcp-port*))
((format "Listen on plain TCP port (default ~a)" default-tcp-broker-port))
(set! tcp-port (string->number port))]
["--no-tcp" "Do not listen on any plain TCP port"
(set! tcp-port #f)]
#:once-any
["--http" port
((format "Listen on websocket HTTP port (default ~a)" *default-http-port*))
((format "Listen on websocket HTTP port (default ~a)" default-http-broker-port))
(set! http-port (string->number port))]
["--no-http" "Do not listen on any websocket HTTP port"
(set! http-port #f)])
(extend-ground-boot! (lambda () (main #:tcp-port tcp-port
#:http-port http-port))))
(extend-ground-boot! (lambda ()
(when tcp-port (spawn-tcp-broker! tcp-port))
(when http-port (spawn-websocket-broker! http-port)))))
(define-logger syndicate/broker)
(when (log-level? syndicate/broker-logger 'debug)
(spawn #:name 'server-debug
(on (asserted (server-connection $id $scope))
(log-syndicate/broker-debug "C+ ~v ~v" id scope))
(on (retracted (server-connection $id $scope))
(log-syndicate/broker-debug "C- ~v ~v" id scope))
(on (message (server-inbound $id $p))
(log-syndicate/broker-debug "CIN ~v ~v" id p))
(on (message (server-outbound $id $p))
(log-syndicate/broker-debug "COUT ~v ~v" id p))))

View File

@ -3,8 +3,8 @@
(provide (all-defined-out))
;; Client protocol
(assertion-struct to-broker (url assertion))
(assertion-struct from-broker (url assertion))
(assertion-struct broker-connection (url))
(assertion-struct broker-connected (url))
(message-struct force-broker-disconnect (url))
(assertion-struct to-broker (address assertion))
(assertion-struct from-broker (address assertion))
(assertion-struct broker-connection (address))
(assertion-struct broker-connected (address))
(message-struct force-broker-disconnect (address))

View File

@ -1,59 +0,0 @@
#lang imperative-syndicate
(provide (struct-out server-connection)
(struct-out server-inbound)
(struct-out server-outbound)
(struct-out server-envelope))
(require "wire-protocol.rkt")
(require imperative-syndicate/term)
(require racket/set)
;; Internal connection protocol
(assertion-struct server-connection (connection-id scope))
(assertion-struct server-inbound (connection-id body))
(assertion-struct server-outbound (connection-id body))
;; Internal isolation
(assertion-struct server-envelope (scope body))
(spawn #:name 'server-connection-factory
(during/spawn (server-connection $id $scope)
(define endpoints (set))
(on (message (server-inbound id (Assert $ep $a)))
(when (not (set-member? endpoints ep))
(set! endpoints (set-add endpoints ep))
(react
(on-stop (set! endpoints (set-remove endpoints ep)))
(field [assertion a])
(define (recompute-endpoint)
(define a (assertion))
(if (observe? a)
(let* ((pattern (observe-specification a))
(spec (server-envelope scope pattern)))
(values (observe spec)
(term->skeleton-interest
spec
(capture-facet-context
(lambda (op . captured-values)
(schedule-script!
(current-actor)
(lambda ()
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
(send! (server-outbound id (ctor ep captured-values))))))))))
(values (server-envelope scope a) #f)))
(add-endpoint! (current-facet) "server" #t recompute-endpoint)
(on (message (server-inbound id (Assert ep $new-a)))
(assertion new-a))
(stop-when (message (server-inbound id (Clear ep)))))))
(on (message (server-inbound id (Message $body)))
(send! (server-envelope scope body)))
(on (message (server-inbound id (Ping)))
(send! (server-outbound id (Pong))))))

View File

@ -1,51 +1,59 @@
#lang imperative-syndicate
(provide server-facet/tcp
server-facet/websocket)
(provide (struct-out server-connection)
(struct-out server-inbound)
(struct-out server-outbound)
(struct-out server-envelope))
(require "wire-protocol.rkt")
(require imperative-syndicate/term)
(require racket/set)
(require/activate imperative-syndicate/drivers/tcp)
(require/activate imperative-syndicate/drivers/web)
(require/activate imperative-syndicate/drivers/timer)
(require/activate imperative-syndicate/broker/server-connection)
;; Internal connection protocol
(assertion-struct server-connection (connection-id scope))
(assertion-struct server-inbound (connection-id body))
(assertion-struct server-outbound (connection-id body))
(define-logger syndicate/broker)
;; Internal isolation
(assertion-struct server-envelope (scope body))
(define (server-facet/tcp id scope)
(assert (tcp-accepted id))
(assert (server-connection id scope))
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p)))))
(on (message (tcp-in id $bs))
(accumulate! bs))
(on (message (server-outbound id $p))
(send! (tcp-out id (encode p)))))
(spawn #:name 'server-connection-factory
(during/spawn (server-connection $id $scope)
(define endpoints (set))
(define (server-facet/websocket id scope)
(assert (http-accepted id))
(assert (http-response-websocket id))
(assert (server-connection id scope))
(on (message (server-inbound id (Assert $ep $a)))
(when (not (set-member? endpoints ep))
(set! endpoints (set-add endpoints ep))
(react
(on-stop (set! endpoints (set-remove endpoints ep)))
(field [ping-time-deadline 0])
(on (asserted (later-than (ping-time-deadline)))
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
(send! (server-outbound id (Ping))))
(field [assertion a])
(on (message (websocket-in id $body))
(define-values (packet remainder) (decode body))
(when (not (equal? remainder #""))
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
(send! (server-inbound id packet)))
(on (message (server-outbound id $p))
(send! (websocket-out id (encode p)))))
(define (recompute-endpoint)
(define a (assertion))
(if (observe? a)
(let* ((pattern (observe-specification a))
(spec (server-envelope scope pattern)))
(values (observe spec)
(term->skeleton-interest
spec
(capture-facet-context
(lambda (op . captured-values)
(schedule-script!
(current-actor)
(lambda ()
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
(send! (server-outbound id (ctor ep captured-values))))))))))
(values (server-envelope scope a) #f)))
(add-endpoint! (current-facet) "server" #t recompute-endpoint)
(when (log-level? syndicate/broker-logger 'debug)
(spawn #:name 'server-debug
(on (asserted (server-connection $id $scope))
(log-syndicate/broker-debug "C+ ~v ~v" id scope))
(on (retracted (server-connection $id $scope))
(log-syndicate/broker-debug "C- ~v ~v" id scope))
(on (message (server-inbound $id $p))
(log-syndicate/broker-debug "CIN ~v ~v" id p))
(on (message (server-outbound $id $p))
(log-syndicate/broker-debug "COUT ~v ~v" id p))))
(on (message (server-inbound id (Assert ep $new-a)))
(assertion new-a))
(stop-when (message (server-inbound id (Clear ep)))))))
(on (message (server-inbound id (Message $body)))
(send! (server-envelope scope body)))
(on (message (server-inbound id (Ping)))
(send! (server-outbound id (Pong))))))

View File

@ -0,0 +1,28 @@
#lang imperative-syndicate
(provide server-facet/tcp
default-tcp-broker-port
spawn-tcp-broker!)
(require "../wire-protocol.rkt")
(require/activate imperative-syndicate/drivers/tcp)
(require/activate imperative-syndicate/broker/server)
(define (server-facet/tcp id scope)
(assert (tcp-accepted id))
(assert (server-connection id scope))
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p)))))
(on (message (tcp-in id $bs))
(accumulate! bs))
(on (message (server-outbound id $p))
(send! (tcp-out id (encode p)))))
(define default-tcp-broker-port 8001)
(define (spawn-tcp-broker! [port default-tcp-broker-port])
(spawn #:name 'tcp-server-listener
(define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup
(during/spawn (tcp-connection $id (tcp-listener port))
#:name `(server-connection ,tcp-scope ,id)
(server-facet/tcp id tcp-scope))))

View File

@ -0,0 +1,38 @@
#lang imperative-syndicate
(provide server-facet/websocket
default-http-broker-port
spawn-websocket-broker!)
(require "../wire-protocol.rkt")
(require/activate imperative-syndicate/drivers/web)
(require/activate imperative-syndicate/drivers/timer)
(require/activate imperative-syndicate/broker/server)
(define (server-facet/websocket id scope)
(assert (http-accepted id))
(assert (http-response-websocket id))
(assert (server-connection id scope))
(field [ping-time-deadline 0])
(on (asserted (later-than (ping-time-deadline)))
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
(send! (server-outbound id (Ping))))
(on (message (websocket-in id $body))
(define-values (packet remainder) (decode body))
(when (not (equal? remainder #""))
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
(send! (server-inbound id packet)))
(on (message (server-outbound id $p))
(send! (websocket-out id (encode p)))))
(define default-http-broker-port 8000)
(define (spawn-websocket-broker! [port default-http-broker-port])
(spawn #:name 'websocket-server-listener
(during/spawn (http-request $id 'get (http-resource (http-server _ port #f)
`(,$scope ())) _ _ _)
#:name `(server-connection ,scope ,id)
(server-facet/websocket id scope))))

View File

@ -44,3 +44,6 @@
(handle-packet! packet)))
(lambda (chunk)
(buffer (bytes-append (buffer) chunk))))
;; Received packets from broker are relayed via one of these.
(message-struct broker-packet (address packet))