diff --git a/syndicate/broker/client.rkt b/syndicate/broker/client.rkt index 0c8fd7d..c207294 100644 --- a/syndicate/broker/client.rkt +++ b/syndicate/broker/client.rkt @@ -1,71 +1,46 @@ #lang imperative-syndicate -(provide standard-localhost-broker/tcp) +(provide generic-client-session-facet) (require "wire-protocol.rkt") (require "protocol.rkt") (require imperative-syndicate/term) -(require imperative-syndicate/reassert) - -(require/activate imperative-syndicate/drivers/tcp) - -(define standard-localhost-broker/tcp "tcp://localhost:8001/") (define-logger syndicate/broker) -(message-struct broker-packet (url packet)) - (spawn #:name 'client-factory - (during (to-broker $u _) (assert (broker-connection u))) - (during (observe (from-broker $u _)) (assert (broker-connection u))) - (during (observe (broker-connected $u)) (assert (broker-connection u))) + (during (to-broker $a _) (assert (broker-connection a))) + (during (observe (from-broker $a _)) (assert (broker-connection a))) + (during (observe (broker-connected $a)) (assert (broker-connection a)))) - (during/spawn (broker-connection $url) - #:name `(client-connection ,url) - (match url - [(pregexp #px"^tcp://([^:]+):([0-9]+)/?" (list _ host portstr)) - (define port (string->number portstr)) - (client-tcp-session-facet url host port)] - [else (error 'client-factory "Invalid server URL: ~v" url)]))) +(define (generic-client-session-facet address w) + (on-start (log-syndicate/broker-info "Connected to ~v" address)) + (on-stop (log-syndicate/broker-info "Disconnected from ~v" address)) + (assert (broker-connected address)) -(define (client-tcp-session-facet url host port) - (define id (list (gensym 'client) host port)) - (reassert-on (tcp-connection id (tcp-address host port)) - (retracted (tcp-accepted id)) - (asserted (tcp-rejected id _))) - (during (tcp-accepted id) - (on-start (log-syndicate/broker-info "Connected to ~v" url)) - (on-stop (log-syndicate/broker-info "Disconnected from ~v" url)) - (assert (broker-connected url)) + (define next-ep + (let ((counter 0)) + (lambda () + (begin0 counter + (set! counter (+ counter 1)))))) - (define accumulate! - (packet-accumulator (lambda (p) (send! (broker-packet url p))))) - (on (message (tcp-in id $bs)) - (accumulate! bs)) + (during (to-broker address $a) + (define ep (next-ep)) + (on-start (w (Assert ep a))) + (on-stop (w (Clear ep)))) - (define (w x) (send! (tcp-out id (encode x)))) + (on (message (to-broker address $a)) + (w (Message a))) - (define next-ep - (let ((counter 0)) - (lambda () - (begin0 counter - (set! counter (+ counter 1)))))) + (on (message (broker-packet address (Ping))) + (w (Pong))) - (during (to-broker url $a) - (define ep (next-ep)) - (on-start (w (Assert ep a))) - (on-stop (w (Clear ep)))) - - (on (message (to-broker url $a)) (w (Message a))) - - (on (message (broker-packet url (Ping))) (w (Pong))) - - (during (observe (from-broker url $spec)) - (define ep (next-ep)) - (on-start (w (Assert ep (observe spec)))) - (on-stop (w (Clear ep))) - (on (message (broker-packet url (Add ep $vs))) - (react (assert (instantiate-term->value (from-broker url spec) vs)) - (stop-when (message (broker-packet url (Del ep vs)))))) - (on (message (broker-packet url (Msg ep $vs))) - (send! (instantiate-term->value (from-broker url spec) vs)))))) + (during (observe (from-broker address $spec)) + (define ep (next-ep)) + (on-start (w (Assert ep (observe spec)))) + (on-stop (w (Clear ep))) + (on (message (broker-packet address (Add ep $vs))) + (react (assert (instantiate-term->value (from-broker address spec) vs)) + (stop-when (message (broker-packet address (Del ep vs)))))) + (on (message (broker-packet address (Msg ep $vs))) + (send! (instantiate-term->value (from-broker address spec) vs))))) diff --git a/syndicate/broker/client/loopback.rkt b/syndicate/broker/client/loopback.rkt new file mode 100644 index 0000000..45ccea7 --- /dev/null +++ b/syndicate/broker/client/loopback.rkt @@ -0,0 +1,18 @@ +#lang imperative-syndicate + +(provide (struct-out broker-loopback-connection)) + +(require "../client.rkt") +(require "../wire-protocol.rkt") +(require "../protocol.rkt") + +(require/activate imperative-syndicate/broker/server) + +(assertion-struct broker-loopback-connection (scope)) + +(spawn #:name 'loopback-client-factory + (during/spawn (broker-connection ($ address (broker-loopback-connection $scope))) + #:name address + (assert (server-connection address scope)) + (on (message (server-outbound address $p)) (send! (broker-packet address p))) + (generic-client-session-facet address (lambda (x) (send! (server-inbound address x)))))) diff --git a/syndicate/broker/client/tcp.rkt b/syndicate/broker/client/tcp.rkt new file mode 100644 index 0000000..53fb503 --- /dev/null +++ b/syndicate/broker/client/tcp.rkt @@ -0,0 +1,27 @@ +#lang imperative-syndicate + +(provide standard-localhost-broker/tcp + (struct-out broker-tcp-connection)) + +(require "../client.rkt") +(require "../wire-protocol.rkt") +(require "../protocol.rkt") +(require imperative-syndicate/reassert) + +(require/activate imperative-syndicate/drivers/tcp) + +(assertion-struct broker-tcp-connection (host port)) + +(define standard-localhost-broker/tcp (broker-tcp-connection "localhost" 8001)) + +(spawn #:name 'tcp-client-factory + (during/spawn (broker-connection ($ address (broker-tcp-connection $host $port))) + #:name address + (define id (list (gensym 'client) host port)) + (reassert-on (tcp-connection id (tcp-address host port)) + (retracted (tcp-accepted id)) + (asserted (tcp-rejected id _))) + (during (tcp-accepted id) + (define accumulate! (packet-accumulator (lambda (p) (send! (broker-packet address p))))) + (on (message (tcp-in id $bs)) (accumulate! bs)) + (generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x)))))))) diff --git a/syndicate/broker/main.rkt b/syndicate/broker/main.rkt index 02000c6..351aaa6 100644 --- a/syndicate/broker/main.rkt +++ b/syndicate/broker/main.rkt @@ -2,47 +2,51 @@ (provide (all-from-out "protocol.rkt") (all-from-out "client.rkt") - (all-from-out "server.rkt")) + (all-from-out "client/tcp.rkt") + (all-from-out "client/loopback.rkt") + (all-from-out "server.rkt") + (all-from-out "server/tcp.rkt") + (all-from-out "server/websocket.rkt")) (require "protocol.rkt") + (require/activate "client.rkt") +(require/activate "client/tcp.rkt") +(require/activate "client/loopback.rkt") + (require/activate "server.rkt") - -(require/activate imperative-syndicate/drivers/tcp) -(require/activate imperative-syndicate/drivers/web) - -(define *default-tcp-port* 8001) -(define *default-http-port* 8000) - -(define (main #:tcp-port [tcp-port *default-tcp-port*] - #:http-port [http-port *default-http-port*]) - (spawn #:name 'server-listener - (when tcp-port - (define tcp-scope "broker") - (during/spawn (tcp-connection $id (tcp-listener tcp-port)) - #:name `(server-connection ,tcp-scope ,id) - (server-facet/tcp id tcp-scope))) - (when http-port - (during/spawn (http-request $id 'get (http-resource (http-server _ http-port #f) - `(,$scope ())) _ _ _) - #:name `(server-connection ,scope ,id) - (server-facet/websocket id scope))))) +(require/activate "server/tcp.rkt") +(require/activate "server/websocket.rkt") (module+ main (require racket/cmdline) - (define tcp-port *default-tcp-port*) - (define http-port *default-http-port*) + (define tcp-port default-tcp-broker-port) + (define http-port default-http-broker-port) (command-line #:once-any ["--tcp" port - ((format "Listen on plain TCP port (default ~a)" *default-tcp-port*)) + ((format "Listen on plain TCP port (default ~a)" default-tcp-broker-port)) (set! tcp-port (string->number port))] ["--no-tcp" "Do not listen on any plain TCP port" (set! tcp-port #f)] #:once-any ["--http" port - ((format "Listen on websocket HTTP port (default ~a)" *default-http-port*)) + ((format "Listen on websocket HTTP port (default ~a)" default-http-broker-port)) (set! http-port (string->number port))] ["--no-http" "Do not listen on any websocket HTTP port" (set! http-port #f)]) - (extend-ground-boot! (lambda () (main #:tcp-port tcp-port - #:http-port http-port)))) + (extend-ground-boot! (lambda () + (when tcp-port (spawn-tcp-broker! tcp-port)) + (when http-port (spawn-websocket-broker! http-port))))) + +(define-logger syndicate/broker) + +(when (log-level? syndicate/broker-logger 'debug) + (spawn #:name 'server-debug + (on (asserted (server-connection $id $scope)) + (log-syndicate/broker-debug "C+ ~v ~v" id scope)) + (on (retracted (server-connection $id $scope)) + (log-syndicate/broker-debug "C- ~v ~v" id scope)) + (on (message (server-inbound $id $p)) + (log-syndicate/broker-debug "CIN ~v ~v" id p)) + (on (message (server-outbound $id $p)) + (log-syndicate/broker-debug "COUT ~v ~v" id p)))) diff --git a/syndicate/broker/protocol.rkt b/syndicate/broker/protocol.rkt index 3b2a0a9..1fdf3aa 100644 --- a/syndicate/broker/protocol.rkt +++ b/syndicate/broker/protocol.rkt @@ -3,8 +3,8 @@ (provide (all-defined-out)) ;; Client protocol -(assertion-struct to-broker (url assertion)) -(assertion-struct from-broker (url assertion)) -(assertion-struct broker-connection (url)) -(assertion-struct broker-connected (url)) -(message-struct force-broker-disconnect (url)) +(assertion-struct to-broker (address assertion)) +(assertion-struct from-broker (address assertion)) +(assertion-struct broker-connection (address)) +(assertion-struct broker-connected (address)) +(message-struct force-broker-disconnect (address)) diff --git a/syndicate/broker/server-connection.rkt b/syndicate/broker/server-connection.rkt deleted file mode 100644 index 797e67b..0000000 --- a/syndicate/broker/server-connection.rkt +++ /dev/null @@ -1,59 +0,0 @@ -#lang imperative-syndicate - -(provide (struct-out server-connection) - (struct-out server-inbound) - (struct-out server-outbound) - (struct-out server-envelope)) - -(require "wire-protocol.rkt") -(require imperative-syndicate/term) -(require racket/set) - -;; Internal connection protocol -(assertion-struct server-connection (connection-id scope)) -(assertion-struct server-inbound (connection-id body)) -(assertion-struct server-outbound (connection-id body)) - -;; Internal isolation -(assertion-struct server-envelope (scope body)) - -(spawn #:name 'server-connection-factory - (during/spawn (server-connection $id $scope) - (define endpoints (set)) - - (on (message (server-inbound id (Assert $ep $a))) - (when (not (set-member? endpoints ep)) - (set! endpoints (set-add endpoints ep)) - (react - (on-stop (set! endpoints (set-remove endpoints ep))) - - (field [assertion a]) - - (define (recompute-endpoint) - (define a (assertion)) - (if (observe? a) - (let* ((pattern (observe-specification a)) - (spec (server-envelope scope pattern))) - (values (observe spec) - (term->skeleton-interest - spec - (capture-facet-context - (lambda (op . captured-values) - (schedule-script! - (current-actor) - (lambda () - (define ctor (match op ['+ Add] ['- Del] ['! Msg])) - (send! (server-outbound id (ctor ep captured-values)))))))))) - (values (server-envelope scope a) #f))) - (add-endpoint! (current-facet) "server" #t recompute-endpoint) - - (on (message (server-inbound id (Assert ep $new-a))) - (assertion new-a)) - - (stop-when (message (server-inbound id (Clear ep))))))) - - (on (message (server-inbound id (Message $body))) - (send! (server-envelope scope body))) - - (on (message (server-inbound id (Ping))) - (send! (server-outbound id (Pong)))))) diff --git a/syndicate/broker/server.rkt b/syndicate/broker/server.rkt index 3d54ce2..797e67b 100644 --- a/syndicate/broker/server.rkt +++ b/syndicate/broker/server.rkt @@ -1,51 +1,59 @@ #lang imperative-syndicate -(provide server-facet/tcp - server-facet/websocket) +(provide (struct-out server-connection) + (struct-out server-inbound) + (struct-out server-outbound) + (struct-out server-envelope)) (require "wire-protocol.rkt") +(require imperative-syndicate/term) +(require racket/set) -(require/activate imperative-syndicate/drivers/tcp) -(require/activate imperative-syndicate/drivers/web) -(require/activate imperative-syndicate/drivers/timer) -(require/activate imperative-syndicate/broker/server-connection) +;; Internal connection protocol +(assertion-struct server-connection (connection-id scope)) +(assertion-struct server-inbound (connection-id body)) +(assertion-struct server-outbound (connection-id body)) -(define-logger syndicate/broker) +;; Internal isolation +(assertion-struct server-envelope (scope body)) -(define (server-facet/tcp id scope) - (assert (tcp-accepted id)) - (assert (server-connection id scope)) - (define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p))))) - (on (message (tcp-in id $bs)) - (accumulate! bs)) - (on (message (server-outbound id $p)) - (send! (tcp-out id (encode p))))) +(spawn #:name 'server-connection-factory + (during/spawn (server-connection $id $scope) + (define endpoints (set)) -(define (server-facet/websocket id scope) - (assert (http-accepted id)) - (assert (http-response-websocket id)) - (assert (server-connection id scope)) + (on (message (server-inbound id (Assert $ep $a))) + (when (not (set-member? endpoints ep)) + (set! endpoints (set-add endpoints ep)) + (react + (on-stop (set! endpoints (set-remove endpoints ep))) - (field [ping-time-deadline 0]) - (on (asserted (later-than (ping-time-deadline))) - (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) - (send! (server-outbound id (Ping)))) + (field [assertion a]) - (on (message (websocket-in id $body)) - (define-values (packet remainder) (decode body)) - (when (not (equal? remainder #"")) - (error 'server-facet/websocket "Multiple packets in a single websocket message")) - (send! (server-inbound id packet))) - (on (message (server-outbound id $p)) - (send! (websocket-out id (encode p))))) + (define (recompute-endpoint) + (define a (assertion)) + (if (observe? a) + (let* ((pattern (observe-specification a)) + (spec (server-envelope scope pattern))) + (values (observe spec) + (term->skeleton-interest + spec + (capture-facet-context + (lambda (op . captured-values) + (schedule-script! + (current-actor) + (lambda () + (define ctor (match op ['+ Add] ['- Del] ['! Msg])) + (send! (server-outbound id (ctor ep captured-values)))))))))) + (values (server-envelope scope a) #f))) + (add-endpoint! (current-facet) "server" #t recompute-endpoint) -(when (log-level? syndicate/broker-logger 'debug) - (spawn #:name 'server-debug - (on (asserted (server-connection $id $scope)) - (log-syndicate/broker-debug "C+ ~v ~v" id scope)) - (on (retracted (server-connection $id $scope)) - (log-syndicate/broker-debug "C- ~v ~v" id scope)) - (on (message (server-inbound $id $p)) - (log-syndicate/broker-debug "CIN ~v ~v" id p)) - (on (message (server-outbound $id $p)) - (log-syndicate/broker-debug "COUT ~v ~v" id p)))) + (on (message (server-inbound id (Assert ep $new-a))) + (assertion new-a)) + + (stop-when (message (server-inbound id (Clear ep))))))) + + (on (message (server-inbound id (Message $body))) + (send! (server-envelope scope body))) + + (on (message (server-inbound id (Ping))) + (send! (server-outbound id (Pong)))))) diff --git a/syndicate/broker/server/tcp.rkt b/syndicate/broker/server/tcp.rkt new file mode 100644 index 0000000..d3fa0d8 --- /dev/null +++ b/syndicate/broker/server/tcp.rkt @@ -0,0 +1,28 @@ +#lang imperative-syndicate + +(provide server-facet/tcp + default-tcp-broker-port + spawn-tcp-broker!) + +(require "../wire-protocol.rkt") + +(require/activate imperative-syndicate/drivers/tcp) +(require/activate imperative-syndicate/broker/server) + +(define (server-facet/tcp id scope) + (assert (tcp-accepted id)) + (assert (server-connection id scope)) + (define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p))))) + (on (message (tcp-in id $bs)) + (accumulate! bs)) + (on (message (server-outbound id $p)) + (send! (tcp-out id (encode p))))) + +(define default-tcp-broker-port 8001) + +(define (spawn-tcp-broker! [port default-tcp-broker-port]) + (spawn #:name 'tcp-server-listener + (define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup + (during/spawn (tcp-connection $id (tcp-listener port)) + #:name `(server-connection ,tcp-scope ,id) + (server-facet/tcp id tcp-scope)))) diff --git a/syndicate/broker/server/websocket.rkt b/syndicate/broker/server/websocket.rkt new file mode 100644 index 0000000..8d2a28c --- /dev/null +++ b/syndicate/broker/server/websocket.rkt @@ -0,0 +1,38 @@ +#lang imperative-syndicate + +(provide server-facet/websocket + default-http-broker-port + spawn-websocket-broker!) + +(require "../wire-protocol.rkt") + +(require/activate imperative-syndicate/drivers/web) +(require/activate imperative-syndicate/drivers/timer) +(require/activate imperative-syndicate/broker/server) + +(define (server-facet/websocket id scope) + (assert (http-accepted id)) + (assert (http-response-websocket id)) + (assert (server-connection id scope)) + + (field [ping-time-deadline 0]) + (on (asserted (later-than (ping-time-deadline))) + (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) + (send! (server-outbound id (Ping)))) + + (on (message (websocket-in id $body)) + (define-values (packet remainder) (decode body)) + (when (not (equal? remainder #"")) + (error 'server-facet/websocket "Multiple packets in a single websocket message")) + (send! (server-inbound id packet))) + (on (message (server-outbound id $p)) + (send! (websocket-out id (encode p))))) + +(define default-http-broker-port 8000) + +(define (spawn-websocket-broker! [port default-http-broker-port]) + (spawn #:name 'websocket-server-listener + (during/spawn (http-request $id 'get (http-resource (http-server _ port #f) + `(,$scope ())) _ _ _) + #:name `(server-connection ,scope ,id) + (server-facet/websocket id scope)))) diff --git a/syndicate/broker/wire-protocol.rkt b/syndicate/broker/wire-protocol.rkt index 02ef2bb..63a9772 100644 --- a/syndicate/broker/wire-protocol.rkt +++ b/syndicate/broker/wire-protocol.rkt @@ -44,3 +44,6 @@ (handle-packet! packet))) (lambda (chunk) (buffer (bytes-append (buffer) chunk)))) + +;; Received packets from broker are relayed via one of these. +(message-struct broker-packet (address packet))