From 51088343c46f0bf4e135ad0c8d4caa17b36e2ffa Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 22 Nov 2016 10:04:07 +1300 Subject: [PATCH] Fixes and improvements toward proper upstream subscriptions --- README.md | 5 ++ racketmq/defaults.rktd | 6 ++ racketmq/hub/remote-topic.rkt | 131 ++++++++++++++++++++++++---------- racketmq/hub/subscription.rkt | 4 +- racketmq/poke.rkt | 5 +- racketmq/private/util.rkt | 16 +++++ 6 files changed, 125 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index d494c72..3972df0 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,11 @@ will occur; otherwise, polling will occur at the fastest requested rate, but never more frequently than every `min-poll-interval` seconds. + (subscription-retry-delay 600) ;; seconds + +If subscription to an upstream hub fails immediately, we will +schedule a retry in this many seconds. + (max-dead-letters 10) (max-delivery-retries 10) (initial-retry-delay 5.0) ;; seconds diff --git a/racketmq/defaults.rktd b/racketmq/defaults.rktd index 5fe0a2c..7fc36c7 100644 --- a/racketmq/defaults.rktd +++ b/racketmq/defaults.rktd @@ -69,6 +69,12 @@ ;; (min-poll-interval 60) ;; seconds ;; (default-poll-interval "none") ;; seconds, or "none" +;;--------------------------------------------------------------------------- +;; If subscription to an upstream hub fails immediately, we will +;; schedule a retry in this many seconds. +;; +;; (subscription-retry-delay 600) ;; seconds + ;;--------------------------------------------------------------------------- ;; Subscriptions last until explicitly terminated by an unsubscription ;; request, implicitly terminated by lease expiry, or implicitly diff --git a/racketmq/hub/remote-topic.rkt b/racketmq/hub/remote-topic.rkt index 900c802..2c10942 100644 --- a/racketmq/hub/remote-topic.rkt +++ b/racketmq/hub/remote-topic.rkt @@ -4,7 +4,6 @@ (require racket/dict) (require racket/set) -(require net/uri-codec) (require file/sha1) (require/activate syndicate/drivers/timestate) @@ -14,6 +13,8 @@ (require "../private/util.rkt") (require "../protocol.rkt") +(module+ test (require rackunit)) + (define (set-minimum xs) (for/fold [(i #f)] [(candidate (in-set xs))] (cond [(not candidate) i] @@ -21,6 +22,18 @@ [(< candidate i) candidate] [else i]))) +(define (shrink-lease seconds) + (if (< seconds 100) + (* seconds 9/10) + (- seconds 10))) + +(module+ test + (check-equal? (shrink-lease 1000) 990) + (check-equal? (shrink-lease 101) 91) + (check-equal? (shrink-lease 100) 90) + (check-equal? (shrink-lease 90) 81) + (check-equal? (shrink-lease 50) 45)) + (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) (log-info "Remote sub endpoint ~a" sub-id) @@ -31,10 +44,14 @@ [established-upstream-hub #f]) (field [last-upstream-check 0] - [poll-interval-seconds #f]) + [poll-interval-seconds #f] + [next-subscription-refresh #f]) + + (define/query-config min-poll-interval 60) + (define/query-config max-upstream-redirects 5) + (define/query-config subscription-retry-delay 600) (define/query-set poll-intervals (topic-demand full-topic $i) i) - (define/query-config min-poll-interval 60) (begin/dataflow (define candidate (set-minimum (poll-intervals))) (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) @@ -50,39 +67,49 @@ (define (refresh-subscription!) ;; TODO: shared secret - ;; TODO: listen to lease duration and use it to refresh ourselves more smarterly - (when (current-upstream-hub) - (log-info "Subscribing to hub ~s for topic ~s" (current-upstream-hub) full-topic) - (web-request! 'post (current-upstream-hub) - #:body (string->bytes/utf-8 - (alist->form-urlencoded - `((hub.callback . ,callback) - (hub.mode . "subscribe") - (hub.topic . ,full-topic))))) - (established-upstream-hub (current-upstream-hub)))) + (define hub (current-upstream-hub)) ;; keep a stable value, lest it change underneath us + (when hub + (log-info "Subscribing to hub ~s for topic ~s" hub full-topic) + (analyze-response + (lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) + (hub.mode . "subscribe") + (hub.topic . ,full-topic)))) + (lambda () + (when (equal? (current-upstream-hub) hub) ;; it may have changed asynchronously + (established-upstream-hub hub))) + (lambda (resp response-body) + (log-warning + "Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" + hub + full-topic + resp + response-body) + (next-subscription-refresh (+ (current-inexact-milliseconds) + (* 1000.0 (subscription-retry-delay)))))))) (define (unsubscribe!) - (when (established-upstream-hub) - (log-info "Unsubscribing from hub ~s for topic ~s" (established-upstream-hub) full-topic) - (web-request! 'post (established-upstream-hub) - #:body (string->bytes/utf-8 - (alist->form-urlencoded - `((hub.callback . ,callback) - (hub.mode . "unsubscribe") - (hub.topic . ,full-topic))))) - (established-upstream-hub #f))) + (define hub (established-upstream-hub)) + (when hub + (log-info "Unsubscribing from hub ~s for topic ~s" hub full-topic) + (analyze-response + (lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) + (hub.mode . "unsubscribe") + (hub.topic . ,full-topic)))) + void + (lambda (resp response-body) + (log-warning + "Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" + hub + full-topic + resp + response-body))) + (when (equal? (established-upstream-hub) hub) ;; it may have changed asynchronously + (established-upstream-hub #f)))) - (on-stop (unsubscribe!)) - - (begin/dataflow - (when (not (equal? (current-upstream-hub) (established-upstream-hub))) - (unsubscribe!) - (refresh-subscription!))) - - (define/query-config max-upstream-redirects 5) - - (on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check) - (* 1000 (or (poll-interval-seconds) 0))))) + (field [poll-busy? #f]) ;; Ensure we only run one poll at once + (define (poll-upstream!) + (when (not (poll-busy?)) + (poll-busy? #t) (log-info "Checking upstream ~a" full-topic) (define-values (resp response-body) (web-request! 'get full-topic #:redirect-budget (max-upstream-redirects))) @@ -103,18 +130,44 @@ upstream-topic response-body new-content-type))) - (if (equal? (current-upstream-hub) upstream-hub) - (refresh-subscription!) - (current-upstream-hub upstream-hub))] + (current-upstream-hub upstream-hub)] [other - (log-warning "Upstream ~a yielded ~a code ~a" + (log-warning "Upstream ~a yielded ~a code ~a during poll" full-topic other - (web-response-header-code resp))])) + (web-response-header-code resp))]) + (poll-busy? #f))) + + (on-start (poll-upstream!)) ;; always check at least once, for discovery + (on-stop (unsubscribe!)) + + (begin/dataflow + (when (not (equal? (current-upstream-hub) (established-upstream-hub))) + (unsubscribe!) + (refresh-subscription!))) + + (begin/dataflow + (when (next-subscription-refresh) + (log-info "Next subscription refresh for topic ~s will occur in ~a seconds." + full-topic + (/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0)))) + + (on #:when (next-subscription-refresh) (asserted (later-than (next-subscription-refresh))) + (next-subscription-refresh #f) + (refresh-subscription!)) + + (on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check) + (* 1000 (or (poll-interval-seconds) 0))))) + (poll-upstream!)) (on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ()))) - (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) + (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) + (define lease-seconds (dict-ref (web-request-header-query req) 'hub.lease_seconds #f)) + (if lease-seconds + (next-subscription-refresh (+ (current-inexact-milliseconds) + (* 1000.0 (shrink-lease lease-seconds)))) + (log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic)) (web-respond/bytes! id (string->bytes/utf-8 challenge))) (on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body) diff --git a/racketmq/hub/subscription.rkt b/racketmq/hub/subscription.rkt index 88b1593..6132412 100644 --- a/racketmq/hub/subscription.rkt +++ b/racketmq/hub/subscription.rkt @@ -84,7 +84,7 @@ (let* ((u (string->url callback))) (url->string (struct-copy url u [query (append (url-query u) extra-query)]))))) - (and (eq? (web-response-header-code-type resp) 'successful) + (and (web-response-successful? resp) (equal? body (string->bytes/utf-8 challenge)))) (define (subscription-main topic callback) @@ -120,7 +120,7 @@ link-headers) #:body body)) (cond - [(eq? (web-response-header-code-type resp) 'successful) + [(web-response-successful? resp) (deliver-rest (initial-retry-delay))] [(zero? retries-remaining) (log-info "Dead letter for ~v" callback) diff --git a/racketmq/poke.rkt b/racketmq/poke.rkt index bc21ddd..5a33b6b 100644 --- a/racketmq/poke.rkt +++ b/racketmq/poke.rkt @@ -21,9 +21,10 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define (request! verb path #:body [body #""]) +(define (request! verb path #:headers [headers '()] #:body [body #""]) (web-request! verb (url->string (resource->url (struct-copy web-resource server-res [path path]))) + #:headers headers #:body body)) (let ((e (read-bytes-line-evt (current-input-port) 'any))) @@ -89,6 +90,7 @@ (define (spawn-subscriber topic) (request! 'post `("hub" ()) + #:headers (list (cons 'content-type "application/x-www-form-urlencoded")) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . "http://localhost:7000/sink") @@ -99,6 +101,7 @@ (define (unsubscribe-from topic) (request! 'post `("hub" ()) + #:headers (list (cons 'content-type "application/x-www-form-urlencoded")) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . "http://localhost:7000/sink") diff --git a/racketmq/private/util.rkt b/racketmq/private/util.rkt index bc39949..41f7fe0 100644 --- a/racketmq/private/util.rkt +++ b/racketmq/private/util.rkt @@ -4,6 +4,8 @@ extend-url-string-query web-respond/status! web-request! + web-post/form-parameters! + analyze-response parse-link-headers link-header-ref maybe-link-header @@ -16,6 +18,7 @@ (require (only-in file/sha1 bytes->hex-string)) (require (only-in racket/random crypto-random-bytes)) (require net/url) +(require net/uri-codec) (require syndicate/actor) (require syndicate/drivers/timer) (require syndicate/drivers/web) @@ -80,6 +83,19 @@ #:redirect-budget (- redirect-budget 1)) (values resp response-body))) +(define (web-post/form-parameters! urlstr query-alist + #:headers [headers '()] + #:redirect-budget [redirect-budget 0]) + (web-request! 'post urlstr + #:headers (cons (cons 'content-type "application/x-www-form-urlencoded") headers) + #:body (string->bytes/utf-8 (alist->form-urlencoded query-alist)))) + +(define (analyze-response req-thunk k-successful k-unsuccessful) + (define-values (resp response-body) (req-thunk)) + (if (web-response-successful? resp) + (k-successful) + (k-unsuccessful resp response-body))) + (define (parse-link-headers headers) (define rel-dict-reverse-order (for/fold [(seed (hasheq))] [(header headers)]