From 2a9273142ac4076cd13388fd8a2d547d75e40d05 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 22 Nov 2016 15:16:35 +1300 Subject: [PATCH] Split out relay subscriptions from remote topics. The previous code was trying to do too much with local dataflow. This refactoring splits (stateful) relay upstream subscriptions out into actors of their own. Previously, dataflow variables were used to hold a single upstream hub and a single upstream canonical topic URL. This causes problems when things change rapidly: an unsubscription or subscription might be in flight, and there was no way to tell what the resulting state of the remote topic as a whole should be if the fields had changed between the start and finish of an in flight action. The new design follows from the observation that an individual subscription to an upstream hub is a stateful entity: it deserves promotion to a full actor. There is now an actor for the remote topic, and one for each established upstream subscription. Upstream subscriptions are created in response to demand expressed by the remote topic actor; to switch, all it needs to do is update its demand, and new subscriptions will spring into being while surplus subscriptions gracefully terminate. Also, this commit fixes a bug where upstream subscriptions were being made to the requested-topic rather than to the upstream-topic. Consequently, self-subscriptions (with multiple `http-listener` aliases) work now without infinite mail loops. There is still an issue that blocking actions in on-stop clauses don't run to completion; that is being dealt with separately, as part of a reexamination of facet-terminating events more generally. --- racketmq/hub/remote-topic.rkt | 269 +++++++++++++++++++--------------- 1 file changed, 154 insertions(+), 115 deletions(-) diff --git a/racketmq/hub/remote-topic.rkt b/racketmq/hub/remote-topic.rkt index 376677a..aeb724e 100644 --- a/racketmq/hub/remote-topic.rkt +++ b/racketmq/hub/remote-topic.rkt @@ -32,168 +32,207 @@ (check-equal? (shrink-lease 90) 81) (check-equal? (shrink-lease 50) 45)) -(actor #:name 'remote-topic-manager - (during/actor (remote-topic-demand $full-topic) - #:name (list 'remote-topic full-topic) - (remote-topic-main full-topic))) +;; An UpstreamLink represents demand for a subscription to a remote +;; hub. Notifications received should be forwarded locally using +;; `requested-topic` as the first field of a `notification` message. +(struct upstream-link (requested-topic ;; Topic -- the topic requested by *our* subscriber + upstream-hub ;; URLString -- upstream hub + upstream-topic ;; Topic -- upstream topic (from discovery) + ) #:prefab) ;; ASSERTION -(define (remote-topic-main full-topic) - (define sub-id (random-hex-string 16)) - (log-info "Remote sub endpoint ~a for topic ~s" sub-id full-topic) +(actor #:name 'remote-topic-manager + (during/actor (remote-topic-demand $requested-topic) + #:name (list 'remote-topic requested-topic) + (remote-topic-main requested-topic))) + +(actor #:name 'upstream-link-manager + (during/actor (upstream-link $requested-topic $upstream-hub $upstream-topic) + #:name (list 'upstream-link requested-topic upstream-hub upstream-topic) + (upstream-link-main requested-topic upstream-hub upstream-topic))) + +(define (remote-topic-main requested-topic) + ;; Does discovery and occasionally polls. + (on-start (log-info "Remote topic ~s starting." requested-topic)) + (on-stop (log-info "Remote topic ~s terminating." requested-topic)) (field [current-content-hash #f] - [current-content-type #f] - [current-upstream-hub #f] - [established-upstream-hub #f]) + [current-content-type #f]) + + (field [current-upstream-hub #f] + [current-upstream-topic requested-topic]) (field [last-upstream-check 0] - [poll-interval-seconds #f] - [next-subscription-refresh #f]) + [poll-interval-seconds #f]) (define/query-config min-poll-interval 60) (define/query-config max-upstream-redirects 5) - (define/query-config subscription-retry-delay 600) - (define/query-set poll-intervals (topic-demand full-topic $i) i) + (define/query-set poll-intervals (topic-demand requested-topic $i) i) (begin/dataflow (define candidate (set-minimum (poll-intervals))) (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) (begin/dataflow - (log-info "Poll interval for ~a is now ~a" - full-topic + (log-info "Poll interval for ~s is now ~a" + requested-topic (match (poll-interval-seconds) [#f "disabled"] [n (format "~a seconds" n)]))) + (field [poll-busy? #f]) ;; Ensure we only run one poll at once + (define (poll-upstream!) + (when (not (poll-busy?)) + (define poll-upstream-topic (current-upstream-topic)) + (poll-busy? #t) + (log-info "Checking upstream ~a" poll-upstream-topic) + (define-values (resp response-body) + (web-request! 'get poll-upstream-topic #:redirect-budget (max-upstream-redirects))) + (last-upstream-check (current-inexact-milliseconds)) + (match (web-response-header-code-type resp) + ['successful + (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) + (current-upstream-hub (link-header-ref parsed-link-headers 'hub #f)) + (current-upstream-topic + (link-header-ref parsed-link-headers 'self (current-upstream-topic))) + (define new-content-hash (sha1 (open-input-bytes response-body))) + (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) + (when (not (and (equal? (current-content-hash) new-content-hash) + (equal? (current-content-type) new-content-type))) + (current-content-hash new-content-hash) + (current-content-type new-content-type) + (send! (notification requested-topic + (current-upstream-hub) + (current-upstream-topic) + response-body + new-content-type)))] + [other + (log-warning "Upstream ~s (original: ~s) yielded ~a code ~a during poll" + poll-upstream-topic + requested-topic + other + (web-response-header-code resp))]) + (poll-busy? #f))) + + (on-start (poll-upstream!)) ;; always check at least once, for discovery + + (on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check) + (* 1000 + (or (poll-interval-seconds) 0))))) + (poll-upstream!)) + + (assert #:when (current-upstream-hub) + (upstream-link requested-topic (current-upstream-hub) (current-upstream-topic))) + + (on (message (notification requested-topic + $new-hub + $new-topic + $new-body + $new-type)) + (current-upstream-hub new-hub) + (current-upstream-topic new-topic) + (current-content-hash (sha1 (open-input-bytes new-body))) + (current-content-type new-type))) + +(define (upstream-link-main requested-topic upstream-hub upstream-topic) + (define sub-id (random-hex-string 16)) + + (on-start + (log-info "Starting remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s" + sub-id requested-topic upstream-hub upstream-topic)) + + (on-stop + (log-info "Stopping remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s" + sub-id requested-topic upstream-hub upstream-topic)) + + (field [next-subscription-refresh #f]) + (begin/dataflow + (when (next-subscription-refresh) + (log-info "Next subscription refresh for topic ~s will occur in ~a seconds." + requested-topic + (/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0)))) + + (define/query-config subscription-retry-delay 600) + (during (canonical-baseurl $baseurl) (define callback (canonical-url baseurl `("sub" (,sub-id ())))) (define (refresh-subscription!) ;; TODO: shared secret - (define hub (current-upstream-hub)) ;; keep a stable value, lest it change underneath us - (when hub - (log-info "Subscribing to hub ~s for topic ~s" hub full-topic) - (analyze-response - (lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) - (hub.mode . "subscribe") - (hub.topic . ,full-topic)))) - (lambda () - (when (equal? (current-upstream-hub) hub) ;; it may have changed asynchronously - (established-upstream-hub hub))) - (lambda (resp response-body) - (log-warning - "Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" - hub - full-topic - resp - response-body) - (next-subscription-refresh (+ (current-inexact-milliseconds) - (* 1000.0 (subscription-retry-delay)))))))) + (log-info "Subscribing endpoint ~a to hub ~s for topic ~s (originally requested as ~s)" + sub-id + upstream-hub + upstream-topic + requested-topic) + (analyze-response + (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback) + (hub.mode . "subscribe") + (hub.topic . ,upstream-topic)))) + void + (lambda (resp response-body) + (log-warning + "Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" + upstream-hub + upstream-topic + resp + response-body) + (next-subscription-refresh (+ (current-inexact-milliseconds) + (* 1000.0 (subscription-retry-delay))))))) (define (unsubscribe!) - (define hub (established-upstream-hub)) - (when hub - (log-info "Unsubscribing from hub ~s for topic ~s" hub full-topic) - (analyze-response - (lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) - (hub.mode . "unsubscribe") - (hub.topic . ,full-topic)))) - void - (lambda (resp response-body) - (log-warning - "Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" - hub - full-topic - resp - response-body))) - (when (equal? (established-upstream-hub) hub) ;; it may have changed asynchronously - (established-upstream-hub #f)))) + (log-info "Unsubscribing endpoint ~a from hub ~s for topic ~s (originally requested as ~s)" + sub-id + upstream-hub + upstream-topic + requested-topic) + (analyze-response + (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback) + (hub.mode . "unsubscribe") + (hub.topic . ,upstream-topic)))) + void + (lambda (resp response-body) + (log-warning + "Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" + upstream-hub + upstream-topic + resp + response-body)))) - (field [poll-busy? #f]) ;; Ensure we only run one poll at once - (define (poll-upstream!) - (when (not (poll-busy?)) - (poll-busy? #t) - (log-info "Checking upstream ~a" full-topic) - (define-values (resp response-body) - (web-request! 'get full-topic #:redirect-budget (max-upstream-redirects))) - (last-upstream-check (current-inexact-milliseconds)) - (match (web-response-header-code-type resp) - ['successful - (define new-content-hash (sha1 (open-input-bytes response-body))) - (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) - (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) - (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) - (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) - (when (not (and (equal? (current-content-hash) new-content-hash) - (equal? (current-content-type) new-content-type))) - (current-content-hash new-content-hash) - (current-content-type new-content-type) - (send! (notification full-topic - upstream-hub - upstream-topic - response-body - new-content-type))) - (current-upstream-hub upstream-hub)] - [other - (log-warning "Upstream ~a yielded ~a code ~a during poll" - full-topic - other - (web-response-header-code resp))]) - (poll-busy? #f))) - - (on-start (poll-upstream!)) ;; always check at least once, for discovery + (on-start (refresh-subscription!)) (on-stop (unsubscribe!)) - (on-stop (log-info "Remote topic ~s terminating" full-topic)) - - (begin/dataflow - (when (not (equal? (current-upstream-hub) (established-upstream-hub))) - (unsubscribe!) - (refresh-subscription!))) - - (begin/dataflow - (when (next-subscription-refresh) - (log-info "Next subscription refresh for topic ~s will occur in ~a seconds." - full-topic - (/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0)))) (on #:when (next-subscription-refresh) (asserted (later-than (next-subscription-refresh))) (next-subscription-refresh #f) (refresh-subscription!)) - (on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check) - (* 1000 (or (poll-interval-seconds) 0))))) - (poll-upstream!)) - (during (http-listener $host-name $port) (on (web-request-get (id req) (vh host-name port) ("sub" (,sub-id ()))) - (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) + (log-info "~a received verification-of-intent: ~v" sub-id (web-request-header-query req)) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (define lease-seconds-str (dict-ref (web-request-header-query req) 'hub.lease_seconds #f)) (if lease-seconds-str (next-subscription-refresh (+ (current-inexact-milliseconds) (* 1000.0 (shrink-lease (string->number lease-seconds-str))))) - (log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic)) + (log-warning "Hub ~s, topic ~s did not supply hub.lease_seconds" + upstream-hub + upstream-topic)) (web-respond/bytes! id (string->bytes/utf-8 challenge))) (on (web-request-incoming (id req) (vh host-name port) 'post ("sub" (,sub-id ())) $body) ;; TODO: verify the use of the shared secret (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) - (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) - (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) + (define new-hub (link-header-ref parsed-link-headers 'hub #f)) + (define new-topic (link-header-ref parsed-link-headers 'self #f)) (define content-type (web-request-header-content-type req)) (web-respond/status! id 202 #"Accepted") - (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" - full-topic + (log-info "~a topic ~s got ~v message ~v; upstream hub ~s, topic ~s" + sub-id + requested-topic content-type body - upstream-hub - upstream-topic) - (current-content-hash (sha1 (open-input-bytes body))) - (current-content-type content-type) - (current-upstream-hub upstream-hub) - (send! (notification full-topic - upstream-hub - upstream-topic + new-hub + new-topic) + (send! (notification requested-topic + new-hub + new-topic body content-type))))))