Initial attempt at porting server.rkt to fastrouting
This commit is contained in:
parent
95701d39c5
commit
461f96d5d4
72
server.rkt
72
server.rkt
|
@ -1,11 +1,11 @@
|
|||
#lang minimart
|
||||
;; Generic broker for WebSockets-based minimart/marketplace communication.
|
||||
|
||||
(require racket/set)
|
||||
(require net/rfc6455)
|
||||
(require minimart/drivers/timer)
|
||||
(require minimart/drivers/websocket)
|
||||
(require minimart/demand-matcher)
|
||||
(require minimart/pattern)
|
||||
(require json)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
@ -20,13 +20,10 @@
|
|||
|
||||
(define (spawn-server-listener port ssl-options)
|
||||
(define server-id (websocket-local-server port ssl-options))
|
||||
(spawn-demand-matcher (websocket-message (websocket-remote-client ?) server-id ?)
|
||||
(spawn-demand-matcher (websocket-message (?! (websocket-remote-client ?)) server-id ?)
|
||||
#:meta-level 1
|
||||
#:demand-is-subscription? #f
|
||||
(match-lambda ;; arrived-demand-route, i.e. new connection publisher
|
||||
[(route _ (websocket-message c _ _) 1 _)
|
||||
(spawn-connection-handler c server-id)]
|
||||
[_ '()])))
|
||||
(lambda (c) (spawn-connection-handler c server-id))))
|
||||
|
||||
(spawn-world
|
||||
(spawn-server-listener 8000 #f)
|
||||
|
@ -35,36 +32,18 @@
|
|||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Wire protocol representation of events and actions
|
||||
|
||||
(define (drop-json-pattern p)
|
||||
(pattern-subst p (hasheq '__ "__") ?))
|
||||
|
||||
(define (drop-json-route r)
|
||||
(match r
|
||||
[`(,pub-or-sub ,pattern ,meta-level ,level)
|
||||
(route (match pub-or-sub ["sub" #t] ["pub" #f])
|
||||
(drop-json-pattern pattern)
|
||||
meta-level
|
||||
level)]))
|
||||
|
||||
(define (drop-json-action j)
|
||||
(match j
|
||||
["ping" 'ping]
|
||||
["pong" 'pong]
|
||||
[`("routes" ,routes) (routing-update (map drop-json-route routes))]
|
||||
[`("routes" ,gj) (routing-update (jsexpr->gestalt gj (lambda (v) (set 'peer))))]
|
||||
[`("message" ,body ,meta-level ,feedback?) (message body meta-level feedback?)]))
|
||||
|
||||
(define (lift-json-pattern p)
|
||||
(pattern-subst p ? (hasheq '__ "__")))
|
||||
|
||||
(define (lift-json-route r)
|
||||
(match r
|
||||
[(route sub? p ml l) `(,(if sub? "sub" "pub") ,(lift-json-pattern p) ,ml ,l)]))
|
||||
|
||||
(define (lift-json-event j)
|
||||
(match j
|
||||
['ping "ping"]
|
||||
['pong "pong"]
|
||||
[(routing-update rs) `("routes" ,(map lift-json-route rs))]
|
||||
[(routing-update g) `("routes" ,(gestalt->jsexpr g (lambda (v) #t)))]
|
||||
[(message body meta-level feedback?) `("message" ,body ,meta-level ,feedback?)]))
|
||||
|
||||
(require racket/trace)
|
||||
|
@ -74,7 +53,7 @@
|
|||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Connections
|
||||
|
||||
(struct connection-state (client-id tunnelled-routes) #:transparent)
|
||||
(struct connection-state (client-id tunnelled-gestalt) #:transparent)
|
||||
|
||||
(define (spawn-connection-handler c server-id)
|
||||
(define (send-event e s)
|
||||
|
@ -82,38 +61,39 @@
|
|||
(connection-state-client-id s)
|
||||
(jsexpr->string (lift-json-event e)))
|
||||
#:meta-level 1))
|
||||
(define ((handle-connection-routing-change rs) s)
|
||||
(match rs
|
||||
['() (transition s (quit))] ;; websocket connection closed
|
||||
[_ (transition s '())]))
|
||||
(define ((handle-tunnelled-routing-change rs) s)
|
||||
(transition s (send-event (routing-update rs) s)))
|
||||
(define ((handle-connection-routing-change g) s)
|
||||
(if (gestalt-empty? g)
|
||||
(transition s (quit)) ;; websocket connection closed
|
||||
#f))
|
||||
(define ((handle-tunnelled-routing-change g) s)
|
||||
(transition s (send-event (routing-update g) s)))
|
||||
(define ((handle-tunnellable-message m) s)
|
||||
(if (ormap (lambda (r) (route-accepts? r m)) (connection-state-tunnelled-routes s))
|
||||
(if (gestalt-accepts? (connection-state-tunnelled-gestalt s) m)
|
||||
(transition s (send-event m s))
|
||||
(transition s '())))
|
||||
(define relay-connections
|
||||
(list (sub (timer-expired c ?) #:meta-level 1)
|
||||
(sub (websocket-message c server-id ?) #:meta-level 1)
|
||||
(sub (websocket-message c server-id ?) #:meta-level 1 #:level 1)
|
||||
(pub (websocket-message server-id c ?) #:meta-level 1)))
|
||||
(gestalt-union (sub (timer-expired c ?) #:meta-level 1)
|
||||
(sub (websocket-message c server-id ?) #:meta-level 1)
|
||||
(sub (websocket-message c server-id ?) #:meta-level 1 #:level 1)
|
||||
(pub (websocket-message server-id c ?) #:meta-level 1)))
|
||||
(define (connection-handler e s)
|
||||
(match e
|
||||
[(routing-update rs)
|
||||
[(routing-update g)
|
||||
(sequence-transitions
|
||||
(transition s '())
|
||||
(handle-connection-routing-change (intersect-routes rs relay-connections))
|
||||
(handle-connection-routing-change (gestalt-filter g relay-connections))
|
||||
(handle-tunnelled-routing-change
|
||||
(intersect-routes rs (connection-state-tunnelled-routes s))))]
|
||||
(gestalt-filter g (connection-state-tunnelled-gestalt s))))]
|
||||
[(? message? m)
|
||||
(sequence-transitions
|
||||
(match m
|
||||
[(message (websocket-message from to data) 1 #f)
|
||||
(match (drop-json-action (string->jsexpr data))
|
||||
[(routing-update rs-unfiltered)
|
||||
(define rs (filter (lambda (r) (zero? (route-meta-level r))) rs-unfiltered))
|
||||
(transition (struct-copy connection-state s [tunnelled-routes rs])
|
||||
(routing-update (append rs relay-connections)))]
|
||||
[(routing-update g-unfiltered)
|
||||
(define g (gestalt-transform g-unfiltered
|
||||
(lambda (ml l p) (if (zero? ml) p '(#f . #f)))))
|
||||
(transition (struct-copy connection-state s [tunnelled-gestalt g])
|
||||
(routing-update (gestalt-union g relay-connections)))]
|
||||
[(? message? m)
|
||||
(transition s (if (zero? (message-meta-level m)) m '()))]
|
||||
['ping
|
||||
|
@ -129,5 +109,5 @@
|
|||
[#f #f]))
|
||||
(list (send (set-timer c ping-interval 'relative) #:meta-level 1)
|
||||
(spawn connection-handler
|
||||
(connection-state c '())
|
||||
(connection-state c (gestalt-empty))
|
||||
relay-connections)))
|
||||
|
|
Loading…
Reference in New Issue