diff --git a/rmq/poke.rkt b/rmq/poke.rkt index 2a93041..4a4760b 100644 --- a/rmq/poke.rkt +++ b/rmq/poke.rkt @@ -91,6 +91,7 @@ #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . "http://localhost:7000/sink") + (hub.poll_interval_seconds . "10") (hub.mode . "subscribe") (hub.topic . ,topic)))))) diff --git a/rmq/server.rkt b/rmq/server.rkt index 4e4932d..975759a 100644 --- a/rmq/server.rkt +++ b/rmq/server.rkt @@ -8,6 +8,7 @@ (require racket/set) (require net/url) (require net/uri-codec) +(require file/sha1) (require/activate syndicate/drivers/timer) (require/activate syndicate/drivers/timestate) @@ -27,17 +28,22 @@ ;; and numbers indicate a moment in time as a Unix epoch timestamp as ;; might be returned from `current-seconds`. -;; (notification Topic Bytes String (Option URIString)) +;; (notification Topic Bytes (Option String)) (struct notification (topic-name content content-type) #:prefab) -;; (subscription Topic Deadline URLString URLString (Option Bytes)) -(struct subscription (topic-name expiry-deadline canonical-hub callback secret) #:prefab) +(struct subscription (topic-name ;; Topic + expiry-deadline ;; Deadline + canonical-hub ;; URLString + callback ;; URLString + secret ;; Option Bytes + poll-interval-seconds) ;; Option Number + #:prefab) ;; (local-topic-config Topic (Option Number) (Option Number)) (struct local-topic-config (name max-age max-count) #:prefab) -;; (topic-demand Topic) -(struct topic-demand (topic-name) #:prefab) +;; (topic-demand Topic (Option Number)) +(struct topic-demand (topic-name poll-interval-seconds) #:prefab) ;; (local-topic-demand String) (struct local-topic-demand (name) #:prefab) @@ -115,7 +121,7 @@ (actor #:name 'topic-demand-analyzer (define/query-set local-hosts (local-host $host-name) host-name) - (during/actor (topic-demand $full-topic) + (during/actor (topic-demand $full-topic _) #:name (list 'general-topic full-topic) #:let [(local-hosts-snapshot (local-hosts))] (with-handlers [(exn? (lambda (e) @@ -124,11 +130,10 @@ (url->resource (string->url full-topic))) (define maybe-local-topic (match topic-path [`("topic" (,topic ())) topic] [_ #f])) - (on-start - (general-topic-main full-topic - topic-host - maybe-local-topic - (set-member? local-hosts-snapshot topic-host)))))) + (general-topic-main full-topic + topic-host + maybe-local-topic + (set-member? local-hosts-snapshot topic-host))))) (define (general-topic-main full-topic topic-host maybe-local-topic start-as-local?) (define (local-state) @@ -139,17 +144,25 @@ (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state)) (remote-topic-main full-topic))) - (if (and maybe-local-topic start-as-local?) - (local-state) - (remote-state))) + (on-start + (if (and maybe-local-topic start-as-local?) + (local-state) + (remote-state)))) (define (req-content-type req) - (dict-ref (web-request-header-headers req) 'content-type "application/octet-stream")) + (dict-ref (web-request-header-headers req) 'content-type #f)) + +(define (canonical-url canonical-host-name path) + (url->string (resource->url (web-resource (vh canonical-host-name) path)))) (define (local-topic-main topic) (field [max-age #f] [max-count #f]) + (field [current-content #f] + [current-content-type #f] + [last-modified-seconds #f]) + (on (asserted (local-topic-config topic $age $count)) (max-age age) (max-count count)) @@ -163,25 +176,80 @@ (on-stop (log-info "Terminating local topic ~v" topic)) (during (local-host $host-name) + (during (canonical-local-host $canonical-host-name) + (define hub-url (canonical-url canonical-host-name `("hub" ()))) + (define self-url (canonical-url canonical-host-name `("topic" (,topic ())))) + (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) + (cons 'link (format "<~a>; rel=self" self-url)))) + (on (web-request-get (id req) (vh host-name) ("topic" (,topic ()))) + (if (current-content) + (web-respond/bytes! id + #:header (web-response-header + #:last-modified-seconds (last-modified-seconds) + #:mime-type (and (current-content-type) + (string->bytes/utf-8 + (current-content-type))) + #:headers discovery-headers) + (current-content)) + (web-respond/bytes! id + #:header (web-response-header + #:code 204 + #:message #"No Content" + #:mime-type #f + #:headers discovery-headers) + #"")))) ;; MUST NOT include a response body for 204 (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) + (define content-type (req-content-type req)) + (log-info "Local topic ~a got ~v message ~v" topic content-type body) + (current-content body) + (current-content-type content-type) + (last-modified-seconds (current-seconds)) (actor* - (define content-type (req-content-type req)) - (log-info "Local topic ~a got ~v message ~v" topic content-type body) (define full-topic (url->string (resource->url (web-request-header-resource req)))) (send! (notification full-topic body content-type)) (web-respond/status! id 201 #"Created"))))) (define (maintain-remote-subscription full-topic canonical-host-name sub-id) - (define callback (url->string - (resource->url - (web-resource (vh canonical-host-name) `("sub" (,sub-id ())))))) + (define callback (canonical-url canonical-host-name `("sub" (,sub-id ())))) (react - (field [next-upstream-check (current-inexact-milliseconds)]) - (on (asserted (later-than (next-upstream-check))) - (log-info "Rechecking upstream for ~a" full-topic) + (field [last-upstream-check 0] + [poll-interval-seconds #f]) + + (field [current-content-hash #f] + [current-content-type #f]) + + (define/query-set poll-intervals (topic-demand full-topic $i) i) + (begin/dataflow + (poll-interval-seconds (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] + (cond [(not i) candidate] + [(< candidate i) candidate] + [else i])))) + (begin/dataflow + (log-info "Poll interval for ~a is now ~a" + full-topic + (match (poll-interval-seconds) + [#f "disabled"] + [n (format "~a seconds" n)]))) + + (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) + (log-info "Checking upstream ~a" full-topic) (define-values (resp response-body) (web-request! 'get full-topic #:redirect-budget 5)) - (log-info "Resource: ~v" resp)))) + (last-upstream-check (current-inexact-milliseconds)) + (match (web-response-header-code-type resp) + ['successful + (define new-content-hash (sha1 (open-input-bytes response-body))) + (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) + (when (not (and (equal? (current-content-hash) new-content-hash) + (equal? (current-content-type) new-content-type))) + (current-content-hash new-content-hash) + (current-content-type new-content-type) + (send! (notification full-topic response-body new-content-type)))] + [other + (log-warning "Upstream ~a yielded ~a code ~a" + full-topic + other + (web-response-header-code resp))])))) (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) @@ -218,6 +286,10 @@ (match (hash-ref params 'hub.lease_seconds "unbounded") ["unbounded" #f] [n (string->number n)])) + (define poll-interval-seconds + (match (hash-ref params 'hub.poll_interval_seconds "none") + ["none" #f] + [n (string->number n)])) (define secret-string (hash-ref params 'hub.secret #f)) (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) @@ -230,9 +302,13 @@ topic callback) (begin - (retract! (subscription topic ? ? callback ?)) - (assert! - (subscription topic expiry-deadline canonical-hub callback secret-bytes)) + (retract! (subscription topic ? ? callback ? ?)) + (assert! (subscription topic + expiry-deadline + canonical-hub + callback + secret-bytes + poll-interval-seconds)) #t) #f)] ['unsubscribe @@ -241,16 +317,16 @@ topic callback) (begin - (retract! (subscription topic ? ? callback ?)) + (retract! (subscription topic ? ? callback ? ?)) #t) #f)]))) (if ok? (web-respond/status! id 202 #"Accepted") (web-respond/status! id 403 #"Forbidden" #"Validation failed")))) - (during/actor (subscription $partial-topic _ _ $callback _) + (during/actor (subscription $partial-topic _ _ $callback _ _) #:name (list 'subscription partial-topic callback) - #:on-crash (retract! (subscription partial-topic ? ? callback ?)) + #:on-crash (retract! (subscription partial-topic ? ? callback ? ?)) (subscription-main partial-topic callback))) (define (subscription-change-validate mode lease topic callback) @@ -289,14 +365,16 @@ (define-values (n newq) (dequeue (message-queue))) (message-queue newq) (match-define (notification topic body content-type) n) + (define link-headers (list (cons 'link (format "<~a>; rel=hub" canonical-hub)) + (cons 'link (format "<~a>; rel=self" topic)))) (let deliver-one ((retries-remaining 1) (retry-delay 5.0)) (define-values (resp _body) (web-request! 'post callback - #:headers (list (cons 'content-type content-type) - (cons 'link (format "<~a>; rel=hub" canonical-hub)) - (cons 'link (format "<~a>; rel=self" topic))) + #:headers (if content-type + (cons (cons 'content-type content-type) link-headers) + link-headers) #:body body)) (cond [(eq? (web-response-header-code-type resp) 'successful) @@ -317,17 +395,23 @@ (min (* retry-delay 1.618) 30))])))) (delivery-active? #f)) - (during (subscription partial-topic $expiry-deadline $canonical-hub callback $secret-bytes) + (during (subscription partial-topic + $expiry-deadline + $canonical-hub + callback + $secret-bytes + $poll-interval-seconds) (define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub) partial-topic))) - (assert (topic-demand topic)) + (assert (topic-demand topic poll-interval-seconds)) - (on-start (log-info "Subscription configured: ~v ~v ~v ~v" + (on-start (log-info "Subscription configured: ~v ~v ~v ~v ~v" topic expiry-deadline canonical-hub - secret-bytes) + secret-bytes + poll-interval-seconds) (cond [expiry-deadline (expiry-timer-id (gensym 'subscription-expiry))