diff --git a/racket/syndicate/broker/server.rkt b/racket/syndicate/broker/server.rkt index 2dae7cf..5ba3c5a 100644 --- a/racket/syndicate/broker/server.rkt +++ b/racket/syndicate/broker/server.rkt @@ -5,6 +5,7 @@ (struct-out broker-scope) (struct-out broker-data)) +(require racket/dict) (require racket/set) (require racket/match) (require net/rfc6455) @@ -16,8 +17,8 @@ (require syndicate/demand-matcher) (require syndicate/protocol/advertise) -(require/activate syndicate/drivers/timer) -(require/activate syndicate/drivers/websocket) +(require/activate syndicate/drivers/timestate) +(require/activate syndicate/drivers/web) (require "protocol.rkt") (define-logger syndicate-broker) @@ -28,56 +29,54 @@ (define broker-data-parenthesis (struct-type->parenthesis struct:broker-data)) (define broker-scope-parenthesis (struct-type->parenthesis struct:broker-scope)) -;; Depends on timer driver and websocket driver running at the given metalevel. +;; Depends on timer driver and web driver. +;; Does not, itself, assert a web-virtual-host; the context must do this. (define (spawn-broker-server port - #:ssl-options [ssl-options #f]) - (define any-client any-websocket-remote-client) - (define server-id (websocket-local-server port ssl-options)) - (spawn-demand-matcher (inbound (advertise (websocket-message (?! any-client) server-id ?))) - (inbound (observe (websocket-message (?! any-client) server-id ?))) - (lambda (c) (spawn-connection-handler c server-id)) - #:name 'broker:dm)) + #:hostname [hostname ?] + #:path [resource-path-str "/"]) + (actor #:name 'broker:dm + (on (web-request-get (id req) + (web-virtual-host "http" hostname port) + ,(string->resource-path resource-path-str)) + (when (equal? (dict-ref (web-request-header-headers req) 'upgrade #f) "websocket") + (spawn-connection-handler id req))))) -(define (spawn-connection-handler c server-id) - (actor #:name (list 'broker server-id) +(define (spawn-connection-handler req-id http-req) + (actor #:name (list 'broker:connection req-id) - (define scope (broker-scope (websocket-remote-client-request-host c) - (websocket-remote-client-request-port c) - (websocket-remote-client-request-path c))) + (on-start (log-syndicate-broker-info "Starting broker connection ~v" req-id)) + (on-stop (log-syndicate-broker-info "Ending broker connection ~v" req-id)) + (on (asserted (web-request-peer-details req-id _ _ $addr $port)) + (log-syndicate-broker-info "Connection ~v is from ~a:~a" req-id addr port)) - (define (arm-ping-timer!) - (send! (outbound (set-timer c (ping-interval) 'relative)))) + (assert (web-response-websocket req-id)) + (stop-when (websocket-connection-closed req-id)) (define (send-event e) - (send! (outbound (websocket-message server-id c (jsexpr->string (lift-json-event e)))))) + (websocket-message-send! req-id (jsexpr->string (lift-json-event e)))) - (on-start (arm-ping-timer!) - (log-syndicate-broker-info "Starting broker connection from ~v" c)) + (define http-resource (web-request-header-resource http-req)) + (define http-vh (web-resource-virtual-host http-resource)) + (define scope (broker-scope (web-virtual-host-name http-vh) + (web-virtual-host-port http-vh) + (resource-path->string (web-resource-path http-resource)))) - (stop-when (retracted (inbound (advertise (websocket-message c server-id _))))) - (assert (outbound (advertise (websocket-message server-id c _)))) - - (on (asserted (inbound - (websocket-peer-details server-id c _ _ $remote-addr $remote-port))) - (log-syndicate-broker-info "Connection ~v is from ~a:~a" c remote-addr remote-port)) - - (on (message (inbound (timer-expired c _))) - (arm-ping-timer!) + (field [ping-time-deadline 0]) + (on (asserted (later-than (ping-time-deadline))) + (ping-time-deadline (+ (current-inexact-milliseconds) (ping-interval))) (send-event 'ping)) - (on (message (inbound (websocket-message c server-id $data))) + (on (websocket-message-recv req-id $data) (match (drop-json-action (string->jsexpr data)) ['ping (send-event 'pong)] ['pong (void)] - [(? patch? p) (patch! (log-packet c 'inbound 'patch (wrap-patch scope p)))] - [(message body) (send! (log-packet c 'inbound 'message (broker-data scope body)))])) + [(? patch? p) (patch! (log-packet req-id 'inbound 'patch (wrap-patch scope p)))] + [(message body) (send! (log-packet req-id 'inbound 'message (broker-data scope body)))])) (on-event - [(? patch? p) (send-event (log-packet c 'outbound 'patch (unwrap-patch scope p)))] + [(? patch? p) (send-event (log-packet req-id 'outbound 'patch (unwrap-patch scope p)))] [(message (broker-data (== scope) body)) - (send-event (message (log-packet c 'outbound 'message body)))]) - - (on-stop (log-syndicate-broker-info "Ending broker connection from ~v" c)))) + (send-event (message (log-packet req-id 'outbound 'message body)))]))) (define (log-packet c direction kind value) (log-syndicate-broker-debug "Broker: ~v: ~a ~a\n~v" c direction kind value) @@ -107,18 +106,9 @@ t (let ((observations (trie-step t observe-parenthesis))) (trie-union (trie-prepend observe-parenthesis (wrap-trie scope observations)) - (wrap-trie* scope t))))) - -(define (wrap-trie* scope t) - (pattern->trie #t (broker-data scope (embedded-trie t)))) + (pattern->trie #t (broker-data scope (embedded-trie t))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(let ((ssl-options - (match (current-command-line-arguments) - [(vector c p) (websocket-ssl-options c p)] - [_ #f]))) - (dataspace (schedule-action! (spawn-broker-server 8000)) - (when ssl-options - (schedule-action! (spawn-broker-server 8443 #:ssl-options ssl-options))) - (forever))) +(actor #:name 'broker:vh (assert (web-virtual-host "http" _ 8000))) +(spawn-broker-server 8000)