#lang imperative-syndicate (provide standard-localhost-broker/tcp) (require "wire-protocol.rkt") (require "protocol.rkt") (require imperative-syndicate/term) (require imperative-syndicate/reassert) (require/activate imperative-syndicate/drivers/tcp) (define standard-localhost-broker/tcp "tcp://localhost:8001/") (define-logger syndicate/broker) (message-struct broker-packet (url packet)) (spawn #:name 'client-factory (during (to-broker $u _) (assert (broker-connection u))) (during (observe (from-broker $u _)) (assert (broker-connection u))) (during (observe (broker-connected $u)) (assert (broker-connection u))) (during/spawn (broker-connection $url) #:name `(client-connection ,url) (match url [(pregexp #px"^tcp://([^:]+):([0-9]+)/?" (list _ host portstr)) (define port (string->number portstr)) (client-tcp-session-facet url host port)] [else (error 'client-factory "Invalid server URL: ~v" url)]))) (define (client-tcp-session-facet url host port) (define id (list (gensym 'client) host port)) (reassert-on (tcp-connection id (tcp-address host port)) (retracted (tcp-accepted id)) (asserted (tcp-rejected id _))) (during (tcp-accepted id) (on-start (log-syndicate/broker-info "Connected to ~v" url)) (on-stop (log-syndicate/broker-info "Disconnected from ~v" url)) (assert (broker-connected url)) (define accumulate! (packet-accumulator (lambda (p) (send! (broker-packet url p))))) (on (message (tcp-in id $bs)) (accumulate! bs)) (define (w x) (send! (tcp-out id (encode x)))) (define next-ep (let ((counter 0)) (lambda () (begin0 counter (set! counter (+ counter 1)))))) (during (to-broker url $a) (define ep (next-ep)) (on-start (w (Assert ep a))) (on-stop (w (Clear ep)))) (on (message (to-broker url $a)) (w (Message a))) (on (message (broker-packet url (Ping))) (w (Pong))) (during (observe (from-broker url $spec)) (define ep (next-ep)) (on-start (w (Assert ep (observe spec)))) (on-stop (w (Clear ep))) (on (message (broker-packet url (Add ep $vs))) (react (assert (instantiate-term->value (from-broker url spec) vs)) (stop-when (message (broker-packet url (Del ep vs)))))) (on (message (broker-packet url (Msg ep $vs))) (send! (instantiate-term->value (from-broker url spec) vs))))))