diff --git a/racketmq/hub/remote-topic.rkt b/racketmq/hub/remote-topic.rkt index 376677a..aeb724e 100644 --- a/racketmq/hub/remote-topic.rkt +++ b/racketmq/hub/remote-topic.rkt @@ -32,168 +32,207 @@ (check-equal? (shrink-lease 90) 81) (check-equal? (shrink-lease 50) 45)) -(actor #:name 'remote-topic-manager - (during/actor (remote-topic-demand $full-topic) - #:name (list 'remote-topic full-topic) - (remote-topic-main full-topic))) +;; An UpstreamLink represents demand for a subscription to a remote +;; hub. Notifications received should be forwarded locally using +;; `requested-topic` as the first field of a `notification` message. +(struct upstream-link (requested-topic ;; Topic -- the topic requested by *our* subscriber + upstream-hub ;; URLString -- upstream hub + upstream-topic ;; Topic -- upstream topic (from discovery) + ) #:prefab) ;; ASSERTION -(define (remote-topic-main full-topic) - (define sub-id (random-hex-string 16)) - (log-info "Remote sub endpoint ~a for topic ~s" sub-id full-topic) +(actor #:name 'remote-topic-manager + (during/actor (remote-topic-demand $requested-topic) + #:name (list 'remote-topic requested-topic) + (remote-topic-main requested-topic))) + +(actor #:name 'upstream-link-manager + (during/actor (upstream-link $requested-topic $upstream-hub $upstream-topic) + #:name (list 'upstream-link requested-topic upstream-hub upstream-topic) + (upstream-link-main requested-topic upstream-hub upstream-topic))) + +(define (remote-topic-main requested-topic) + ;; Does discovery and occasionally polls. + (on-start (log-info "Remote topic ~s starting." requested-topic)) + (on-stop (log-info "Remote topic ~s terminating." requested-topic)) (field [current-content-hash #f] - [current-content-type #f] - [current-upstream-hub #f] - [established-upstream-hub #f]) + [current-content-type #f]) + + (field [current-upstream-hub #f] + [current-upstream-topic requested-topic]) (field [last-upstream-check 0] - [poll-interval-seconds #f] - [next-subscription-refresh #f]) + [poll-interval-seconds #f]) (define/query-config min-poll-interval 60) (define/query-config max-upstream-redirects 5) - (define/query-config subscription-retry-delay 600) - (define/query-set poll-intervals (topic-demand full-topic $i) i) + (define/query-set poll-intervals (topic-demand requested-topic $i) i) (begin/dataflow (define candidate (set-minimum (poll-intervals))) (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) (begin/dataflow - (log-info "Poll interval for ~a is now ~a" - full-topic + (log-info "Poll interval for ~s is now ~a" + requested-topic (match (poll-interval-seconds) [#f "disabled"] [n (format "~a seconds" n)]))) + (field [poll-busy? #f]) ;; Ensure we only run one poll at once + (define (poll-upstream!) + (when (not (poll-busy?)) + (define poll-upstream-topic (current-upstream-topic)) + (poll-busy? #t) + (log-info "Checking upstream ~a" poll-upstream-topic) + (define-values (resp response-body) + (web-request! 'get poll-upstream-topic #:redirect-budget (max-upstream-redirects))) + (last-upstream-check (current-inexact-milliseconds)) + (match (web-response-header-code-type resp) + ['successful + (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) + (current-upstream-hub (link-header-ref parsed-link-headers 'hub #f)) + (current-upstream-topic + (link-header-ref parsed-link-headers 'self (current-upstream-topic))) + (define new-content-hash (sha1 (open-input-bytes response-body))) + (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) + (when (not (and (equal? (current-content-hash) new-content-hash) + (equal? (current-content-type) new-content-type))) + (current-content-hash new-content-hash) + (current-content-type new-content-type) + (send! (notification requested-topic + (current-upstream-hub) + (current-upstream-topic) + response-body + new-content-type)))] + [other + (log-warning "Upstream ~s (original: ~s) yielded ~a code ~a during poll" + poll-upstream-topic + requested-topic + other + (web-response-header-code resp))]) + (poll-busy? #f))) + + (on-start (poll-upstream!)) ;; always check at least once, for discovery + + (on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check) + (* 1000 + (or (poll-interval-seconds) 0))))) + (poll-upstream!)) + + (assert #:when (current-upstream-hub) + (upstream-link requested-topic (current-upstream-hub) (current-upstream-topic))) + + (on (message (notification requested-topic + $new-hub + $new-topic + $new-body + $new-type)) + (current-upstream-hub new-hub) + (current-upstream-topic new-topic) + (current-content-hash (sha1 (open-input-bytes new-body))) + (current-content-type new-type))) + +(define (upstream-link-main requested-topic upstream-hub upstream-topic) + (define sub-id (random-hex-string 16)) + + (on-start + (log-info "Starting remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s" + sub-id requested-topic upstream-hub upstream-topic)) + + (on-stop + (log-info "Stopping remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s" + sub-id requested-topic upstream-hub upstream-topic)) + + (field [next-subscription-refresh #f]) + (begin/dataflow + (when (next-subscription-refresh) + (log-info "Next subscription refresh for topic ~s will occur in ~a seconds." + requested-topic + (/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0)))) + + (define/query-config subscription-retry-delay 600) + (during (canonical-baseurl $baseurl) (define callback (canonical-url baseurl `("sub" (,sub-id ())))) (define (refresh-subscription!) ;; TODO: shared secret - (define hub (current-upstream-hub)) ;; keep a stable value, lest it change underneath us - (when hub - (log-info "Subscribing to hub ~s for topic ~s" hub full-topic) - (analyze-response - (lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) - (hub.mode . "subscribe") - (hub.topic . ,full-topic)))) - (lambda () - (when (equal? (current-upstream-hub) hub) ;; it may have changed asynchronously - (established-upstream-hub hub))) - (lambda (resp response-body) - (log-warning - "Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" - hub - full-topic - resp - response-body) - (next-subscription-refresh (+ (current-inexact-milliseconds) - (* 1000.0 (subscription-retry-delay)))))))) + (log-info "Subscribing endpoint ~a to hub ~s for topic ~s (originally requested as ~s)" + sub-id + upstream-hub + upstream-topic + requested-topic) + (analyze-response + (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback) + (hub.mode . "subscribe") + (hub.topic . ,upstream-topic)))) + void + (lambda (resp response-body) + (log-warning + "Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" + upstream-hub + upstream-topic + resp + response-body) + (next-subscription-refresh (+ (current-inexact-milliseconds) + (* 1000.0 (subscription-retry-delay))))))) (define (unsubscribe!) - (define hub (established-upstream-hub)) - (when hub - (log-info "Unsubscribing from hub ~s for topic ~s" hub full-topic) - (analyze-response - (lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) - (hub.mode . "unsubscribe") - (hub.topic . ,full-topic)))) - void - (lambda (resp response-body) - (log-warning - "Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" - hub - full-topic - resp - response-body))) - (when (equal? (established-upstream-hub) hub) ;; it may have changed asynchronously - (established-upstream-hub #f)))) + (log-info "Unsubscribing endpoint ~a from hub ~s for topic ~s (originally requested as ~s)" + sub-id + upstream-hub + upstream-topic + requested-topic) + (analyze-response + (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback) + (hub.mode . "unsubscribe") + (hub.topic . ,upstream-topic)))) + void + (lambda (resp response-body) + (log-warning + "Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" + upstream-hub + upstream-topic + resp + response-body)))) - (field [poll-busy? #f]) ;; Ensure we only run one poll at once - (define (poll-upstream!) - (when (not (poll-busy?)) - (poll-busy? #t) - (log-info "Checking upstream ~a" full-topic) - (define-values (resp response-body) - (web-request! 'get full-topic #:redirect-budget (max-upstream-redirects))) - (last-upstream-check (current-inexact-milliseconds)) - (match (web-response-header-code-type resp) - ['successful - (define new-content-hash (sha1 (open-input-bytes response-body))) - (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) - (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) - (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) - (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) - (when (not (and (equal? (current-content-hash) new-content-hash) - (equal? (current-content-type) new-content-type))) - (current-content-hash new-content-hash) - (current-content-type new-content-type) - (send! (notification full-topic - upstream-hub - upstream-topic - response-body - new-content-type))) - (current-upstream-hub upstream-hub)] - [other - (log-warning "Upstream ~a yielded ~a code ~a during poll" - full-topic - other - (web-response-header-code resp))]) - (poll-busy? #f))) - - (on-start (poll-upstream!)) ;; always check at least once, for discovery + (on-start (refresh-subscription!)) (on-stop (unsubscribe!)) - (on-stop (log-info "Remote topic ~s terminating" full-topic)) - - (begin/dataflow - (when (not (equal? (current-upstream-hub) (established-upstream-hub))) - (unsubscribe!) - (refresh-subscription!))) - - (begin/dataflow - (when (next-subscription-refresh) - (log-info "Next subscription refresh for topic ~s will occur in ~a seconds." - full-topic - (/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0)))) (on #:when (next-subscription-refresh) (asserted (later-than (next-subscription-refresh))) (next-subscription-refresh #f) (refresh-subscription!)) - (on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check) - (* 1000 (or (poll-interval-seconds) 0))))) - (poll-upstream!)) - (during (http-listener $host-name $port) (on (web-request-get (id req) (vh host-name port) ("sub" (,sub-id ()))) - (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) + (log-info "~a received verification-of-intent: ~v" sub-id (web-request-header-query req)) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (define lease-seconds-str (dict-ref (web-request-header-query req) 'hub.lease_seconds #f)) (if lease-seconds-str (next-subscription-refresh (+ (current-inexact-milliseconds) (* 1000.0 (shrink-lease (string->number lease-seconds-str))))) - (log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic)) + (log-warning "Hub ~s, topic ~s did not supply hub.lease_seconds" + upstream-hub + upstream-topic)) (web-respond/bytes! id (string->bytes/utf-8 challenge))) (on (web-request-incoming (id req) (vh host-name port) 'post ("sub" (,sub-id ())) $body) ;; TODO: verify the use of the shared secret (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) - (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) - (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) + (define new-hub (link-header-ref parsed-link-headers 'hub #f)) + (define new-topic (link-header-ref parsed-link-headers 'self #f)) (define content-type (web-request-header-content-type req)) (web-respond/status! id 202 #"Accepted") - (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" - full-topic + (log-info "~a topic ~s got ~v message ~v; upstream hub ~s, topic ~s" + sub-id + requested-topic content-type body - upstream-hub - upstream-topic) - (current-content-hash (sha1 (open-input-bytes body))) - (current-content-type content-type) - (current-upstream-hub upstream-hub) - (send! (notification full-topic - upstream-hub - upstream-topic + new-hub + new-topic) + (send! (notification requested-topic + new-hub + new-topic body content-type))))))