From f2b29a798a96a5af3b6aa4ab7ce7af6b6a8a840b Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 5 May 2019 16:37:03 +0100 Subject: [PATCH] The Great Renaming broker -> server, part 1 --- syndicate/broker/client.rkt | 22 ++++++------ syndicate/broker/client/loopback.rkt | 12 +++---- syndicate/broker/client/tcp.rkt | 12 +++---- syndicate/broker/federation.rkt | 2 +- syndicate/broker/main.rkt | 28 +++++++-------- syndicate/broker/protocol.rkt | 10 +++--- syndicate/broker/server.rkt | 35 ++++++++++--------- syndicate/broker/server/tcp.rkt | 16 ++++----- syndicate/broker/server/websocket.rkt | 18 +++++----- syndicate/broker/tf.rkt | 28 +++++++-------- syndicate/broker/wire-protocol.rkt | 10 +++--- syndicate/examples/broker-chat-client.rkt | 18 +++++----- syndicate/test/broker/nesting-confusion.rkt | 20 +++++------ .../test/broker/observation-visibility.rkt | 14 ++++---- 14 files changed, 123 insertions(+), 122 deletions(-) diff --git a/syndicate/broker/client.rkt b/syndicate/broker/client.rkt index bda73df..e0cdb9b 100644 --- a/syndicate/broker/client.rkt +++ b/syndicate/broker/client.rkt @@ -9,14 +9,14 @@ (define-logger syndicate/broker) (spawn #:name 'client-factory - (during (to-broker $a _) (assert (broker-connection a))) - (during (observe (from-broker $a _)) (assert (broker-connection a))) - (during (observe (broker-connected $a)) (assert (broker-connection a)))) + (during (to-server $a _) (assert (server-connection a))) + (during (observe (from-server $a _)) (assert (server-connection a))) + (during (observe (server-connected $a)) (assert (server-connection a)))) (define (generic-client-session-facet address w) (on-start (log-syndicate/broker-info "Connected to ~v" address)) (on-stop (log-syndicate/broker-info "Disconnected from ~v" address)) - (assert (broker-connected address)) + (assert (server-connected address)) (define next-ep (let ((counter 0)) @@ -24,23 +24,23 @@ (begin0 counter (set! counter (+ counter 1)))))) - (during (to-broker address $a) + (during (to-server address $a) (define ep (next-ep)) (on-start (w (Assert ep a))) (on-stop (w (Clear ep)))) - (on (message (to-broker address $a)) + (on (message (to-server address $a)) (w (Message a))) - (on (message (broker-packet address (Ping))) + (on (message (server-packet address (Ping))) (w (Pong))) - (during (observe ($ pat (from-broker address $spec))) + (during (observe ($ pat (from-server address $spec))) (define ep (next-ep)) (on-start (w (Assert ep (observe spec)))) (on-stop (w (Clear ep))) - (on (message (broker-packet address (Add ep $vs))) + (on (message (server-packet address (Add ep $vs))) (react (assert (instantiate-term->value pat vs)) - (stop-when (message (broker-packet address (Del ep vs)))))) - (on (message (broker-packet address (Msg ep $vs))) + (stop-when (message (server-packet address (Del ep vs)))))) + (on (message (server-packet address (Msg ep $vs))) (send! (instantiate-term->value pat vs))))) diff --git a/syndicate/broker/client/loopback.rkt b/syndicate/broker/client/loopback.rkt index 45ccea7..150830a 100644 --- a/syndicate/broker/client/loopback.rkt +++ b/syndicate/broker/client/loopback.rkt @@ -1,6 +1,6 @@ #lang imperative-syndicate -(provide (struct-out broker-loopback-connection)) +(provide (struct-out server-loopback-connection)) (require "../client.rkt") (require "../wire-protocol.rkt") @@ -8,11 +8,11 @@ (require/activate imperative-syndicate/broker/server) -(assertion-struct broker-loopback-connection (scope)) +(assertion-struct server-loopback-connection (scope)) (spawn #:name 'loopback-client-factory - (during/spawn (broker-connection ($ address (broker-loopback-connection $scope))) + (during/spawn (server-connection ($ address (server-loopback-connection $scope))) #:name address - (assert (server-connection address scope)) - (on (message (server-outbound address $p)) (send! (broker-packet address p))) - (generic-client-session-facet address (lambda (x) (send! (server-inbound address x)))))) + (assert (server-poa address scope)) + (on (message (message-server->poa address $p)) (send! (server-packet address p))) + (generic-client-session-facet address (lambda (x) (send! (message-poa->server address x)))))) diff --git a/syndicate/broker/client/tcp.rkt b/syndicate/broker/client/tcp.rkt index 53fb503..6f96fd9 100644 --- a/syndicate/broker/client/tcp.rkt +++ b/syndicate/broker/client/tcp.rkt @@ -1,7 +1,7 @@ #lang imperative-syndicate -(provide standard-localhost-broker/tcp - (struct-out broker-tcp-connection)) +(provide standard-localhost-server/tcp + (struct-out server-tcp-connection)) (require "../client.rkt") (require "../wire-protocol.rkt") @@ -10,18 +10,18 @@ (require/activate imperative-syndicate/drivers/tcp) -(assertion-struct broker-tcp-connection (host port)) +(assertion-struct server-tcp-connection (host port)) -(define standard-localhost-broker/tcp (broker-tcp-connection "localhost" 8001)) +(define standard-localhost-server/tcp (server-tcp-connection "localhost" 8001)) (spawn #:name 'tcp-client-factory - (during/spawn (broker-connection ($ address (broker-tcp-connection $host $port))) + (during/spawn (server-connection ($ address (server-tcp-connection $host $port))) #:name address (define id (list (gensym 'client) host port)) (reassert-on (tcp-connection id (tcp-address host port)) (retracted (tcp-accepted id)) (asserted (tcp-rejected id _))) (during (tcp-accepted id) - (define accumulate! (packet-accumulator (lambda (p) (send! (broker-packet address p))))) + (define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p))))) (on (message (tcp-in id $bs)) (accumulate! bs)) (generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x)))))))) diff --git a/syndicate/broker/federation.rkt b/syndicate/broker/federation.rkt index 55f6c28..b7a615b 100644 --- a/syndicate/broker/federation.rkt +++ b/syndicate/broker/federation.rkt @@ -28,7 +28,7 @@ ;; map to multiple subscription IDs - this is the place where ;; aggregation pops up. -;; Unlike the broker protocol, both Actions and Events are +;; Unlike the server protocol, both Actions and Events are ;; BIDIRECTIONAL, travelling in both directions along edges linking ;; peer nodes. diff --git a/syndicate/broker/main.rkt b/syndicate/broker/main.rkt index 351aaa6..21781a4 100644 --- a/syndicate/broker/main.rkt +++ b/syndicate/broker/main.rkt @@ -20,33 +20,33 @@ (module+ main (require racket/cmdline) - (define tcp-port default-tcp-broker-port) - (define http-port default-http-broker-port) + (define tcp-port default-tcp-server-port) + (define http-port default-http-server-port) (command-line #:once-any ["--tcp" port - ((format "Listen on plain TCP port (default ~a)" default-tcp-broker-port)) + ((format "Listen on plain TCP port (default ~a)" default-tcp-server-port)) (set! tcp-port (string->number port))] ["--no-tcp" "Do not listen on any plain TCP port" (set! tcp-port #f)] #:once-any ["--http" port - ((format "Listen on websocket HTTP port (default ~a)" default-http-broker-port)) + ((format "Listen on websocket HTTP port (default ~a)" default-http-server-port)) (set! http-port (string->number port))] ["--no-http" "Do not listen on any websocket HTTP port" (set! http-port #f)]) (extend-ground-boot! (lambda () - (when tcp-port (spawn-tcp-broker! tcp-port)) - (when http-port (spawn-websocket-broker! http-port))))) + (when tcp-port (spawn-tcp-server! tcp-port)) + (when http-port (spawn-websocket-server! http-port))))) (define-logger syndicate/broker) (when (log-level? syndicate/broker-logger 'debug) (spawn #:name 'server-debug - (on (asserted (server-connection $id $scope)) - (log-syndicate/broker-debug "C+ ~v ~v" id scope)) - (on (retracted (server-connection $id $scope)) - (log-syndicate/broker-debug "C- ~v ~v" id scope)) - (on (message (server-inbound $id $p)) - (log-syndicate/broker-debug "CIN ~v ~v" id p)) - (on (message (server-outbound $id $p)) - (log-syndicate/broker-debug "COUT ~v ~v" id p)))) + (on (asserted (server-poa $id $scope)) + (log-syndicate/broker-debug "+ ~v ~v" id scope)) + (on (retracted (server-poa $id $scope)) + (log-syndicate/broker-debug "- ~v ~v" id scope)) + (on (message (message-poa->server $id $p)) + (log-syndicate/broker-debug "IN ~v ~v" id p)) + (on (message (message-server->poa $id $p)) + (log-syndicate/broker-debug "OUT ~v ~v" id p)))) diff --git a/syndicate/broker/protocol.rkt b/syndicate/broker/protocol.rkt index 1fdf3aa..b1b1528 100644 --- a/syndicate/broker/protocol.rkt +++ b/syndicate/broker/protocol.rkt @@ -3,8 +3,8 @@ (provide (all-defined-out)) ;; Client protocol -(assertion-struct to-broker (address assertion)) -(assertion-struct from-broker (address assertion)) -(assertion-struct broker-connection (address)) -(assertion-struct broker-connected (address)) -(message-struct force-broker-disconnect (address)) +(assertion-struct to-server (address assertion)) +(assertion-struct from-server (address assertion)) +(assertion-struct server-connection (address)) +(assertion-struct server-connected (address)) +(message-struct force-server-disconnect (address)) diff --git a/syndicate/broker/server.rkt b/syndicate/broker/server.rkt index 571a7aa..a178a3d 100644 --- a/syndicate/broker/server.rkt +++ b/syndicate/broker/server.rkt @@ -1,8 +1,8 @@ #lang imperative-syndicate -(provide (struct-out server-connection) - (struct-out server-inbound) - (struct-out server-outbound) +(provide (struct-out server-poa) + (struct-out message-poa->server) + (struct-out message-server->poa) (struct-out server-proposal) (struct-out server-envelope)) @@ -10,17 +10,18 @@ (require racket/set) ;; Internal connection protocol -(assertion-struct server-connection (connection-id scope)) -(assertion-struct server-inbound (connection-id body)) -(assertion-struct server-outbound (connection-id body)) +(assertion-struct server-poa (connection-id scope)) ;; "Point of Attachment" +(assertion-struct message-poa->server (connection-id body)) +(assertion-struct message-server->poa (connection-id body)) -;; Internal isolation +;; Internal isolation -- these are isomorphic to `to-server` and `from-server`! +;; (and, for that matter, to `outbound` and `inbound`!) (assertion-struct server-proposal (scope body)) ;; suggestions (~ actions) (assertion-struct server-envelope (scope body)) ;; decisions (~ events) -(spawn #:name 'server-connection-factory +(spawn #:name 'server-factory - (during/spawn (server-connection _ _) + (during/spawn (server-poa _ _) ;; Previously, we just had server-envelope. Now, we have both ;; server-envelope and server-proposal. While not everything ;; decided is (locally) suggested, it is true that everything @@ -29,10 +30,10 @@ (during (server-proposal $scope $assertion) (assert (server-envelope scope assertion)))) - (during/spawn (server-connection $id $scope) + (during/spawn (server-poa $id $scope) (define endpoints (set)) - (on (message (server-inbound id (Assert $ep $a))) + (on (message (message-poa->server id (Assert $ep $a))) (when (not (set-member? endpoints ep)) (set! endpoints (set-add endpoints ep)) (react @@ -42,7 +43,7 @@ (assert (server-proposal scope (assertion))) - (let ((! (lambda (ctor) (lambda (cs) (send! (server-outbound id (ctor ep cs))))))) + (let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs))))))) (add-observer-endpoint! (lambda () (let ((a (assertion))) (when (observe? a) @@ -51,13 +52,13 @@ #:on-remove (! Del) #:on-message (! Msg))) - (on (message (server-inbound id (Assert ep $new-a))) + (on (message (message-poa->server id (Assert ep $new-a))) (assertion new-a)) - (stop-when (message (server-inbound id (Clear ep))))))) + (stop-when (message (message-poa->server id (Clear ep))))))) - (on (message (server-inbound id (Message $body))) + (on (message (message-poa->server id (Message $body))) (send! (server-envelope scope body))) - (on (message (server-inbound id (Ping))) - (send! (server-outbound id (Pong)))))) + (on (message (message-poa->server id (Ping))) + (send! (message-server->poa id (Pong)))))) diff --git a/syndicate/broker/server/tcp.rkt b/syndicate/broker/server/tcp.rkt index d3fa0d8..649127d 100644 --- a/syndicate/broker/server/tcp.rkt +++ b/syndicate/broker/server/tcp.rkt @@ -1,8 +1,8 @@ #lang imperative-syndicate (provide server-facet/tcp - default-tcp-broker-port - spawn-tcp-broker!) + default-tcp-server-port + spawn-tcp-server!) (require "../wire-protocol.rkt") @@ -11,18 +11,18 @@ (define (server-facet/tcp id scope) (assert (tcp-accepted id)) - (assert (server-connection id scope)) - (define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p))))) + (assert (server-poa id scope)) + (define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p))))) (on (message (tcp-in id $bs)) (accumulate! bs)) - (on (message (server-outbound id $p)) + (on (message (message-server->poa id $p)) (send! (tcp-out id (encode p))))) -(define default-tcp-broker-port 8001) +(define default-tcp-server-port 8001) -(define (spawn-tcp-broker! [port default-tcp-broker-port]) +(define (spawn-tcp-server! [port default-tcp-server-port]) (spawn #:name 'tcp-server-listener (define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup (during/spawn (tcp-connection $id (tcp-listener port)) - #:name `(server-connection ,tcp-scope ,id) + #:name `(server-poa ,tcp-scope ,id) (server-facet/tcp id tcp-scope)))) diff --git a/syndicate/broker/server/websocket.rkt b/syndicate/broker/server/websocket.rkt index 8d2a28c..03f938a 100644 --- a/syndicate/broker/server/websocket.rkt +++ b/syndicate/broker/server/websocket.rkt @@ -1,8 +1,8 @@ #lang imperative-syndicate (provide server-facet/websocket - default-http-broker-port - spawn-websocket-broker!) + default-http-server-port + spawn-websocket-server!) (require "../wire-protocol.rkt") @@ -13,26 +13,26 @@ (define (server-facet/websocket id scope) (assert (http-accepted id)) (assert (http-response-websocket id)) - (assert (server-connection id scope)) + (assert (server-poa id scope)) (field [ping-time-deadline 0]) (on (asserted (later-than (ping-time-deadline))) (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) - (send! (server-outbound id (Ping)))) + (send! (message-server->poa id (Ping)))) (on (message (websocket-in id $body)) (define-values (packet remainder) (decode body)) (when (not (equal? remainder #"")) (error 'server-facet/websocket "Multiple packets in a single websocket message")) - (send! (server-inbound id packet))) - (on (message (server-outbound id $p)) + (send! (message-poa->server id packet))) + (on (message (message-server->poa id $p)) (send! (websocket-out id (encode p))))) -(define default-http-broker-port 8000) +(define default-http-server-port 8000) -(define (spawn-websocket-broker! [port default-http-broker-port]) +(define (spawn-websocket-server! [port default-http-server-port]) (spawn #:name 'websocket-server-listener (during/spawn (http-request $id 'get (http-resource (http-server _ port #f) `(,$scope ())) _ _ _) - #:name `(server-connection ,scope ,id) + #:name `(server-poa ,scope ,id) (server-facet/websocket id scope)))) diff --git a/syndicate/broker/tf.rkt b/syndicate/broker/tf.rkt index ef2ddb8..1e0dd59 100644 --- a/syndicate/broker/tf.rkt +++ b/syndicate/broker/tf.rkt @@ -56,8 +56,8 @@ ))) -(assertion-struct to-broker (node assertion)) -(assertion-struct from-broker (node assertion)) +(assertion-struct to-server (node assertion)) +(assertion-struct from-server (node assertion)) (define (leaf2 name node) (local-require imperative-syndicate/term) @@ -68,7 +68,7 @@ (stop-when (message (terminate name))) (field [present? #t]) - (assert #:when (present?) (to-broker node (present name))) + (assert #:when (present?) (to-server node (present name))) (on (message (change-presence name $new-presence)) (present? new-presence)) @@ -86,42 +86,42 @@ ;; get "absent" messages on clean termination, because the ;; assertion is still there even as the actor terminates! ;; - ;; (during (from-broker node (present $who)) + ;; (during (from-server node (present $who)) ;; (on-start (log-info "~a: ~a present" name who)) ;; (on-stop (log-info "~a: ~a absent" name who))) - (on (asserted (from-broker node (present $who))) (log-info "~a: ~a present" name who)) - (on (retracted (from-broker node (present $who))) (log-info "~a: ~a absent" name who)) + (on (asserted (from-server node (present $who))) (log-info "~a: ~a present" name who)) + (on (retracted (from-server node (present $who))) (log-info "~a: ~a absent" name who)) - (on (asserted (from-broker node (observe (present _)))) + (on (asserted (from-server node (observe (present _)))) (log-info "~a: someone cares about presence!" name)) - (on (message (from-broker node (says $who $what))) + (on (message (from-server node (says $who $what))) (log-info "~a: ~a says ~v" name who what)) ;;---------------------------------------- - (during (to-broker node $what) + (during (to-server node $what) ;; This takes care of the self-signalling discussed above. - (assert (from-broker node what))) + (assert (from-server node what))) (during (router-connection node name) (on (message (router-outbound name (Assert $subid (observe $spec)))) (react (let ((! (lambda (ctor) (lambda (cs) (send! (router-inbound name (ctor subid cs))))))) - (add-observer-endpoint! (lambda () (to-broker node spec)) + (add-observer-endpoint! (lambda () (to-server node spec)) #:on-add (! Add) #:on-remove (! Del) #:on-message (! Msg))) - (assert (from-broker node (observe spec))) + (assert (from-server node (observe spec))) (stop-when (message (router-outbound name (Clear subid)))))) - (during (observe ($ pat (from-broker node $spec))) + (during (observe ($ pat (from-server node $spec))) (define ep (gensym 'ep)) (on-start (send! (router-inbound name (Assert ep (observe spec))))) (on-stop (send! (router-inbound name (Clear ep)))) - (assert (from-broker node (observe spec))) ;; more self-signalling + (assert (from-server node (observe spec))) ;; more self-signalling (on (message (router-outbound name (Add ep $captures))) (react (assert (instantiate-term->value pat captures)) (stop-when (message (router-outbound name (Del ep captures)))))) diff --git a/syndicate/broker/wire-protocol.rkt b/syndicate/broker/wire-protocol.rkt index 63a9772..3382316 100644 --- a/syndicate/broker/wire-protocol.rkt +++ b/syndicate/broker/wire-protocol.rkt @@ -6,17 +6,17 @@ (require bitsyntax) (require (only-in net/rfc6455 ws-idle-timeout)) -;; Client --> Broker +;; Actions; Client --> Server (and Peer --> Peer, except for Message) (message-struct Assert (endpoint-name assertion)) (message-struct Clear (endpoint-name)) (message-struct Message (body)) -;; Broker --> Client +;; Events; Server --> Client (and Peer --> Peer) (message-struct Add (endpoint-name captures)) (message-struct Del (endpoint-name captures)) (message-struct Msg (endpoint-name captures)) -;; Bidirectional +;; Transport-related; Bidirectional (message-struct Ping ()) (message-struct Pong ()) @@ -45,5 +45,5 @@ (lambda (chunk) (buffer (bytes-append (buffer) chunk)))) -;; Received packets from broker are relayed via one of these. -(message-struct broker-packet (address packet)) +;; Received packets from server are relayed via one of these. +(message-struct server-packet (address packet)) diff --git a/syndicate/examples/broker-chat-client.rkt b/syndicate/examples/broker-chat-client.rkt index b49deb3..118ebc9 100644 --- a/syndicate/examples/broker-chat-client.rkt +++ b/syndicate/examples/broker-chat-client.rkt @@ -11,20 +11,20 @@ (field [username (symbol->string (strong-gensym 'chatter-))]) (define root-facet (current-facet)) - (define url standard-localhost-broker/tcp) - (during (broker-connected url) - (on-start (log-info "Connected to broker.")) - (on-stop (log-info "Disconnected from broker.")) + (define url standard-localhost-server/tcp) + (during (server-connected url) + (on-start (log-info "Connected to server.")) + (on-stop (log-info "Disconnected from server.")) - (on (asserted (from-broker url (Present $who))) (printf "~a arrived.\n" who)) - (on (retracted (from-broker url (Present $who))) (printf "~a departed.\n" who)) - (on (message (from-broker url (Says $who $what))) (printf "~a: ~a\n" who what)) + (on (asserted (from-server url (Present $who))) (printf "~a arrived.\n" who)) + (on (retracted (from-server url (Present $who))) (printf "~a departed.\n" who)) + (on (message (from-server url (Says $who $what))) (printf "~a: ~a\n" who what)) - (assert (to-broker url (Present (username)))) + (assert (to-server url (Present (username)))) (define stdin-evt (read-line-evt (current-input-port) 'any)) (on (message (inbound (external-event stdin-evt (list $line)))) (match line [(? eof-object?) (stop-facet root-facet)] [(pregexp #px"^/nick (.+)$" (list _ newnick)) (username newnick)] - [other (send! (to-broker url (Says (username) other)))])))) + [other (send! (to-server url (Says (username) other)))])))) diff --git a/syndicate/test/broker/nesting-confusion.rkt b/syndicate/test/broker/nesting-confusion.rkt index fe743d3..badb8ac 100644 --- a/syndicate/test/broker/nesting-confusion.rkt +++ b/syndicate/test/broker/nesting-confusion.rkt @@ -6,7 +6,7 @@ (assertion-struct researcher (name topic)) -(define test-address (broker-loopback-connection "test")) +(define test-address (server-loopback-connection "test")) (define no-mention-of-discard (lambda () @@ -37,13 +37,13 @@ (assert (server-proposal "test" (researcher "Eve" "Evil")))) (spawn #:name 'all-topics - (during (broker-connected test-address) - (during (from-broker test-address (researcher _ $topic)) + (during (server-connected test-address) + (during (from-server test-address (researcher _ $topic)) (on-start (printf "Added topic: ~a\n" topic)) (on-stop (printf "Removed topic: ~a\n" topic))))) (spawn #:name 'all-researchers - (during (broker-connected test-address) - (during (from-broker test-address (researcher $name _)) + (during (server-connected test-address) + (during (from-server test-address (researcher $name _)) (on-start (printf "Added researcher: ~a\n" name)) (on-stop (printf "Removed researcher: ~a\n" name)))))] no-crashes @@ -68,11 +68,11 @@ (assert (server-proposal "test" (claim 123))) (on-start (for [(i 100)] (flush!)) (stop-current-facet))) (spawn #:name 'monitor - (during (broker-connected test-address) - (during (from-broker test-address (claim 123)) + (during (server-connected test-address) + (during (from-server test-address (claim 123)) (on-start (printf "Specific claim asserted\n")) (on-stop (printf "Specific claim retracted\n"))) - (during (from-broker test-address (claim $detail)) + (during (from-server test-address (claim $detail)) (on-start (printf "Nonspecific claim ~v asserted\n" detail)) (on-stop (printf "Nonspecific claim ~v retracted\n" detail)))))] no-crashes @@ -84,8 +84,8 @@ [(activate imperative-syndicate/broker) (spawn #:name 'inner-monitor - (during (broker-connected test-address) - (during (from-broker test-address (claim $detail)) + (during (server-connected test-address) + (during (from-server test-address (claim $detail)) (on-start (printf "Inner saw claim asserted\n")) (on-stop (printf "Inner saw claim retracted\n"))))) (spawn #:name 'claimant diff --git a/syndicate/test/broker/observation-visibility.rkt b/syndicate/test/broker/observation-visibility.rkt index ec24446..42d658c 100644 --- a/syndicate/test/broker/observation-visibility.rkt +++ b/syndicate/test/broker/observation-visibility.rkt @@ -7,23 +7,23 @@ (assertion-struct presence (who)) -(define test-address (broker-loopback-connection "test")) +(define test-address (server-loopback-connection "test")) (test-case [(activate imperative-syndicate/broker) (spawn #:name 'producer - (during (broker-connected test-address) - (assert (to-broker test-address (presence 'producer))))) + (during (server-connected test-address) + (assert (to-server test-address (presence 'producer))))) (spawn #:name 'consumer - (during (broker-connected test-address) - (on (asserted (from-broker test-address (presence $who))) + (during (server-connected test-address) + (on (asserted (from-server test-address (presence $who))) (printf "~a joined\n" who)))) (spawn #:name 'metaconsumer - (during (broker-connected test-address) - (on (asserted (from-broker test-address (observe (presence _)))) + (during (server-connected test-address) + (on (asserted (from-server test-address (observe (presence _)))) (printf "Someone cares about presence!\n")))) ] no-crashes