Fixes and improvements toward proper upstream subscriptions
This commit is contained in:
parent
79e8d87e5f
commit
51088343c4
|
@ -122,6 +122,11 @@ will occur; otherwise, polling will occur at the fastest requested
|
||||||
rate, but never more frequently than every `min-poll-interval`
|
rate, but never more frequently than every `min-poll-interval`
|
||||||
seconds.
|
seconds.
|
||||||
|
|
||||||
|
(subscription-retry-delay 600) ;; seconds
|
||||||
|
|
||||||
|
If subscription to an upstream hub fails immediately, we will
|
||||||
|
schedule a retry in this many seconds.
|
||||||
|
|
||||||
(max-dead-letters 10)
|
(max-dead-letters 10)
|
||||||
(max-delivery-retries 10)
|
(max-delivery-retries 10)
|
||||||
(initial-retry-delay 5.0) ;; seconds
|
(initial-retry-delay 5.0) ;; seconds
|
||||||
|
|
|
@ -69,6 +69,12 @@
|
||||||
;; (min-poll-interval 60) ;; seconds
|
;; (min-poll-interval 60) ;; seconds
|
||||||
;; (default-poll-interval "none") ;; seconds, or "none"
|
;; (default-poll-interval "none") ;; seconds, or "none"
|
||||||
|
|
||||||
|
;;---------------------------------------------------------------------------
|
||||||
|
;; If subscription to an upstream hub fails immediately, we will
|
||||||
|
;; schedule a retry in this many seconds.
|
||||||
|
;;
|
||||||
|
;; (subscription-retry-delay 600) ;; seconds
|
||||||
|
|
||||||
;;---------------------------------------------------------------------------
|
;;---------------------------------------------------------------------------
|
||||||
;; Subscriptions last until explicitly terminated by an unsubscription
|
;; Subscriptions last until explicitly terminated by an unsubscription
|
||||||
;; request, implicitly terminated by lease expiry, or implicitly
|
;; request, implicitly terminated by lease expiry, or implicitly
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
|
|
||||||
(require racket/dict)
|
(require racket/dict)
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
(require net/uri-codec)
|
|
||||||
(require file/sha1)
|
(require file/sha1)
|
||||||
|
|
||||||
(require/activate syndicate/drivers/timestate)
|
(require/activate syndicate/drivers/timestate)
|
||||||
|
@ -14,6 +13,8 @@
|
||||||
(require "../private/util.rkt")
|
(require "../private/util.rkt")
|
||||||
(require "../protocol.rkt")
|
(require "../protocol.rkt")
|
||||||
|
|
||||||
|
(module+ test (require rackunit))
|
||||||
|
|
||||||
(define (set-minimum xs)
|
(define (set-minimum xs)
|
||||||
(for/fold [(i #f)] [(candidate (in-set xs))]
|
(for/fold [(i #f)] [(candidate (in-set xs))]
|
||||||
(cond [(not candidate) i]
|
(cond [(not candidate) i]
|
||||||
|
@ -21,6 +22,18 @@
|
||||||
[(< candidate i) candidate]
|
[(< candidate i) candidate]
|
||||||
[else i])))
|
[else i])))
|
||||||
|
|
||||||
|
(define (shrink-lease seconds)
|
||||||
|
(if (< seconds 100)
|
||||||
|
(* seconds 9/10)
|
||||||
|
(- seconds 10)))
|
||||||
|
|
||||||
|
(module+ test
|
||||||
|
(check-equal? (shrink-lease 1000) 990)
|
||||||
|
(check-equal? (shrink-lease 101) 91)
|
||||||
|
(check-equal? (shrink-lease 100) 90)
|
||||||
|
(check-equal? (shrink-lease 90) 81)
|
||||||
|
(check-equal? (shrink-lease 50) 45))
|
||||||
|
|
||||||
(define (remote-topic-main full-topic)
|
(define (remote-topic-main full-topic)
|
||||||
(define sub-id (random-hex-string 16))
|
(define sub-id (random-hex-string 16))
|
||||||
(log-info "Remote sub endpoint ~a" sub-id)
|
(log-info "Remote sub endpoint ~a" sub-id)
|
||||||
|
@ -31,10 +44,14 @@
|
||||||
[established-upstream-hub #f])
|
[established-upstream-hub #f])
|
||||||
|
|
||||||
(field [last-upstream-check 0]
|
(field [last-upstream-check 0]
|
||||||
[poll-interval-seconds #f])
|
[poll-interval-seconds #f]
|
||||||
|
[next-subscription-refresh #f])
|
||||||
|
|
||||||
|
(define/query-config min-poll-interval 60)
|
||||||
|
(define/query-config max-upstream-redirects 5)
|
||||||
|
(define/query-config subscription-retry-delay 600)
|
||||||
|
|
||||||
(define/query-set poll-intervals (topic-demand full-topic $i) i)
|
(define/query-set poll-intervals (topic-demand full-topic $i) i)
|
||||||
(define/query-config min-poll-interval 60)
|
|
||||||
(begin/dataflow
|
(begin/dataflow
|
||||||
(define candidate (set-minimum (poll-intervals)))
|
(define candidate (set-minimum (poll-intervals)))
|
||||||
(poll-interval-seconds (and candidate (max candidate (min-poll-interval)))))
|
(poll-interval-seconds (and candidate (max candidate (min-poll-interval)))))
|
||||||
|
@ -50,39 +67,49 @@
|
||||||
|
|
||||||
(define (refresh-subscription!)
|
(define (refresh-subscription!)
|
||||||
;; TODO: shared secret
|
;; TODO: shared secret
|
||||||
;; TODO: listen to lease duration and use it to refresh ourselves more smarterly
|
(define hub (current-upstream-hub)) ;; keep a stable value, lest it change underneath us
|
||||||
(when (current-upstream-hub)
|
(when hub
|
||||||
(log-info "Subscribing to hub ~s for topic ~s" (current-upstream-hub) full-topic)
|
(log-info "Subscribing to hub ~s for topic ~s" hub full-topic)
|
||||||
(web-request! 'post (current-upstream-hub)
|
(analyze-response
|
||||||
#:body (string->bytes/utf-8
|
(lambda () (web-post/form-parameters! hub `((hub.callback . ,callback)
|
||||||
(alist->form-urlencoded
|
(hub.mode . "subscribe")
|
||||||
`((hub.callback . ,callback)
|
(hub.topic . ,full-topic))))
|
||||||
(hub.mode . "subscribe")
|
(lambda ()
|
||||||
(hub.topic . ,full-topic)))))
|
(when (equal? (current-upstream-hub) hub) ;; it may have changed asynchronously
|
||||||
(established-upstream-hub (current-upstream-hub))))
|
(established-upstream-hub hub)))
|
||||||
|
(lambda (resp response-body)
|
||||||
|
(log-warning
|
||||||
|
"Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s"
|
||||||
|
hub
|
||||||
|
full-topic
|
||||||
|
resp
|
||||||
|
response-body)
|
||||||
|
(next-subscription-refresh (+ (current-inexact-milliseconds)
|
||||||
|
(* 1000.0 (subscription-retry-delay))))))))
|
||||||
|
|
||||||
(define (unsubscribe!)
|
(define (unsubscribe!)
|
||||||
(when (established-upstream-hub)
|
(define hub (established-upstream-hub))
|
||||||
(log-info "Unsubscribing from hub ~s for topic ~s" (established-upstream-hub) full-topic)
|
(when hub
|
||||||
(web-request! 'post (established-upstream-hub)
|
(log-info "Unsubscribing from hub ~s for topic ~s" hub full-topic)
|
||||||
#:body (string->bytes/utf-8
|
(analyze-response
|
||||||
(alist->form-urlencoded
|
(lambda () (web-post/form-parameters! hub `((hub.callback . ,callback)
|
||||||
`((hub.callback . ,callback)
|
(hub.mode . "unsubscribe")
|
||||||
(hub.mode . "unsubscribe")
|
(hub.topic . ,full-topic))))
|
||||||
(hub.topic . ,full-topic)))))
|
void
|
||||||
(established-upstream-hub #f)))
|
(lambda (resp response-body)
|
||||||
|
(log-warning
|
||||||
|
"Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s"
|
||||||
|
hub
|
||||||
|
full-topic
|
||||||
|
resp
|
||||||
|
response-body)))
|
||||||
|
(when (equal? (established-upstream-hub) hub) ;; it may have changed asynchronously
|
||||||
|
(established-upstream-hub #f))))
|
||||||
|
|
||||||
(on-stop (unsubscribe!))
|
(field [poll-busy? #f]) ;; Ensure we only run one poll at once
|
||||||
|
(define (poll-upstream!)
|
||||||
(begin/dataflow
|
(when (not (poll-busy?))
|
||||||
(when (not (equal? (current-upstream-hub) (established-upstream-hub)))
|
(poll-busy? #t)
|
||||||
(unsubscribe!)
|
|
||||||
(refresh-subscription!)))
|
|
||||||
|
|
||||||
(define/query-config max-upstream-redirects 5)
|
|
||||||
|
|
||||||
(on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check)
|
|
||||||
(* 1000 (or (poll-interval-seconds) 0)))))
|
|
||||||
(log-info "Checking upstream ~a" full-topic)
|
(log-info "Checking upstream ~a" full-topic)
|
||||||
(define-values (resp response-body)
|
(define-values (resp response-body)
|
||||||
(web-request! 'get full-topic #:redirect-budget (max-upstream-redirects)))
|
(web-request! 'get full-topic #:redirect-budget (max-upstream-redirects)))
|
||||||
|
@ -103,18 +130,44 @@
|
||||||
upstream-topic
|
upstream-topic
|
||||||
response-body
|
response-body
|
||||||
new-content-type)))
|
new-content-type)))
|
||||||
(if (equal? (current-upstream-hub) upstream-hub)
|
(current-upstream-hub upstream-hub)]
|
||||||
(refresh-subscription!)
|
|
||||||
(current-upstream-hub upstream-hub))]
|
|
||||||
[other
|
[other
|
||||||
(log-warning "Upstream ~a yielded ~a code ~a"
|
(log-warning "Upstream ~a yielded ~a code ~a during poll"
|
||||||
full-topic
|
full-topic
|
||||||
other
|
other
|
||||||
(web-response-header-code resp))]))
|
(web-response-header-code resp))])
|
||||||
|
(poll-busy? #f)))
|
||||||
|
|
||||||
|
(on-start (poll-upstream!)) ;; always check at least once, for discovery
|
||||||
|
(on-stop (unsubscribe!))
|
||||||
|
|
||||||
|
(begin/dataflow
|
||||||
|
(when (not (equal? (current-upstream-hub) (established-upstream-hub)))
|
||||||
|
(unsubscribe!)
|
||||||
|
(refresh-subscription!)))
|
||||||
|
|
||||||
|
(begin/dataflow
|
||||||
|
(when (next-subscription-refresh)
|
||||||
|
(log-info "Next subscription refresh for topic ~s will occur in ~a seconds."
|
||||||
|
full-topic
|
||||||
|
(/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0))))
|
||||||
|
|
||||||
|
(on #:when (next-subscription-refresh) (asserted (later-than (next-subscription-refresh)))
|
||||||
|
(next-subscription-refresh #f)
|
||||||
|
(refresh-subscription!))
|
||||||
|
|
||||||
|
(on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check)
|
||||||
|
(* 1000 (or (poll-interval-seconds) 0)))))
|
||||||
|
(poll-upstream!))
|
||||||
|
|
||||||
(on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ())))
|
(on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ())))
|
||||||
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
|
|
||||||
(log-info "Received verification-of-intent: ~v" (web-request-header-query req))
|
(log-info "Received verification-of-intent: ~v" (web-request-header-query req))
|
||||||
|
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
|
||||||
|
(define lease-seconds (dict-ref (web-request-header-query req) 'hub.lease_seconds #f))
|
||||||
|
(if lease-seconds
|
||||||
|
(next-subscription-refresh (+ (current-inexact-milliseconds)
|
||||||
|
(* 1000.0 (shrink-lease lease-seconds))))
|
||||||
|
(log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic))
|
||||||
(web-respond/bytes! id (string->bytes/utf-8 challenge)))
|
(web-respond/bytes! id (string->bytes/utf-8 challenge)))
|
||||||
|
|
||||||
(on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body)
|
(on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body)
|
||||||
|
|
|
@ -84,7 +84,7 @@
|
||||||
(let* ((u (string->url callback)))
|
(let* ((u (string->url callback)))
|
||||||
(url->string
|
(url->string
|
||||||
(struct-copy url u [query (append (url-query u) extra-query)])))))
|
(struct-copy url u [query (append (url-query u) extra-query)])))))
|
||||||
(and (eq? (web-response-header-code-type resp) 'successful)
|
(and (web-response-successful? resp)
|
||||||
(equal? body (string->bytes/utf-8 challenge))))
|
(equal? body (string->bytes/utf-8 challenge))))
|
||||||
|
|
||||||
(define (subscription-main topic callback)
|
(define (subscription-main topic callback)
|
||||||
|
@ -120,7 +120,7 @@
|
||||||
link-headers)
|
link-headers)
|
||||||
#:body body))
|
#:body body))
|
||||||
(cond
|
(cond
|
||||||
[(eq? (web-response-header-code-type resp) 'successful)
|
[(web-response-successful? resp)
|
||||||
(deliver-rest (initial-retry-delay))]
|
(deliver-rest (initial-retry-delay))]
|
||||||
[(zero? retries-remaining)
|
[(zero? retries-remaining)
|
||||||
(log-info "Dead letter for ~v" callback)
|
(log-info "Dead letter for ~v" callback)
|
||||||
|
|
|
@ -21,9 +21,10 @@
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(define (request! verb path #:body [body #""])
|
(define (request! verb path #:headers [headers '()] #:body [body #""])
|
||||||
(web-request! verb
|
(web-request! verb
|
||||||
(url->string (resource->url (struct-copy web-resource server-res [path path])))
|
(url->string (resource->url (struct-copy web-resource server-res [path path])))
|
||||||
|
#:headers headers
|
||||||
#:body body))
|
#:body body))
|
||||||
|
|
||||||
(let ((e (read-bytes-line-evt (current-input-port) 'any)))
|
(let ((e (read-bytes-line-evt (current-input-port) 'any)))
|
||||||
|
@ -89,6 +90,7 @@
|
||||||
|
|
||||||
(define (spawn-subscriber topic)
|
(define (spawn-subscriber topic)
|
||||||
(request! 'post `("hub" ())
|
(request! 'post `("hub" ())
|
||||||
|
#:headers (list (cons 'content-type "application/x-www-form-urlencoded"))
|
||||||
#:body (string->bytes/utf-8
|
#:body (string->bytes/utf-8
|
||||||
(alist->form-urlencoded
|
(alist->form-urlencoded
|
||||||
`((hub.callback . "http://localhost:7000/sink")
|
`((hub.callback . "http://localhost:7000/sink")
|
||||||
|
@ -99,6 +101,7 @@
|
||||||
|
|
||||||
(define (unsubscribe-from topic)
|
(define (unsubscribe-from topic)
|
||||||
(request! 'post `("hub" ())
|
(request! 'post `("hub" ())
|
||||||
|
#:headers (list (cons 'content-type "application/x-www-form-urlencoded"))
|
||||||
#:body (string->bytes/utf-8
|
#:body (string->bytes/utf-8
|
||||||
(alist->form-urlencoded
|
(alist->form-urlencoded
|
||||||
`((hub.callback . "http://localhost:7000/sink")
|
`((hub.callback . "http://localhost:7000/sink")
|
||||||
|
|
|
@ -4,6 +4,8 @@
|
||||||
extend-url-string-query
|
extend-url-string-query
|
||||||
web-respond/status!
|
web-respond/status!
|
||||||
web-request!
|
web-request!
|
||||||
|
web-post/form-parameters!
|
||||||
|
analyze-response
|
||||||
parse-link-headers
|
parse-link-headers
|
||||||
link-header-ref
|
link-header-ref
|
||||||
maybe-link-header
|
maybe-link-header
|
||||||
|
@ -16,6 +18,7 @@
|
||||||
(require (only-in file/sha1 bytes->hex-string))
|
(require (only-in file/sha1 bytes->hex-string))
|
||||||
(require (only-in racket/random crypto-random-bytes))
|
(require (only-in racket/random crypto-random-bytes))
|
||||||
(require net/url)
|
(require net/url)
|
||||||
|
(require net/uri-codec)
|
||||||
(require syndicate/actor)
|
(require syndicate/actor)
|
||||||
(require syndicate/drivers/timer)
|
(require syndicate/drivers/timer)
|
||||||
(require syndicate/drivers/web)
|
(require syndicate/drivers/web)
|
||||||
|
@ -80,6 +83,19 @@
|
||||||
#:redirect-budget (- redirect-budget 1))
|
#:redirect-budget (- redirect-budget 1))
|
||||||
(values resp response-body)))
|
(values resp response-body)))
|
||||||
|
|
||||||
|
(define (web-post/form-parameters! urlstr query-alist
|
||||||
|
#:headers [headers '()]
|
||||||
|
#:redirect-budget [redirect-budget 0])
|
||||||
|
(web-request! 'post urlstr
|
||||||
|
#:headers (cons (cons 'content-type "application/x-www-form-urlencoded") headers)
|
||||||
|
#:body (string->bytes/utf-8 (alist->form-urlencoded query-alist))))
|
||||||
|
|
||||||
|
(define (analyze-response req-thunk k-successful k-unsuccessful)
|
||||||
|
(define-values (resp response-body) (req-thunk))
|
||||||
|
(if (web-response-successful? resp)
|
||||||
|
(k-successful)
|
||||||
|
(k-unsuccessful resp response-body)))
|
||||||
|
|
||||||
(define (parse-link-headers headers)
|
(define (parse-link-headers headers)
|
||||||
(define rel-dict-reverse-order
|
(define rel-dict-reverse-order
|
||||||
(for/fold [(seed (hasheq))] [(header headers)]
|
(for/fold [(seed (hasheq))] [(header headers)]
|
||||||
|
|
Loading…
Reference in New Issue