broker: client; ping for keepalive; client example

This commit is contained in:
Tony Garnock-Jones 2019-03-18 23:29:12 +00:00
parent 4e33429b19
commit 3ff50a6f65
5 changed files with 132 additions and 10 deletions

5
syndicate/broker.rkt Normal file
View File

@ -0,0 +1,5 @@
#lang imperative-syndicate
(provide (all-from-out "broker/main.rkt"))
(require/activate "broker/main.rkt")
(module+ main (require (submod "broker/main.rkt" main)))

View File

@ -1,5 +1,71 @@
#lang imperative-syndicate
(provide standard-localhost-broker/tcp)
(require "wire-protocol.rkt")
(require "protocol.rkt")
(require imperative-syndicate/term)
(require imperative-syndicate/reassert)
(require/activate imperative-syndicate/drivers/tcp)
(define standard-localhost-broker/tcp "tcp://localhost:8001/")
(define-logger syndicate/broker)
(message-struct broker-packet (url packet))
(spawn #:name 'client-factory
(during (to-broker $u _) (assert (broker-connection u)))
(during (observe (from-broker $u _)) (assert (broker-connection u)))
(during (observe (broker-connected $u)) (assert (broker-connection u)))
(during/spawn (broker-connection $url)
#:name `(client-connection ,url)
(match url
[(pregexp #px"^tcp://([^:]+):([0-9]+)/?" (list _ host portstr))
(define port (string->number portstr))
(client-tcp-session-facet url host port)]
[else (error 'client-factory "Invalid server URL: ~v" url)])))
(define (client-tcp-session-facet url host port)
(define id (list (gensym 'client) host port))
(reassert-on (tcp-connection id (tcp-address host port))
(retracted (tcp-accepted id))
(asserted (tcp-rejected id _)))
(during (tcp-accepted id)
(on-start (log-syndicate/broker-info "Connected to ~v" url))
(on-stop (log-syndicate/broker-info "Disconnected from ~v" url))
(assert (broker-connected url))
(define accumulate!
(packet-accumulator (lambda (p) (send! (broker-packet url p)))))
(on (message (tcp-in id $bs))
(accumulate! bs))
(define (w x) (send! (tcp-out id (encode x))))
(define next-ep
(let ((counter 0))
(lambda ()
(begin0 counter
(set! counter (+ counter 1))))))
(during (to-broker url $a)
(define ep (next-ep))
(on-start (w (Assert ep a)))
(on-stop (w (Clear ep))))
(on (message (to-broker url $a)) (w (Message a)))
(on (message (broker-packet url (Ping))) (w (Pong)))
(during (observe (from-broker url $spec))
(define ep (next-ep))
(on-start (w (Assert ep (observe spec))))
(on-stop (w (Clear ep)))
(on (message (broker-packet url (Add ep $vs)))
(react (assert (instantiate-term->value (from-broker url spec) vs))
(stop-when (message (broker-packet url (Del ep vs))))))
(on (message (broker-packet url (Msg ep $vs)))
(send! (instantiate-term->value (from-broker url spec) vs))))))

View File

@ -10,6 +10,7 @@
(require/activate imperative-syndicate/drivers/tcp)
(require/activate imperative-syndicate/drivers/web)
(require/activate imperative-syndicate/drivers/timer)
;; Internal connection protocol
(assertion-struct server-connection (connection-id scope))
@ -57,21 +58,17 @@
(stop-when (message (server-inbound id (Clear ep)))))))
(on (message (server-inbound id (Message $body)))
(send! (envelope scope body)))))
(send! (envelope scope body)))
(on (message (server-inbound id (Ping)))
(send! (server-outbound id (Pong))))))
(define (server-facet/tcp id scope)
(assert (tcp-accepted id))
(assert (server-connection id scope))
(field [buffer #""])
(begin/dataflow
(define-values (packet remainder) (decode (buffer)))
(when packet
(buffer remainder)
(send! (server-inbound id packet))))
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p)))))
(on (message (tcp-in id $bs))
(buffer (bytes-append (buffer) bs)))
(accumulate! bs))
(on (message (server-outbound id $p))
(send! (tcp-out id (encode p)))))
@ -80,6 +77,11 @@
(assert (http-response-websocket id))
(assert (server-connection id scope))
(field [ping-time-deadline 0])
(on (asserted (later-than (ping-time-deadline)))
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
(send! (server-outbound id (Ping))))
(on (message (websocket-in id $body))
(define-values (packet remainder) (decode body))
(when (not (equal? remainder #""))

View File

@ -4,6 +4,7 @@
(require (prefix-in preserves: preserves))
(require bitsyntax)
(require (only-in net/rfc6455 ws-idle-timeout))
;; Client --> Broker
(message-struct Assert (endpoint-name assertion))
@ -15,6 +16,10 @@
(message-struct Del (endpoint-name captures))
(message-struct Msg (endpoint-name captures))
;; Bidirectional
(message-struct Ping ())
(message-struct Pong ())
(define (decode bs)
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
(bit-string-case bs
@ -25,3 +30,17 @@
(define (encode v)
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
(preserves:encode v)))
(define (ping-interval)
(* 1000 (max (- (ws-idle-timeout) 10)
(* (ws-idle-timeout) 0.8))))
(define (packet-accumulator handle-packet!)
(field [buffer #""])
(begin/dataflow
(define-values (packet remainder) (decode (buffer)))
(when packet
(buffer remainder)
(handle-packet! packet)))
(lambda (chunk)
(buffer (bytes-append (buffer) chunk))))

View File

@ -0,0 +1,30 @@
#lang imperative-syndicate
(require/activate imperative-syndicate/broker)
(require/activate imperative-syndicate/drivers/external-event)
(require (only-in racket/port read-line-evt))
(assertion-struct Present (name))
(message-struct Says (who what))
(spawn #:name 'main
(field [username (symbol->string (gensym 'chatter))])
(define root-facet (current-facet))
(define url standard-localhost-broker/tcp)
(during (broker-connected url)
(on-start (log-info "Connected to broker."))
(on-stop (log-info "Disconnected from broker."))
(on (asserted (from-broker url (Present $who))) (printf "~a arrived.\n" who))
(on (retracted (from-broker url (Present $who))) (printf "~a departed.\n" who))
(on (message (from-broker url (Says $who $what))) (printf "~a: ~a\n" who what))
(assert (to-broker url (Present (username))))
(define stdin-evt (read-line-evt (current-input-port) 'any))
(on (message (inbound (external-event stdin-evt (list $line))))
(match line
[(? eof-object?) (stop-facet root-facet)]
[(pregexp #px"^/nick (.+)$" (list _ newnick)) (username newnick)]
[other (send! (to-broker url (Says (username) other)))]))))