From eca3e9ded045e6139002a8dbb70fe25496b7de4f Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 11 Jun 2014 16:03:22 -0400 Subject: [PATCH] Switch websocket driver to use send-ground-message. --- minimart/drivers/websocket.rkt | 74 ++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/minimart/drivers/websocket.rkt b/minimart/drivers/websocket.rkt index a723a4f..9c7a663 100644 --- a/minimart/drivers/websocket.rkt +++ b/minimart/drivers/websocket.rkt @@ -25,6 +25,12 @@ (struct websocket-ssl-options (cert-file key-file) #:prefab) (struct websocket-message (from to body) #:prefab) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Ground-level communication messages + +(struct websocket-accepted (id connection control-ch) #:prefab) +(struct websocket-incoming-message (id message) #:prefab) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Driver @@ -48,17 +54,28 @@ (begin (when shutdown-procedure (shutdown-procedure)) (transition (struct-copy listener-state state [shutdown-procedure #f]) (quit))) #f)] - [(message (event _ (list (list c connection-shutdown-procedure))) 1 #f) + [(message (websocket-accepted id c control-ch) 1 #f) (transition state - (spawn-connection (listener-state-server-addr state) - c - connection-shutdown-procedure))] + (spawn-connection (listener-state-server-addr state) id c control-ch))] [_ #f])) -(define ((connection-handler listener-ch) c dummy-state) - (define connection-ch (make-channel)) - (channel-put listener-ch (list c (lambda () (channel-put connection-ch #t)))) - (channel-get connection-ch) +(define (connection-handler c dummy-state) + (define control-ch (make-channel)) + (define c-input-port (ws-conn-base-ip c)) + (define id (gensym 'ws)) + (send-ground-message (websocket-accepted id c control-ch)) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt c-input-port + (lambda (dummy) + (define msg (ws-recv c #:payload-type 'text)) + (send-ground-message (websocket-incoming-message id msg)) + (loop (or blocked? (eof-object? msg)))))))) (ws-close! c)) (define (ssl-options->ssl-tcp@ ssl-options) @@ -71,42 +88,38 @@ (define (spawn-websocket-listener server-addr) (match-define (websocket-local-server port ssl-options) server-addr) - (define ch (make-channel)) (define shutdown-procedure (ws-serve #:port port #:tcp@ (if ssl-options (ssl-options->ssl-tcp@ ssl-options) tcp@) - (connection-handler ch))) + connection-handler)) (spawn websocket-listener (listener-state shutdown-procedure server-addr) (gestalt-union (pub (websocket-message ? server-addr ?) #:level 2) - (sub (event ch ?) #:meta-level 1)))) + (sub (websocket-accepted ? ? ?) #:meta-level 1)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Connection -(struct connection-state (seen-peer? local-addr server-addr c [shutdown-procedure #:mutable]) - #:transparent) +(struct connection-state (seen-peer? local-addr server-addr c control-ch) #:transparent) (define (shutdown-connection state) - (when (connection-state-shutdown-procedure state) - ((connection-state-shutdown-procedure state)) - (set-connection-state-shutdown-procedure! state #f)) - (transition state (quit))) + (transition (match (connection-state-control-ch state) + [#f state] + [ch (channel-put ch 'quit) + (struct-copy connection-state state [control-ch #f])]) + (quit))) (define (websocket-connection e state) (with-handlers [((lambda (exn) #t) (lambda (exn) (shutdown-connection state)))] (match e - [(message (event _ _) 1 #f) - (match-define (connection-state seen-peer? local-addr server-addr c _) state) - (and seen-peer? - (let ((m (ws-recv c #:payload-type 'text))) - (if (eof-object? m) - (shutdown-connection state) - (transition state (send (websocket-message local-addr - server-addr - m))))))] + [(message (websocket-incoming-message _ m) 1 #f) + (if (eof-object? m) + (shutdown-connection state) + (transition state (send (websocket-message (connection-state-local-addr state) + (connection-state-server-addr state) + m))))] [(message (websocket-message _ _ m) 0 #f) (ws-send! (connection-state-c state) m) #f] @@ -115,16 +128,17 @@ [(and (connection-state-seen-peer? state) (gestalt-empty? g)) (shutdown-connection state)] [(and (not (connection-state-seen-peer? state)) (not (gestalt-empty? g))) + (channel-put (connection-state-control-ch state) 'unblock) (transition (struct-copy connection-state state [seen-peer? #t]) '())] [else #f])] [#f #f]))) -(define (spawn-connection server-addr c shutdown-procedure) - (define local-addr (websocket-remote-client (gensym 'ws))) +(define (spawn-connection server-addr id c control-ch) + (define local-addr (websocket-remote-client id)) (spawn websocket-connection - (connection-state #f local-addr server-addr c shutdown-procedure) + (connection-state #f local-addr server-addr c control-ch) (gestalt-union (pub (websocket-message local-addr server-addr ?)) (sub (websocket-message server-addr local-addr ?)) (sub (websocket-message server-addr local-addr ?) #:level 1) - (sub (event (ws-conn-base-ip c) ?) #:meta-level 1)))) + (sub (websocket-incoming-message id ?) #:meta-level 1))))