The Great Renaming broker -> server, part 1

This commit is contained in:
Tony Garnock-Jones 2019-05-05 16:37:03 +01:00
parent 5d657d6a95
commit 9f03dbb6d3
14 changed files with 123 additions and 122 deletions

View File

@ -9,14 +9,14 @@
(define-logger syndicate/broker) (define-logger syndicate/broker)
(spawn #:name 'client-factory (spawn #:name 'client-factory
(during (to-broker $a _) (assert (broker-connection a))) (during (to-server $a _) (assert (server-connection a)))
(during (observe (from-broker $a _)) (assert (broker-connection a))) (during (observe (from-server $a _)) (assert (server-connection a)))
(during (observe (broker-connected $a)) (assert (broker-connection a)))) (during (observe (server-connected $a)) (assert (server-connection a))))
(define (generic-client-session-facet address w) (define (generic-client-session-facet address w)
(on-start (log-syndicate/broker-info "Connected to ~v" address)) (on-start (log-syndicate/broker-info "Connected to ~v" address))
(on-stop (log-syndicate/broker-info "Disconnected from ~v" address)) (on-stop (log-syndicate/broker-info "Disconnected from ~v" address))
(assert (broker-connected address)) (assert (server-connected address))
(define next-ep (define next-ep
(let ((counter 0)) (let ((counter 0))
@ -24,23 +24,23 @@
(begin0 counter (begin0 counter
(set! counter (+ counter 1)))))) (set! counter (+ counter 1))))))
(during (to-broker address $a) (during (to-server address $a)
(define ep (next-ep)) (define ep (next-ep))
(on-start (w (Assert ep a))) (on-start (w (Assert ep a)))
(on-stop (w (Clear ep)))) (on-stop (w (Clear ep))))
(on (message (to-broker address $a)) (on (message (to-server address $a))
(w (Message a))) (w (Message a)))
(on (message (broker-packet address (Ping))) (on (message (server-packet address (Ping)))
(w (Pong))) (w (Pong)))
(during (observe ($ pat (from-broker address $spec))) (during (observe ($ pat (from-server address $spec)))
(define ep (next-ep)) (define ep (next-ep))
(on-start (w (Assert ep (observe spec)))) (on-start (w (Assert ep (observe spec))))
(on-stop (w (Clear ep))) (on-stop (w (Clear ep)))
(on (message (broker-packet address (Add ep $vs))) (on (message (server-packet address (Add ep $vs)))
(react (assert (instantiate-term->value pat vs)) (react (assert (instantiate-term->value pat vs))
(stop-when (message (broker-packet address (Del ep vs)))))) (stop-when (message (server-packet address (Del ep vs))))))
(on (message (broker-packet address (Msg ep $vs))) (on (message (server-packet address (Msg ep $vs)))
(send! (instantiate-term->value pat vs))))) (send! (instantiate-term->value pat vs)))))

View File

@ -1,6 +1,6 @@
#lang imperative-syndicate #lang imperative-syndicate
(provide (struct-out broker-loopback-connection)) (provide (struct-out server-loopback-connection))
(require "../client.rkt") (require "../client.rkt")
(require "../wire-protocol.rkt") (require "../wire-protocol.rkt")
@ -8,11 +8,11 @@
(require/activate imperative-syndicate/broker/server) (require/activate imperative-syndicate/broker/server)
(assertion-struct broker-loopback-connection (scope)) (assertion-struct server-loopback-connection (scope))
(spawn #:name 'loopback-client-factory (spawn #:name 'loopback-client-factory
(during/spawn (broker-connection ($ address (broker-loopback-connection $scope))) (during/spawn (server-connection ($ address (server-loopback-connection $scope)))
#:name address #:name address
(assert (server-connection address scope)) (assert (server-poa address scope))
(on (message (server-outbound address $p)) (send! (broker-packet address p))) (on (message (message-server->poa address $p)) (send! (server-packet address p)))
(generic-client-session-facet address (lambda (x) (send! (server-inbound address x)))))) (generic-client-session-facet address (lambda (x) (send! (message-poa->server address x))))))

View File

