broker: client; ping for keepalive; client example
This commit is contained in:
parent
3bfef265a5
commit
239b0810e5
|
@ -0,0 +1,5 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide (all-from-out "broker/main.rkt"))
|
||||
(require/activate "broker/main.rkt")
|
||||
(module+ main (require (submod "broker/main.rkt" main)))
|
|
@ -1,5 +1,71 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(provide standard-localhost-broker/tcp)
|
||||
|
||||
(require "wire-protocol.rkt")
|
||||
(require "protocol.rkt")
|
||||
(require imperative-syndicate/term)
|
||||
(require imperative-syndicate/reassert)
|
||||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
|
||||
(define standard-localhost-broker/tcp "tcp://localhost:8001/")
|
||||
|
||||
(define-logger syndicate/broker)
|
||||
|
||||
(message-struct broker-packet (url packet))
|
||||
|
||||
(spawn #:name 'client-factory
|
||||
(during (to-broker $u _) (assert (broker-connection u)))
|
||||
(during (observe (from-broker $u _)) (assert (broker-connection u)))
|
||||
(during (observe (broker-connected $u)) (assert (broker-connection u)))
|
||||
|
||||
(during/spawn (broker-connection $url)
|
||||
#:name `(client-connection ,url)
|
||||
(match url
|
||||
[(pregexp #px"^tcp://([^:]+):([0-9]+)/?" (list _ host portstr))
|
||||
(define port (string->number portstr))
|
||||
(client-tcp-session-facet url host port)]
|
||||
[else (error 'client-factory "Invalid server URL: ~v" url)])))
|
||||
|
||||
(define (client-tcp-session-facet url host port)
|
||||
(define id (list (gensym 'client) host port))
|
||||
(reassert-on (tcp-connection id (tcp-address host port))
|
||||
(retracted (tcp-accepted id))
|
||||
(asserted (tcp-rejected id _)))
|
||||
(during (tcp-accepted id)
|
||||
(on-start (log-syndicate/broker-info "Connected to ~v" url))
|
||||
(on-stop (log-syndicate/broker-info "Disconnected from ~v" url))
|
||||
(assert (broker-connected url))
|
||||
|
||||
(define accumulate!
|
||||
(packet-accumulator (lambda (p) (send! (broker-packet url p)))))
|
||||
(on (message (tcp-in id $bs))
|
||||
(accumulate! bs))
|
||||
|
||||
(define (w x) (send! (tcp-out id (encode x))))
|
||||
|
||||
(define next-ep
|
||||
(let ((counter 0))
|
||||
(lambda ()
|
||||
(begin0 counter
|
||||
(set! counter (+ counter 1))))))
|
||||
|
||||
(during (to-broker url $a)
|
||||
(define ep (next-ep))
|
||||
(on-start (w (Assert ep a)))
|
||||
(on-stop (w (Clear ep))))
|
||||
|
||||
(on (message (to-broker url $a)) (w (Message a)))
|
||||
|
||||
(on (message (broker-packet url (Ping))) (w (Pong)))
|
||||
|
||||
(during (observe (from-broker url $spec))
|
||||
(define ep (next-ep))
|
||||
(on-start (w (Assert ep (observe spec))))
|
||||
(on-stop (w (Clear ep)))
|
||||
(on (message (broker-packet url (Add ep $vs)))
|
||||
(react (assert (instantiate-term->value (from-broker url spec) vs))
|
||||
(stop-when (message (broker-packet url (Del ep vs))))))
|
||||
(on (message (broker-packet url (Msg ep $vs)))
|
||||
(send! (instantiate-term->value (from-broker url spec) vs))))))
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
(require/activate imperative-syndicate/drivers/tcp)
|
||||
(require/activate imperative-syndicate/drivers/web)
|
||||
(require/activate imperative-syndicate/drivers/timer)
|
||||
|
||||
;; Internal connection protocol
|
||||
(assertion-struct server-connection (connection-id scope))
|
||||
|
@ -57,21 +58,17 @@
|
|||
(stop-when (message (server-inbound id (Clear ep)))))))
|
||||
|
||||
(on (message (server-inbound id (Message $body)))
|
||||
(send! (envelope scope body)))))
|
||||
(send! (envelope scope body)))
|
||||
|
||||
(on (message (server-inbound id (Ping)))
|
||||
(send! (server-outbound id (Pong))))))
|
||||
|
||||
(define (server-facet/tcp id scope)
|
||||
(assert (tcp-accepted id))
|
||||
(assert (server-connection id scope))
|
||||
|
||||
(field [buffer #""])
|
||||
(begin/dataflow
|
||||
(define-values (packet remainder) (decode (buffer)))
|
||||
(when packet
|
||||
(buffer remainder)
|
||||
(send! (server-inbound id packet))))
|
||||
|
||||
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p)))))
|
||||
(on (message (tcp-in id $bs))
|
||||
(buffer (bytes-append (buffer) bs)))
|
||||
(accumulate! bs))
|
||||
(on (message (server-outbound id $p))
|
||||
(send! (tcp-out id (encode p)))))
|
||||
|
||||
|
@ -80,6 +77,11 @@
|
|||
(assert (http-response-websocket id))
|
||||
(assert (server-connection id scope))
|
||||
|
||||
(field [ping-time-deadline 0])
|
||||
(on (asserted (later-than (ping-time-deadline)))
|
||||
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
|
||||
(send! (server-outbound id (Ping))))
|
||||
|
||||
(on (message (websocket-in id $body))
|
||||
(define-values (packet remainder) (decode body))
|
||||
(when (not (equal? remainder #""))
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
(require (prefix-in preserves: preserves))
|
||||
(require bitsyntax)
|
||||
(require (only-in net/rfc6455 ws-idle-timeout))
|
||||
|
||||
;; Client --> Broker
|
||||
(message-struct Assert (endpoint-name assertion))
|
||||
|
@ -15,6 +16,10 @@
|
|||
(message-struct Del (endpoint-name captures))
|
||||
(message-struct Msg (endpoint-name captures))
|
||||
|
||||
;; Bidirectional
|
||||
(message-struct Ping ())
|
||||
(message-struct Pong ())
|
||||
|
||||
(define (decode bs)
|
||||
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
|
||||
(bit-string-case bs
|
||||
|
@ -25,3 +30,17 @@
|
|||
(define (encode v)
|
||||
(parameterize ((preserves:short-form-labels '#(discard capture observe)))
|
||||
(preserves:encode v)))
|
||||
|
||||
(define (ping-interval)
|
||||
(* 1000 (max (- (ws-idle-timeout) 10)
|
||||
(* (ws-idle-timeout) 0.8))))
|
||||
|
||||
(define (packet-accumulator handle-packet!)
|
||||
(field [buffer #""])
|
||||
(begin/dataflow
|
||||
(define-values (packet remainder) (decode (buffer)))
|
||||
(when packet
|
||||
(buffer remainder)
|
||||
(handle-packet! packet)))
|
||||
(lambda (chunk)
|
||||
(buffer (bytes-append (buffer) chunk))))
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
#lang imperative-syndicate
|
||||
|
||||
(require/activate imperative-syndicate/broker)
|
||||
(require/activate imperative-syndicate/drivers/external-event)
|
||||
(require (only-in racket/port read-line-evt))
|
||||
|
||||
(assertion-struct Present (name))
|
||||
(message-struct Says (who what))
|
||||
|
||||
(spawn #:name 'main
|
||||
(field [username (symbol->string (gensym 'chatter))])
|
||||
|
||||
(define root-facet (current-facet))
|
||||
(define url standard-localhost-broker/tcp)
|
||||
(during (broker-connected url)
|
||||
(on-start (log-info "Connected to broker."))
|
||||
(on-stop (log-info "Disconnected from broker."))
|
||||
|
||||
(on (asserted (from-broker url (Present $who))) (printf "~a arrived.\n" who))
|
||||
(on (retracted (from-broker url (Present $who))) (printf "~a departed.\n" who))
|
||||
(on (message (from-broker url (Says $who $what))) (printf "~a: ~a\n" who what))
|
||||
|
||||
(assert (to-broker url (Present (username))))
|
||||
|
||||
(define stdin-evt (read-line-evt (current-input-port) 'any))
|
||||
(on (message (inbound (external-event stdin-evt (list $line))))
|
||||
(match line
|
||||
[(? eof-object?) (stop-facet root-facet)]
|
||||
[(pregexp #px"^/nick (.+)$" (list _ newnick)) (username newnick)]
|
||||
[other (send! (to-broker url (Says (username) other)))]))))
|
Loading…
Reference in New Issue