Asynchronous outbound websocket connections.

This commit is contained in:
Tony Garnock-Jones 2014-08-11 22:11:42 -07:00
parent 5bd30db9b0
commit 2286c7c617
1 changed files with 22 additions and 16 deletions

View File

@ -33,7 +33,7 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Ground-level communication messages
(struct websocket-accepted (id server-addr connection control-ch) #:prefab)
(struct websocket-connection (id local-addr remote-addr connection control-ch) #:prefab)
(struct websocket-incoming-message (id message) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -64,19 +64,15 @@
(begin (when shutdown-procedure (shutdown-procedure))
(transition (struct-copy listener-state state [shutdown-procedure #f]) (quit)))
#f)]
[(message (websocket-accepted id _ c control-ch) 1 #f)
(transition state
(spawn-connection (listener-state-server-addr state)
(websocket-remote-client id)
id
c
control-ch))]
[(message (websocket-connection id local-addr remote-addr c control-ch) 1 #f)
(transition state (spawn-connection local-addr remote-addr id c control-ch))]
[_ #f]))
(define ((connection-handler server-addr) c dummy-state)
(define control-ch (make-channel))
(define id (gensym 'ws))
(send-ground-message (websocket-accepted id server-addr c control-ch))
(send-ground-message
(websocket-connection id server-addr (websocket-remote-client id) c control-ch))
(connection-thread-loop control-ch c id))
(define (connection-thread-loop control-ch c id)
@ -115,15 +111,25 @@
(spawn websocket-listener
(listener-state shutdown-procedure server-addr)
(gestalt-union (pub (websocket-message ? server-addr ?) #:level 2)
(sub (websocket-accepted ? server-addr ? ?) #:meta-level 1))))
(sub (websocket-connection ? server-addr ? ? ?) #:meta-level 1))))
(define (spawn-websocket-connection local-addr remote-addr)
(match-define (websocket-remote-server url) remote-addr)
(define c (ws-connect (string->url url)))
(define control-ch (make-channel))
(define id (gensym 'ws))
(thread (lambda () (connection-thread-loop control-ch c id)))
(spawn-connection local-addr remote-addr id c control-ch))
(define control-ch (make-channel))
(thread
(lambda ()
(log-info "Connecting to ~a ~a" url (current-inexact-milliseconds))
(define c (with-handlers [(exn? values)] (ws-connect (string->url url))))
(log-info "Connected to ~a ~a" url (current-inexact-milliseconds))
(send-ground-message
(websocket-connection id local-addr remote-addr c control-ch))
(when (not (exn? c))
(connection-thread-loop control-ch c id))))
(actor (subscribe (websocket-connection id local-addr remote-addr ($ c) control-ch)
#:meta-level 1
(list (when (not (exn? c)) (spawn-connection local-addr remote-addr id c control-ch))
(quit)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Connection
@ -137,7 +143,7 @@
(struct-copy connection-state state [control-ch #f])])
(quit)))
(define (websocket-connection e state)
(define (websocket-connection-behaviour e state)
(with-handlers [((lambda (exn) #t)
(lambda (exn)
(shutdown-connection state)
@ -164,7 +170,7 @@
[#f #f])))
(define (spawn-connection local-addr remote-addr id c control-ch)
(spawn websocket-connection
(spawn websocket-connection-behaviour
(connection-state #f local-addr remote-addr c control-ch)
(gestalt-union (pub (websocket-message remote-addr local-addr ?))
(sub (websocket-message local-addr remote-addr ?))