The Great Renaming broker -> server, part 1

This commit is contained in:
Tony Garnock-Jones 2019-05-05 16:37:03 +01:00
parent a08574a403
commit f2b29a798a
14 changed files with 123 additions and 122 deletions

View File

@ -9,14 +9,14 @@
(define-logger syndicate/broker)
(spawn #:name 'client-factory
(during (to-broker $a _) (assert (broker-connection a)))
(during (observe (from-broker $a _)) (assert (broker-connection a)))
(during (observe (broker-connected $a)) (assert (broker-connection a))))
(during (to-server $a _) (assert (server-connection a)))
(during (observe (from-server $a _)) (assert (server-connection a)))
(during (observe (server-connected $a)) (assert (server-connection a))))
(define (generic-client-session-facet address w)
(on-start (log-syndicate/broker-info "Connected to ~v" address))
(on-stop (log-syndicate/broker-info "Disconnected from ~v" address))
(assert (broker-connected address))
(assert (server-connected address))
(define next-ep
(let ((counter 0))
@ -24,23 +24,23 @@
(begin0 counter
(set! counter (+ counter 1))))))
(during (to-broker address $a)
(during (to-server address $a)
(define ep (next-ep))
(on-start (w (Assert ep a)))
(on-stop (w (Clear ep))))
(on (message (to-broker address $a))
(on (message (to-server address $a))
(w (Message a)))
(on (message (broker-packet address (Ping)))
(on (message (server-packet address (Ping)))
(w (Pong)))
(during (observe ($ pat (from-broker address $spec)))
(during (observe ($ pat (from-server address $spec)))
(define ep (next-ep))
(on-start (w (Assert ep (observe spec))))
(on-stop (w (Clear ep)))
(on (message (broker-packet address (Add ep $vs)))
(on (message (server-packet address (Add ep $vs)))
(react (assert (instantiate-term->value pat vs))
(stop-when (message (broker-packet address (Del ep vs))))))
(on (message (broker-packet address (Msg ep $vs)))
(stop-when (message (server-packet address (Del ep vs))))))
(on (message (server-packet address (Msg ep $vs)))
(send! (instantiate-term->value pat vs)))))

View File

@ -1,6 +1,6 @@
#lang imperative-syndicate
(provide (struct-out broker-loopback-connection))
(provide (struct-out server-loopback-connection))
(require "../client.rkt")
(require "../wire-protocol.rkt")
@ -8,11 +8,11 @@
(require/activate imperative-syndicate/broker/server)
(assertion-struct broker-loopback-connection (scope))
(assertion-struct server-loopback-connection (scope))
(spawn #:name 'loopback-client-factory
(during/spawn (broker-connection ($ address (broker-loopback-connection $scope)))
(during/spawn (server-connection ($ address (server-loopback-connection $scope)))
#:name address
(assert (server-connection address scope))
(on (message (server-outbound address $p)) (send! (broker-packet address p)))
(generic-client-session-facet address (lambda (x) (send! (server-inbound address x))))))
(assert (server-poa address scope))
(on (message (message-server->poa address $p)) (send! (server-packet address p)))
(generic-client-session-facet address (lambda (x) (send! (message-poa->server address x))))))

View File

@ -1,7 +1,7 @@
#lang imperative-syndicate
(provide standard-localhost-broker/tcp
(struct-out broker-tcp-connection))
(provide standard-localhost-server/tcp
(struct-out server-tcp-connection))
(require "../client.rkt")
(require "../wire-protocol.rkt")
@ -10,18 +10,18 @@
(require/activate imperative-syndicate/drivers/tcp)
(assertion-struct broker-tcp-connection (host port))
(assertion-struct server-tcp-connection (host port))
(define standard-localhost-broker/tcp (broker-tcp-connection "localhost" 8001))
(define standard-localhost-server/tcp (server-tcp-connection "localhost" 8001))
(spawn #:name 'tcp-client-factory
(during/spawn (broker-connection ($ address (broker-tcp-connection $host $port)))
(during/spawn (server-connection ($ address (server-tcp-connection $host $port)))
#:name address
(define id (list (gensym 'client) host port))
(reassert-on (tcp-connection id (tcp-address host port))
(retracted (tcp-accepted id))
(asserted (tcp-rejected id _)))
(during (tcp-accepted id)
(define accumulate! (packet-accumulator (lambda (p) (send! (broker-packet address p)))))
(define accumulate! (packet-accumulator (lambda (p) (send! (server-packet address p)))))
(on (message (tcp-in id $bs)) (accumulate! bs))
(generic-client-session-facet address (lambda (x) (send! (tcp-out id (encode x))))))))

View File

@ -28,7 +28,7 @@
;; map to multiple subscription IDs - this is the place where
;; aggregation pops up.
;; Unlike the broker protocol, both Actions and Events are
;; Unlike the server protocol, both Actions and Events are
;; BIDIRECTIONAL, travelling in both directions along edges linking
;; peer nodes.

View File

@ -20,33 +20,33 @@
(module+ main
(require racket/cmdline)
(define tcp-port default-tcp-broker-port)
(define http-port default-http-broker-port)
(define tcp-port default-tcp-server-port)
(define http-port default-http-server-port)
(command-line #:once-any
["--tcp" port
((format "Listen on plain TCP port (default ~a)" default-tcp-broker-port))
((format "Listen on plain TCP port (default ~a)" default-tcp-server-port))
(set! tcp-port (string->number port))]
["--no-tcp" "Do not listen on any plain TCP port"
(set! tcp-port #f)]
#:once-any
["--http" port
((format "Listen on websocket HTTP port (default ~a)" default-http-broker-port))
((format "Listen on websocket HTTP port (default ~a)" default-http-server-port))
(set! http-port (string->number port))]
["--no-http" "Do not listen on any websocket HTTP port"
(set! http-port #f)])
(extend-ground-boot! (lambda ()
(when tcp-port (spawn-tcp-broker! tcp-port))
(when http-port (spawn-websocket-broker! http-port)))))
(when tcp-port (spawn-tcp-server! tcp-port))
(when http-port (spawn-websocket-server! http-port)))))
(define-logger syndicate/broker)
(when (log-level? syndicate/broker-logger 'debug)
(spawn #:name 'server-debug
(on (asserted (server-connection $id $scope))
(log-syndicate/broker-debug "C+ ~v ~v" id scope))
(on (retracted (server-connection $id $scope))
(log-syndicate/broker-debug "C- ~v ~v" id scope))
(on (message (server-inbound $id $p))
(log-syndicate/broker-debug "CIN ~v ~v" id p))
(on (message (server-outbound $id $p))
(log-syndicate/broker-debug "COUT ~v ~v" id p))))
(on (asserted (server-poa $id $scope))
(log-syndicate/broker-debug "+ ~v ~v" id scope))
(on (retracted (server-poa $id $scope))
(log-syndicate/broker-debug "- ~v ~v" id scope))
(on (message (message-poa->server $id $p))
(log-syndicate/broker-debug "IN ~v ~v" id p))
(on (message (message-server->poa $id $p))
(log-syndicate/broker-debug "OUT ~v ~v" id p))))

View File

@ -3,8 +3,8 @@
(provide (all-defined-out))
;; Client protocol
(assertion-struct to-broker (address assertion))
(assertion-struct from-broker (address assertion))
(assertion-struct broker-connection (address))
(assertion-struct broker-connected (address))
(message-struct force-broker-disconnect (address))
(assertion-struct to-server (address assertion))
(assertion-struct from-server (address assertion))
(assertion-struct server-connection (address))
(assertion-struct server-connected (address))
(message-struct force-server-disconnect (address))

View File

@ -1,8 +1,8 @@
#lang imperative-syndicate
(provide (struct-out server-connection)
(struct-out server-inbound)
(struct-out server-outbound)
(provide (struct-out server-poa)
(struct-out message-poa->server)
(struct-out message-server->poa)
(struct-out server-proposal)
(struct-out server-envelope))
@ -10,17 +10,18 @@
(require racket/set)
;; Internal connection protocol
(assertion-struct server-connection (connection-id scope))
(assertion-struct server-inbound (connection-id body))
(assertion-struct server-outbound (connection-id body))
(assertion-struct server-poa (connection-id scope)) ;; "Point of Attachment"
(assertion-struct message-poa->server (connection-id body))
(assertion-struct message-server->poa (connection-id body))
;; Internal isolation
;; Internal isolation -- these are isomorphic to `to-server` and `from-server`!
;; (and, for that matter, to `outbound` and `inbound`!)
(assertion-struct server-proposal (scope body)) ;; suggestions (~ actions)
(assertion-struct server-envelope (scope body)) ;; decisions (~ events)
(spawn #:name 'server-connection-factory
(spawn #:name 'server-factory
(during/spawn (server-connection _ _)
(during/spawn (server-poa _ _)
;; Previously, we just had server-envelope. Now, we have both
;; server-envelope and server-proposal. While not everything
;; decided is (locally) suggested, it is true that everything
@ -29,10 +30,10 @@
(during (server-proposal $scope $assertion)
(assert (server-envelope scope assertion))))
(during/spawn (server-connection $id $scope)
(during/spawn (server-poa $id $scope)
(define endpoints (set))
(on (message (server-inbound id (Assert $ep $a)))
(on (message (message-poa->server id (Assert $ep $a)))
(when (not (set-member? endpoints ep))
(set! endpoints (set-add endpoints ep))
(react
@ -42,7 +43,7 @@
(assert (server-proposal scope (assertion)))
(let ((! (lambda (ctor) (lambda (cs) (send! (server-outbound id (ctor ep cs)))))))
(let ((! (lambda (ctor) (lambda (cs) (send! (message-server->poa id (ctor ep cs)))))))
(add-observer-endpoint! (lambda ()
(let ((a (assertion)))
(when (observe? a)
@ -51,13 +52,13 @@
#:on-remove (! Del)
#:on-message (! Msg)))
(on (message (server-inbound id (Assert ep $new-a)))
(on (message (message-poa->server id (Assert ep $new-a)))
(assertion new-a))
(stop-when (message (server-inbound id (Clear ep)))))))
(stop-when (message (message-poa->server id (Clear ep)))))))
(on (message (server-inbound id (Message $body)))
(on (message (message-poa->server id (Message $body)))
(send! (server-envelope scope body)))
(on (message (server-inbound id (Ping)))
(send! (server-outbound id (Pong))))))
(on (message (message-poa->server id (Ping)))
(send! (message-server->poa id (Pong))))))

View File

@ -1,8 +1,8 @@
#lang imperative-syndicate
(provide server-facet/tcp
default-tcp-broker-port
spawn-tcp-broker!)
default-tcp-server-port
spawn-tcp-server!)
(require "../wire-protocol.rkt")
@ -11,18 +11,18 @@
(define (server-facet/tcp id scope)
(assert (tcp-accepted id))
(assert (server-connection id scope))
(define accumulate! (packet-accumulator (lambda (p) (send! (server-inbound id p)))))
(assert (server-poa id scope))
(define accumulate! (packet-accumulator (lambda (p) (send! (message-poa->server id p)))))
(on (message (tcp-in id $bs))
(accumulate! bs))
(on (message (server-outbound id $p))
(on (message (message-server->poa id $p))
(send! (tcp-out id (encode p)))))
(define default-tcp-broker-port 8001)
(define default-tcp-server-port 8001)
(define (spawn-tcp-broker! [port default-tcp-broker-port])
(define (spawn-tcp-server! [port default-tcp-server-port])
(spawn #:name 'tcp-server-listener
(define tcp-scope "broker") ;; TODO: allow this to be negotiated during protocol startup
(during/spawn (tcp-connection $id (tcp-listener port))
#:name `(server-connection ,tcp-scope ,id)
#:name `(server-poa ,tcp-scope ,id)
(server-facet/tcp id tcp-scope))))

View File

@ -1,8 +1,8 @@
#lang imperative-syndicate
(provide server-facet/websocket
default-http-broker-port
spawn-websocket-broker!)
default-http-server-port
spawn-websocket-server!)
(require "../wire-protocol.rkt")
@ -13,26 +13,26 @@
(define (server-facet/websocket id scope)
(assert (http-accepted id))
(assert (http-response-websocket id))
(assert (server-connection id scope))
(assert (server-poa id scope))
(field [ping-time-deadline 0])
(on (asserted (later-than (ping-time-deadline)))
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
(send! (server-outbound id (Ping))))
(send! (message-server->poa id (Ping))))
(on (message (websocket-in id $body))
(define-values (packet remainder) (decode body))
(when (not (equal? remainder #""))
(error 'server-facet/websocket "Multiple packets in a single websocket message"))
(send! (server-inbound id packet)))
(on (message (server-outbound id $p))
(send! (message-poa->server id packet)))
(on (message (message-server->poa id $p))
(send! (websocket-out id (encode p)))))
(define default-http-broker-port 8000)
(define default-http-server-port 8000)
(define (spawn-websocket-broker! [port default-http-broker-port])
(define (spawn-websocket-server! [port default-http-server-port])
(spawn #:name 'websocket-server-listener
(during/spawn (http-request $id 'get (http-resource (http-server _ port #f)
`(,$scope ())) _ _ _)
#:name `(server-connection ,scope ,id)
#:name `(server-poa ,scope ,id)
(server-facet/websocket id scope))))

View File

@ -56,8 +56,8 @@
)))
(assertion-struct to-broker (node assertion))
(assertion-struct from-broker (node assertion))
(assertion-struct to-server (node assertion))
(assertion-struct from-server (node assertion))
(define (leaf2 name node)
(local-require imperative-syndicate/term)
@ -68,7 +68,7 @@
(stop-when (message (terminate name)))
(field [present? #t])
(assert #:when (present?) (to-broker node (present name)))
(assert #:when (present?) (to-server node (present name)))
(on (message (change-presence name $new-presence))
(present? new-presence))
@ -86,42 +86,42 @@
;; get "absent" messages on clean termination, because the
;; assertion is still there even as the actor terminates!
;;
;; (during (from-broker node (present $who))
;; (during (from-server node (present $who))
;; (on-start (log-info "~a: ~a present" name who))
;; (on-stop (log-info "~a: ~a absent" name who)))
(on (asserted (from-broker node (present $who))) (log-info "~a: ~a present" name who))
(on (retracted (from-broker node (present $who))) (log-info "~a: ~a absent" name who))
(on (asserted (from-server node (present $who))) (log-info "~a: ~a present" name who))
(on (retracted (from-server node (present $who))) (log-info "~a: ~a absent" name who))
(on (asserted (from-broker node (observe (present _))))
(on (asserted (from-server node (observe (present _))))
(log-info "~a: someone cares about presence!" name))
(on (message (from-broker node (says $who $what)))
(on (message (from-server node (says $who $what)))
(log-info "~a: ~a says ~v" name who what))
;;----------------------------------------
(during (to-broker node $what)
(during (to-server node $what)
;; This takes care of the self-signalling discussed above.
(assert (from-broker node what)))
(assert (from-server node what)))
(during (router-connection node name)
(on (message (router-outbound name (Assert $subid (observe $spec))))
(react
(let ((! (lambda (ctor)
(lambda (cs) (send! (router-inbound name (ctor subid cs)))))))
(add-observer-endpoint! (lambda () (to-broker node spec))
(add-observer-endpoint! (lambda () (to-server node spec))
#:on-add (! Add)
#:on-remove (! Del)
#:on-message (! Msg)))
(assert (from-broker node (observe spec)))
(assert (from-server node (observe spec)))
(stop-when (message (router-outbound name (Clear subid))))))
(during (observe ($ pat (from-broker node $spec)))
(during (observe ($ pat (from-server node $spec)))
(define ep (gensym 'ep))
(on-start (send! (router-inbound name (Assert ep (observe spec)))))
(on-stop (send! (router-inbound name (Clear ep))))
(assert (from-broker node (observe spec))) ;; more self-signalling
(assert (from-server node (observe spec))) ;; more self-signalling
(on (message (router-outbound name (Add ep $captures)))
(react (assert (instantiate-term->value pat captures))
(stop-when (message (router-outbound name (Del ep captures))))))

View File

@ -6,17 +6,17 @@
(require bitsyntax)
(require (only-in net/rfc6455 ws-idle-timeout))
;; Client --> Broker
;; Actions; Client --> Server (and Peer --> Peer, except for Message)
(message-struct Assert (endpoint-name assertion))
(message-struct Clear (endpoint-name))
(message-struct Message (body))
;; Broker --> Client
;; Events; Server --> Client (and Peer --> Peer)
(message-struct Add (endpoint-name captures))
(message-struct Del (endpoint-name captures))
(message-struct Msg (endpoint-name captures))
;; Bidirectional
;; Transport-related; Bidirectional
(message-struct Ping ())
(message-struct Pong ())
@ -45,5 +45,5 @@
(lambda (chunk)
(buffer (bytes-append (buffer) chunk))))
;; Received packets from broker are relayed via one of these.
(message-struct broker-packet (address packet))
;; Received packets from server are relayed via one of these.
(message-struct server-packet (address packet))

View File

@ -11,20 +11,20 @@
(field [username (symbol->string (strong-gensym 'chatter-))])
(define root-facet (current-facet))
(define url standard-localhost-broker/tcp)
(during (broker-connected url)
(on-start (log-info "Connected to broker."))
(on-stop (log-info "Disconnected from broker."))
(define url standard-localhost-server/tcp)
(during (server-connected url)
(on-start (log-info "Connected to server."))
(on-stop (log-info "Disconnected from server."))
(on (asserted (from-broker url (Present $who))) (printf "~a arrived.\n" who))
(on (retracted (from-broker url (Present $who))) (printf "~a departed.\n" who))
(on (message (from-broker url (Says $who $what))) (printf "~a: ~a\n" who what))
(on (asserted (from-server url (Present $who))) (printf "~a arrived.\n" who))
(on (retracted (from-server url (Present $who))) (printf "~a departed.\n" who))
(on (message (from-server url (Says $who $what))) (printf "~a: ~a\n" who what))
(assert (to-broker url (Present (username))))
(assert (to-server url (Present (username))))
(define stdin-evt (read-line-evt (current-input-port) 'any))
(on (message (inbound (external-event stdin-evt (list $line))))
(match line
[(? eof-object?) (stop-facet root-facet)]
[(pregexp #px"^/nick (.+)$" (list _ newnick)) (username newnick)]
[other (send! (to-broker url (Says (username) other)))]))))
[other (send! (to-server url (Says (username) other)))]))))

View File

@ -6,7 +6,7 @@
(assertion-struct researcher (name topic))
(define test-address (broker-loopback-connection "test"))
(define test-address (server-loopback-connection "test"))
(define no-mention-of-discard
(lambda ()
@ -37,13 +37,13 @@
(assert (server-proposal "test" (researcher "Eve" "Evil"))))
(spawn #:name 'all-topics
(during (broker-connected test-address)
(during (from-broker test-address (researcher _ $topic))
(during (server-connected test-address)
(during (from-server test-address (researcher _ $topic))
(on-start (printf "Added topic: ~a\n" topic))
(on-stop (printf "Removed topic: ~a\n" topic)))))
(spawn #:name 'all-researchers
(during (broker-connected test-address)
(during (from-broker test-address (researcher $name _))
(during (server-connected test-address)
(during (from-server test-address (researcher $name _))
(on-start (printf "Added researcher: ~a\n" name))
(on-stop (printf "Removed researcher: ~a\n" name)))))]
no-crashes
@ -68,11 +68,11 @@
(assert (server-proposal "test" (claim 123)))
(on-start (for [(i 100)] (flush!)) (stop-current-facet)))
(spawn #:name 'monitor
(during (broker-connected test-address)
(during (from-broker test-address (claim 123))
(during (server-connected test-address)
(during (from-server test-address (claim 123))
(on-start (printf "Specific claim asserted\n"))
(on-stop (printf "Specific claim retracted\n")))
(during (from-broker test-address (claim $detail))
(during (from-server test-address (claim $detail))
(on-start (printf "Nonspecific claim ~v asserted\n" detail))
(on-stop (printf "Nonspecific claim ~v retracted\n" detail)))))]
no-crashes
@ -84,8 +84,8 @@
[(activate imperative-syndicate/broker)
(spawn #:name 'inner-monitor
(during (broker-connected test-address)
(during (from-broker test-address (claim $detail))
(during (server-connected test-address)
(during (from-server test-address (claim $detail))
(on-start (printf "Inner saw claim asserted\n"))
(on-stop (printf "Inner saw claim retracted\n")))))
(spawn #:name 'claimant

View File

@ -7,23 +7,23 @@
(assertion-struct presence (who))
(define test-address (broker-loopback-connection "test"))
(define test-address (server-loopback-connection "test"))
(test-case
[(activate imperative-syndicate/broker)
(spawn #:name 'producer
(during (broker-connected test-address)
(assert (to-broker test-address (presence 'producer)))))
(during (server-connected test-address)
(assert (to-server test-address (presence 'producer)))))
(spawn #:name 'consumer
(during (broker-connected test-address)
(on (asserted (from-broker test-address (presence $who)))
(during (server-connected test-address)
(on (asserted (from-server test-address (presence $who)))
(printf "~a joined\n" who))))
(spawn #:name 'metaconsumer
(during (broker-connected test-address)
(on (asserted (from-broker test-address (observe (presence _))))
(during (server-connected test-address)
(on (asserted (from-server test-address (observe (presence _))))
(printf "Someone cares about presence!\n"))))
]
no-crashes