racketmq-2017/racketmq/hub/remote-topic.rkt

140 lines
6.0 KiB
Racket

#lang syndicate/actor
(provide remote-topic-main)
(require racket/dict)
(require racket/set)
(require net/uri-codec)
(require file/sha1)
(require/activate syndicate/drivers/timestate)
(require/activate syndicate/drivers/web)
(require/activate "../config.rkt")
(require "../private/util.rkt")
(require "../protocol.rkt")
(define (set-minimum xs)
(for/fold [(i #f)] [(candidate (in-set xs))]
(cond [(not candidate) i]
[(not i) candidate]
[(< candidate i) candidate]
[else i])))
(define (remote-topic-main full-topic)
(define sub-id (random-hex-string 16))
(log-info "Remote sub endpoint ~a" sub-id)
(field [current-content-hash #f]
[current-content-type #f]
[current-upstream-hub #f]
[established-upstream-hub #f])
(field [last-upstream-check 0]
[poll-interval-seconds #f])
(define/query-set poll-intervals (topic-demand full-topic $i) i)
(define/query-config min-poll-interval 60)
(begin/dataflow
(define candidate (set-minimum (poll-intervals)))
(poll-interval-seconds (and candidate (max candidate (min-poll-interval)))))
(begin/dataflow
(log-info "Poll interval for ~a is now ~a"
full-topic
(match (poll-interval-seconds)
[#f "disabled"]
[n (format "~a seconds" n)])))
(during (canonical-local-host $canonical-host-name $cport)
(define callback (canonical-url canonical-host-name cport `("sub" (,sub-id ()))))
(define (refresh-subscription!)
;; TODO: shared secret
;; TODO: listen to lease duration and use it to refresh ourselves more smarterly
(when (current-upstream-hub)
(log-info "Subscribing to hub ~s for topic ~s" (current-upstream-hub) full-topic)
(web-request! 'post (current-upstream-hub)
#:body (string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,callback)
(hub.mode . "subscribe")
(hub.topic . ,full-topic)))))
(established-upstream-hub (current-upstream-hub))))
(define (unsubscribe!)
(when (established-upstream-hub)
(log-info "Unsubscribing from hub ~s for topic ~s" (established-upstream-hub) full-topic)
(web-request! 'post (established-upstream-hub)
#:body (string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,callback)
(hub.mode . "unsubscribe")
(hub.topic . ,full-topic)))))
(established-upstream-hub #f)))
(begin/dataflow
(when (not (equal? (current-upstream-hub) (established-upstream-hub)))
(unsubscribe!)
(refresh-subscription!)))
(define/query-config max-upstream-redirects 5)
(on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check)
(* 1000 (or (poll-interval-seconds) 0)))))
(log-info "Checking upstream ~a" full-topic)
(define-values (resp response-body)
(web-request! 'get full-topic #:redirect-budget (max-upstream-redirects)))
(last-upstream-check (current-inexact-milliseconds))
(match (web-response-header-code-type resp)
['successful
(define new-content-hash (sha1 (open-input-bytes response-body)))
(define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f))
(define parsed-link-headers (parse-link-headers (web-response-header-headers resp)))
(define upstream-hub (link-header-ref parsed-link-headers 'hub #f))
(define upstream-topic (link-header-ref parsed-link-headers 'self #f))
(when (not (and (equal? (current-content-hash) new-content-hash)
(equal? (current-content-type) new-content-type)))
(current-content-hash new-content-hash)
(current-content-type new-content-type)
(send! (notification full-topic
upstream-hub
upstream-topic
response-body
new-content-type)))
(if (equal? (current-upstream-hub) upstream-hub)
(refresh-subscription!)
(current-upstream-hub upstream-hub))]
[other
(log-warning "Upstream ~a yielded ~a code ~a"
full-topic
other
(web-response-header-code resp))]))
(on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ())))
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
(log-info "Received verification-of-intent: ~v" (web-request-header-query req))
(web-respond/bytes! id (string->bytes/utf-8 challenge)))
(on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body)
;; TODO: verify the use of the shared secret
(actor*
(define parsed-link-headers (parse-link-headers (web-request-header-headers req)))
(define upstream-hub (link-header-ref parsed-link-headers 'hub #f))
(define upstream-topic (link-header-ref parsed-link-headers 'self #f))
(define content-type (web-request-header-content-type req))
(log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v"
full-topic
content-type
body
upstream-hub
upstream-topic)
(current-content-hash (sha1 (open-input-bytes body)))
(current-content-type content-type)
(current-upstream-hub upstream-hub)
(send! (notification full-topic
upstream-hub
upstream-topic
body
content-type))
(web-respond/status! id 201 #"Created")))))