Split out relay subscriptions from remote topics.

The previous code was trying to do too much with local dataflow. This
refactoring splits (stateful) relay upstream subscriptions out into
actors of their own.

Previously, dataflow variables were used to hold a single upstream hub
and a single upstream canonical topic URL. This causes problems when
things change rapidly: an unsubscription or subscription might be in
flight, and there was no way to tell what the resulting state of the
remote topic as a whole should be if the fields had changed between
the start and finish of an in flight action.

The new design follows from the observation that an individual
subscription to an upstream hub is a stateful entity: it deserves
promotion to a full actor. There is now an actor for the remote topic,
and one for each established upstream subscription. Upstream
subscriptions are created in response to demand expressed by the
remote topic actor; to switch, all it needs to do is update its
demand, and new subscriptions will spring into being while surplus
subscriptions gracefully terminate.

Also, this commit fixes a bug where upstream subscriptions were being
made to the requested-topic rather than to the upstream-topic.
Consequently, self-subscriptions (with multiple `http-listener`
aliases) work now without infinite mail loops.

There is still an issue that blocking actions in on-stop clauses don't
run to completion; that is being dealt with separately, as part of a
reexamination of facet-terminating events more generally.
This commit is contained in:
Tony Garnock-Jones 2016-11-22 15:16:35 +13:00
parent d2c161fd3e
commit 2a9273142a
1 changed files with 154 additions and 115 deletions

View File

