diff --git a/racketmq/hub.rkt b/racketmq/hub.rkt index b4c7975..8f5f6b8 100644 --- a/racketmq/hub.rkt +++ b/racketmq/hub.rkt @@ -13,6 +13,7 @@ (require/activate "hub/topic-demand.rkt") (require/activate "hub/local-topic.rkt") (require/activate "hub/subscription.rkt") +(require/activate "hub/websocket.rkt") (command-line #:program "racketmq" diff --git a/racketmq/hub/websocket.rkt b/racketmq/hub/websocket.rkt new file mode 100644 index 0000000..13a8312 --- /dev/null +++ b/racketmq/hub/websocket.rkt @@ -0,0 +1,59 @@ +#lang syndicate/actor + +(require racket/dict) +(require racket/format) +(require net/url) +(require net/base64) +(require json) + +(require/activate syndicate/drivers/web) +(require/activate "../config.rkt") + +(require "../private/util.rkt") +(require "../protocol.rkt") + +(actor #:name 'websocket-hub + (during (local-host $host-name $port) + (during (canonical-local-host $canonical-host-name $cport) + (on (web-request-get (id req) (vh host-name port) ("hub" ())) + (when (equal? (dict-ref (web-request-header-headers req) 'upgrade #f) "websocket") + (websocket-subscription id req canonical-host-name cport)))))) + +(define (websocket-subscription id req canonical-host-name cport) + (actor* #:name (list 'websocket-subscription id) + (define params (web-request-header-query req)) + (define requested-topic (dict-ref params 'hub.topic)) + (define topic ;; TODO: abstract this expression out (see also subscription.rkt) + (url->string + (combine-url/relative (string->url (canonical-url canonical-host-name + cport + `("topic" ("" ())))) + requested-topic))) + (define poll-interval-seconds + (match (dict-ref params + 'hub.poll_interval_seconds + (~a (config-ref 'default-poll-interval "none"))) + ["none" #f] + [n (string->number n)])) + + (react + (on-start (log-info "Opening websocket subscription to ~v; poll interval ~v" + topic + poll-interval-seconds)) + (on-stop (log-info "Closing websocket subscription to ~v" topic)) + + (assert (topic-demand topic poll-interval-seconds)) + + (assert (web-response-websocket id)) + + (on (message ($ n (notification topic + $canonical-hub + $canonical-topic + $content + $content-type))) + (define msg (hash 'topic topic + 'link (hash 'hub canonical-hub + 'self canonical-topic) + 'content-type content-type + 'content-base64 (bytes->string/utf-8 (base64-encode content #"")))) + (websocket-message-send! id (jsexpr->string msg))))))