@ -1,7 +1,7 @@
#lang imperative-syndicate #lang imperative-syndicate
(provide standard-localhost-broker/tcp (provide standard-localhost-server/tcp
(struct-out broker-tcp-connection)) (struct-out server-tcp-connection))
(require "../client.rkt") (require "../client.rkt")
(require "../wire-protocol.rkt") (require "../wire-protocol.rkt")
@ -10,18 +10,18 @@
(require/activate imperative-syndicate/drivers/tcp) (require/activate imperative-syndicate/drivers/tcp)
(assertion-struct broker-tcp-connection (host port)) (assertion-struct server-tcp-connection (host port))
(define standard-localhost-broker/tcp (broker-tcp-connection "localhost" 8001)) (define standard-localhost-server/tcp (server-tcp-connection "localhost" 8001))
(spawn #:name 'tcp-client-factory (spawn #:name 'tcp-client-factory
(during/spawn (broker-connection ($ address (broker-tcp-connection $host $port))) (during/spawn (server-connection ($ address (server-tcp-connection $host $port)))
#:name address #:name address
(define id (list (gensym 'client) host port)) (define id (list (gensym 'client) host port))
(reassert-on (tcp-connection id (tcp-address host port)) (reassert-on (tcp-connection id (tcp-address host port))
(retracted (tcp-accepted id)) (retracted (tcp-accepted id))
(asserted (tcp-rejected id _))) (asserted (tcp-rejected id _)))
(during (tcp-accepted id) (during (tcp-accepted id)
(define accumulate! (packet-accumulator (lambda (p) (send! (broker-packet address p))))) (define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p)))))
(on (message (tcp-in id $bs)) (accumulate! bs)) (on (message (tcp-in id $bs)) (accumulate! bs))
(generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x)))))))) (generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x))))))))

View File

@ -28,7 +28,7 @@
;; map to multiple subscription IDs - this is the place where ;; map to multiple subscription IDs - this is the place where
;; aggregation pops up. ;; aggregation pops up.
;; Unlike the broker protocol, both Actions and Events are ;; Unlike the server protocol, both Actions and Events are
;; BIDIRECTIONAL, travelling in both directions along edges linking ;; BIDIRECTIONAL, travelling in both directions along edges linking
;; peer nodes. ;; peer nodes.

View File

