Switch websocket driver to use send-ground-message.
This commit is contained in:
parent
aa629c8bbe
commit
eca3e9ded0
|
@ -25,6 +25,12 @@
|
|||
(struct websocket-ssl-options (cert-file key-file) #:prefab)
|
||||
(struct websocket-message (from to body) #:prefab)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Ground-level communication messages
|
||||
|
||||
(struct websocket-accepted (id connection control-ch) #:prefab)
|
||||
(struct websocket-incoming-message (id message) #:prefab)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Driver
|
||||
|
||||
|
@ -48,17 +54,28 @@
|
|||
(begin (when shutdown-procedure (shutdown-procedure))
|
||||
(transition (struct-copy listener-state state [shutdown-procedure #f]) (quit)))
|
||||
#f)]
|
||||
[(message (event _ (list (list c connection-shutdown-procedure))) 1 #f)
|
||||
[(message (websocket-accepted id c control-ch) 1 #f)
|
||||
(transition state
|
||||
(spawn-connection (listener-state-server-addr state)
|
||||
c
|
||||
connection-shutdown-procedure))]
|
||||
(spawn-connection (listener-state-server-addr state) id c control-ch))]
|
||||
[_ #f]))
|
||||
|
||||
(define ((connection-handler listener-ch) c dummy-state)
|
||||
(define connection-ch (make-channel))
|
||||
(channel-put listener-ch (list c (lambda () (channel-put connection-ch #t))))
|
||||
(channel-get connection-ch)
|
||||
(define (connection-handler c dummy-state)
|
||||
(define control-ch (make-channel))
|
||||
(define c-input-port (ws-conn-base-ip c))
|
||||
(define id (gensym 'ws))
|
||||
(send-ground-message (websocket-accepted id c control-ch))
|
||||
(let loop ((blocked? #t))
|
||||
(sync (handle-evt control-ch
|
||||
(match-lambda
|
||||
['unblock (loop #f)]
|
||||
['quit (void)]))
|
||||
(if blocked?
|
||||
never-evt
|
||||
(handle-evt c-input-port
|
||||
(lambda (dummy)
|
||||
(define msg (ws-recv c #:payload-type 'text))
|
||||
(send-ground-message (websocket-incoming-message id msg))
|
||||
(loop (or blocked? (eof-object? msg))))))))
|
||||
(ws-close! c))
|
||||
|
||||
(define (ssl-options->ssl-tcp@ ssl-options)
|
||||
|
@ -71,42 +88,38 @@
|
|||
|
||||
(define (spawn-websocket-listener server-addr)
|
||||
(match-define (websocket-local-server port ssl-options) server-addr)
|
||||
(define ch (make-channel))
|
||||
(define shutdown-procedure (ws-serve #:port port
|
||||
#:tcp@ (if ssl-options
|
||||
(ssl-options->ssl-tcp@ ssl-options)
|
||||
tcp@)
|
||||
(connection-handler ch)))
|
||||
connection-handler))
|
||||
(spawn websocket-listener
|
||||
(listener-state shutdown-procedure server-addr)
|
||||
(gestalt-union (pub (websocket-message ? server-addr ?) #:level 2)
|
||||
(sub (event ch ?) #:meta-level 1))))
|
||||
(sub (websocket-accepted ? ? ?) #:meta-level 1))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Connection
|
||||
|
||||
(struct connection-state (seen-peer? local-addr server-addr c [shutdown-procedure #:mutable])
|
||||
#:transparent)
|
||||
(struct connection-state (seen-peer? local-addr server-addr c control-ch) #:transparent)
|
||||
|
||||
(define (shutdown-connection state)
|
||||
(when (connection-state-shutdown-procedure state)
|
||||
((connection-state-shutdown-procedure state))
|
||||
(set-connection-state-shutdown-procedure! state #f))
|
||||
(transition state (quit)))
|
||||
(transition (match (connection-state-control-ch state)
|
||||
[#f state]
|
||||
[ch (channel-put ch 'quit)
|
||||
(struct-copy connection-state state [control-ch #f])])
|
||||
(quit)))
|
||||
|
||||
(define (websocket-connection e state)
|
||||
(with-handlers [((lambda (exn) #t)
|
||||
(lambda (exn) (shutdown-connection state)))]
|
||||
(match e
|
||||
[(message (event _ _) 1 #f)
|
||||
(match-define (connection-state seen-peer? local-addr server-addr c _) state)
|
||||
(and seen-peer?
|
||||
(let ((m (ws-recv c #:payload-type 'text)))
|
||||
(if (eof-object? m)
|
||||
(shutdown-connection state)
|
||||
(transition state (send (websocket-message local-addr
|
||||
server-addr
|
||||
m))))))]
|
||||
[(message (websocket-incoming-message _ m) 1 #f)
|
||||
(if (eof-object? m)
|
||||
(shutdown-connection state)
|
||||
(transition state (send (websocket-message (connection-state-local-addr state)
|
||||
(connection-state-server-addr state)
|
||||
m))))]
|
||||
[(message (websocket-message _ _ m) 0 #f)
|
||||
(ws-send! (connection-state-c state) m)
|
||||
#f]
|
||||
|
@ -115,16 +128,17 @@
|
|||
[(and (connection-state-seen-peer? state) (gestalt-empty? g))
|
||||
(shutdown-connection state)]
|
||||
[(and (not (connection-state-seen-peer? state)) (not (gestalt-empty? g)))
|
||||
(channel-put (connection-state-control-ch state) 'unblock)
|
||||
(transition (struct-copy connection-state state [seen-peer? #t]) '())]
|
||||
[else
|
||||
#f])]
|
||||
[#f #f])))
|
||||
|
||||
(define (spawn-connection server-addr c shutdown-procedure)
|
||||
(define local-addr (websocket-remote-client (gensym 'ws)))
|
||||
(define (spawn-connection server-addr id c control-ch)
|
||||
(define local-addr (websocket-remote-client id))
|
||||
(spawn websocket-connection
|
||||
(connection-state #f local-addr server-addr c shutdown-procedure)
|
||||
(connection-state #f local-addr server-addr c control-ch)
|
||||
(gestalt-union (pub (websocket-message local-addr server-addr ?))
|
||||
(sub (websocket-message server-addr local-addr ?))
|
||||
(sub (websocket-message server-addr local-addr ?) #:level 1)
|
||||
(sub (event (ws-conn-base-ip c) ?) #:meta-level 1))))
|
||||
(sub (websocket-incoming-message id ?) #:meta-level 1))))
|
||||
|
|
Loading…
Reference in New Issue