diff --git a/rmq/private/util.rkt b/rmq/private/util.rkt index 68d7ed3..aa25fd7 100644 --- a/rmq/private/util.rkt +++ b/rmq/private/util.rkt @@ -5,7 +5,8 @@ extend-url-string-query web-respond/status! web-request! - parse-link-headers) + parse-link-headers + link-header-ref) (require racket/dict) (require racket/match) @@ -91,6 +92,11 @@ (for/hasheq [((rel urls-rev) (in-hash rel-dict-reverse-order))] (values rel (reverse urls-rev)))) +(define (link-header-ref parsed-link-headers rel default) + (match (hash-ref parsed-link-headers rel '()) + [(cons v _) v] + ['() default])) + (define (parse-link-header-value link-header seed) ;; This is not adequate with respect to the RFC 5988 grammar, which ;; includes a wide variety of link-params other than "rel". diff --git a/rmq/server.rkt b/rmq/server.rkt index 609d9f3..22fbe7b 100644 --- a/rmq/server.rkt +++ b/rmq/server.rkt @@ -44,8 +44,21 @@ ;; and numbers indicate a moment in time as a Unix epoch timestamp as ;; might be returned from `current-seconds`. -;; (notification Topic Bytes (Option String)) -(struct notification (topic-name content content-type) #:prefab) +;; A Notification contains both `topic-name`, indicating the *local* +;; name for the topic that is being subscribed to, as well as +;; `canonical-topic`, which is the rel=self URL given by the upstream +;; topic itself. The former is our local key for finding resources; +;; the latter is a property of the topic content, not a property of +;; the subscription. The same is true of `canonical-hub`: it pertains +;; to the topic, not the subscription. The `topic-name` is the only +;; field pertaining to a subscription; all the others pertain to the +;; content. +(struct notification (topic-name ;; Topic + canonical-hub ;; Option URLString + canonical-topic ;; Option Topic + content ;; Bytes + content-type) ;; Option String + #:prefab) ;; (update-subscription Topic URLString (Option SubscriptionSettings)) (struct update-subscription (topic callback settings) #:prefab) ;; message @@ -54,7 +67,6 @@ (struct subscription (topic callback settings-) #:prefab) ;; assertion (struct subscription-settings (expiry-deadline ;; Deadline - canonical-hub ;; URLString secret ;; Option Bytes poll-interval-seconds) ;; Option Number #:prefab) @@ -176,9 +188,6 @@ (define (canonical-url canonical-host-name path) (url->string (resource->url (web-resource (vh canonical-host-name) path)))) -(define (canonical-topic-base-url canonical-hub-str) - (combine-url/relative (string->url canonical-hub-str) "/topic/")) - (define (local-topic-main topic) (field [max-age #f] [max-count #f]) @@ -227,18 +236,20 @@ (on (web-request-incoming (id req) (vh host-name) 'head ("topic" (,topic ()))) (topic-response id #f)) (on (web-request-get (id req) (vh host-name) ("topic" (,topic ()))) - (topic-response id #t))) - - (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) - (define content-type (req-content-type req)) - (log-info "Local topic ~a got ~v message ~v" topic content-type body) - (current-content body) - (current-content-type content-type) - (last-modified-seconds (current-seconds)) - (actor* - (define full-topic (url->string (resource->url (web-request-header-resource req)))) - (send! (notification full-topic body content-type)) - (web-respond/status! id 201 #"Created"))))) + (topic-response id #t)) + (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) + (define content-type (req-content-type req)) + (log-info "Local topic ~a got ~v message ~v" topic content-type body) + (current-content body) + (current-content-type content-type) + (last-modified-seconds (current-seconds)) + (actor* + (send! (notification self-url + hub-url + self-url + body + content-type)) + (web-respond/status! id 201 #"Created")))))) (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) @@ -270,6 +281,8 @@ (define callback (canonical-url canonical-host-name `("sub" (,sub-id ())))) (define (refresh-subscription!) + ;; TODO: shared secret + ;; TODO: listen to lease duration and use it to refresh ourselves more smarterly (when (current-upstream-hub) (web-request! 'post (current-upstream-hub) #:body (string->bytes/utf-8 @@ -303,15 +316,18 @@ ['successful (define new-content-hash (sha1 (open-input-bytes response-body))) (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) + (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) + (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) + (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) (when (not (and (equal? (current-content-hash) new-content-hash) (equal? (current-content-type) new-content-type))) (current-content-hash new-content-hash) (current-content-type new-content-type) - (send! (notification full-topic response-body new-content-type))) - (define upstream-hub - (match (hash-ref (parse-link-headers (web-response-header-headers resp)) 'hub '()) - [(cons hub-url _) hub-url] - ['() #f])) + (send! (notification full-topic + upstream-hub + upstream-topic + response-body + new-content-type))) (if (equal? (current-upstream-hub) upstream-hub) (refresh-subscription!) (current-upstream-hub upstream-hub))] @@ -327,19 +343,34 @@ (web-respond/bytes! id (string->bytes/utf-8 challenge))) (on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body) + ;; TODO: verify the use of the shared secret (actor* + (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) + (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) + (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) (define content-type (req-content-type req)) - (log-info "Remote topic ~a got ~v message ~v" full-topic content-type body) + (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" + full-topic + content-type + body + upstream-hub + upstream-topic) (current-content-hash (sha1 (open-input-bytes body))) (current-content-type content-type) - (send! (notification full-topic body content-type)) + (current-upstream-hub upstream-hub) + (send! (notification full-topic + upstream-hub + upstream-topic + body + content-type)) (web-respond/status! id 201 #"Created"))))) (actor #:name 'hub (during (local-host $host-name) - (on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body) - (asynchronous-verification-of-intent id req body) - (web-respond/status! id 202 #"Accepted"))) + (during (canonical-local-host $canonical-host-name) + (on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body) + (asynchronous-verification-of-intent id req body canonical-host-name) + (web-respond/status! id 202 #"Accepted")))) (on (message (update-subscription $topic $callback $settings)) (retract! (subscription topic callback ?)) @@ -350,13 +381,18 @@ #:on-crash (retract! (subscription topic callback ?)) (subscription-main topic callback))) -(define (asynchronous-verification-of-intent id req body) +(define (asynchronous-verification-of-intent id req body canonical-host-name) (actor* (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) (define callback (hash-ref params 'hub.callback)) (define mode (match (hash-ref params 'hub.mode) ["subscribe" 'subscribe] ["unsubscribe" 'unsubscribe])) - (define topic (hash-ref params 'hub.topic)) + (define requested-topic (hash-ref params 'hub.topic)) + (define topic + (url->string + (combine-url/relative (string->url (canonical-url canonical-host-name + `("topic" ("" ())))) + requested-topic))) (define lease-seconds (match (hash-ref params 'hub.lease_seconds *default-lease*) ["unbounded" #f] [n (string->number n)])) @@ -367,28 +403,26 @@ (define secret-string (hash-ref params 'hub.secret #f)) (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) - (define canonical-hub (url->string (resource->url (web-request-header-resource req)))) (match mode ['subscribe (when (subscription-change-validate "subscribe" (or lease-seconds "unbounded") - topic + requested-topic callback) (send! (update-subscription topic callback (subscription-settings expiry-deadline - canonical-hub secret-bytes poll-interval-seconds))))] ['unsubscribe - (when (subscription-change-validate "unsubscribe" #f topic callback) + (when (subscription-change-validate "unsubscribe" #f requested-topic callback) (send! (update-subscription topic callback #f)))]))) -(define (subscription-change-validate mode lease topic callback) +(define (subscription-change-validate mode lease requested-topic callback) (define challenge (random-hex-string 16)) (define id (gensym 'validation)) (define extra-query (list* (cons 'hub.mode mode) - (cons 'hub.topic topic) + (cons 'hub.topic requested-topic) (cons 'hub.challenge challenge) (if lease (list (cons 'hub.lease_seconds (~a lease))) @@ -401,7 +435,12 @@ (and (eq? (web-response-header-code-type resp) 'successful) (equal? body (string->bytes/utf-8 challenge)))) -(define (subscription-main partial-topic callback) +(define (maybe-link-header urlstr rel) + (if urlstr + (list (cons 'link (format "<~a>; rel=~a" urlstr rel))) + '())) + +(define (subscription-main topic callback) (field [delivery-active? #f] [message-queue (make-queue)] [dead-letters (make-queue)]) @@ -409,15 +448,15 @@ (stop-when (rising-edge (> (queue-length (dead-letters)) *max-dead-letters*)) (log-info "Too many dead letters for ~a" callback)) - (define (deliver-queued-notifications canonical-hub) + (define (deliver-queued-notifications) (delivery-active? #t) (let deliver-rest ((retry-delay *initial-retry-delay*)) (when (not (queue-empty? (message-queue))) (define-values (n newq) (dequeue (message-queue))) (message-queue newq) - (match-define (notification topic body content-type) n) - (define link-headers (list (cons 'link (format "<~a>; rel=hub" canonical-hub)) - (cons 'link (format "<~a>; rel=self" topic)))) + (match-define (notification _ canonical-hub canonical-topic body content-type) n) + (define link-headers (append (maybe-link-header canonical-hub 'hub) + (maybe-link-header canonical-topic 'self))) (let deliver-one ((retries-remaining *max-delivery-retries*) (retry-delay retry-delay)) (define-values (resp _body) @@ -447,20 +486,15 @@ *max-retry-delay*))])))) (delivery-active? #f)) - (during (subscription partial-topic callback (subscription-settings - $expiry-deadline - $canonical-hub - $secret-bytes - $poll-interval-seconds)) - (define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub) - partial-topic))) - + (during (subscription topic callback (subscription-settings + $expiry-deadline + $secret-bytes + $poll-interval-seconds)) (assert (topic-demand topic poll-interval-seconds)) (on-start (log-info "Subscription configured: ~v" `((topic ,topic) (expiry-deadline ,expiry-deadline) - (canonical-hub ,canonical-hub) (callback ,callback) (secret-bytes ,secret-bytes) (poll-interval-seconds ,poll-interval-seconds)))) @@ -468,7 +502,7 @@ (stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0))) (log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback)))) - (on (message ($ n (notification topic _ _))) + (on (message ($ n (notification topic _ _ _ _))) (message-queue (enqueue (message-queue) n)) (when (not (delivery-active?)) - (deliver-queued-notifications canonical-hub))))) + (deliver-queued-notifications)))))