#lang syndicate/actor (require racket/dict) (require racket/format) (require net/url) (require net/base64) (require json) (require/activate syndicate/drivers/web) (require/activate "../config.rkt") (require "../private/util.rkt") (require "../protocol.rkt") (actor #:name 'websocket-hub (during (local-host $host-name $port) (during (canonical-local-host $canonical-host-name $cport) (on (web-request-get (id req) (vh host-name port) ("hub" ())) (when (equal? (dict-ref (web-request-header-headers req) 'upgrade #f) "websocket") (websocket-subscription id req canonical-host-name cport)))))) (define (websocket-subscription id req canonical-host-name cport) (actor* #:name (list 'websocket-subscription id) (define params (web-request-header-query req)) (define requested-topic (dict-ref params 'hub.topic)) (define topic ;; TODO: abstract this expression out (see also subscription.rkt) (url->string (combine-url/relative (string->url (canonical-url canonical-host-name cport `("topic" ("" ())))) requested-topic))) (define poll-interval-seconds (match (dict-ref params 'hub.poll_interval_seconds (~a (config-ref 'default-poll-interval "none"))) ["none" #f] [n (string->number n)])) (react (on-start (log-info "Opening websocket subscription to ~v; poll interval ~v" topic poll-interval-seconds)) (on-stop (log-info "Closing websocket subscription to ~v" topic)) (assert (topic-demand topic poll-interval-seconds)) (assert (web-response-websocket id)) (on (message ($ n (notification topic $canonical-hub $canonical-topic $content $content-type))) (define msg (hash 'topic topic 'link (hash 'hub canonical-hub 'self canonical-topic) 'content-type content-type 'content-base64 (bytes->string/utf-8 (base64-encode content #"")))) (websocket-message-send! id (jsexpr->string msg))))))