racketmq-2017/racketmq/hub/subscription.rkt

162 lines
7.5 KiB
Racket

#lang syndicate/actor
(require racket/format)
(require net/url)
(require net/uri-codec)
(require syndicate/functional-queue)
(require/activate syndicate/drivers/timestate)
(require/activate syndicate/drivers/web)
(require/activate "../config.rkt")
(require "../private/util.rkt")
(require "../protocol.rkt")
(actor #:name 'hub
(during (local-host $host-name $port)
(during (canonical-local-host $canonical-host-name $cport)
(on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body)
(asynchronous-verification-of-intent id req body canonical-host-name cport)
(web-respond/status! id 202 #"Accepted"))))
(on (message (update-subscription $topic $callback $settings))
(retract! (subscription topic callback ?))
(when settings (assert! (subscription topic callback settings))))
(during/actor (subscription $topic $callback _)
#:name (list 'subscription topic callback)
#:on-crash (retract! (subscription topic callback ?))
(subscription-main topic callback)))
(define (asynchronous-verification-of-intent id req body canonical-host-name cport)
(actor* #:name 'verification-of-intent
(define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body))))
(define callback (hash-ref params 'hub.callback))
(define mode (match (hash-ref params 'hub.mode)
["subscribe" 'subscribe]
["unsubscribe" 'unsubscribe]))
(define requested-topic (hash-ref params 'hub.topic))
(define topic
(url->string
(combine-url/relative (string->url (canonical-url canonical-host-name
cport
`("topic" ("" ()))))
requested-topic)))
(define requested-lease-seconds
(string->number
(hash-ref params 'hub.lease_seconds (~a (config-ref 'default-lease 86400)))))
(define lease-seconds (min requested-lease-seconds (config-ref 'max-lease 604800)))
(define poll-interval-seconds
(match (hash-ref params
'hub.poll_interval_seconds
(~a (config-ref 'default-poll-interval "none")))
["none" #f]
[n (string->number n)]))
(define secret-string (hash-ref params 'hub.secret #f))
(define secret-bytes (and secret-string (string->bytes/utf-8 secret-string)))
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
(match mode
['subscribe
(when (subscription-change-validate "subscribe"
lease-seconds
requested-topic
callback)
(send! (update-subscription topic
callback
(subscription-settings expiry-deadline
secret-bytes
poll-interval-seconds))))]
['unsubscribe
(when (subscription-change-validate "unsubscribe" #f requested-topic callback)
(send! (update-subscription topic callback #f)))])))
(define (subscription-change-validate mode lease requested-topic callback)
(define challenge (random-hex-string 16))
(define id (gensym 'validation))
(define extra-query (list* (cons 'hub.mode mode)
(cons 'hub.topic requested-topic)
(cons 'hub.challenge challenge)
(if lease
(list (cons 'hub.lease_seconds (~a lease)))
(list))))
(define-values (resp body)
(web-request! 'get
(let* ((u (string->url callback)))
(url->string
(struct-copy url u [query (append (url-query u) extra-query)])))))
(and (eq? (web-response-header-code-type resp) 'successful)
(equal? body (string->bytes/utf-8 challenge))))
(define (subscription-main topic callback)
(field [delivery-active? #f]
[message-queue (make-queue)]
[dead-letters (make-queue)])
(define/query-config max-dead-letters 10)
(define/query-config initial-retry-delay 5.0)
(define/query-config max-delivery-retries 10)
(define/query-config retry-delay-multiplier 1.618)
(define/query-config max-retry-delay 30)
(stop-when (rising-edge (> (queue-length (dead-letters)) (max-dead-letters)))
(log-info "Too many dead letters for ~a" callback))
(define (deliver-queued-notifications)
(delivery-active? #t)
(let deliver-rest ((retry-delay (initial-retry-delay)))
(when (not (queue-empty? (message-queue)))
(define-values (n newq) (dequeue (message-queue)))
(message-queue newq)
(match-define (notification _ canonical-hub canonical-topic body content-type) n)
(define link-headers (append (maybe-link-header canonical-hub 'hub)
(maybe-link-header canonical-topic 'self)))
(let deliver-one ((retries-remaining (max-delivery-retries))
(retry-delay retry-delay))
(define-values (resp _body)
(web-request! 'post
callback
#:headers (if content-type
(cons (cons 'content-type content-type) link-headers)
link-headers)
#:body body))
(cond
[(eq? (web-response-header-code-type resp) 'successful)
(deliver-rest (initial-retry-delay))]
[(zero? retries-remaining)
(log-info "Dead letter for ~v" callback)
(dead-letters (enqueue (dead-letters) n))
(deliver-rest retry-delay)]
[else
(log-info
"Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v"
callback
retry-delay
retries-remaining
resp)
(sleep retry-delay)
(deliver-one (- retries-remaining 1)
(min (* retry-delay (retry-delay-multiplier))
(max-retry-delay)))]))))
(delivery-active? #f))
(during (subscription topic callback (subscription-settings
$expiry-deadline
$secret-bytes
$poll-interval-seconds))
(assert (topic-demand topic poll-interval-seconds))
(on-start (log-info "Subscription configured: ~v"
`((topic ,topic)
(expiry-deadline ,expiry-deadline)
(callback ,callback)
(secret-bytes ,secret-bytes)
(poll-interval-seconds ,poll-interval-seconds))))
(stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0)))
(log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback))))
(on (message ($ n (notification topic _ _ _ _)))
(message-queue (enqueue (message-queue) n))
(when (not (delivery-active?))
(deliver-queued-notifications)))))