From 461f96d5d403dd1d5b80c42bd9d81366bec772c8 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 28 May 2014 16:36:52 -0400 Subject: [PATCH] Initial attempt at porting server.rkt to fastrouting --- server.rkt | 72 ++++++++++++++++++++---------------------------------- 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/server.rkt b/server.rkt index 7b9f6f5..87f7407 100644 --- a/server.rkt +++ b/server.rkt @@ -1,11 +1,11 @@ #lang minimart ;; Generic broker for WebSockets-based minimart/marketplace communication. +(require racket/set) (require net/rfc6455) (require minimart/drivers/timer) (require minimart/drivers/websocket) (require minimart/demand-matcher) -(require minimart/pattern) (require json) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -20,13 +20,10 @@ (define (spawn-server-listener port ssl-options) (define server-id (websocket-local-server port ssl-options)) - (spawn-demand-matcher (websocket-message (websocket-remote-client ?) server-id ?) + (spawn-demand-matcher (websocket-message (?! (websocket-remote-client ?)) server-id ?) #:meta-level 1 #:demand-is-subscription? #f - (match-lambda ;; arrived-demand-route, i.e. new connection publisher - [(route _ (websocket-message c _ _) 1 _) - (spawn-connection-handler c server-id)] - [_ '()]))) + (lambda (c) (spawn-connection-handler c server-id)))) (spawn-world (spawn-server-listener 8000 #f) @@ -35,36 +32,18 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Wire protocol representation of events and actions -(define (drop-json-pattern p) - (pattern-subst p (hasheq '__ "__") ?)) - -(define (drop-json-route r) - (match r - [`(,pub-or-sub ,pattern ,meta-level ,level) - (route (match pub-or-sub ["sub" #t] ["pub" #f]) - (drop-json-pattern pattern) - meta-level - level)])) - (define (drop-json-action j) (match j ["ping" 'ping] ["pong" 'pong] - [`("routes" ,routes) (routing-update (map drop-json-route routes))] + [`("routes" ,gj) (routing-update (jsexpr->gestalt gj (lambda (v) (set 'peer))))] [`("message" ,body ,meta-level ,feedback?) (message body meta-level feedback?)])) -(define (lift-json-pattern p) - (pattern-subst p ? (hasheq '__ "__"))) - -(define (lift-json-route r) - (match r - [(route sub? p ml l) `(,(if sub? "sub" "pub") ,(lift-json-pattern p) ,ml ,l)])) - (define (lift-json-event j) (match j ['ping "ping"] ['pong "pong"] - [(routing-update rs) `("routes" ,(map lift-json-route rs))] + [(routing-update g) `("routes" ,(gestalt->jsexpr g (lambda (v) #t)))] [(message body meta-level feedback?) `("message" ,body ,meta-level ,feedback?)])) (require racket/trace) @@ -74,7 +53,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Connections -(struct connection-state (client-id tunnelled-routes) #:transparent) +(struct connection-state (client-id tunnelled-gestalt) #:transparent) (define (spawn-connection-handler c server-id) (define (send-event e s) @@ -82,38 +61,39 @@ (connection-state-client-id s) (jsexpr->string (lift-json-event e))) #:meta-level 1)) - (define ((handle-connection-routing-change rs) s) - (match rs - ['() (transition s (quit))] ;; websocket connection closed - [_ (transition s '())])) - (define ((handle-tunnelled-routing-change rs) s) - (transition s (send-event (routing-update rs) s))) + (define ((handle-connection-routing-change g) s) + (if (gestalt-empty? g) + (transition s (quit)) ;; websocket connection closed + #f)) + (define ((handle-tunnelled-routing-change g) s) + (transition s (send-event (routing-update g) s))) (define ((handle-tunnellable-message m) s) - (if (ormap (lambda (r) (route-accepts? r m)) (connection-state-tunnelled-routes s)) + (if (gestalt-accepts? (connection-state-tunnelled-gestalt s) m) (transition s (send-event m s)) (transition s '()))) (define relay-connections - (list (sub (timer-expired c ?) #:meta-level 1) - (sub (websocket-message c server-id ?) #:meta-level 1) - (sub (websocket-message c server-id ?) #:meta-level 1 #:level 1) - (pub (websocket-message server-id c ?) #:meta-level 1))) + (gestalt-union (sub (timer-expired c ?) #:meta-level 1) + (sub (websocket-message c server-id ?) #:meta-level 1) + (sub (websocket-message c server-id ?) #:meta-level 1 #:level 1) + (pub (websocket-message server-id c ?) #:meta-level 1))) (define (connection-handler e s) (match e - [(routing-update rs) + [(routing-update g) (sequence-transitions (transition s '()) - (handle-connection-routing-change (intersect-routes rs relay-connections)) + (handle-connection-routing-change (gestalt-filter g relay-connections)) (handle-tunnelled-routing-change - (intersect-routes rs (connection-state-tunnelled-routes s))))] + (gestalt-filter g (connection-state-tunnelled-gestalt s))))] [(? message? m) (sequence-transitions (match m [(message (websocket-message from to data) 1 #f) (match (drop-json-action (string->jsexpr data)) - [(routing-update rs-unfiltered) - (define rs (filter (lambda (r) (zero? (route-meta-level r))) rs-unfiltered)) - (transition (struct-copy connection-state s [tunnelled-routes rs]) - (routing-update (append rs relay-connections)))] + [(routing-update g-unfiltered) + (define g (gestalt-transform g-unfiltered + (lambda (ml l p) (if (zero? ml) p '(#f . #f))))) + (transition (struct-copy connection-state s [tunnelled-gestalt g]) + (routing-update (gestalt-union g relay-connections)))] [(? message? m) (transition s (if (zero? (message-meta-level m)) m '()))] ['ping @@ -129,5 +109,5 @@ [#f #f])) (list (send (set-timer c ping-interval 'relative) #:meta-level 1) (spawn connection-handler - (connection-state c '()) + (connection-state c (gestalt-empty)) relay-connections)))