Change syndicate-broker to use web.rkt instead of websocket.rkt
This commit is contained in:
parent
7b5b866a6d
commit
f1a7e10fbf
|
@ -5,6 +5,7 @@
|
|||
(struct-out broker-scope)
|
||||
(struct-out broker-data))
|
||||
|
||||
(require racket/dict)
|
||||
(require racket/set)
|
||||
(require racket/match)
|
||||
(require net/rfc6455)
|
||||
|
@ -16,8 +17,8 @@
|
|||
(require syndicate/demand-matcher)
|
||||
(require syndicate/protocol/advertise)
|
||||
|
||||
(require/activate syndicate/drivers/timer)
|
||||
(require/activate syndicate/drivers/websocket)
|
||||
(require/activate syndicate/drivers/timestate)
|
||||
(require/activate syndicate/drivers/web)
|
||||
(require "protocol.rkt")
|
||||
|
||||
(define-logger syndicate-broker)
|
||||
|
@ -28,56 +29,54 @@
|
|||
(define broker-data-parenthesis (struct-type->parenthesis struct:broker-data))
|
||||
(define broker-scope-parenthesis (struct-type->parenthesis struct:broker-scope))
|
||||
|
||||
;; Depends on timer driver and websocket driver running at the given metalevel.
|
||||
;; Depends on timer driver and web driver.
|
||||
;; Does not, itself, assert a web-virtual-host; the context must do this.
|
||||
(define (spawn-broker-server port
|
||||
#:ssl-options [ssl-options #f])
|
||||
(define any-client any-websocket-remote-client)
|
||||
(define server-id (websocket-local-server port ssl-options))
|
||||
(spawn-demand-matcher (inbound (advertise (websocket-message (?! any-client) server-id ?)))
|
||||
(inbound (observe (websocket-message (?! any-client) server-id ?)))
|
||||
(lambda (c) (spawn-connection-handler c server-id))
|
||||
#:name 'broker:dm))
|
||||
#:hostname [hostname ?]
|
||||
#:path [resource-path-str "/"])
|
||||
(actor #:name 'broker:dm
|
||||
(on (web-request-get (id req)
|
||||
(web-virtual-host "http" hostname port)
|
||||
,(string->resource-path resource-path-str))
|
||||
(when (equal? (dict-ref (web-request-header-headers req) 'upgrade #f) "websocket")
|
||||
(spawn-connection-handler id req)))))
|
||||
|
||||
(define (spawn-connection-handler c server-id)
|
||||
(actor #:name (list 'broker server-id)
|
||||
(define (spawn-connection-handler req-id http-req)
|
||||
(actor #:name (list 'broker:connection req-id)
|
||||
|
||||
(define scope (broker-scope (websocket-remote-client-request-host c)
|
||||
(websocket-remote-client-request-port c)
|
||||
(websocket-remote-client-request-path c)))
|
||||
(on-start (log-syndicate-broker-info "Starting broker connection ~v" req-id))
|
||||
(on-stop (log-syndicate-broker-info "Ending broker connection ~v" req-id))
|
||||
(on (asserted (web-request-peer-details req-id _ _ $addr $port))
|
||||
(log-syndicate-broker-info "Connection ~v is from ~a:~a" req-id addr port))
|
||||
|
||||
(define (arm-ping-timer!)
|
||||
(send! (outbound (set-timer c (ping-interval) 'relative))))
|
||||
(assert (web-response-websocket req-id))
|
||||
(stop-when (websocket-connection-closed req-id))
|
||||
|
||||
(define (send-event e)
|
||||
(send! (outbound (websocket-message server-id c (jsexpr->string (lift-json-event e))))))
|
||||
(websocket-message-send! req-id (jsexpr->string (lift-json-event e))))
|
||||
|
||||
(on-start (arm-ping-timer!)
|
||||
(log-syndicate-broker-info "Starting broker connection from ~v" c))
|
||||
(define http-resource (web-request-header-resource http-req))
|
||||
(define http-vh (web-resource-virtual-host http-resource))
|
||||
(define scope (broker-scope (web-virtual-host-name http-vh)
|
||||
(web-virtual-host-port http-vh)
|
||||
(resource-path->string (web-resource-path http-resource))))
|
||||
|
||||
(stop-when (retracted (inbound (advertise (websocket-message c server-id _)))))
|
||||
(assert (outbound (advertise (websocket-message server-id c _))))
|
||||
|
||||
(on (asserted (inbound
|
||||
(websocket-peer-details server-id c _ _ $remote-addr $remote-port)))
|
||||
(log-syndicate-broker-info "Connection ~v is from ~a:~a" c remote-addr remote-port))
|
||||
|
||||
(on (message (inbound (timer-expired c _)))
|
||||
(arm-ping-timer!)
|
||||
(field [ping-time-deadline 0])
|
||||
(on (asserted (later-than (ping-time-deadline)))
|
||||
(ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval)))
|
||||
(send-event 'ping))
|
||||
|
||||
(on (message (inbound (websocket-message c server-id $data)))
|
||||
(on (websocket-message-recv req-id $data)
|
||||
(match (drop-json-action (string->jsexpr data))
|
||||
['ping (send-event 'pong)]
|
||||
['pong (void)]
|
||||
[(? patch? p) (patch! (log-packet c 'inbound 'patch (wrap-patch scope p)))]
|
||||
[(message body) (send! (log-packet c 'inbound 'message (broker-data scope body)))]))
|
||||
[(? patch? p) (patch! (log-packet req-id 'inbound 'patch (wrap-patch scope p)))]
|
||||
[(message body) (send! (log-packet req-id 'inbound 'message (broker-data scope body)))]))
|
||||
|
||||
(on-event
|
||||
[(? patch? p) (send-event (log-packet c 'outbound 'patch (unwrap-patch scope p)))]
|
||||
[(? patch? p) (send-event (log-packet req-id 'outbound 'patch (unwrap-patch scope p)))]
|
||||
[(message (broker-data (== scope) body))
|
||||
(send-event (message (log-packet c 'outbound 'message body)))])
|
||||
|
||||
(on-stop (log-syndicate-broker-info "Ending broker connection from ~v" c))))
|
||||
(send-event (message (log-packet req-id 'outbound 'message body)))])))
|
||||
|
||||
(define (log-packet c direction kind value)
|
||||
(log-syndicate-broker-debug "Broker: ~v: ~a ~a\n~v" c direction kind value)
|
||||
|
@ -107,18 +106,9 @@
|
|||
t
|
||||
(let ((observations (trie-step t observe-parenthesis)))
|
||||
(trie-union (trie-prepend observe-parenthesis (wrap-trie scope observations))
|
||||
(wrap-trie* scope t)))))
|
||||
|
||||
(define (wrap-trie* scope t)
|
||||
(pattern->trie #t (broker-data scope (embedded-trie t))))
|
||||
(pattern->trie #t (broker-data scope (embedded-trie t)))))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(let ((ssl-options
|
||||
(match (current-command-line-arguments)
|
||||
[(vector c p) (websocket-ssl-options c p)]
|
||||
[_ #f])))
|
||||
(dataspace (schedule-action! (spawn-broker-server 8000))
|
||||
(when ssl-options
|
||||
(schedule-action! (spawn-broker-server 8443 #:ssl-options ssl-options)))
|
||||
(forever)))
|
||||
(actor #:name 'broker:vh (assert (web-virtual-host "http" _ 8000)))
|
||||
(spawn-broker-server 8000)
|
||||
|
|
Loading…
Reference in New Issue