#lang syndicate/actor (require racket/dict) (require racket/set) (require file/sha1) (require/activate syndicate/drivers/timestate) (require/activate syndicate/drivers/web) (require/activate syndicate/drivers/config) (require "../private/util.rkt") (require "../protocol.rkt") (module+ test (require rackunit)) (define (set-minimum xs) (for/fold [(i #f)] [(candidate (in-set xs))] (cond [(not candidate) i] [(not i) candidate] [(< candidate i) candidate] [else i]))) (define (shrink-lease seconds) (if (< seconds 100) (* seconds 9/10) (- seconds 10))) (module+ test (check-equal? (shrink-lease 1000) 990) (check-equal? (shrink-lease 101) 91) (check-equal? (shrink-lease 100) 90) (check-equal? (shrink-lease 90) 81) (check-equal? (shrink-lease 50) 45)) ;; An UpstreamLink represents demand for a subscription to a remote ;; hub. Notifications received should be forwarded locally using ;; `requested-topic` as the first field of a `notification` message. (struct upstream-link (requested-topic ;; Topic -- the topic requested by *our* subscriber upstream-hub ;; URLString -- upstream hub upstream-topic ;; Topic -- upstream topic (from discovery) ) #:prefab) ;; ASSERTION (actor #:name 'remote-topic-manager (during/actor (remote-topic-demand $requested-topic) #:name (list 'remote-topic requested-topic) (remote-topic-main requested-topic))) (actor #:name 'upstream-link-manager (during/actor (upstream-link $requested-topic $upstream-hub $upstream-topic) #:name (list 'upstream-link requested-topic upstream-hub upstream-topic) (upstream-link-main requested-topic upstream-hub upstream-topic))) (define (remote-topic-main requested-topic) ;; Does discovery and occasionally polls. (on-start (log-info "Remote topic ~s starting." requested-topic)) (on-stop (log-info "Remote topic ~s terminating." requested-topic)) (field [current-content-hash #f] [current-content-type #f]) (field [current-upstream-hub #f] [current-upstream-topic requested-topic]) (field [last-upstream-check 0] [poll-interval-seconds #f]) (define/query-config _ min-poll-interval 60) (define/query-config _ max-upstream-redirects 5) (define/query-set poll-intervals (topic-demand requested-topic $i) i) (begin/dataflow (define candidate (set-minimum (poll-intervals))) (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) (begin/dataflow (log-info "Poll interval for ~s is now ~a" requested-topic (match (poll-interval-seconds) [#f "disabled"] [n (format "~a seconds" n)]))) (field [poll-busy? #f]) ;; Ensure we only run one poll at once (define (poll-upstream!) (when (not (poll-busy?)) (define poll-upstream-topic (current-upstream-topic)) (poll-busy? #t) (log-info "Checking upstream ~a" poll-upstream-topic) (define-values (resp response-body) (web-request! 'get poll-upstream-topic #:redirect-budget (max-upstream-redirects))) (last-upstream-check (current-inexact-milliseconds)) (match (web-response-header-code-type resp) ['successful (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) (current-upstream-hub (link-header-ref parsed-link-headers 'hub #f)) (current-upstream-topic (link-header-ref parsed-link-headers 'self (current-upstream-topic))) (define new-content-hash (sha1 (open-input-bytes response-body))) (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) (when (not (and (equal? (current-content-hash) new-content-hash) (equal? (current-content-type) new-content-type))) (current-content-hash new-content-hash) (current-content-type new-content-type) (send! (notification requested-topic (current-upstream-hub) (current-upstream-topic) response-body new-content-type)))] [other (log-warning "Upstream ~s (original: ~s) yielded ~a code ~a during poll" poll-upstream-topic requested-topic other (web-response-header-code resp))]) (poll-busy? #f))) (on-start (poll-upstream!)) ;; always check at least once, for discovery (on #:when (poll-interval-seconds) (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) (poll-upstream!)) (assert #:when (current-upstream-hub) (upstream-link requested-topic (current-upstream-hub) (current-upstream-topic))) (on (message (notification requested-topic $new-hub $new-topic $new-body $new-type)) (current-upstream-hub new-hub) (current-upstream-topic new-topic) (current-content-hash (sha1 (open-input-bytes new-body))) (current-content-type new-type))) (define (upstream-link-main requested-topic upstream-hub upstream-topic) (define sub-id (random-hex-string 16)) (on-start (log-info "Starting remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s" sub-id requested-topic upstream-hub upstream-topic)) (on-stop (log-info "Stopping remote sub endpoint ~a for requested topic ~s at hub ~s, topic ~s" sub-id requested-topic upstream-hub upstream-topic)) (field [next-subscription-refresh #f]) (begin/dataflow (when (next-subscription-refresh) (log-info "Next subscription refresh for topic ~s will occur in ~a seconds." requested-topic (/ (- (next-subscription-refresh) (current-inexact-milliseconds)) 1000.0)))) (define/query-config _ subscription-retry-delay 600) (during (canonical-baseurl $baseurl) (define callback (canonical-url baseurl `("sub" (,sub-id ())))) (define (refresh-subscription!) ;; TODO: shared secret (log-info "Subscribing endpoint ~a to hub ~s for topic ~s (originally requested as ~s)" sub-id upstream-hub upstream-topic requested-topic) (analyze-response (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback) (hub.mode . "subscribe") (hub.topic . ,upstream-topic)))) void (lambda (resp response-body) (log-warning "Upstream subscription to hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" upstream-hub upstream-topic resp response-body) (next-subscription-refresh (+ (current-inexact-milliseconds) (* 1000.0 (subscription-retry-delay))))))) (define (unsubscribe!) (log-info "Unsubscribing endpoint ~a from hub ~s for topic ~s (originally requested as ~s)" sub-id upstream-hub upstream-topic requested-topic) (analyze-response (lambda () (web-post/form-parameters! upstream-hub `((hub.callback . ,callback) (hub.mode . "unsubscribe") (hub.topic . ,upstream-topic)))) void (lambda (resp response-body) (log-warning "Upstream unsubscription from hub ~s for topic ~s failed:\n - headers: ~v\n - body: ~s" upstream-hub upstream-topic resp response-body)))) (on-start (refresh-subscription!)) (on-stop (unsubscribe!)) (on #:when (next-subscription-refresh) (asserted (later-than (next-subscription-refresh))) (next-subscription-refresh #f) (refresh-subscription!)) (during (http-listener $host-name $port) (on (web-request-get (id req) (vh host-name port) ("sub" (,sub-id ()))) (log-info "~a received verification-of-intent: ~v" sub-id (web-request-header-query req)) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (define lease-seconds-str (dict-ref (web-request-header-query req) 'hub.lease_seconds #f)) (if lease-seconds-str (next-subscription-refresh (+ (current-inexact-milliseconds) (* 1000.0 (shrink-lease (string->number lease-seconds-str))))) (log-warning "Hub ~s, topic ~s did not supply hub.lease_seconds" upstream-hub upstream-topic)) (web-respond/bytes! id (string->bytes/utf-8 challenge))) (on (web-request-incoming (id req) (vh host-name port) 'post ("sub" (,sub-id ())) $body) ;; TODO: verify the use of the shared secret (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) (define new-hub (link-header-ref parsed-link-headers 'hub #f)) (define new-topic (link-header-ref parsed-link-headers 'self #f)) (define content-type (web-request-header-content-type req)) (web-respond/status! id 202 #"Accepted") (log-info "~a topic ~s got ~v message ~v; upstream hub ~s, topic ~s" sub-id requested-topic content-type body new-hub new-topic) (send! (notification requested-topic new-hub new-topic body content-type))))))