racketmq-2017/racketmq/hub/websocket.rkt

60 lines
2.4 KiB
Racket

#lang syndicate/actor
(require racket/dict)
(require racket/format)
(require net/url)
(require net/base64)
(require json)
(require/activate syndicate/drivers/web)
(require/activate syndicate/drivers/config)
(require "../private/util.rkt")
(require "../protocol.rkt")
(actor #:name 'websocket-hub
(during (http-listener $host-name $port)
(during (canonical-baseurl $baseurl)
(on (web-request-get (id req) (vh host-name port) ("hub" ()))
(when (web-request-header-websocket-upgrade? req)
(websocket-subscription id req baseurl))))))
(define (websocket-subscription id req baseurl)
(actor* #:name (list 'websocket-subscription id)
(define params (web-request-header-query req))
(define requested-topic (dict-ref params 'hub.topic))
(define topic ;; TODO: abstract this expression out (see also subscription.rkt)
(url->string
(combine-url/relative (string->url (canonical-url baseurl `("topic" ("" ()))))
requested-topic)))
(define poll-interval-seconds
(match (dict-ref params
'hub.poll_interval_seconds
(~a (config-ref 'default-poll-interval "none")))
["none" #f]
[n (string->number n)]))
(react
(on-start (log-info "Opening websocket subscription to ~v; poll interval ~v"
topic
poll-interval-seconds))
(on-stop (log-info "Closing websocket subscription to ~v" topic))
(assert (topic-demand topic poll-interval-seconds))
(assert (web-response-websocket id))
(stop-when (websocket-connection-closed id))
(on (message ($ n (notification topic
$canonical-hub
$canonical-topic
$content
$content-type)))
(define msg (hash 'topic topic
'link (hash 'hub canonical-hub
'self canonical-topic)
'content-type content-type
'content-base64 (bytes->string/utf-8 (base64-encode content #""))))
(websocket-message-send! id (jsexpr->string msg))))))