From 7d61d63dc7468542112659ee3ef2d550c67e9981 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 31 Oct 2016 22:30:22 -0400 Subject: [PATCH] Defaults; more use of later-than; work on upstream subscription --- rmq/server.rkt | 218 ++++++++++++++++++++++++++++++------------------- 1 file changed, 135 insertions(+), 83 deletions(-) diff --git a/rmq/server.rkt b/rmq/server.rkt index 975759a..8cacfa0 100644 --- a/rmq/server.rkt +++ b/rmq/server.rkt @@ -22,6 +22,22 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; TODO: sane defaults +(define *max-upstream-redirects* 5) +(define *server-port* 7827) +(define *canonical-host* "localhost") +(define *accepted-hosts* (list *canonical-host*)) +(define *min-poll-interval* 10) ;; seconds +(define *default-lease* "unbounded") +(define *default-poll-interval* "none") +(define *max-dead-letters* 10) +(define *max-delivery-retries* 10) +(define *initial-retry-delay* 5.0) ;; seconds +(define *retry-delay-multiplier* 1.618) +(define *max-retry-delay* 30) ;; seconds + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + ;; A Topic is a URIString. ;; A Deadline is an (Option Number), where #f indicates "unbounded", @@ -73,13 +89,14 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define (vh host-name) (web-virtual-host "http" host-name 7827)) +(define (vh host-name) (web-virtual-host "http" host-name *server-port*)) (actor #:name 'main - (assert (local-host "localhost")) + (for [(h *accepted-hosts*)] + (assert (local-host h))) - (assert (canonical-local-host "localhost")) + (assert (canonical-local-host *canonical-host*)) (during (local-host $host-name) (assert (vh host-name)) @@ -155,6 +172,9 @@ (define (canonical-url canonical-host-name path) (url->string (resource->url (web-resource (vh canonical-host-name) path)))) +(define (canonical-topic-base-url canonical-hub-str) + (combine-url/relative (string->url canonical-hub-str) "/topic/")) + (define (local-topic-main topic) (field [max-age #f] [max-count #f]) @@ -209,57 +229,98 @@ (send! (notification full-topic body content-type)) (web-respond/status! id 201 #"Created"))))) -(define (maintain-remote-subscription full-topic canonical-host-name sub-id) - (define callback (canonical-url canonical-host-name `("sub" (,sub-id ())))) - (react - (field [last-upstream-check 0] - [poll-interval-seconds #f]) - - (field [current-content-hash #f] - [current-content-type #f]) - - (define/query-set poll-intervals (topic-demand full-topic $i) i) - (begin/dataflow - (poll-interval-seconds (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] - (cond [(not i) candidate] - [(< candidate i) candidate] - [else i])))) - (begin/dataflow - (log-info "Poll interval for ~a is now ~a" - full-topic - (match (poll-interval-seconds) - [#f "disabled"] - [n (format "~a seconds" n)]))) - - (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) - (log-info "Checking upstream ~a" full-topic) - (define-values (resp response-body) - (web-request! 'get full-topic #:redirect-budget 5)) - (last-upstream-check (current-inexact-milliseconds)) - (match (web-response-header-code-type resp) - ['successful - (define new-content-hash (sha1 (open-input-bytes response-body))) - (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) - (when (not (and (equal? (current-content-hash) new-content-hash) - (equal? (current-content-type) new-content-type))) - (current-content-hash new-content-hash) - (current-content-type new-content-type) - (send! (notification full-topic response-body new-content-type)))] - [other - (log-warning "Upstream ~a yielded ~a code ~a" - full-topic - other - (web-response-header-code resp))])))) - (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) (log-info "Remote sub endpoint ~a" sub-id) + + (field [current-content-hash #f] + [current-content-type #f] + [current-upstream-hub #f] + [established-upstream-hub #f]) + + (field [last-upstream-check 0] + [poll-interval-seconds #f]) + + (define/query-set poll-intervals (topic-demand full-topic $i) i) + (begin/dataflow + (define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] + (cond [(not i) candidate] + [(< candidate i) candidate] + [else i]))) + (poll-interval-seconds (and candidate (max candidate *min-poll-interval*)))) + (begin/dataflow + (log-info "Poll interval for ~a is now ~a" + full-topic + (match (poll-interval-seconds) + [#f "disabled"] + [n (format "~a seconds" n)]))) + (during (canonical-local-host $canonical-host-name) - (on-start (maintain-remote-subscription full-topic canonical-host-name sub-id)) + (define callback (canonical-url canonical-host-name `("sub" (,sub-id ())))) + + (define (refresh-subscription!) + (when (current-upstream-hub) + (web-request! 'post (current-upstream-hub) + #:body (string->bytes/utf-8 + (alist->form-urlencoded + `((hub.callback . ,callback) + (hub.mode . "subscribe") + (hub.topic . ,full-topic))))) + (established-upstream-hub (current-upstream-hub)))) + + (define (unsubscribe!) + (when (established-upstream-hub) + (web-request! 'post (established-upstream-hub) + #:body (string->bytes/utf-8 + (alist->form-urlencoded + `((hub.callback . ,callback) + (hub.mode . "unsubscribe") + (hub.topic . ,full-topic))))) + (established-upstream-hub #f))) + + (begin/dataflow + (when (not (equal? (current-upstream-hub) (established-upstream-hub))) + (unsubscribe!) + (refresh-subscription!))) + + (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) + (log-info "Checking upstream ~a" full-topic) + (define-values (resp response-body) + (web-request! 'get full-topic #:redirect-budget *max-upstream-redirects*)) + (last-upstream-check (current-inexact-milliseconds)) + (match (web-response-header-code-type resp) + ['successful + (define new-content-hash (sha1 (open-input-bytes response-body))) + (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) + (when (not (and (equal? (current-content-hash) new-content-hash) + (equal? (current-content-type) new-content-type))) + (current-content-hash new-content-hash) + (current-content-type new-content-type) + (send! (notification full-topic response-body new-content-type))) + (define upstream-hub + (match (hash-ref (parse-link-headers (web-response-header-headers resp)) 'hub '()) + [(cons hub-url _) hub-url] + ['() #f])) + (if (equal? (current-upstream-hub) upstream-hub) + (refresh-subscription!) + (current-upstream-hub upstream-hub))] + [other + (log-warning "Upstream ~a yielded ~a code ~a" + full-topic + other + (web-response-header-code resp))])) + + (on (web-request-get (id req) (vh canonical-host-name) ("sub" (,sub-id ()))) + (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) + (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) + (web-respond/bytes! id (string->bytes/utf-8 challenge))) + (on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body) (actor* (define content-type (req-content-type req)) (log-info "Remote topic ~a got ~v message ~v" full-topic content-type body) + (current-content-hash (sha1 (open-input-bytes body))) + (current-content-type content-type) (send! (notification full-topic body content-type)) (web-respond/status! id 201 #"Created"))))) @@ -283,11 +344,11 @@ ["unsubscribe" 'unsubscribe])) (define topic (hash-ref params 'hub.topic)) (define lease-seconds - (match (hash-ref params 'hub.lease_seconds "unbounded") + (match (hash-ref params 'hub.lease_seconds *default-lease*) ["unbounded" #f] [n (string->number n)])) (define poll-interval-seconds - (match (hash-ref params 'hub.poll_interval_seconds "none") + (match (hash-ref params 'hub.poll_interval_seconds *default-poll-interval*) ["none" #f] [n (string->number n)])) (define secret-string (hash-ref params 'hub.secret #f)) @@ -295,6 +356,7 @@ (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) (define canonical-hub (url->string (resource->url (web-request-header-resource req)))) + ;; TODO: asynchronous validation (match mode ['subscribe (if (subscription-change-validate "subscribe" @@ -346,29 +408,25 @@ (and (eq? (web-response-header-code-type resp) 'successful) (equal? body (string->bytes/utf-8 challenge)))) -(define (canonical-topic-base-url canonical-hub-str) - (combine-url/relative (string->url canonical-hub-str) "/topic/")) - (define (subscription-main partial-topic callback) - (field [expiry-timer-id #f] - [delivery-active? #f] + (field [delivery-active? #f] [message-queue (make-queue)] [dead-letters (make-queue)]) - (stop-when (rising-edge (> (queue-length (dead-letters)) 1)) + (stop-when (rising-edge (> (queue-length (dead-letters)) *max-dead-letters*)) (log-info "Too many dead letters for ~a" callback)) (define (deliver-queued-notifications canonical-hub) (delivery-active? #t) - (let deliver-rest () + (let deliver-rest ((retry-delay *initial-retry-delay*)) (when (not (queue-empty? (message-queue))) (define-values (n newq) (dequeue (message-queue))) (message-queue newq) (match-define (notification topic body content-type) n) (define link-headers (list (cons 'link (format "<~a>; rel=hub" canonical-hub)) (cons 'link (format "<~a>; rel=self" topic)))) - (let deliver-one ((retries-remaining 1) - (retry-delay 5.0)) + (let deliver-one ((retries-remaining *max-delivery-retries*) + (retry-delay retry-delay)) (define-values (resp _body) (web-request! 'post callback @@ -378,11 +436,11 @@ #:body body)) (cond [(eq? (web-response-header-code-type resp) 'successful) - (deliver-rest)] + (deliver-rest *initial-retry-delay*)] [(zero? retries-remaining) (log-info "Dead letter for ~v" callback) (dead-letters (enqueue (dead-letters) n)) - (deliver-rest)] + (deliver-rest retry-delay)] [else (log-info "Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v" @@ -392,7 +450,8 @@ resp) (sleep retry-delay) (deliver-one (- retries-remaining 1) - (min (* retry-delay 1.618) 30))])))) + (min (* retry-delay *retry-delay-multiplier*) + *max-retry-delay*))])))) (delivery-active? #f)) (during (subscription partial-topic @@ -401,30 +460,23 @@ callback $secret-bytes $poll-interval-seconds) - (define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub) - partial-topic))) + (define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub) + partial-topic))) - (assert (topic-demand topic poll-interval-seconds)) + (assert (topic-demand topic poll-interval-seconds)) - (on-start (log-info "Subscription configured: ~v ~v ~v ~v ~v" - topic - expiry-deadline - canonical-hub - secret-bytes - poll-interval-seconds) - (cond - [expiry-deadline - (expiry-timer-id (gensym 'subscription-expiry)) - (log-info "Subscription will expire at ~a" expiry-deadline) - (send! (set-timer (expiry-timer-id) (* expiry-deadline 1000.0) 'absolute))] - [else - (log-info "Subscription will not expire") - (expiry-timer-id #f)])) + (on-start (log-info "Subscription configured: ~v" + `((topic ,topic) + (expiry-deadline ,expiry-deadline) + (canonical-hub ,canonical-hub) + (callback ,callback) + (secret-bytes ,secret-bytes) + (poll-interval-seconds ,poll-interval-seconds)))) - (stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _)) - (log-info "Subscription expired")) + (stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0))) + (log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback)))) - (on (message ($ n (notification topic _ _))) - (message-queue (enqueue (message-queue) n)) - (when (not (delivery-active?)) - (deliver-queued-notifications canonical-hub))))) + (on (message ($ n (notification topic _ _))) + (message-queue (enqueue (message-queue) n)) + (when (not (delivery-active?)) + (deliver-queued-notifications canonical-hub)))))