@ -32,168 +32,207 @@
(check-equal? (shrink-lease 90) 81) (check-equal? (shrink-lease 90) 81)
(check-equal? (shrink-lease 50) 45)) (check-equal? (shrink-lease 50) 45))
(actor #:name 'remote-topic-manager ;; An UpstreamLink represents demand for a subscription to a remote
(during/actor (remote-topic-demand $full-topic) ;; hub. Notifications received should be forwarded locally using
#:name (list 'remote-topic full-topic) ;; `requested-topic` as the first field of a `notification` message.
(remote-topic-main full-topic))) (struct upstream-link (requested-topic ;; Topic -- the topic requested by *our* subscriber
upstream-hub ;; URLString -- upstream hub
upstream-topic ;; Topic -- upstream topic (from discovery)
) #:prefab) ;; ASSERTION
(define (remote-topic-main full-topic) (actor #:name 'remote-topic-manager
(define sub-id (random-hex-string 16)) (during/actor (remote-topic-demand $requested-topic)
(log-info "Remote sub endpoint ~a for topic ~s" sub-id full-topic) #:name (list 'remote-topic requested-topic)
(remote-topic-main requested-topic)))
(actor #:name 'upstream-link-manager
(during/actor (upstream-link $requested-topic $upstream-hub $upstream-topic)
#:name (list 'upstream-link requested-topic upstream-hub upstream-topic)
(upstream-link-main requested-topic upstream-hub upstream-topic)))
(define (remote-topic-main requested-topic)
;; Does discovery and occasionally polls.
(on-start (log-info "Remote topic ~s starting." requested-topic))
(on-stop (log-info "Remote topic ~s terminating." requested-topic))
(field [current-content-hash #f] (field [current-content-hash #f]
[current-content-type #f] [current-content-type #f])
[current-upstream-hub #f]
[established-upstream-hub #f]) (field [current-upstream-hub #f]
[current-upstream-topic requested-topic])
(field [last-upstream-check 0] (field [last-upstream-check 0]
[poll-interval-seconds #f] [poll-interval-seconds #f])
[next-subscription-refresh #f])
(define/query-config min-poll-interval 60) (define/query-config min-poll-interval 60)
(define/query-config max-upstream-redirects 5) (define/query-config max-upstream-redirects 5)
(define/query-config subscription-retry-delay 600)
(define/query-set poll-intervals (topic-demand full-topic $i) i) (define/query-set poll-intervals (topic-demand requested-topic $i) i)
(begin/dataflow (begin/dataflow
(define candidate (set-minimum (poll-intervals))) (define candidate (set-minimum (poll-intervals)))
(poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) (poll-interval-seconds (and candidate (max candidate (min-poll-interval)))))
(begin/dataflow (begin/dataflow
(log-info "Poll interval for ~a is now ~a" (log-info "Poll interval for ~s is now ~a"
full-topic requested-topic
(match (poll-interval-seconds) (match (poll-interval-seconds)
[#f "disabled"] [#f "disabled"]
[n (format "~a seconds" n)]))) [n (format "~a seconds" n)])))
(field [poll-busy? #f]) ;; Ensure we only run one poll at once
(define (poll-upstream!)
(when (not (poll-busy?))
(define poll-upstream-topic (current-upstream-topic))
(poll-busy? #t)
(log-info "Checking upstream ~a" poll-upstream-topic)
(define-values (resp response-body)
(web-request! 'get poll-upstream-topic #:redirect-budget (max-upstream-redirects)))
(last-upstream-check (current-inexact-milliseconds))
(match (web-response-header-code-type resp)
['successful
(define parsed-link-headers (parse-link-headers (web-response-header-headers resp)))
(current-upstream-hub (link-header-ref parsed-link-headers 'hub #f))
(current-upstream-topic
(link-header-ref parsed-link-headers 'self (current-upstream-topic)))
(define new-content-hash (sha1 (open-input-bytes response-body)))
(define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f))
(when (not (and (equal? (current-content-hash) new-content-hash)
(equal? (current-content-type) new-content-type)))
(current-content-hash new-content-hash)
(current-content-type new-content-type)
(send! (notification requested-topic
(current-upstream-hub)
(current-upstream-topic)
response-body
new-content-type)))]
[other
(log-warning "Upstream ~s (original: ~s) yielded ~a code ~a during poll"
poll-upstream-topic
requested-topic
other
(web-response-header-code resp))])
(poll-busy? #f)))
(on-start (poll-upstream!)) ;; always check at least once, for discovery
(on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check)
(* 1000
(or (poll-interval-seconds) 0)))))
(poll-upstream!))
(assert #:when (current-upstream-hub)
(upstream-link requested-topic (current-upstream-hub) (current-upstream-topic)))
(on (message (notification requested-topic
$new-hub
$new-topic
$new-body
$new-type))
(current-upstream-hub new-hub)
(current-upstream-topic new-topic)
(current-content-hash (sha1 (open-input-bytes new-body)))
(current-content-type new-type)))
(define (upstream-link-main requested-topic upstream-hub upstream-topic)
(define sub-id (random-hex-string 16))
(on-start
(log-info "Starting remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s"
sub-id requested-topic upstream-hub upstream-topic))
(on-stop
(log-info "Stopping remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s"
sub-id requested-topic upstream-hub upstream-topic))
(field [next-subscription-refresh #f])
(begin/dataflow
(when (next-subscription-refresh)
(log-info "Next subscription refresh for topic ~s will occur in ~a seconds."
requested-topic
(/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0))))
(define/query-config subscription-retry-delay 600)
(during (canonical-baseurl $baseurl) (during (canonical-baseurl $baseurl)
(define callback (canonical-url baseurl `("sub" (,sub-id ())))) (define callback (canonical-url baseurl `("sub" (,sub-id ()))))
(define (refresh-subscription!) (define (refresh-subscription!)
;; TODO: shared secret ;; TODO: shared secret
(define hub (current-upstream-hub)) ;; keep a stable value, lest it change underneath us (log-info "Subscribing endpoint ~a to hub ~s for topic ~s (originally requested as ~s)"
(when hub sub-id
(log-info "Subscribing to hub ~s for topic ~s" hub full-topic) upstream-hub
(analyze-response upstream-topic
(lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) requested-topic)
(hub.mode . "subscribe") (analyze-response
(hub.topic . ,full-topic)))) (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback)
(lambda () (hub.mode . "subscribe")
(when (equal? (current-upstream-hub) hub) ;; it may have changed asynchronously (hub.topic . ,upstream-topic))))
(established-upstream-hub hub))) void
(lambda (resp response-body) (lambda (resp response-body)
(log-warning (log-warning
"Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" "Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s"
hub upstream-hub
full-topic upstream-topic
resp resp
response-body) response-body)
(next-subscription-refresh (+ (current-inexact-milliseconds) (next-subscription-refresh (+ (current-inexact-milliseconds)
(* 1000.0 (subscription-retry-delay)))))))) (* 1000.0 (subscription-retry-delay)))))))
(define (unsubscribe!) (define (unsubscribe!)
(define hub (established-upstream-hub)) (log-info "Unsubscribing endpoint ~a from hub ~s for topic ~s (originally requested as ~s)"
(when hub sub-id
(log-info "Unsubscribing from hub ~s for topic ~s" hub full-topic) upstream-hub
(analyze-response upstream-topic
(lambda () (web-post/form-parameters! hub `((hub.callback . ,callback) requested-topic)
(hub.mode . "unsubscribe") (analyze-response
(hub.topic . ,full-topic)))) (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback)
void (hub.mode . "unsubscribe")
(lambda (resp response-body) (hub.topic . ,upstream-topic))))
(log-warning void
"Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" (lambda (resp response-body)
hub (log-warning
full-topic "Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s"
resp upstream-hub
response-body))) upstream-topic
(when (equal? (established-upstream-hub) hub) ;; it may have changed asynchronously resp
(established-upstream-hub #f)))) response-body))))
(field [poll-busy? #f]) ;; Ensure we only run one poll at once (on-start (refresh-subscription!))
(define (poll-upstream!)
(when (not (poll-busy?))
(poll-busy? #t)
(log-info "Checking upstream ~a" full-topic)
(define-values (resp response-body)
(web-request! 'get full-topic #:redirect-budget (max-upstream-redirects)))
(last-upstream-check (current-inexact-milliseconds))
(match (web-response-header-code-type resp)
['successful
(define new-content-hash (sha1 (open-input-bytes response-body)))
(define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f))
(define parsed-link-headers (parse-link-headers (web-response-header-headers resp)))
(define upstream-hub (link-header-ref parsed-link-headers 'hub #f))
(define upstream-topic (link-header-ref parsed-link-headers 'self #f))
(when (not (and (equal? (current-content-hash) new-content-hash)
(equal? (current-content-type) new-content-type)))
(current-content-hash new-content-hash)
(current-content-type new-content-type)
(send! (notification full-topic
upstream-hub
upstream-topic
response-body
new-content-type)))
(current-upstream-hub upstream-hub)]
[other
(log-warning "Upstream ~a yielded ~a code ~a during poll"
full-topic
other
(web-response-header-code resp))])
(poll-busy? #f)))
(on-start (poll-upstream!)) ;; always check at least once, for discovery
(on-stop (unsubscribe!)) (on-stop (unsubscribe!))
(on-stop (log-info "Remote topic ~s terminating" full-topic))
(begin/dataflow
(when (not (equal? (current-upstream-hub) (established-upstream-hub)))
(unsubscribe!)
(refresh-subscription!)))
(begin/dataflow
(when (next-subscription-refresh)
(log-info "Next subscription refresh for topic ~s will occur in ~a seconds."
full-topic
(/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0))))
(on #:when (next-subscription-refresh) (asserted (later-than (next-subscription-refresh))) (on #:when (next-subscription-refresh) (asserted (later-than (next-subscription-refresh)))
(next-subscription-refresh #f) (next-subscription-refresh #f)
(refresh-subscription!)) (refresh-subscription!))
(on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check)
(* 1000 (or (poll-interval-seconds) 0)))))
(poll-upstream!))
(during (http-listener $host-name $port) (during (http-listener $host-name $port)
(on (web-request-get (id req) (vh host-name port) ("sub" (,sub-id ()))) (on (web-request-get (id req) (vh host-name port) ("sub" (,sub-id ())))
(log-info "Received verification-of-intent: ~v" (web-request-header-query req)) (log-info "~a received verification-of-intent: ~v" sub-id (web-request-header-query req))
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
(define lease-seconds-str (dict-ref (web-request-header-query req) 'hub.lease_seconds #f)) (define lease-seconds-str (dict-ref (web-request-header-query req) 'hub.lease_seconds #f))
(if lease-seconds-str (if lease-seconds-str
(next-subscription-refresh (+ (current-inexact-milliseconds) (next-subscription-refresh (+ (current-inexact-milliseconds)
(* 1000.0 (* 1000.0
(shrink-lease (string->number lease-seconds-str))))) (shrink-lease (string->number lease-seconds-str)))))
(log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic)) (log-warning "Hub ~s, topic ~s did not supply hub.lease_seconds"
upstream-hub
upstream-topic))
(web-respond/bytes! id (string->bytes/utf-8 challenge))) (web-respond/bytes! id (string->bytes/utf-8 challenge)))
(on (web-request-incoming (id req) (vh host-name port) 'post ("sub" (,sub-id ())) $body) (on (web-request-incoming (id req) (vh host-name port) 'post ("sub" (,sub-id ())) $body)
;; TODO: verify the use of the shared secret ;; TODO: verify the use of the shared secret
(define parsed-link-headers (parse-link-headers (web-request-header-headers req))) (define parsed-link-headers (parse-link-headers (web-request-header-headers req)))
(define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) (define new-hub (link-header-ref parsed-link-headers 'hub #f))
(define upstream-topic (link-header-ref parsed-link-headers 'self #f)) (define new-topic (link-header-ref parsed-link-headers 'self #f))
(define content-type (web-request-header-content-type req)) (define content-type (web-request-header-content-type req))
(web-respond/status! id 202 #"Accepted") (web-respond/status! id 202 #"Accepted")
(log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" (log-info "~a topic ~s got ~v message ~v; upstream hub ~s, topic ~s"
full-topic sub-id
requested-topic
content-type content-type
body body
upstream-hub new-hub
upstream-topic) new-topic)
(current-content-hash (sha1 (open-input-bytes body))) (send! (notification requested-topic
(current-content-type content-type) new-hub
(current-upstream-hub upstream-hub) new-topic
(send! (notification full-topic
upstream-hub
upstream-topic
body body
content-type)))))) content-type))))))