From 2286c7c617f5772de807fab8725aae95d12634e0 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 11 Aug 2014 22:11:42 -0700 Subject: [PATCH] Asynchronous outbound websocket connections. --- minimart/drivers/websocket.rkt | 38 ++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/minimart/drivers/websocket.rkt b/minimart/drivers/websocket.rkt index 1f85df8..90c36b1 100644 --- a/minimart/drivers/websocket.rkt +++ b/minimart/drivers/websocket.rkt @@ -33,7 +33,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Ground-level communication messages -(struct websocket-accepted (id server-addr connection control-ch) #:prefab) +(struct websocket-connection (id local-addr remote-addr connection control-ch) #:prefab) (struct websocket-incoming-message (id message) #:prefab) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -64,19 +64,15 @@ (begin (when shutdown-procedure (shutdown-procedure)) (transition (struct-copy listener-state state [shutdown-procedure #f]) (quit))) #f)] - [(message (websocket-accepted id _ c control-ch) 1 #f) - (transition state - (spawn-connection (listener-state-server-addr state) - (websocket-remote-client id) - id - c - control-ch))] + [(message (websocket-connection id local-addr remote-addr c control-ch) 1 #f) + (transition state (spawn-connection local-addr remote-addr id c control-ch))] [_ #f])) (define ((connection-handler server-addr) c dummy-state) (define control-ch (make-channel)) (define id (gensym 'ws)) - (send-ground-message (websocket-accepted id server-addr c control-ch)) + (send-ground-message + (websocket-connection id server-addr (websocket-remote-client id) c control-ch)) (connection-thread-loop control-ch c id)) (define (connection-thread-loop control-ch c id) @@ -115,15 +111,25 @@ (spawn websocket-listener (listener-state shutdown-procedure server-addr) (gestalt-union (pub (websocket-message ? server-addr ?) #:level 2) - (sub (websocket-accepted ? server-addr ? ?) #:meta-level 1)))) + (sub (websocket-connection ? server-addr ? ? ?) #:meta-level 1)))) (define (spawn-websocket-connection local-addr remote-addr) (match-define (websocket-remote-server url) remote-addr) - (define c (ws-connect (string->url url))) - (define control-ch (make-channel)) (define id (gensym 'ws)) - (thread (lambda () (connection-thread-loop control-ch c id))) - (spawn-connection local-addr remote-addr id c control-ch)) + (define control-ch (make-channel)) + (thread + (lambda () + (log-info "Connecting to ~a ~a" url (current-inexact-milliseconds)) + (define c (with-handlers [(exn? values)] (ws-connect (string->url url)))) + (log-info "Connected to ~a ~a" url (current-inexact-milliseconds)) + (send-ground-message + (websocket-connection id local-addr remote-addr c control-ch)) + (when (not (exn? c)) + (connection-thread-loop control-ch c id)))) + (actor (subscribe (websocket-connection id local-addr remote-addr ($ c) control-ch) + #:meta-level 1 + (list (when (not (exn? c)) (spawn-connection local-addr remote-addr id c control-ch)) + (quit))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Connection @@ -137,7 +143,7 @@ (struct-copy connection-state state [control-ch #f])]) (quit))) -(define (websocket-connection e state) +(define (websocket-connection-behaviour e state) (with-handlers [((lambda (exn) #t) (lambda (exn) (shutdown-connection state) @@ -164,7 +170,7 @@ [#f #f]))) (define (spawn-connection local-addr remote-addr id c control-ch) - (spawn websocket-connection + (spawn websocket-connection-behaviour (connection-state #f local-addr remote-addr c control-ch) (gestalt-union (pub (websocket-message remote-addr local-addr ?)) (sub (websocket-message local-addr remote-addr ?))