diff --git a/syndicate/broker.rkt b/syndicate/broker.rkt new file mode 100644 index 0000000..5b1d346 --- /dev/null +++ b/syndicate/broker.rkt @@ -0,0 +1,5 @@ +#lang imperative-syndicate + +(provide (all-from-out "broker/main.rkt")) +(require/activate "broker/main.rkt") +(module+ main (require (submod "broker/main.rkt" main))) diff --git a/syndicate/broker/client.rkt b/syndicate/broker/client.rkt index 102ef9a..0c8fd7d 100644 --- a/syndicate/broker/client.rkt +++ b/syndicate/broker/client.rkt @@ -1,5 +1,71 @@ #lang imperative-syndicate +(provide standard-localhost-broker/tcp) + (require "wire-protocol.rkt") (require "protocol.rkt") +(require imperative-syndicate/term) +(require imperative-syndicate/reassert) +(require/activate imperative-syndicate/drivers/tcp) + +(define standard-localhost-broker/tcp "tcp://localhost:8001/") + +(define-logger syndicate/broker) + +(message-struct broker-packet (url packet)) + +(spawn #:name 'client-factory + (during (to-broker $u _) (assert (broker-connection u))) + (during (observe (from-broker $u _)) (assert (broker-connection u))) + (during (observe (broker-connected $u)) (assert (broker-connection u))) + + (during/spawn (broker-connection $url) + #:name `(client-connection ,url) + (match url + [(pregexp #px"^tcp://([^:]+):([0-9]+)/?" (list _ host portstr)) + (define port (string->number portstr)) + (client-tcp-session-facet url host port)] + [else (error 'client-factory "Invalid server URL: ~v" url)]))) + +(define (client-tcp-session-facet url host port) + (define id (list (gensym 'client) host port)) + (reassert-on (tcp-connection id (tcp-address host port)) + (retracted (tcp-accepted id)) + (asserted (tcp-rejected id _))) + (during (tcp-accepted id) + (on-start (log-syndicate/broker-info "Connected to ~v" url)) + (on-stop (log-syndicate/broker-info "Disconnected from ~v" url)) + (assert (broker-connected url)) + + (define accumulate! + (packet-accumulator (lambda (p) (send! (broker-packet url p))))) + (on (message (tcp-in id $bs)) + (accumulate! bs)) + + (define (w x) (send! (tcp-out id (encode x)))) + + (define next-ep + (let ((counter 0)) + (lambda () + (begin0 counter + (set! counter (+ counter 1)))))) + + (during (to-broker url $a) + (define ep (next-ep)) + (on-start (w (Assert ep a))) + (on-stop (w (Clear ep)))) + + (on (message (to-broker url $a)) (w (Message a))) + + (on (message (broker-packet url (Ping))) (w (Pong))) + + (during (observe (from-broker url $spec)) + (define ep (next-ep)) + (on-start (w (Assert ep (observe spec)))) + (on-stop (w (Clear ep))) + (on (message (broker-packet url (Add ep $vs))) + (react (assert (instantiate-term->value (from-broker url spec) vs)) + (stop-when (message (broker-packet url (Del ep vs)))))) + (on (message (broker-packet url (Msg ep $vs))) + (send! (instantiate-term->value (from-broker url spec) vs)))))) diff --git a/syndicate/broker/server.rkt b/syndicate/broker/server.rkt index 0a3c0cc..e8d3435 100644 --- a/syndicate/broker/server.rkt +++ b/syndicate/broker/server.rkt @@ -10,6 +10,7 @@ (require/activate imperative-syndicate/drivers/tcp) (require/activate imperative-syndicate/drivers/web) +(require/activate imperative-syndicate/drivers/timer) ;; Internal connection protocol (assertion-struct server-connection (connection-id scope)) @@ -57,21 +58,17 @@ (stop-when (message (server-inbound id (Clear ep))))))) (on (message (server-inbound id (Message $body))) - (send! (envelope scope body))))) + (send! (envelope scope body))) + + (on (message (server-inbound id (Ping))) + (send! (server-outbound id (Pong)))))) (define (server-facet/tcp id scope) (assert (tcp-accepted id)) (assert (server-connection id scope)) - - (field [buffer #""]) - (begin/dataflow - (define-values (packet remainder) (decode (buffer))) - (when packet - (buffer remainder) - (send! (server-inbound id packet)))) - + (define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p))))) (on (message (tcp-in id $bs)) - (buffer (bytes-append (buffer) bs))) + (accumulate! bs)) (on (message (server-outbound id $p)) (send! (tcp-out id (encode p))))) @@ -80,6 +77,11 @@ (assert (http-response-websocket id)) (assert (server-connection id scope)) + (field [ping-time-deadline 0]) + (on (asserted (later-than (ping-time-deadline))) + (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) + (send! (server-outbound id (Ping)))) + (on (message (websocket-in id $body)) (define-values (packet remainder) (decode body)) (when (not (equal? remainder #"")) diff --git a/syndicate/broker/wire-protocol.rkt b/syndicate/broker/wire-protocol.rkt index c690386..02ef2bb 100644 --- a/syndicate/broker/wire-protocol.rkt +++ b/syndicate/broker/wire-protocol.rkt @@ -4,6 +4,7 @@ (require (prefix-in preserves: preserves)) (require bitsyntax) +(require (only-in net/rfc6455 ws-idle-timeout)) ;; Client --> Broker (message-struct Assert (endpoint-name assertion)) @@ -15,6 +16,10 @@ (message-struct Del (endpoint-name captures)) (message-struct Msg (endpoint-name captures)) +;; Bidirectional +(message-struct Ping ()) +(message-struct Pong ()) + (define (decode bs) (parameterize ((preserves:short-form-labels '#(discard capture observe))) (bit-string-case bs @@ -25,3 +30,17 @@ (define (encode v) (parameterize ((preserves:short-form-labels '#(discard capture observe))) (preserves:encode v))) + +(define (ping-interval) + (* 1000 (max (- (ws-idle-timeout) 10) + (* (ws-idle-timeout) 0.8)))) + +(define (packet-accumulator handle-packet!) + (field [buffer #""]) + (begin/dataflow + (define-values (packet remainder) (decode (buffer))) + (when packet + (buffer remainder) + (handle-packet! packet))) + (lambda (chunk) + (buffer (bytes-append (buffer) chunk)))) diff --git a/syndicate/examples/broker-chat-client.rkt b/syndicate/examples/broker-chat-client.rkt new file mode 100644 index 0000000..7e10808 --- /dev/null +++ b/syndicate/examples/broker-chat-client.rkt @@ -0,0 +1,30 @@ +#lang imperative-syndicate + +(require/activate imperative-syndicate/broker) +(require/activate imperative-syndicate/drivers/external-event) +(require (only-in racket/port read-line-evt)) + +(assertion-struct Present (name)) +(message-struct Says (who what)) + +(spawn #:name 'main + (field [username (symbol->string (gensym 'chatter))]) + + (define root-facet (current-facet)) + (define url standard-localhost-broker/tcp) + (during (broker-connected url) + (on-start (log-info "Connected to broker.")) + (on-stop (log-info "Disconnected from broker.")) + + (on (asserted (from-broker url (Present $who))) (printf "~a arrived.\n" who)) + (on (retracted (from-broker url (Present $who))) (printf "~a departed.\n" who)) + (on (message (from-broker url (Says $who $what))) (printf "~a: ~a\n" who what)) + + (assert (to-broker url (Present (username)))) + + (define stdin-evt (read-line-evt (current-input-port) 'any)) + (on (message (inbound (external-event stdin-evt (list $line)))) + (match line + [(? eof-object?) (stop-facet root-facet)] + [(pregexp #px"^/nick (.+)$" (list _ newnick)) (username newnick)] + [other (send! (to-broker url (Says (username) other)))]))))