2016-11-20 21:13:40 +00:00
|
|
|
#lang syndicate/actor
|
|
|
|
|
|
|
|
(require racket/format)
|
|
|
|
(require net/url)
|
|
|
|
(require net/uri-codec)
|
|
|
|
(require syndicate/functional-queue)
|
|
|
|
|
|
|
|
(require/activate syndicate/drivers/timestate)
|
|
|
|
(require/activate syndicate/drivers/web)
|
|
|
|
(require/activate "../config.rkt")
|
|
|
|
|
|
|
|
(require "../private/util.rkt")
|
|
|
|
(require "../protocol.rkt")
|
|
|
|
|
|
|
|
(actor #:name 'hub
|
|
|
|
(during (local-host $host-name $port)
|
|
|
|
(during (canonical-local-host $canonical-host-name $cport)
|
|
|
|
(on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body)
|
|
|
|
(asynchronous-verification-of-intent id req body canonical-host-name cport)
|
|
|
|
(web-respond/status! id 202 #"Accepted"))))
|
|
|
|
|
|
|
|
(on (message (update-subscription $topic $callback $settings))
|
|
|
|
(retract! (subscription topic callback ?))
|
|
|
|
(when settings (assert! (subscription topic callback settings))))
|
|
|
|
|
|
|
|
(during/actor (subscription $topic $callback _)
|
|
|
|
#:name (list 'subscription topic callback)
|
|
|
|
#:on-crash (retract! (subscription topic callback ?))
|
|
|
|
(subscription-main topic callback)))
|
|
|
|
|
|
|
|
(define (asynchronous-verification-of-intent id req body canonical-host-name cport)
|
|
|
|
(actor* #:name 'verification-of-intent
|
|
|
|
(define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body))))
|
|
|
|
(define callback (hash-ref params 'hub.callback))
|
|
|
|
(define mode (match (hash-ref params 'hub.mode)
|
|
|
|
["subscribe" 'subscribe]
|
|
|
|
["unsubscribe" 'unsubscribe]))
|
|
|
|
(define requested-topic (hash-ref params 'hub.topic))
|
|
|
|
(define topic
|
|
|
|
(url->string
|
|
|
|
(combine-url/relative (string->url (canonical-url canonical-host-name
|
|
|
|
cport
|
|
|
|
`("topic" ("" ()))))
|
|
|
|
requested-topic)))
|
|
|
|
(define requested-lease-seconds
|
|
|
|
(string->number
|
|
|
|
(hash-ref params 'hub.lease_seconds (~a (config-ref 'default-lease 86400)))))
|
|
|
|
(define lease-seconds (min requested-lease-seconds (config-ref 'max-lease 604800)))
|
|
|
|
(define poll-interval-seconds
|
|
|
|
(match (hash-ref params
|
|
|
|
'hub.poll_interval_seconds
|
|
|
|
(~a (config-ref 'default-poll-interval "none")))
|
|
|
|
["none" #f]
|
|
|
|
[n (string->number n)]))
|
|
|
|
(define secret-string (hash-ref params 'hub.secret #f))
|
|
|
|
(define secret-bytes (and secret-string (string->bytes/utf-8 secret-string)))
|
|
|
|
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
|
|
|
|
(match mode
|
|
|
|
['subscribe
|
|
|
|
(when (subscription-change-validate "subscribe"
|
|
|
|
lease-seconds
|
|
|
|
requested-topic
|
|
|
|
callback)
|
|
|
|
(send! (update-subscription topic
|
|
|
|
callback
|
|
|
|
(subscription-settings expiry-deadline
|
|
|
|
secret-bytes
|
|
|
|
poll-interval-seconds))))]
|
|
|
|
['unsubscribe
|
|
|
|
(when (subscription-change-validate "unsubscribe" #f requested-topic callback)
|
|
|
|
(send! (update-subscription topic callback #f)))])))
|
|
|
|
|
|
|
|
(define (subscription-change-validate mode lease requested-topic callback)
|
|
|
|
(define challenge (random-hex-string 16))
|
|
|
|
(define id (gensym 'validation))
|
|
|
|
(define extra-query (list* (cons 'hub.mode mode)
|
|
|
|
(cons 'hub.topic requested-topic)
|
|
|
|
(cons 'hub.challenge challenge)
|
|
|
|
(if lease
|
|
|
|
(list (cons 'hub.lease_seconds (~a lease)))
|
|
|
|
(list))))
|
|
|
|
(define-values (resp body)
|
|
|
|
(web-request! 'get
|
|
|
|
(let* ((u (string->url callback)))
|
|
|
|
(url->string
|
|
|
|
(struct-copy url u [query (append (url-query u) extra-query)])))))
|
2016-11-21 21:04:07 +00:00
|
|
|
(and (web-response-successful? resp)
|
2016-11-20 21:13:40 +00:00
|
|
|
(equal? body (string->bytes/utf-8 challenge))))
|
|
|
|
|
|
|
|
(define (subscription-main topic callback)
|
|
|
|
(field [delivery-active? #f]
|
|
|
|
[message-queue (make-queue)]
|
|
|
|
[dead-letters (make-queue)])
|
|
|
|
|
|
|
|
(define/query-config max-dead-letters 10)
|
|
|
|
(define/query-config initial-retry-delay 5.0)
|
|
|
|
(define/query-config max-delivery-retries 10)
|
|
|
|
(define/query-config retry-delay-multiplier 1.618)
|
|
|
|
(define/query-config max-retry-delay 30)
|
|
|
|
|
|
|
|
(stop-when (rising-edge (> (queue-length (dead-letters)) (max-dead-letters)))
|
|
|
|
(log-info "Too many dead letters for ~a" callback))
|
|
|
|
|
|
|
|
(define (deliver-queued-notifications)
|
|
|
|
(delivery-active? #t)
|
|
|
|
(let deliver-rest ((retry-delay (initial-retry-delay)))
|
|
|
|
(when (not (queue-empty? (message-queue)))
|
|
|
|
(define-values (n newq) (dequeue (message-queue)))
|
|
|
|
(message-queue newq)
|
|
|
|
(match-define (notification _ canonical-hub canonical-topic body content-type) n)
|
|
|
|
(define link-headers (append (maybe-link-header canonical-hub 'hub)
|
|
|
|
(maybe-link-header canonical-topic 'self)))
|
|
|
|
(let deliver-one ((retries-remaining (max-delivery-retries))
|
|
|
|
(retry-delay retry-delay))
|
|
|
|
(define-values (resp _body)
|
|
|
|
(web-request! 'post
|
|
|
|
callback
|
|
|
|
#:headers (if content-type
|
|
|
|
(cons (cons 'content-type content-type) link-headers)
|
|
|
|
link-headers)
|
|
|
|
#:body body))
|
|
|
|
(cond
|
2016-11-21 21:04:07 +00:00
|
|
|
[(web-response-successful? resp)
|
2016-11-20 21:13:40 +00:00
|
|
|
(deliver-rest (initial-retry-delay))]
|
|
|
|
[(zero? retries-remaining)
|
|
|
|
(log-info "Dead letter for ~v" callback)
|
|
|
|
(dead-letters (enqueue (dead-letters) n))
|
|
|
|
(deliver-rest retry-delay)]
|
|
|
|
[else
|
|
|
|
(log-info
|
|
|
|
"Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v"
|
|
|
|
callback
|
|
|
|
retry-delay
|
|
|
|
retries-remaining
|
|
|
|
resp)
|
|
|
|
(sleep retry-delay)
|
|
|
|
(deliver-one (- retries-remaining 1)
|
|
|
|
(min (* retry-delay (retry-delay-multiplier))
|
|
|
|
(max-retry-delay)))]))))
|
|
|
|
(delivery-active? #f))
|
|
|
|
|
|
|
|
(during (subscription topic callback (subscription-settings
|
|
|
|
$expiry-deadline
|
|
|
|
$secret-bytes
|
|
|
|
$poll-interval-seconds))
|
|
|
|
(assert (topic-demand topic poll-interval-seconds))
|
|
|
|
|
|
|
|
(on-start (log-info "Subscription configured: ~v"
|
|
|
|
`((topic ,topic)
|
|
|
|
(expiry-deadline ,expiry-deadline)
|
|
|
|
(callback ,callback)
|
|
|
|
(secret-bytes ,secret-bytes)
|
|
|
|
(poll-interval-seconds ,poll-interval-seconds))))
|
|
|
|
|
|
|
|
(stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0)))
|
|
|
|
(log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback))))
|
|
|
|
|
|
|
|
(on (message ($ n (notification topic _ _ _ _)))
|
|
|
|
(message-queue (enqueue (message-queue) n))
|
|
|
|
(when (not (delivery-active?))
|
|
|
|
(deliver-queued-notifications)))))
|