@ -20,33 +20,33 @@
(module+ main (module+ main
(require racket/cmdline) (require racket/cmdline)
(define tcp-port default-tcp-broker-port) (define tcp-port default-tcp-server-port)
(define http-port default-http-broker-port) (define http-port default-http-server-port)
(command-line #:once-any (command-line #:once-any
["--tcp" port ["--tcp" port
((format "Listen on plain TCP port (default ~a)" default-tcp-broker-port)) ((format "Listen on plain TCP port (default ~a)" default-tcp-server-port))
(set! tcp-port (string->number port))] (set! tcp-port (string->number port))]
["--no-tcp" "Do not listen on any plain TCP port" ["--no-tcp" "Do not listen on any plain TCP port"
(set! tcp-port #f)] (set! tcp-port #f)]
#:once-any #:once-any
["--http" port ["--http" port
((format "Listen on websocket HTTP port (default ~a)" default-http-broker-port)) ((format "Listen on websocket HTTP port (default ~a)" default-http-server-port))
(set! http-port (string->number port))] (set! http-port (string->number port))]
["--no-http" "Do not listen on any websocket HTTP port" ["--no-http" "Do not listen on any websocket HTTP port"
(set! http-port #f)]) (set! http-port #f)])
(extend-ground-boot! (lambda () (extend-ground-boot! (lambda ()
(when tcp-port (spawn-tcp-broker! tcp-port)) (when tcp-port (spawn-tcp-server! tcp-port))
(when http-port (spawn-websocket-broker! http-port))))) (when http-port (spawn-websocket-server! http-port)))))
(define-logger syndicate/broker) (define-logger syndicate/broker)
(when (log-level? syndicate/broker-logger 'debug) (when (log-level? syndicate/broker-logger 'debug)
(spawn #:name 'server-debug (spawn #:name 'server-debug
(on (asserted (server-connection $id $scope)) (on (asserted (server-poa $id $scope))
(log-syndicate/broker-debug "C+ ~v ~v" id scope)) (log-syndicate/broker-debug "+ ~v ~v" id scope))
(on (retracted (server-connection $id $scope)) (on (retracted (server-poa $id $scope))
(log-syndicate/broker-debug "C- ~v ~v" id scope)) (log-syndicate/broker-debug "- ~v ~v" id scope))
(on (message (server-inbound $id $p)) (on (message (message-poa->server $id $p))
(log-syndicate/broker-debug "CIN ~v ~v" id p)) (log-syndicate/broker-debug "IN ~v ~v" id p))
(on (message (server-outbound $id $p)) (on (message (message-server->poa $id $p))
(log-syndicate/broker-debug "COUT ~v ~v" id p)))) (log-syndicate/broker-debug "OUT ~v ~v" id p))))

View File

@ -3,8 +3,8 @@
(provide (all-defined-out)) (provide (all-defined-out))
;; Client protocol ;; Client protocol
(assertion-struct to-broker (address assertion)) (assertion-struct to-server (address assertion))
(assertion-struct from-broker (address assertion)) (assertion-struct from-server (address assertion))
(assertion-struct broker-connection (address)) (assertion-struct server-connection (address))
(assertion-struct broker-connected (address)) (assertion-struct server-connected (address))
(message-struct force-broker-disconnect (address)) (message-struct force-server-disconnect (address))

View File

@ -1,8 +1,8 @@
#lang imperative-syndicate #lang imperative-syndicate
(provide (struct-out server-connection) (provide (struct-out server-poa)
(struct-out server-inbound) (struct-out message-poa->server)
(struct-out server-outbound) (struct-out message-server->poa)
(struct-out server-proposal) (struct-out server-proposal)
(struct-out server-envelope)) (struct-out server-envelope))
@ -10,17 +10,18 @@
(require racket/set) (require racket/set)
;; Internal connection protocol ;; Internal connection protocol
(assertion-struct server-connection (connection-id scope)) (assertion-struct server-poa (connection-id scope)) ;; "Point of Attachment"
(assertion-struct server-inbound (connection-id body)) (assertion-struct message-poa->server (connection-id body))
(assertion-struct server-outbound (connection-id body)) (assertion-struct message-server->poa (connection-id body))
;; Internal isolation ;; Internal isolation -- these are isomorphic to `to-server` and `from-server`!
;; (and, for that matter, to `outbound` and `inbound`!)
(assertion-struct server-proposal (scope body)) ;; suggestions (~ actions) (assertion-struct server-proposal (scope body)) ;; suggestions (~ actions)
(assertion-struct server-envelope (scope body)) ;; decisions (~ events) (assertion-struct server-envelope (scope body)) ;; decisions (~ events)
(spawn #:name 'server-connection-factory (spawn #:name 'server-factory
(during/spawn (server-connection _ _) (during/spawn (server-poa _ _)
;; Previously, we just had server-envelope. Now, we have both ;; Previously, we just had server-envelope. Now, we have both
;; server-envelope and server-proposal. While not everything ;; server-envelope and server-proposal. While not everything
;; decided is (locally) suggested, it is true that everything ;; decided is (locally) suggested, it is true that everything
@ -29,10 +30,10 @@
(during (server-proposal $scope $assertion) (during (server-proposal $scope $assertion)
(assert (server-envelope scope assertion)))) (assert (server-envelope scope assertion))))
(during/spawn (server-connection $id $scope) (during/spawn (server-poa $id $scope)
(define endpoints (set)) (define endpoints (set))
(on (message (server-inbound id (Assert $ep $a))) (on (message (message-poa->server id (Assert $ep $a)))
(when (not (set-member? endpoints ep)) (when (not (set-member? endpoints ep))
(set! endpoints (set-add endpoints ep)) (set! endpoints (set-add endpoints ep))
(react (react
@ -42,7 +43,7 @@
(assert (server-proposal scope (assertion))) (assert (server-proposal scope (assertion)))
(let ((! (lambda (ctor) (lambda (cs) (send! (server-outbound id (ctor ep cs))))))) (let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs)))))))
(add-observer-endpoint! (lambda () (add-observer-endpoint! (lambda ()
(let ((a (assertion))) (let ((a (assertion)))
(when (observe? a) (when (observe? a)
@ -51,13 +52,13 @@
#:on-remove (! Del) #:on-remove (! Del)
#:on-message (! Msg))) #:on-message (! Msg)))
(on (message (server-inbound id (Assert ep $new-a))) (on (message (message-poa->server id (Assert ep $new-a)))
(assertion new-a)) (assertion new-a))
(stop-when (message (server-inbound id (Clear ep))))))) (stop-when (message (message-poa->server id (Clear ep)))))))
(on (message (server-inbound id (Message $body))) (on (message (message-poa->server id (Message $body)))
(send! (server-envelope scope body))) (send! (server-envelope scope body)))
(on (message (server-inbound id (Ping))) (on (message (message-poa->server id (Ping)))
(send! (server-outbound id (Pong)))))) (send! (message-server->poa id (Pong))))))

View File

@ -1,8 +1,8 @@
#lang imperative-syndicate #lang imperative-syndicate
(provide server-facet/tcp (provide server-facet/tcp
default-tcp-broker-port default-tcp-server-port
spawn-tcp-broker!) spawn-tcp-server!)
(require "../wire-protocol.rkt") (require "../wire-protocol.rkt")
@ -11,18 +11,18 @@
(define (server-facet/tcp id scope) (define (server-facet/tcp id scope)
(assert (tcp-accepted id)) (assert (tcp-accepted id))
(assert (server-connection id scope)) (assert (server-poa id scope))
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p))))) (define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p)))))
(on (message (tcp-in id $bs)) (on (message (tcp-in id $bs))
(accumulate! bs)) (accumulate! bs))
(on (message (server-outbound id $p)) (on (message (message-server->poa id $p))
(send! (tcp-out id (encode p))))) (send! (tcp-out id (encode p)))))
(define default-tcp-broker-port 8001) (define default-tcp-server-port 8001)
(define (spawn-tcp-broker! [port default-tcp-broker-port]) (define (spawn-tcp-server! [port default-tcp-server-port])
(spawn #:name 'tcp-server-listener (spawn #:name 'tcp-server-listener
(define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup (define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup
(during/spawn (tcp-connection $id (tcp-listener port)) (during/spawn (tcp-connection $id (tcp-listener port))
#:name `(server-connection ,tcp-scope ,id) #:name `(server-poa ,tcp-scope ,id)
(server-facet/tcp id tcp-scope)))) (server-facet/tcp id tcp-scope))))

View File

@ -1,8 +1,8 @@
#lang imperative-syndicate #lang imperative-syndicate
(provide server-facet/websocket (provide server-facet/websocket
default-http-broker-port default-http-server-port
spawn-websocket-broker!) spawn-websocket-server!)
(require "../wire-protocol.rkt") (require "../wire-protocol.rkt")
@ -13,26 +13,26 @@
(define (server-facet/websocket id scope) (define (server-facet/websocket id scope)
(assert (http-accepted id)) (assert (http-accepted id))
(assert (http-response-websocket id)) (assert (http-response-websocket id))
(assert (server-connection id scope)) (assert (server-poa id scope))
(field [ping-time-deadline 0]) (field [ping-time-deadline 0])
(on (asserted (later-than (ping-time-deadline))) (on (asserted (later-than (ping-time-deadline)))
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
(send! (server-outbound id (Ping)))) (send! (message-server->poa id (Ping))))
(on (message (websocket-in id $body)) (on (message (websocket-in id $body))
(define-values (packet remainder) (decode body)) (define-values (packet remainder) (decode body))
(when (not (equal? remainder #"")) (when (not (equal? remainder #""))
(error 'server-facet/websocket "Multiple packets in a single websocket message")) (error 'server-facet/websocket "Multiple packets in a single websocket message"))
(send! (server-inbound id packet))) (send! (message-poa->server id packet)))
(on (message (server-outbound id $p)) (on (message (message-server->poa id $p))
(send! (websocket-out id (encode p))))) (send! (websocket-out id (encode p)))))
(define default-http-broker-port 8000) (define default-http-server-port 8000)
(define (spawn-websocket-broker! [port default-http-broker-port]) (define (spawn-websocket-server! [port default-http-server-port])
(spawn #:name 'websocket-server-listener (spawn #:name 'websocket-server-listener
(during/spawn (http-request $id 'get (http-resource (http-server _ port #f) (during/spawn (http-request $id 'get (http-resource (http-server _ port #f)
`(,$scope ())) _ _ _) `(,$scope ())) _ _ _)
#:name `(server-connection ,scope ,id) #:name `(server-poa ,scope ,id)
(server-facet/websocket id scope)))) (server-facet/websocket id scope))))

View File

@ -56,8 +56,8 @@
))) )))
(assertion-struct to-broker (node assertion)) (assertion-struct to-server (node assertion))
(assertion-struct from-broker (node assertion)) (assertion-struct from-server (node assertion))
(define (leaf2 name node) (define (leaf2 name node)
(local-require imperative-syndicate/term) (local-require imperative-syndicate/term)
@ -68,7 +68,7 @@
(stop-when (message (terminate name))) (stop-when (message (terminate name)))
(field [present? #t]) (field [present? #t])
(assert #:when (present?) (to-broker node (present name))) (assert #:when (present?) (to-server node (present name)))
(on (message (change-presence name $new-presence)) (on (message (change-presence name $new-presence))
(present? new-presence)) (present? new-presence))
@ -86,42 +86,42 @@
;; get "absent" messages on clean termination, because the ;; get "absent" messages on clean termination, because the
;; assertion is still there even as the actor terminates! ;; assertion is still there even as the actor terminates!
;; ;;
;; (during (from-broker node (present $who)) ;; (during (from-server node (present $who))
;; (on-start (log-info "~a: ~a present" name who)) ;; (on-start (log-info "~a: ~a present" name who))
;; (on-stop (log-info "~a: ~a absent" name who))) ;; (on-stop (log-info "~a: ~a absent" name who)))
(on (asserted (from-broker node (present $who))) (log-info "~a: ~a present" name who)) (on (asserted (from-server node (present $who))) (log-info "~a: ~a present" name who))
(on (retracted (from-broker node (present $who))) (log-info "~a: ~a absent" name who)) (on (retracted (from-server node (present $who))) (log-info "~a: ~a absent" name who))
(on (asserted (from-broker node (observe (present _)))) (on (asserted (from-server node (observe (present _))))
(log-info "~a: someone cares about presence!" name)) (log-info "~a: someone cares about presence!" name))
(on (message (from-broker node (says $who $what))) (on (message (from-server node (says $who $what)))
(log-info "~a: ~a says ~v" name who what)) (log-info "~a: ~a says ~v" name who what))
;;---------------------------------------- ;;----------------------------------------
(during (to-broker node $what) (during (to-server node $what)
;; This takes care of the self-signalling discussed above. ;; This takes care of the self-signalling discussed above.
(assert (from-broker node what))) (assert (from-server node what)))
(during (router-connection node name) (during (router-connection node name)
(on (message (router-outbound name (Assert $subid (observe $spec)))) (on (message (router-outbound name (Assert $subid (observe $spec))))
(react (react
(let ((! (lambda (ctor) (let ((! (lambda (ctor)
(lambda (cs) (send! (router-inbound name (ctor subid cs))))))) (lambda (cs) (send! (router-inbound name (ctor subid cs)))))))
(add-observer-endpoint! (lambda () (to-broker node spec)) (add-observer-endpoint! (lambda () (to-server node spec))
#:on-add (! Add) #:on-add (! Add)
#:on-remove (! Del) #:on-remove (! Del)
#:on-message (! Msg))) #:on-message (! Msg)))
(assert (from-broker node (observe spec))) (assert (from-server node (observe spec)))
(stop-when (message (router-outbound name (Clear subid)))))) (stop-when (message (router-outbound name (Clear subid))))))
(during (observe ($ pat (from-broker node $spec))) (during (observe ($ pat (from-server node $spec)))
(define ep (gensym 'ep)) (define ep (gensym 'ep))
(on-start (send! (router-inbound name (Assert ep (observe spec))))) (on-start (send! (router-inbound name (Assert ep (observe spec)))))
(on-stop (send! (router-inbound name (Clear ep)))) (on-stop (send! (router-inbound name (Clear ep))))
(assert (from-broker node (observe spec))) ;; more self-signalling (assert (from-server node (observe spec))) ;; more self-signalling
(on (message (router-outbound name (Add ep $captures))) (on (message (router-outbound name (Add ep $captures)))
(react (assert (instantiate-term->value pat captures)) (react (assert (instantiate-term->value pat captures))
(stop-when (message (router-outbound name (Del ep captures)))))) (stop-when (message (router-outbound name (Del ep captures))))))

View File

@ -6,17 +6,17 @@
(require bitsyntax) (require bitsyntax)
(require (only-in net/rfc6455 ws-idle-timeout)) (require (only-in net/rfc6455 ws-idle-timeout))
;; Client --> Broker ;; Actions; Client --> Server (and Peer --> Peer, except for Message)
(message-struct Assert (endpoint-name assertion)) (message-struct Assert (endpoint-name assertion))
(message-struct Clear (endpoint-name)) (message-struct Clear (endpoint-name))
(message-struct Message (body)) (message-struct Message (body))
;; Broker --> Client ;; Events; Server --> Client (and Peer --> Peer)
(message-struct Add (endpoint-name captures)) (message-struct Add (endpoint-name captures))
(message-struct Del (endpoint-name captures)) (message-struct Del (endpoint-name captures))
(message-struct Msg (endpoint-name captures)) (message-struct Msg (endpoint-name captures))
;; Bidirectional ;; Transport-related; Bidirectional
(message-struct Ping ()) (message-struct Ping ())
(message-struct Pong ()) (message-struct Pong ())
@ -45,5 +45,5 @@
(lambda (chunk) (lambda (chunk)
(buffer (bytes-append (buffer) chunk)))) (buffer (bytes-append (buffer) chunk))))
;; Received packets from broker are relayed via one of these. ;; Received packets from server are relayed via one of these.
(message-struct broker-packet (address packet)) (message-struct server-packet (address packet))

View File

@ -11,20 +11,20 @@
(field [username (symbol->string (strong-gensym 'chatter-))]) (field [username (symbol->string (strong-gensym 'chatter-))])
(define root-facet (current-facet)) (define root-facet (current-facet))
(define url standard-localhost-broker/tcp) (define url standard-localhost-server/tcp)
(during (broker-connected url) (during (server-connected url)
(on-start (log-info "Connected to broker.")) (on-start (log-info "Connected to server."))
(on-stop (log-info "Disconnected from broker.")) (on-stop (log-info "Disconnected from server."))
(on (asserted (from-broker url (Present $who))) (printf "~a arrived.\n" who)) (on (asserted (from-server url (Present $who))) (printf "~a arrived.\n" who))
(on (retracted (from-broker url (Present $who))) (printf "~a departed.\n" who)) (on (retracted (from-server url (Present $who))) (printf "~a departed.\n" who))
(on (message (from-broker url (Says $who $what))) (printf "~a: ~a\n" who what)) (on (message (from-server url (Says $who $what))) (printf "~a: ~a\n" who what))
(assert (to-broker url (Present (username)))) (assert (to-server url (Present (username))))
(define stdin-evt (read-line-evt (current-input-port) 'any)) (define stdin-evt (read-line-evt (current-input-port) 'any))
(on (message (inbound (external-event stdin-evt (list $line)))) (on (message (inbound (external-event stdin-evt (list $line))))
(match line (match line
[(? eof-object?) (stop-facet root-facet)] [(? eof-object?) (stop-facet root-facet)]
[(pregexp #px"^/nick (.+)$" (list _ newnick)) (username newnick)] [(pregexp #px"^/nick (.+)$" (list _ newnick)) (username newnick)]
[other (send! (to-broker url (Says (username) other)))])))) [other (send! (to-server url (Says (username) other)))]))))

View File

@ -6,7 +6,7 @@
(assertion-struct researcher (name topic)) (assertion-struct researcher (name topic))
(define test-address (broker-loopback-connection "test")) (define test-address (server-loopback-connection "test"))
(define no-mention-of-discard (define no-mention-of-discard
(lambda () (lambda ()
@ -37,13 +37,13 @@
(assert (server-proposal "test" (researcher "Eve" "Evil")))) (assert (server-proposal "test" (researcher "Eve" "Evil"))))
(spawn #:name 'all-topics (spawn #:name 'all-topics
(during (broker-connected test-address) (during (server-connected test-address)
(during (from-broker test-address (researcher _ $topic)) (during (from-server test-address (researcher _ $topic))
(on-start (printf "Added topic: ~a\n" topic)) (on-start (printf "Added topic: ~a\n" topic))
(on-stop (printf "Removed topic: ~a\n" topic))))) (on-stop (printf "Removed topic: ~a\n" topic)))))
(spawn #:name 'all-researchers (spawn #:name 'all-researchers
(during (broker-connected test-address) (during (server-connected test-address)
(during (from-broker test-address (researcher $name _)) (during (from-server test-address (researcher $name _))
(on-start (printf "Added researcher: ~a\n" name)) (on-start (printf "Added researcher: ~a\n" name))
(on-stop (printf "Removed researcher: ~a\n" name)))))] (on-stop (printf "Removed researcher: ~a\n" name)))))]
no-crashes no-crashes
@ -68,11 +68,11 @@
(assert (server-proposal "test" (claim 123))) (assert (server-proposal "test" (claim 123)))
(on-start (for [(i 100)] (flush!)) (stop-current-facet))) (on-start (for [(i 100)] (flush!)) (stop-current-facet)))
(spawn #:name 'monitor (spawn #:name 'monitor
(during (broker-connected test-address) (during (server-connected test-address)
(during (from-broker test-address (claim 123)) (during (from-server test-address (claim 123))
(on-start (printf "Specific claim asserted\n")) (on-start (printf "Specific claim asserted\n"))
(on-stop (printf "Specific claim retracted\n"))) (on-stop (printf "Specific claim retracted\n")))
(during (from-broker test-address (claim $detail)) (during (from-server test-address (claim $detail))
(on-start (printf "Nonspecific claim ~v asserted\n" detail)) (on-start (printf "Nonspecific claim ~v asserted\n" detail))
(on-stop (printf "Nonspecific claim ~v retracted\n" detail)))))] (on-stop (printf "Nonspecific claim ~v retracted\n" detail)))))]
no-crashes no-crashes
@ -84,8 +84,8 @@
[(activate imperative-syndicate/broker) [(activate imperative-syndicate/broker)
(spawn #:name 'inner-monitor (spawn #:name 'inner-monitor
(during (broker-connected test-address) (during (server-connected test-address)
(during (from-broker test-address (claim $detail)) (during (from-server test-address (claim $detail))
(on-start (printf "Inner saw claim asserted\n")) (on-start (printf "Inner saw claim asserted\n"))
(on-stop (printf "Inner saw claim retracted\n"))))) (on-stop (printf "Inner saw claim retracted\n")))))
(spawn #:name 'claimant (spawn #:name 'claimant

View File

@ -7,23 +7,23 @@
(assertion-struct presence (who)) (assertion-struct presence (who))
(define test-address (broker-loopback-connection "test")) (define test-address (server-loopback-connection "test"))
(test-case (test-case
[(activate imperative-syndicate/broker) [(activate imperative-syndicate/broker)
(spawn #:name 'producer (spawn #:name 'producer
(during (broker-connected test-address) (during (server-connected test-address)
(assert (to-broker test-address (presence 'producer))))) (assert (to-server test-address (presence 'producer)))))
(spawn #:name 'consumer (spawn #:name 'consumer
(during (broker-connected test-address) (during (server-connected test-address)
(on (asserted (from-broker test-address (presence $who))) (on (asserted (from-server test-address (presence $who)))
(printf "~a joined\n" who)))) (printf "~a joined\n" who))))
(spawn #:name 'metaconsumer (spawn #:name 'metaconsumer
(during (broker-connected test-address) (during (server-connected test-address)
(on (asserted (from-broker test-address (observe (presence _)))) (on (asserted (from-server test-address (observe (presence _))))
(printf "Someone cares about presence!\n")))) (printf "Someone cares about presence!\n"))))
] ]
no-crashes no-crashes