Restructure broker protocol adapters; add loopback
This commit is contained in:
parent
ab6f83a281
commit
f669f053ea
|
@ -1,49 +1,22 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide standard-localhost-broker/tcp)
|
||||
(provide generic-client-session-facet)
|
||||
|
||||
(require "wire-protocol.rkt")
|
||||
(require "protocol.rkt")
|
||||
(require imperative-syndicate/term)
|
||||
(require imperative-syndicate/reassert)
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
|
||||
(define standard-localhost-broker/tcp "tcp://localhost:8001/")
|
||||
|
||||
(define-logger syndicate/broker)
|
||||
|
||||
(message-struct broker-packet (url packet))
|
||||
|
||||
(spawn #:name 'client-factory
|
||||
(during (to-broker $u _) (assert (broker-connection u)))
|
||||
(during (observe (from-broker $u _)) (assert (broker-connection u)))
|
||||
(during (observe (broker-connected $u)) (assert (broker-connection u)))
|
||||
(during (to-broker $a _) (assert (broker-connection a)))
|
||||
(during (observe (from-broker $a _)) (assert (broker-connection a)))
|
||||
(during (observe (broker-connected $a)) (assert (broker-connection a))))
|
||||
|
||||
(during/spawn (broker-connection $url)
|
||||
#:name `(client-connection ,url)
|
||||
(match url
|
||||
[(pregexp #px"^tcp://([^:]+):([0-9]+)/?" (list _ host portstr))
|
||||
(define port (string->number portstr))
|
||||
(client-tcp-session-facet url host port)]
|
||||
[else (error 'client-factory "Invalid server URL: ~v" url)])))
|
||||
|
||||
(define (client-tcp-session-facet url host port)
|
||||
(define id (list (gensym 'client) host port))
|
||||
(reassert-on (tcp-connection id (tcp-address host port))
|
||||
(retracted (tcp-accepted id))
|
||||
(asserted (tcp-rejected id _)))
|
||||
(during (tcp-accepted id)
|
||||
(on-start (log-syndicate/broker-info "Connected to ~v" url))
|
||||
(on-stop (log-syndicate/broker-info "Disconnected from ~v" url))
|
||||
(assert (broker-connected url))
|
||||
|
||||
(define accumulate!
|
||||
(packet-accumulator (lambda (p) (send! (broker-packet url p)))))
|
||||
(on (message (tcp-in id $bs))
|
||||
(accumulate! bs))
|
||||
|
||||
(define (w x) (send! (tcp-out id (encode x))))
|
||||
(define (generic-client-session-facet address w)
|
||||
(on-start (log-syndicate/broker-info "Connected to ~v" address))
|
||||
(on-stop (log-syndicate/broker-info "Disconnected from ~v" address))
|
||||
(assert (broker-connected address))
|
||||
|
||||
(define next-ep
|
||||
(let ((counter 0))
|
||||
|
@ -51,21 +24,23 @@
|
|||
(begin0 counter
|
||||
(set! counter (+ counter 1))))))
|
||||
|
||||
(during (to-broker url $a)
|
||||
(during (to-broker address $a)
|
||||
(define ep (next-ep))
|
||||
(on-start (w (Assert ep a)))
|
||||
(on-stop (w (Clear ep))))
|
||||
|
||||
(on (message (to-broker url $a)) (w (Message a)))
|
||||
(on (message (to-broker address $a))
|
||||
(w (Message a)))
|
||||
|
||||
(on (message (broker-packet url (Ping))) (w (Pong)))
|
||||
(on (message (broker-packet address (Ping)))
|
||||
(w (Pong)))
|
||||
|
||||
(during (observe (from-broker url $spec))
|
||||
(during (observe (from-broker address $spec))
|
||||
(define ep (next-ep))
|
||||
(on-start (w (Assert ep (observe spec))))
|
||||
(on-stop (w (Clear ep)))
|
||||
(on (message (broker-packet url (Add ep $vs)))
|
||||
(react (assert (instantiate-term->value (from-broker url spec) vs))
|
||||
(stop-when (message (broker-packet url (Del ep vs))))))
|
||||
(on (message (broker-packet url (Msg ep $vs)))
|
||||
(send! (instantiate-term->value (from-broker url spec) vs))))))
|
||||
(on (message (broker-packet address (Add ep $vs)))
|
||||
(react (assert (instantiate-term->value (from-broker address spec) vs))
|
||||
(stop-when (message (broker-packet address (Del ep vs))))))
|
||||
(on (message (broker-packet address (Msg ep $vs)))
|
||||
(send! (instantiate-term->value (from-broker address spec) vs)))))
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (struct-out broker-loopback-connection))
|
||||
|
||||
(require "../client.rkt")
|
||||
(require "../wire-protocol.rkt")
|
||||
(require "../protocol.rkt")
|
||||
|
||||
(require/activate imperative-syndicate/broker/server)
|
||||
|
||||
(assertion-struct broker-loopback-connection (scope))
|
||||
|
||||
(spawn #:name 'loopback-client-factory
|
||||
(during/spawn (broker-connection ($ address (broker-loopback-connection $scope)))
|
||||
#:name address
|
||||
(assert (server-connection address scope))
|
||||
(on (message (server-outbound address $p)) (send! (broker-packet address p)))
|
||||
(generic-client-session-facet address (lambda (x) (send! (server-inbound address x))))))
|
|
@ -0,0 +1,27 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide standard-localhost-broker/tcp
|
||||
(struct-out broker-tcp-connection))
|
||||
|
||||
(require "../client.rkt")
|
||||
(require "../wire-protocol.rkt")
|
||||
(require "../protocol.rkt")
|
||||
(require imperative-syndicate/reassert)
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
|
||||
(assertion-struct broker-tcp-connection (host port))
|
||||
|
||||
(define standard-localhost-broker/tcp (broker-tcp-connection "localhost" 8001))
|
||||
|
||||
(spawn #:name 'tcp-client-factory
|
||||
(during/spawn (broker-connection ($ address (broker-tcp-connection $host $port)))
|
||||
#:name address
|
||||
(define id (list (gensym 'client) host port))
|
||||
(reassert-on (tcp-connection id (tcp-address host port))
|
||||
(retracted (tcp-accepted id))
|
||||
(asserted (tcp-rejected id _)))
|
||||
(during (tcp-accepted id)
|
||||
(define accumulate! (packet-accumulator (lambda (p) (send! (broker-packet address p)))))
|
||||
(on (message (tcp-in id $bs)) (accumulate! bs))
|
||||
(generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x))))))))
|
|
@ -2,47 +2,51 @@
|
|||
|
||||
(provide (all-from-out "protocol.rkt")
|
||||
(all-from-out "client.rkt")
|
||||
(all-from-out "server.rkt"))
|
||||
(all-from-out "client/tcp.rkt")
|
||||
(all-from-out "client/loopback.rkt")
|
||||
(all-from-out "server.rkt")
|
||||
(all-from-out "server/tcp.rkt")
|
||||
(all-from-out "server/websocket.rkt"))
|
||||
|
||||
(require "protocol.rkt")
|
||||
|
||||
(require/activate "client.rkt")
|
||||
(require/activate "client/tcp.rkt")
|
||||
(require/activate "client/loopback.rkt")
|
||||
|
||||
(require/activate "server.rkt")
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/drivers/web)
|
||||
|
||||
(define *default-tcp-port* 8001)
|
||||
(define *default-http-port* 8000)
|
||||
|
||||
(define (main #:tcp-port [tcp-port *default-tcp-port*]
|
||||
#:http-port [http-port *default-http-port*])
|
||||
(spawn #:name 'server-listener
|
||||
(when tcp-port
|
||||
(define tcp-scope "broker")
|
||||
(during/spawn (tcp-connection $id (tcp-listener tcp-port))
|
||||
#:name `(server-connection ,tcp-scope ,id)
|
||||
(server-facet/tcp id tcp-scope)))
|
||||
(when http-port
|
||||
(during/spawn (http-request $id 'get (http-resource (http-server _ http-port #f)
|
||||
`(,$scope ())) _ _ _)
|
||||
#:name `(server-connection ,scope ,id)
|
||||
(server-facet/websocket id scope)))))
|
||||
(require/activate "server/tcp.rkt")
|
||||
(require/activate "server/websocket.rkt")
|
||||
|
||||
(module+ main
|
||||
(require racket/cmdline)
|
||||
(define tcp-port *default-tcp-port*)
|
||||
(define http-port *default-http-port*)
|
||||
(define tcp-port default-tcp-broker-port)
|
||||
(define http-port default-http-broker-port)
|
||||
(command-line #:once-any
|
||||
["--tcp" port
|
||||
((format "Listen on plain TCP port (default ~a)" *default-tcp-port*))
|
||||
((format "Listen on plain TCP port (default ~a)" default-tcp-broker-port))
|
||||
(set! tcp-port (string->number port))]
|
||||
["--no-tcp" "Do not listen on any plain TCP port"
|
||||
(set! tcp-port #f)]
|
||||
#:once-any
|
||||
["--http" port
|
||||
((format "Listen on websocket HTTP port (default ~a)" *default-http-port*))
|
||||
((format "Listen on websocket HTTP port (default ~a)" default-http-broker-port))
|
||||
(set! http-port (string->number port))]
|
||||
["--no-http" "Do not listen on any websocket HTTP port"
|
||||
(set! http-port #f)])
|
||||
(extend-ground-boot! (lambda () (main #:tcp-port tcp-port
|
||||
#:http-port http-port))))
|
||||
(extend-ground-boot! (lambda ()
|
||||
(when tcp-port (spawn-tcp-broker! tcp-port))
|
||||
(when http-port (spawn-websocket-broker! http-port)))))
|
||||
|
||||
(define-logger syndicate/broker)
|
||||
|
||||
(when (log-level? syndicate/broker-logger 'debug)
|
||||
(spawn #:name 'server-debug
|
||||
(on (asserted (server-connection $id $scope))
|
||||
(log-syndicate/broker-debug "C+ ~v ~v" id scope))
|
||||
(on (retracted (server-connection $id $scope))
|
||||
(log-syndicate/broker-debug "C- ~v ~v" id scope))
|
||||
(on (message (server-inbound $id $p))
|
||||
(log-syndicate/broker-debug "CIN ~v ~v" id p))
|
||||
(on (message (server-outbound $id $p))
|
||||
(log-syndicate/broker-debug "COUT ~v ~v" id p))))
|
||||
|
|
|
@ -3,8 +3,8 @@
|
|||
(provide (all-defined-out))
|
||||
|
||||
;; Client protocol
|
||||
(assertion-struct to-broker (url assertion))
|
||||
(assertion-struct from-broker (url assertion))
|
||||
(assertion-struct broker-connection (url))
|
||||
(assertion-struct broker-connected (url))
|
||||
(message-struct force-broker-disconnect (url))
|
||||
(assertion-struct to-broker (address assertion))
|
||||
(assertion-struct from-broker (address assertion))
|
||||
(assertion-struct broker-connection (address))
|
||||
(assertion-struct broker-connected (address))
|
||||
(message-struct force-broker-disconnect (address))
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (struct-out server-connection)
|
||||
(struct-out server-inbound)
|
||||
(struct-out server-outbound)
|
||||
(struct-out server-envelope))
|
||||
|
||||
(require "wire-protocol.rkt")
|
||||
(require imperative-syndicate/term)
|
||||
(require racket/set)
|
||||
|
||||
;; Internal connection protocol
|
||||
(assertion-struct server-connection (connection-id scope))
|
||||
(assertion-struct server-inbound (connection-id body))
|
||||
(assertion-struct server-outbound (connection-id body))
|
||||
|
||||
;; Internal isolation
|
||||
(assertion-struct server-envelope (scope body))
|
||||
|
||||
(spawn #:name 'server-connection-factory
|
||||
(during/spawn (server-connection $id $scope)
|
||||
(define endpoints (set))
|
||||
|
||||
(on (message (server-inbound id (Assert $ep $a)))
|
||||
(when (not (set-member? endpoints ep))
|
||||
(set! endpoints (set-add endpoints ep))
|
||||
(react
|
||||
(on-stop (set! endpoints (set-remove endpoints ep)))
|
||||
|
||||
(field [assertion a])
|
||||
|
||||
(define (recompute-endpoint)
|
||||
(define a (assertion))
|
||||
(if (observe? a)
|
||||
(let* ((pattern (observe-specification a))
|
||||
(spec (server-envelope scope pattern)))
|
||||
(values (observe spec)
|
||||
(term->skeleton-interest
|
||||
spec
|
||||
(capture-facet-context
|
||||
(lambda (op . captured-values)
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
|
||||
(send! (server-outbound id (ctor ep captured-values))))))))))
|
||||
(values (server-envelope scope a) #f)))
|
||||
(add-endpoint! (current-facet) "server" #t recompute-endpoint)
|
||||
|
||||
(on (message (server-inbound id (Assert ep $new-a)))
|
||||
(assertion new-a))
|
||||
|
||||
(stop-when (message (server-inbound id (Clear ep)))))))
|
||||
|
||||
(on (message (server-inbound id (Message $body)))
|
||||
(send! (server-envelope scope body)))
|
||||
|
||||
(on (message (server-inbound id (Ping)))
|
||||
(send! (server-outbound id (Pong))))))
|
|
@ -1,51 +1,59 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide server-facet/tcp
|
||||
server-facet/websocket)
|
||||
(provide (struct-out server-connection)
|
||||
(struct-out server-inbound)
|
||||
(struct-out server-outbound)
|
||||
(struct-out server-envelope))
|
||||
|
||||
(require "wire-protocol.rkt")
|
||||
(require imperative-syndicate/term)
|
||||
(require racket/set)
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/drivers/web)
|
||||
(require/activate imperative-syndicate/drivers/timer)
|
||||
(require/activate imperative-syndicate/broker/server-connection)
|
||||
;; Internal connection protocol
|
||||
(assertion-struct server-connection (connection-id scope))
|
||||
(assertion-struct server-inbound (connection-id body))
|
||||
(assertion-struct server-outbound (connection-id body))
|
||||
|
||||
(define-logger syndicate/broker)
|
||||
;; Internal isolation
|
||||
(assertion-struct server-envelope (scope body))
|
||||
|
||||
(define (server-facet/tcp id scope)
|
||||
(assert (tcp-accepted id))
|
||||
(assert (server-connection id scope))
|
||||
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p)))))
|
||||
(on (message (tcp-in id $bs))
|
||||
(accumulate! bs))
|
||||
(on (message (server-outbound id $p))
|
||||
(send! (tcp-out id (encode p)))))
|
||||
(spawn #:name 'server-connection-factory
|
||||
(during/spawn (server-connection $id $scope)
|
||||
(define endpoints (set))
|
||||
|
||||
(define (server-facet/websocket id scope)
|
||||
(assert (http-accepted id))
|
||||
(assert (http-response-websocket id))
|
||||
(assert (server-connection id scope))
|
||||
(on (message (server-inbound id (Assert $ep $a)))
|
||||
(when (not (set-member? endpoints ep))
|
||||
(set! endpoints (set-add endpoints ep))
|
||||
(react
|
||||
(on-stop (set! endpoints (set-remove endpoints ep)))
|
||||
|
||||
(field [ping-time-deadline 0])
|
||||
(on (asserted (later-than (ping-time-deadline)))
|
||||
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
|
||||
(send! (server-outbound id (Ping))))
|
||||
(field [assertion a])
|
||||
|
||||
(on (message (websocket-in id $body))
|
||||
(define-values (packet remainder) (decode body))
|
||||
(when (not (equal? remainder #""))
|
||||
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
|
||||
(send! (server-inbound id packet)))
|
||||
(on (message (server-outbound id $p))
|
||||
(send! (websocket-out id (encode p)))))
|
||||
(define (recompute-endpoint)
|
||||
(define a (assertion))
|
||||
(if (observe? a)
|
||||
(let* ((pattern (observe-specification a))
|
||||
(spec (server-envelope scope pattern)))
|
||||
(values (observe spec)
|
||||
(term->skeleton-interest
|
||||
spec
|
||||
(capture-facet-context
|
||||
(lambda (op . captured-values)
|
||||
(schedule-script!
|
||||
(current-actor)
|
||||
(lambda ()
|
||||
(define ctor (match op ['+ Add] ['- Del] ['! Msg]))
|
||||
(send! (server-outbound id (ctor ep captured-values))))))))))
|
||||
(values (server-envelope scope a) #f)))
|
||||
(add-endpoint! (current-facet) "server" #t recompute-endpoint)
|
||||
|
||||
(when (log-level? syndicate/broker-logger 'debug)
|
||||
(spawn #:name 'server-debug
|
||||
(on (asserted (server-connection $id $scope))
|
||||
(log-syndicate/broker-debug "C+ ~v ~v" id scope))
|
||||
(on (retracted (server-connection $id $scope))
|
||||
(log-syndicate/broker-debug "C- ~v ~v" id scope))
|
||||
(on (message (server-inbound $id $p))
|
||||
(log-syndicate/broker-debug "CIN ~v ~v" id p))
|
||||
(on (message (server-outbound $id $p))
|
||||
(log-syndicate/broker-debug "COUT ~v ~v" id p))))
|
||||
(on (message (server-inbound id (Assert ep $new-a)))
|
||||
(assertion new-a))
|
||||
|
||||
(stop-when (message (server-inbound id (Clear ep)))))))
|
||||
|
||||
(on (message (server-inbound id (Message $body)))
|
||||
(send! (server-envelope scope body)))
|
||||
|
||||
(on (message (server-inbound id (Ping)))
|
||||
(send! (server-outbound id (Pong))))))
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide server-facet/tcp
|
||||
default-tcp-broker-port
|
||||
spawn-tcp-broker!)
|
||||
|
||||
(require "../wire-protocol.rkt")
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/broker/server)
|
||||
|
||||
(define (server-facet/tcp id scope)
|
||||
(assert (tcp-accepted id))
|
||||
(assert (server-connection id scope))
|
||||
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p)))))
|
||||
(on (message (tcp-in id $bs))
|
||||
(accumulate! bs))
|
||||
(on (message (server-outbound id $p))
|
||||
(send! (tcp-out id (encode p)))))
|
||||
|
||||
(define default-tcp-broker-port 8001)
|
||||
|
||||
(define (spawn-tcp-broker! [port default-tcp-broker-port])
|
||||
(spawn #:name 'tcp-server-listener
|
||||
(define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup
|
||||
(during/spawn (tcp-connection $id (tcp-listener port))
|
||||
#:name `(server-connection ,tcp-scope ,id)
|
||||
(server-facet/tcp id tcp-scope))))
|
|
@ -0,0 +1,38 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide server-facet/websocket
|
||||
default-http-broker-port
|
||||
spawn-websocket-broker!)
|
||||
|
||||
(require "../wire-protocol.rkt")
|
||||
|
||||
(require/activate imperative-syndicate/drivers/web)
|
||||
(require/activate imperative-syndicate/drivers/timer)
|
||||
(require/activate imperative-syndicate/broker/server)
|
||||
|
||||
(define (server-facet/websocket id scope)
|
||||
(assert (http-accepted id))
|
||||
(assert (http-response-websocket id))
|
||||
(assert (server-connection id scope))
|
||||
|
||||
(field [ping-time-deadline 0])
|
||||
(on (asserted (later-than (ping-time-deadline)))
|
||||
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
|
||||
(send! (server-outbound id (Ping))))
|
||||
|
||||
(on (message (websocket-in id $body))
|
||||
(define-values (packet remainder) (decode body))
|
||||
(when (not (equal? remainder #""))
|
||||
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
|
||||
(send! (server-inbound id packet)))
|
||||
(on (message (server-outbound id $p))
|
||||
(send! (websocket-out id (encode p)))))
|
||||
|
||||
(define default-http-broker-port 8000)
|
||||
|
||||
(define (spawn-websocket-broker! [port default-http-broker-port])
|
||||
(spawn #:name 'websocket-server-listener
|
||||
(during/spawn (http-request $id 'get (http-resource (http-server _ port #f)
|
||||
`(,$scope ())) _ _ _)
|
||||
#:name `(server-connection ,scope ,id)
|
||||
(server-facet/websocket id scope))))
|
|
@ -44,3 +44,6 @@
|
|||
(handle-packet! packet)))
|
||||
(lambda (chunk)
|
||||
(buffer (bytes-append (buffer) chunk))))
|
||||
|
||||
;; Received packets from broker are relayed via one of these.
|
||||
(message-struct broker-packet (address packet))
|
||||
|
|
Loading…
Reference in New Issue