Websocket driver and example

This commit is contained in:
Tony Garnock-Jones 2013-10-28 19:08:41 +00:00
parent 235b0f2969
commit ceada73709
2 changed files with 145 additions and 0 deletions

View File

@ -0,0 +1,108 @@
#lang racket/base
(require racket/match)
(require net/rfc6455)
(require "../main.rkt")
(require "../demand-matcher.rkt")
(provide (struct-out websocket-remote-client)
(struct-out websocket-server)
(struct-out websocket-message)
spawn-websocket-driver)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Protocol messages
(struct websocket-remote-client (id) #:prefab)
(struct websocket-server (port) #:prefab)
(struct websocket-message (from to body) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Driver
(define (spawn-websocket-driver)
(spawn-demand-matcher (websocket-message ? (websocket-server ?) ?)
#:demand-level 1
#:supply-level 2
(match-lambda
[(route _ (websocket-message _ (websocket-server port) _) _ _)
(spawn-websocket-listener port)])))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Listener
(struct listener-state (shutdown-procedure port) #:transparent)
(define (websocket-listener e state)
(match e
[(routing-update routes)
(match-define (listener-state shutdown-procedure port) state)
(define peer-listener-route (pub (websocket-message ? (websocket-server port) ?) #:level 2))
(if (for/or ((r routes)) (pair? (intersect-routes (list r) (list peer-listener-route))))
#f
(begin (when shutdown-procedure (shutdown-procedure))
(transition (struct-copy listener-state state [shutdown-procedure #f]) (quit))))]
[(message (event _ (list (list c connection-shutdown-procedure))) 1 #f)
(transition state
(spawn-connection (listener-state-port state)
c
connection-shutdown-procedure))]
[_ #f]))
(define ((connection-handler listener-ch) c dummy-state)
(define connection-ch (make-channel))
(channel-put listener-ch (list c (lambda () (channel-put connection-ch #t))))
(channel-get connection-ch)
(ws-close! c))
(define (spawn-websocket-listener port)
(define ch (make-channel))
(define shutdown-procedure (ws-serve #:port port (connection-handler ch)))
(spawn websocket-listener
(listener-state shutdown-procedure port)
(list (pub (websocket-message ? (websocket-server port) ?) #:level 2)
(sub (event ch ?) #:meta-level 1))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Connection
(struct connection-state (seen-peer? local-id port c shutdown-procedure) #:transparent)
(define (shutdown-connection state)
(if (connection-state-shutdown-procedure state)
(begin ((connection-state-shutdown-procedure state))
(transition (struct-copy connection-state state [shutdown-procedure #f]) (quit)))
state))
(define (websocket-connection e state)
(match e
[(message (event _ _) 1 #f)
(match-define (connection-state seen-peer? local-id port c _) state)
(and seen-peer?
(let ((m (ws-recv c #:payload-type 'text)))
(if (eof-object? m)
(shutdown-connection state)
(transition state (send (websocket-message local-id
(websocket-server port)
m))))))]
[(message (websocket-message _ _ m) 0 #f)
(ws-send! (connection-state-c state) m)
#f]
[(routing-update routes)
(cond
[(and (connection-state-seen-peer? state) (null? routes))
(shutdown-connection state)]
[(and (not (connection-state-seen-peer? state)) (pair? routes))
(transition (struct-copy connection-state state [seen-peer? #t]) '())]
[else
#f])]
[#f #f]))
(define (spawn-connection port c shutdown-procedure)
(define local-id (websocket-remote-client (gensym 'ws)))
(spawn websocket-connection
(connection-state #f local-id port c shutdown-procedure)
(list (pub (websocket-message local-id (websocket-server port) ?))
(sub (websocket-message (websocket-server port) local-id ?))
(sub (websocket-message (websocket-server port) local-id ?) #:level 1)
(sub (event c ?) #:meta-level 1))))

View File

@ -0,0 +1,37 @@
#lang minimart
(require "../drivers/websocket.rkt")
(require "../demand-matcher.rkt")
(spawn-websocket-driver)
(define any-client (websocket-remote-client ?))
(define server-id (websocket-server 8081))
(spawn-demand-matcher (websocket-message any-client server-id ?)
#:demand-is-subscription? #f
#:demand-level 0
#:supply-level 0
(match-lambda ;; arrived-demand-route, i.e. new connection publisher
[(route _ (websocket-message c _ _) _ _)
(spawn-connection-handler c)]
[_ '()])
(lambda (departed-supply-route)
(log-info "Connection handler decided to exit")
'()))
(define (spawn-connection-handler c)
(log-info "spawn-connection-handler ~v" c)
(define (connection-handler e n)
(when e (log-info "connection-handler ~v: ~v /// ~v" c e n))
(match e
[(routing-update '()) (transition n (quit))]
[_
(if (< n 20)
(transition (+ n 1) (send (websocket-message server-id c (format "msg ~v" n))))
#f)]))
(spawn connection-handler
0
(list (sub (websocket-message c server-id ?))
(sub (websocket-message c server-id ?) #:level 1)
(pub (websocket-message server-id c ?)))))