Defaults; more use of later-than; work on upstream subscription

This commit is contained in:
Tony Garnock-Jones 2016-10-31 22:30:22 -04:00
parent a445c6860a
commit 7d61d63dc7
1 changed files with 135 additions and 83 deletions

View File

@ -22,6 +22,22 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TODO: sane defaults
(define *max-upstream-redirects* 5)
(define *server-port* 7827)
(define *canonical-host* "localhost")
(define *accepted-hosts* (list *canonical-host*))
(define *min-poll-interval* 10) ;; seconds
(define *default-lease* "unbounded")
(define *default-poll-interval* "none")
(define *max-dead-letters* 10)
(define *max-delivery-retries* 10)
(define *initial-retry-delay* 5.0) ;; seconds
(define *retry-delay-multiplier* 1.618)
(define *max-retry-delay* 30) ;; seconds
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; A Topic is a URIString.
;; A Deadline is an (Option Number), where #f indicates "unbounded",
@ -73,13 +89,14 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (vh host-name) (web-virtual-host "http" host-name 7827))
(define (vh host-name) (web-virtual-host "http" host-name *server-port*))
(actor #:name 'main
(assert (local-host "localhost"))
(for [(h *accepted-hosts*)]
(assert (local-host h)))
(assert (canonical-local-host "localhost"))
(assert (canonical-local-host *canonical-host*))
(during (local-host $host-name)
(assert (vh host-name))
@ -155,6 +172,9 @@
(define (canonical-url canonical-host-name path)
(url->string (resource->url (web-resource (vh canonical-host-name) path))))
(define (canonical-topic-base-url canonical-hub-str)
(combine-url/relative (string->url canonical-hub-str) "/topic/"))
(define (local-topic-main topic)
(field [max-age #f]
[max-count #f])
@ -209,57 +229,98 @@
(send! (notification full-topic body content-type))
(web-respond/status! id 201 #"Created")))))
(define (maintain-remote-subscription full-topic canonical-host-name sub-id)
(define callback (canonical-url canonical-host-name `("sub" (,sub-id ()))))
(react
(field [last-upstream-check 0]
[poll-interval-seconds #f])
(field [current-content-hash #f]
[current-content-type #f])
(define/query-set poll-intervals (topic-demand full-topic $i) i)
(begin/dataflow
(poll-interval-seconds (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))]
(cond [(not i) candidate]
[(< candidate i) candidate]
[else i]))))
(begin/dataflow
(log-info "Poll interval for ~a is now ~a"
full-topic
(match (poll-interval-seconds)
[#f "disabled"]
[n (format "~a seconds" n)])))
(on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0)))))
(log-info "Checking upstream ~a" full-topic)
(define-values (resp response-body)
(web-request! 'get full-topic #:redirect-budget 5))
(last-upstream-check (current-inexact-milliseconds))
(match (web-response-header-code-type resp)
['successful
(define new-content-hash (sha1 (open-input-bytes response-body)))
(define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f))
(when (not (and (equal? (current-content-hash) new-content-hash)
(equal? (current-content-type) new-content-type)))
(current-content-hash new-content-hash)
(current-content-type new-content-type)
(send! (notification full-topic response-body new-content-type)))]
[other
(log-warning "Upstream ~a yielded ~a code ~a"
full-topic
other
(web-response-header-code resp))]))))
(define (remote-topic-main full-topic)
(define sub-id (random-hex-string 16))
(log-info "Remote sub endpoint ~a" sub-id)
(field [current-content-hash #f]
[current-content-type #f]
[current-upstream-hub #f]
[established-upstream-hub #f])
(field [last-upstream-check 0]
[poll-interval-seconds #f])
(define/query-set poll-intervals (topic-demand full-topic $i) i)
(begin/dataflow
(define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))]
(cond [(not i) candidate]
[(< candidate i) candidate]
[else i])))
(poll-interval-seconds (and candidate (max candidate *min-poll-interval*))))
(begin/dataflow
(log-info "Poll interval for ~a is now ~a"
full-topic
(match (poll-interval-seconds)
[#f "disabled"]
[n (format "~a seconds" n)])))
(during (canonical-local-host $canonical-host-name)
(on-start (maintain-remote-subscription full-topic canonical-host-name sub-id))
(define callback (canonical-url canonical-host-name `("sub" (,sub-id ()))))
(define (refresh-subscription!)
(when (current-upstream-hub)
(web-request! 'post (current-upstream-hub)
#:body (string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,callback)
(hub.mode . "subscribe")
(hub.topic . ,full-topic)))))
(established-upstream-hub (current-upstream-hub))))
(define (unsubscribe!)
(when (established-upstream-hub)
(web-request! 'post (established-upstream-hub)
#:body (string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,callback)
(hub.mode . "unsubscribe")
(hub.topic . ,full-topic)))))
(established-upstream-hub #f)))
(begin/dataflow
(when (not (equal? (current-upstream-hub) (established-upstream-hub)))
(unsubscribe!)
(refresh-subscription!)))
(on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0)))))
(log-info "Checking upstream ~a" full-topic)
(define-values (resp response-body)
(web-request! 'get full-topic #:redirect-budget *max-upstream-redirects*))
(last-upstream-check (current-inexact-milliseconds))
(match (web-response-header-code-type resp)
['successful
(define new-content-hash (sha1 (open-input-bytes response-body)))
(define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f))
(when (not (and (equal? (current-content-hash) new-content-hash)
(equal? (current-content-type) new-content-type)))
(current-content-hash new-content-hash)
(current-content-type new-content-type)
(send! (notification full-topic response-body new-content-type)))
(define upstream-hub
(match (hash-ref (parse-link-headers (web-response-header-headers resp)) 'hub '())
[(cons hub-url _) hub-url]
['() #f]))
(if (equal? (current-upstream-hub) upstream-hub)
(refresh-subscription!)
(current-upstream-hub upstream-hub))]
[other
(log-warning "Upstream ~a yielded ~a code ~a"
full-topic
other
(web-response-header-code resp))]))
(on (web-request-get (id req) (vh canonical-host-name) ("sub" (,sub-id ())))
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
(log-info "Received verification-of-intent: ~v" (web-request-header-query req))
(web-respond/bytes! id (string->bytes/utf-8 challenge)))
(on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body)
(actor*
(define content-type (req-content-type req))
(log-info "Remote topic ~a got ~v message ~v" full-topic content-type body)
(current-content-hash (sha1 (open-input-bytes body)))
(current-content-type content-type)
(send! (notification full-topic body content-type))
(web-respond/status! id 201 #"Created")))))
@ -283,11 +344,11 @@
["unsubscribe" 'unsubscribe]))
(define topic (hash-ref params 'hub.topic))
(define lease-seconds
(match (hash-ref params 'hub.lease_seconds "unbounded")
(match (hash-ref params 'hub.lease_seconds *default-lease*)
["unbounded" #f]
[n (string->number n)]))
(define poll-interval-seconds
(match (hash-ref params 'hub.poll_interval_seconds "none")
(match (hash-ref params 'hub.poll_interval_seconds *default-poll-interval*)
["none" #f]
[n (string->number n)]))
(define secret-string (hash-ref params 'hub.secret #f))
@ -295,6 +356,7 @@
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
(define canonical-hub
(url->string (resource->url (web-request-header-resource req))))
;; TODO: asynchronous validation
(match mode
['subscribe
(if (subscription-change-validate "subscribe"
@ -346,29 +408,25 @@
(and (eq? (web-response-header-code-type resp) 'successful)
(equal? body (string->bytes/utf-8 challenge))))
(define (canonical-topic-base-url canonical-hub-str)
(combine-url/relative (string->url canonical-hub-str) "/topic/"))
(define (subscription-main partial-topic callback)
(field [expiry-timer-id #f]
[delivery-active? #f]
(field [delivery-active? #f]
[message-queue (make-queue)]
[dead-letters (make-queue)])
(stop-when (rising-edge (> (queue-length (dead-letters)) 1))
(stop-when (rising-edge (> (queue-length (dead-letters)) *max-dead-letters*))
(log-info "Too many dead letters for ~a" callback))
(define (deliver-queued-notifications canonical-hub)
(delivery-active? #t)
(let deliver-rest ()
(let deliver-rest ((retry-delay *initial-retry-delay*))
(when (not (queue-empty? (message-queue)))
(define-values (n newq) (dequeue (message-queue)))
(message-queue newq)
(match-define (notification topic body content-type) n)
(define link-headers (list (cons 'link (format "<~a>; rel=hub" canonical-hub))
(cons 'link (format "<~a>; rel=self" topic))))
(let deliver-one ((retries-remaining 1)
(retry-delay 5.0))
(let deliver-one ((retries-remaining *max-delivery-retries*)
(retry-delay retry-delay))
(define-values (resp _body)
(web-request! 'post
callback
@ -378,11 +436,11 @@
#:body body))
(cond
[(eq? (web-response-header-code-type resp) 'successful)
(deliver-rest)]
(deliver-rest *initial-retry-delay*)]
[(zero? retries-remaining)
(log-info "Dead letter for ~v" callback)
(dead-letters (enqueue (dead-letters) n))
(deliver-rest)]
(deliver-rest retry-delay)]
[else
(log-info
"Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v"
@ -392,7 +450,8 @@
resp)
(sleep retry-delay)
(deliver-one (- retries-remaining 1)
(min (* retry-delay 1.618) 30))]))))
(min (* retry-delay *retry-delay-multiplier*)
*max-retry-delay*))]))))
(delivery-active? #f))
(during (subscription partial-topic
@ -401,30 +460,23 @@
callback
$secret-bytes
$poll-interval-seconds)
(define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub)
partial-topic)))
(define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub)
partial-topic)))
(assert (topic-demand topic poll-interval-seconds))
(assert (topic-demand topic poll-interval-seconds))
(on-start (log-info "Subscription configured: ~v ~v ~v ~v ~v"
topic
expiry-deadline
canonical-hub
secret-bytes
poll-interval-seconds)
(cond
[expiry-deadline
(expiry-timer-id (gensym 'subscription-expiry))
(log-info "Subscription will expire at ~a" expiry-deadline)
(send! (set-timer (expiry-timer-id) (* expiry-deadline 1000.0) 'absolute))]
[else
(log-info "Subscription will not expire")
(expiry-timer-id #f)]))
(on-start (log-info "Subscription configured: ~v"
`((topic ,topic)
(expiry-deadline ,expiry-deadline)
(canonical-hub ,canonical-hub)
(callback ,callback)
(secret-bytes ,secret-bytes)
(poll-interval-seconds ,poll-interval-seconds))))
(stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _))
(log-info "Subscription expired"))
(stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0)))
(log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback))))
(on (message ($ n (notification topic _ _)))
(message-queue (enqueue (message-queue) n))
(when (not (delivery-active?))
(deliver-queued-notifications canonical-hub)))))
(on (message ($ n (notification topic _ _)))
(message-queue (enqueue (message-queue) n))
(when (not (delivery-active?))
(deliver-queued-notifications canonical-hub)))))