From 9f6a73a8b02a633720cbc1729947682fe5154882 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 21 Nov 2016 10:13:40 +1300 Subject: [PATCH] Split server.rkt into hub.rkt and hub/*.rkt --- racketmq/client.rkt | 16 -- racketmq/hub.rkt | 25 ++ racketmq/hub/local-topic.rkt | 95 +++++++ racketmq/hub/remote-topic.rkt | 132 +++++++++ racketmq/hub/static-content.rkt | 26 ++ racketmq/hub/subscription.rkt | 161 +++++++++++ racketmq/hub/topic-demand.rkt | 45 ++++ racketmq/poke.rkt | 2 +- racketmq/private/util.rkt | 15 +- racketmq/run | 2 +- racketmq/server.rkt | 464 -------------------------------- 11 files changed, 500 insertions(+), 483 deletions(-) delete mode 100644 racketmq/client.rkt create mode 100644 racketmq/hub.rkt create mode 100644 racketmq/hub/local-topic.rkt create mode 100644 racketmq/hub/remote-topic.rkt create mode 100644 racketmq/hub/static-content.rkt create mode 100644 racketmq/hub/subscription.rkt create mode 100644 racketmq/hub/topic-demand.rkt delete mode 100644 racketmq/server.rkt diff --git a/racketmq/client.rkt b/racketmq/client.rkt deleted file mode 100644 index c09b8ee..0000000 --- a/racketmq/client.rkt +++ /dev/null @@ -1,16 +0,0 @@ -#lang racket/base - -(provide ) - -(require "private/util.rkt") - -(module+ test (require rackunit)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(struct client (callback-url-base - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(define (subscribe! client topic [callback #f] - diff --git a/racketmq/hub.rkt b/racketmq/hub.rkt new file mode 100644 index 0000000..6a52d70 --- /dev/null +++ b/racketmq/hub.rkt @@ -0,0 +1,25 @@ +#lang syndicate/actor + +(require "private/util.rkt") +(require "protocol.rkt") + +(require/activate "config.rkt") +(require/activate "hub/static-content.rkt") +(require/activate "hub/topic-demand.rkt") +(require/activate "hub/local-topic.rkt") +(require/activate "hub/subscription.rkt") + +;; Spawn configuration in parallel: +(spawn-configuration "defaults.rktd") +;; +;; ... or assert the required configuration directly: +;; (actor (assert (config (list 'canonical-host "localhost" 7827)))) + +(actor #:name 'main + (during (config (list 'canonical-host $h $p)) + (assert (canonical-local-host h p)) + (assert (local-host h p))) + (during (config (list 'accepted-host $h $p)) + (assert (local-host h p))) + (during (local-host $host-name $port) + (assert (vh host-name port)))) diff --git a/racketmq/hub/local-topic.rkt b/racketmq/hub/local-topic.rkt new file mode 100644 index 0000000..37d534b --- /dev/null +++ b/racketmq/hub/local-topic.rkt @@ -0,0 +1,95 @@ +#lang syndicate/actor + +(require racket/set) + +(require/activate syndicate/drivers/web) + +(require "../private/util.rkt") +(require "../protocol.rkt") + +(actor #:name 'local-topic-manager + + (field [topics (set)]) + + (during (local-host $host-name $port) + (on (web-request-incoming (id req) (vh host-name port) 'put ("topic" (,$topic ()))) + (when (not (set-member? (topics) topic)) + (topics (set-add (topics) topic)) + (assert! (local-topic-demand topic)) + (retract! (local-topic-config topic ? ?)) + (assert! (local-topic-config topic #f #f))) ;; TODO: maximums + (web-respond/bytes! id #"")) + + (on (web-request-incoming (id req) (vh host-name port) 'delete ("topic" (,$topic ()))) + (when (set-member? (topics) topic) + (topics (set-remove (topics) topic)) + (retract! (local-topic-demand topic)) + (retract! (local-topic-config topic ? ?))) + (web-respond/bytes! id #""))) + + (during/actor (local-topic-demand $topic) + #:name (list 'local-topic topic) + (local-topic-main topic))) + +(define (local-topic-main topic) + (field [max-age #f] + [max-count #f]) + + (field [current-content #f] + [current-content-type #f] + [last-modified-seconds #f]) + + (on (asserted (local-topic-config topic $age $count)) + (max-age age) + (max-count count)) + + (on-start (log-info "Creating local topic ~v" topic)) + (begin/dataflow + (log-info "Configured local topic ~v, max-age ~v, max-count ~v" + topic + (max-age) + (max-count))) + (on-stop (log-info "Terminating local topic ~v" topic)) + + (during (local-host $host-name $port) + (during (canonical-local-host $canonical-host-name $cport) + (define hub-url (canonical-url canonical-host-name cport `("hub" ()))) + (define self-url (canonical-url canonical-host-name cport `("topic" (,topic ())))) + (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) + (cons 'link (format "<~a>; rel=self" self-url)))) + + (define (topic-response id include-content?) ;; Used in both GET and HEAD requests + (if (current-content) + (web-respond/bytes! id + #:header (web-response-header + #:last-modified-seconds (last-modified-seconds) + #:mime-type (and (current-content-type) + (string->bytes/utf-8 + (current-content-type))) + #:headers discovery-headers) + (if include-content? (current-content) #"")) + (web-respond/bytes! id + #:header (web-response-header + #:code 204 + #:message #"No Content" + #:mime-type #f + #:headers discovery-headers) + #""))) ;; MUST NOT include a response body for 204 + + (on (web-request-incoming (id req) (vh host-name port) 'head ("topic" (,topic ()))) + (topic-response id #f)) + (on (web-request-get (id req) (vh host-name port) ("topic" (,topic ()))) + (topic-response id #t)) + (on (web-request-incoming (id req) (vh host-name port) 'post ("topic" (,topic ())) $body) + (define content-type (web-request-header-content-type req)) + (log-info "Local topic ~a got ~v message ~v" topic content-type body) + (current-content body) + (current-content-type content-type) + (last-modified-seconds (current-seconds)) + (actor* + (send! (notification self-url + hub-url + self-url + body + content-type)) + (web-respond/status! id 201 #"Created")))))) diff --git a/racketmq/hub/remote-topic.rkt b/racketmq/hub/remote-topic.rkt new file mode 100644 index 0000000..31e1c4a --- /dev/null +++ b/racketmq/hub/remote-topic.rkt @@ -0,0 +1,132 @@ +#lang syndicate/actor + +(provide remote-topic-main) + +(require racket/dict) +(require racket/set) +(require net/uri-codec) +(require file/sha1) + +(require/activate syndicate/drivers/timestate) +(require/activate syndicate/drivers/web) +(require/activate "../config.rkt") + +(require "../private/util.rkt") +(require "../protocol.rkt") + +(define (remote-topic-main full-topic) + (define sub-id (random-hex-string 16)) + (log-info "Remote sub endpoint ~a" sub-id) + + (field [current-content-hash #f] + [current-content-type #f] + [current-upstream-hub #f] + [established-upstream-hub #f]) + + (field [last-upstream-check 0] + [poll-interval-seconds #f]) + + (define/query-set poll-intervals (topic-demand full-topic $i) i) + (define/query-config min-poll-interval 60) + (begin/dataflow + (define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] + (cond [(not i) candidate] + [(< candidate i) candidate] + [else i]))) + (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) + (begin/dataflow + (log-info "Poll interval for ~a is now ~a" + full-topic + (match (poll-interval-seconds) + [#f "disabled"] + [n (format "~a seconds" n)]))) + + (during (canonical-local-host $canonical-host-name $cport) + (define callback (canonical-url canonical-host-name cport `("sub" (,sub-id ())))) + + (define (refresh-subscription!) + ;; TODO: shared secret + ;; TODO: listen to lease duration and use it to refresh ourselves more smarterly + (when (current-upstream-hub) + (web-request! 'post (current-upstream-hub) + #:body (string->bytes/utf-8 + (alist->form-urlencoded + `((hub.callback . ,callback) + (hub.mode . "subscribe") + (hub.topic . ,full-topic))))) + (established-upstream-hub (current-upstream-hub)))) + + (define (unsubscribe!) + (when (established-upstream-hub) + (web-request! 'post (established-upstream-hub) + #:body (string->bytes/utf-8 + (alist->form-urlencoded + `((hub.callback . ,callback) + (hub.mode . "unsubscribe") + (hub.topic . ,full-topic))))) + (established-upstream-hub #f))) + + (begin/dataflow + (when (not (equal? (current-upstream-hub) (established-upstream-hub))) + (unsubscribe!) + (refresh-subscription!))) + + (define/query-config max-upstream-redirects 5) + + (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) + (log-info "Checking upstream ~a" full-topic) + (define-values (resp response-body) + (web-request! 'get full-topic #:redirect-budget (max-upstream-redirects))) + (last-upstream-check (current-inexact-milliseconds)) + (match (web-response-header-code-type resp) + ['successful + (define new-content-hash (sha1 (open-input-bytes response-body))) + (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) + (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) + (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) + (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) + (when (not (and (equal? (current-content-hash) new-content-hash) + (equal? (current-content-type) new-content-type))) + (current-content-hash new-content-hash) + (current-content-type new-content-type) + (send! (notification full-topic + upstream-hub + upstream-topic + response-body + new-content-type))) + (if (equal? (current-upstream-hub) upstream-hub) + (refresh-subscription!) + (current-upstream-hub upstream-hub))] + [other + (log-warning "Upstream ~a yielded ~a code ~a" + full-topic + other + (web-response-header-code resp))])) + + (on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ()))) + (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) + (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) + (web-respond/bytes! id (string->bytes/utf-8 challenge))) + + (on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body) + ;; TODO: verify the use of the shared secret + (actor* + (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) + (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) + (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) + (define content-type (web-request-header-content-type req)) + (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" + full-topic + content-type + body + upstream-hub + upstream-topic) + (current-content-hash (sha1 (open-input-bytes body))) + (current-content-type content-type) + (current-upstream-hub upstream-hub) + (send! (notification full-topic + upstream-hub + upstream-topic + body + content-type)) + (web-respond/status! id 201 #"Created"))))) diff --git a/racketmq/hub/static-content.rkt b/racketmq/hub/static-content.rkt new file mode 100644 index 0000000..8e63d4b --- /dev/null +++ b/racketmq/hub/static-content.rkt @@ -0,0 +1,26 @@ +#lang syndicate/actor + +(require racket/file) +(require racket/runtime-path) +(require web-server/dispatchers/filesystem-map) +(require web-server/private/mime-types) + +(require/activate syndicate/drivers/web) + +(require "../private/util.rkt") +(require "../protocol.rkt") + +(begin-for-declarations + (define-runtime-path htdocs-path "../htdocs") + (define path->mime-type (make-path->mime-type "/etc/mime.types"))) + +(actor #:name 'static-content-server + (define url->path (make-url->path htdocs-path)) + (during (local-host $host-name $port) + (on (web-request-get (id req) (vh host-name port) ,_) + (define-values (path path-pieces) + (url->path (resource->url (web-request-header-resource req)))) + (when (file-exists? path) + (web-respond/bytes! id + #:header (web-response-header #:mime-type (path->mime-type path)) + (file->bytes path)))))) diff --git a/racketmq/hub/subscription.rkt b/racketmq/hub/subscription.rkt new file mode 100644 index 0000000..88b1593 --- /dev/null +++ b/racketmq/hub/subscription.rkt @@ -0,0 +1,161 @@ +#lang syndicate/actor + +(require racket/format) +(require net/url) +(require net/uri-codec) +(require syndicate/functional-queue) + +(require/activate syndicate/drivers/timestate) +(require/activate syndicate/drivers/web) +(require/activate "../config.rkt") + +(require "../private/util.rkt") +(require "../protocol.rkt") + +(actor #:name 'hub + (during (local-host $host-name $port) + (during (canonical-local-host $canonical-host-name $cport) + (on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body) + (asynchronous-verification-of-intent id req body canonical-host-name cport) + (web-respond/status! id 202 #"Accepted")))) + + (on (message (update-subscription $topic $callback $settings)) + (retract! (subscription topic callback ?)) + (when settings (assert! (subscription topic callback settings)))) + + (during/actor (subscription $topic $callback _) + #:name (list 'subscription topic callback) + #:on-crash (retract! (subscription topic callback ?)) + (subscription-main topic callback))) + +(define (asynchronous-verification-of-intent id req body canonical-host-name cport) + (actor* #:name 'verification-of-intent + (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) + (define callback (hash-ref params 'hub.callback)) + (define mode (match (hash-ref params 'hub.mode) + ["subscribe" 'subscribe] + ["unsubscribe" 'unsubscribe])) + (define requested-topic (hash-ref params 'hub.topic)) + (define topic + (url->string + (combine-url/relative (string->url (canonical-url canonical-host-name + cport + `("topic" ("" ())))) + requested-topic))) + (define requested-lease-seconds + (string->number + (hash-ref params 'hub.lease_seconds (~a (config-ref 'default-lease 86400))))) + (define lease-seconds (min requested-lease-seconds (config-ref 'max-lease 604800))) + (define poll-interval-seconds + (match (hash-ref params + 'hub.poll_interval_seconds + (~a (config-ref 'default-poll-interval "none"))) + ["none" #f] + [n (string->number n)])) + (define secret-string (hash-ref params 'hub.secret #f)) + (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) + (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) + (match mode + ['subscribe + (when (subscription-change-validate "subscribe" + lease-seconds + requested-topic + callback) + (send! (update-subscription topic + callback + (subscription-settings expiry-deadline + secret-bytes + poll-interval-seconds))))] + ['unsubscribe + (when (subscription-change-validate "unsubscribe" #f requested-topic callback) + (send! (update-subscription topic callback #f)))]))) + +(define (subscription-change-validate mode lease requested-topic callback) + (define challenge (random-hex-string 16)) + (define id (gensym 'validation)) + (define extra-query (list* (cons 'hub.mode mode) + (cons 'hub.topic requested-topic) + (cons 'hub.challenge challenge) + (if lease + (list (cons 'hub.lease_seconds (~a lease))) + (list)))) + (define-values (resp body) + (web-request! 'get + (let* ((u (string->url callback))) + (url->string + (struct-copy url u [query (append (url-query u) extra-query)]))))) + (and (eq? (web-response-header-code-type resp) 'successful) + (equal? body (string->bytes/utf-8 challenge)))) + +(define (subscription-main topic callback) + (field [delivery-active? #f] + [message-queue (make-queue)] + [dead-letters (make-queue)]) + + (define/query-config max-dead-letters 10) + (define/query-config initial-retry-delay 5.0) + (define/query-config max-delivery-retries 10) + (define/query-config retry-delay-multiplier 1.618) + (define/query-config max-retry-delay 30) + + (stop-when (rising-edge (> (queue-length (dead-letters)) (max-dead-letters))) + (log-info "Too many dead letters for ~a" callback)) + + (define (deliver-queued-notifications) + (delivery-active? #t) + (let deliver-rest ((retry-delay (initial-retry-delay))) + (when (not (queue-empty? (message-queue))) + (define-values (n newq) (dequeue (message-queue))) + (message-queue newq) + (match-define (notification _ canonical-hub canonical-topic body content-type) n) + (define link-headers (append (maybe-link-header canonical-hub 'hub) + (maybe-link-header canonical-topic 'self))) + (let deliver-one ((retries-remaining (max-delivery-retries)) + (retry-delay retry-delay)) + (define-values (resp _body) + (web-request! 'post + callback + #:headers (if content-type + (cons (cons 'content-type content-type) link-headers) + link-headers) + #:body body)) + (cond + [(eq? (web-response-header-code-type resp) 'successful) + (deliver-rest (initial-retry-delay))] + [(zero? retries-remaining) + (log-info "Dead letter for ~v" callback) + (dead-letters (enqueue (dead-letters) n)) + (deliver-rest retry-delay)] + [else + (log-info + "Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v" + callback + retry-delay + retries-remaining + resp) + (sleep retry-delay) + (deliver-one (- retries-remaining 1) + (min (* retry-delay (retry-delay-multiplier)) + (max-retry-delay)))])))) + (delivery-active? #f)) + + (during (subscription topic callback (subscription-settings + $expiry-deadline + $secret-bytes + $poll-interval-seconds)) + (assert (topic-demand topic poll-interval-seconds)) + + (on-start (log-info "Subscription configured: ~v" + `((topic ,topic) + (expiry-deadline ,expiry-deadline) + (callback ,callback) + (secret-bytes ,secret-bytes) + (poll-interval-seconds ,poll-interval-seconds)))) + + (stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0))) + (log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback)))) + + (on (message ($ n (notification topic _ _ _ _))) + (message-queue (enqueue (message-queue) n)) + (when (not (delivery-active?)) + (deliver-queued-notifications))))) diff --git a/racketmq/hub/topic-demand.rkt b/racketmq/hub/topic-demand.rkt new file mode 100644 index 0000000..9731437 --- /dev/null +++ b/racketmq/hub/topic-demand.rkt @@ -0,0 +1,45 @@ +#lang syndicate/actor + +(require racket/exn) +(require racket/set) +(require net/url) + +(require/activate syndicate/drivers/web) +(require/activate "remote-topic.rkt") + +(require "../private/util.rkt") +(require "../protocol.rkt") + +(actor #:name 'topic-demand-analyzer + (define/query-set local-hosts ($ h (local-host _ _)) h) + (during/actor (topic-demand $full-topic _) + #:name (list 'general-topic full-topic) + #:let [(local-hosts-snapshot (local-hosts))] + (with-handlers [(exn? (lambda (e) + (log-error "Topic demand error: ~a" (exn->string e))))] + (match-define (web-resource (web-virtual-host _ topic-host topic-port) topic-path) + (url->resource (string->url full-topic))) + (define maybe-local-topic + (match topic-path [`("topic" (,topic ())) topic] [_ #f])) + (general-topic-main full-topic + topic-host + topic-port + maybe-local-topic + (set-member? local-hosts-snapshot + (local-host topic-host topic-port)))))) + +(define (general-topic-main full-topic topic-host topic-port maybe-local-topic start-as-local?) + (define (local-state) + (react (stop-when (retracted (local-host topic-host topic-port)) + (remote-state)) + (assert (local-topic-demand maybe-local-topic)))) + + (define (remote-state) + (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host topic-port)) + (local-state)) + (remote-topic-main full-topic))) + + (on-start + (if (and maybe-local-topic start-as-local?) + (local-state) + (remote-state)))) diff --git a/racketmq/poke.rkt b/racketmq/poke.rkt index a4b50ee..8a09571 100644 --- a/racketmq/poke.rkt +++ b/racketmq/poke.rkt @@ -11,7 +11,7 @@ (require/activate syndicate/drivers/timer) (require/activate syndicate/drivers/web) -(require "private/util.rkt") +(require (except-in "private/util.rkt" vh)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/racketmq/private/util.rkt b/racketmq/private/util.rkt index aa25fd7..661b244 100644 --- a/racketmq/private/util.rkt +++ b/racketmq/private/util.rkt @@ -6,7 +6,10 @@ web-respond/status! web-request! parse-link-headers - link-header-ref) + link-header-ref + maybe-link-header + vh + canonical-url) (require racket/dict) (require racket/match) @@ -123,3 +126,13 @@ (hasheq 'hub (list "http://phubb.cweiske.de/hub.php" "http://localhost:7827/hub") 'self (list "http://push-tester.cweiske.de/feed.php")))) + +(define (maybe-link-header urlstr rel) + (if urlstr + (list (cons 'link (format "<~a>; rel=~a" urlstr rel))) + '())) + +(define (vh host-name port) (web-virtual-host "http" host-name port)) + +(define (canonical-url canonical-host-name cport path) + (url->string (resource->url (web-resource (vh canonical-host-name cport) path)))) diff --git a/racketmq/run b/racketmq/run index c8239af..e4a5801 100755 --- a/racketmq/run +++ b/racketmq/run @@ -6,4 +6,4 @@ set -e echo "Starting RacketMQ" exec 2>&1 export PLTSTDERR="$PLTSTDERR debug@racketmq info@syndicate/drivers/web error" -exec racket server.rkt +exec racket hub.rkt diff --git a/racketmq/server.rkt b/racketmq/server.rkt deleted file mode 100644 index e3a5d4b..0000000 --- a/racketmq/server.rkt +++ /dev/null @@ -1,464 +0,0 @@ -#lang syndicate/actor - -(provide ) - -(require racket/dict) -(require racket/exn) -(require racket/file) -(require racket/format) -(require racket/runtime-path) -(require racket/set) -(require net/url) -(require net/uri-codec) -(require file/sha1) -(require web-server/dispatchers/filesystem-map) -(require web-server/private/mime-types) - -(require/activate syndicate/drivers/timer) -(require/activate syndicate/drivers/timestate) -(require/activate syndicate/drivers/web) -(require syndicate/protocol/advertise) -(require syndicate/functional-queue) - -(require "private/util.rkt") -(require "protocol.rkt") -(require "config.rkt") - -(define (vh host-name port) (web-virtual-host "http" host-name port)) - -(spawn-configuration "defaults.rktd") - -(actor #:name 'main - - (during (config (list 'canonical-host $h $p)) - (assert (canonical-local-host h p)) - (assert (local-host h p))) - - (during (config (list 'accepted-host $h $p)) - (assert (local-host h p))) - - (during (local-host $host-name $port) - (assert (vh host-name port)) - - (on (web-request-get (id req) (vh host-name port) ("" ())) - (actor* - (web-respond/xexpr! id - `(html - (body - (h1 "Status" - " " - ,(url->string - (resource->url - (web-request-header-resource req))))))))))) - -(begin-for-declarations - (define-runtime-path htdocs-path "htdocs") - (define path->mime-type (make-path->mime-type "/etc/mime.types"))) - -(actor #:name 'static-content-server - (define url->path (make-url->path htdocs-path)) - (during (local-host $host-name $port) - (on (web-request-get (id req) (vh host-name port) ,_) - (define-values (path path-pieces) - (url->path (resource->url (web-request-header-resource req)))) - (when (file-exists? path) - (web-respond/bytes! id - #:header (web-response-header #:mime-type (path->mime-type path)) - (file->bytes path)))))) - -(actor #:name 'local-topic-manager - - (field [topics (set)]) - - (during (local-host $host-name $port) - (on (web-request-incoming (id req) (vh host-name port) 'put ("topic" (,$topic ()))) - (when (not (set-member? (topics) topic)) - (topics (set-add (topics) topic)) - (assert! (local-topic-demand topic)) - (retract! (local-topic-config topic ? ?)) - (assert! (local-topic-config topic #f #f))) ;; TODO: maximums - (web-respond/bytes! id #"")) - - (on (web-request-incoming (id req) (vh host-name port) 'delete ("topic" (,$topic ()))) - (when (set-member? (topics) topic) - (topics (set-remove (topics) topic)) - (retract! (local-topic-demand topic)) - (retract! (local-topic-config topic ? ?))) - (web-respond/bytes! id #""))) - - (during/actor (local-topic-demand $topic) - #:name (list 'local-topic topic) - (local-topic-main topic))) - -(actor #:name 'topic-demand-analyzer - (define/query-set local-hosts ($ h (local-host _ _)) h) - (during/actor (topic-demand $full-topic _) - #:name (list 'general-topic full-topic) - #:let [(local-hosts-snapshot (local-hosts))] - (with-handlers [(exn? (lambda (e) - (log-error "Topic demand error: ~a" (exn->string e))))] - (match-define (web-resource (web-virtual-host _ topic-host topic-port) topic-path) - (url->resource (string->url full-topic))) - (define maybe-local-topic - (match topic-path [`("topic" (,topic ())) topic] [_ #f])) - (general-topic-main full-topic - topic-host - topic-port - maybe-local-topic - (set-member? local-hosts-snapshot - (local-host topic-host topic-port)))))) - -(define (general-topic-main full-topic topic-host topic-port maybe-local-topic start-as-local?) - (define (local-state) - (react (stop-when (retracted (local-host topic-host topic-port)) - (remote-state)) - (assert (local-topic-demand maybe-local-topic)))) - - (define (remote-state) - (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host topic-port)) - (local-state)) - (remote-topic-main full-topic))) - - (on-start - (if (and maybe-local-topic start-as-local?) - (local-state) - (remote-state)))) - -(define (req-content-type req) - (dict-ref (web-request-header-headers req) 'content-type #f)) - -(define (canonical-url canonical-host-name cport path) - (url->string (resource->url (web-resource (vh canonical-host-name cport) path)))) - -(define (local-topic-main topic) - (field [max-age #f] - [max-count #f]) - - (field [current-content #f] - [current-content-type #f] - [last-modified-seconds #f]) - - (on (asserted (local-topic-config topic $age $count)) - (max-age age) - (max-count count)) - - (on-start (log-info "Creating local topic ~v" topic)) - (begin/dataflow - (log-info "Configured local topic ~v, max-age ~v, max-count ~v" - topic - (max-age) - (max-count))) - (on-stop (log-info "Terminating local topic ~v" topic)) - - (during (local-host $host-name $port) - (during (canonical-local-host $canonical-host-name $cport) - (define hub-url (canonical-url canonical-host-name cport `("hub" ()))) - (define self-url (canonical-url canonical-host-name cport `("topic" (,topic ())))) - (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) - (cons 'link (format "<~a>; rel=self" self-url)))) - - (define (topic-response id include-content?) ;; Used in both GET and HEAD requests - (if (current-content) - (web-respond/bytes! id - #:header (web-response-header - #:last-modified-seconds (last-modified-seconds) - #:mime-type (and (current-content-type) - (string->bytes/utf-8 - (current-content-type))) - #:headers discovery-headers) - (if include-content? (current-content) #"")) - (web-respond/bytes! id - #:header (web-response-header - #:code 204 - #:message #"No Content" - #:mime-type #f - #:headers discovery-headers) - #""))) ;; MUST NOT include a response body for 204 - - (on (web-request-incoming (id req) (vh host-name port) 'head ("topic" (,topic ()))) - (topic-response id #f)) - (on (web-request-get (id req) (vh host-name port) ("topic" (,topic ()))) - (topic-response id #t)) - (on (web-request-incoming (id req) (vh host-name port) 'post ("topic" (,topic ())) $body) - (define content-type (req-content-type req)) - (log-info "Local topic ~a got ~v message ~v" topic content-type body) - (current-content body) - (current-content-type content-type) - (last-modified-seconds (current-seconds)) - (actor* - (send! (notification self-url - hub-url - self-url - body - content-type)) - (web-respond/status! id 201 #"Created")))))) - -(define (remote-topic-main full-topic) - (define sub-id (random-hex-string 16)) - (log-info "Remote sub endpoint ~a" sub-id) - - (field [current-content-hash #f] - [current-content-type #f] - [current-upstream-hub #f] - [established-upstream-hub #f]) - - (field [last-upstream-check 0] - [poll-interval-seconds #f]) - - (define/query-set poll-intervals (topic-demand full-topic $i) i) - (define/query-config min-poll-interval 60) - (begin/dataflow - (define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] - (cond [(not i) candidate] - [(< candidate i) candidate] - [else i]))) - (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) - (begin/dataflow - (log-info "Poll interval for ~a is now ~a" - full-topic - (match (poll-interval-seconds) - [#f "disabled"] - [n (format "~a seconds" n)]))) - - (during (canonical-local-host $canonical-host-name $cport) - (define callback (canonical-url canonical-host-name cport `("sub" (,sub-id ())))) - - (define (refresh-subscription!) - ;; TODO: shared secret - ;; TODO: listen to lease duration and use it to refresh ourselves more smarterly - (when (current-upstream-hub) - (web-request! 'post (current-upstream-hub) - #:body (string->bytes/utf-8 - (alist->form-urlencoded - `((hub.callback . ,callback) - (hub.mode . "subscribe") - (hub.topic . ,full-topic))))) - (established-upstream-hub (current-upstream-hub)))) - - (define (unsubscribe!) - (when (established-upstream-hub) - (web-request! 'post (established-upstream-hub) - #:body (string->bytes/utf-8 - (alist->form-urlencoded - `((hub.callback . ,callback) - (hub.mode . "unsubscribe") - (hub.topic . ,full-topic))))) - (established-upstream-hub #f))) - - (begin/dataflow - (when (not (equal? (current-upstream-hub) (established-upstream-hub))) - (unsubscribe!) - (refresh-subscription!))) - - (define/query-config max-upstream-redirects 5) - - (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) - (log-info "Checking upstream ~a" full-topic) - (define-values (resp response-body) - (web-request! 'get full-topic #:redirect-budget (max-upstream-redirects))) - (last-upstream-check (current-inexact-milliseconds)) - (match (web-response-header-code-type resp) - ['successful - (define new-content-hash (sha1 (open-input-bytes response-body))) - (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) - (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) - (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) - (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) - (when (not (and (equal? (current-content-hash) new-content-hash) - (equal? (current-content-type) new-content-type))) - (current-content-hash new-content-hash) - (current-content-type new-content-type) - (send! (notification full-topic - upstream-hub - upstream-topic - response-body - new-content-type))) - (if (equal? (current-upstream-hub) upstream-hub) - (refresh-subscription!) - (current-upstream-hub upstream-hub))] - [other - (log-warning "Upstream ~a yielded ~a code ~a" - full-topic - other - (web-response-header-code resp))])) - - (on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ()))) - (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) - (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) - (web-respond/bytes! id (string->bytes/utf-8 challenge))) - - (on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body) - ;; TODO: verify the use of the shared secret - (actor* - (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) - (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) - (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) - (define content-type (req-content-type req)) - (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" - full-topic - content-type - body - upstream-hub - upstream-topic) - (current-content-hash (sha1 (open-input-bytes body))) - (current-content-type content-type) - (current-upstream-hub upstream-hub) - (send! (notification full-topic - upstream-hub - upstream-topic - body - content-type)) - (web-respond/status! id 201 #"Created"))))) - -(actor #:name 'hub - (during (local-host $host-name $port) - (during (canonical-local-host $canonical-host-name $cport) - (on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body) - (asynchronous-verification-of-intent id req body canonical-host-name cport) - (web-respond/status! id 202 #"Accepted")))) - - (on (message (update-subscription $topic $callback $settings)) - (retract! (subscription topic callback ?)) - (when settings (assert! (subscription topic callback settings)))) - - (during/actor (subscription $topic $callback _) - #:name (list 'subscription topic callback) - #:on-crash (retract! (subscription topic callback ?)) - (subscription-main topic callback))) - -(define (asynchronous-verification-of-intent id req body canonical-host-name cport) - (actor* (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) - (define callback (hash-ref params 'hub.callback)) - (define mode (match (hash-ref params 'hub.mode) - ["subscribe" 'subscribe] - ["unsubscribe" 'unsubscribe])) - (define requested-topic (hash-ref params 'hub.topic)) - (define topic - (url->string - (combine-url/relative (string->url (canonical-url canonical-host-name - cport - `("topic" ("" ())))) - requested-topic))) - (define lease-seconds (match (hash-ref params - 'hub.lease_seconds - (config-ref 'default-lease 86400)) - ["unbounded" #f] - [n (string->number n)])) - (define poll-interval-seconds - (match (hash-ref params - 'hub.poll_interval_seconds - (config-ref 'default-poll-interval "none")) - ["none" #f] - [n (string->number n)])) - (define secret-string (hash-ref params 'hub.secret #f)) - (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) - (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) - (match mode - ['subscribe - (when (subscription-change-validate "subscribe" - (or lease-seconds "unbounded") - requested-topic - callback) - (send! (update-subscription topic - callback - (subscription-settings expiry-deadline - secret-bytes - poll-interval-seconds))))] - ['unsubscribe - (when (subscription-change-validate "unsubscribe" #f requested-topic callback) - (send! (update-subscription topic callback #f)))]))) - -(define (subscription-change-validate mode lease requested-topic callback) - (define challenge (random-hex-string 16)) - (define id (gensym 'validation)) - (define extra-query (list* (cons 'hub.mode mode) - (cons 'hub.topic requested-topic) - (cons 'hub.challenge challenge) - (if lease - (list (cons 'hub.lease_seconds (~a lease))) - (list)))) - (define-values (resp body) - (web-request! 'get - (let* ((u (string->url callback))) - (url->string - (struct-copy url u [query (append (url-query u) extra-query)]))))) - (and (eq? (web-response-header-code-type resp) 'successful) - (equal? body (string->bytes/utf-8 challenge)))) - -(define (maybe-link-header urlstr rel) - (if urlstr - (list (cons 'link (format "<~a>; rel=~a" urlstr rel))) - '())) - -(define (subscription-main topic callback) - (field [delivery-active? #f] - [message-queue (make-queue)] - [dead-letters (make-queue)]) - - (define/query-config max-dead-letters 10) - (define/query-config initial-retry-delay 5.0) - (define/query-config max-delivery-retries 10) - (define/query-config retry-delay-multiplier 1.618) - (define/query-config max-retry-delay 30) - - (stop-when (rising-edge (> (queue-length (dead-letters)) (max-dead-letters))) - (log-info "Too many dead letters for ~a" callback)) - - (define (deliver-queued-notifications) - (delivery-active? #t) - (let deliver-rest ((retry-delay (initial-retry-delay))) - (when (not (queue-empty? (message-queue))) - (define-values (n newq) (dequeue (message-queue))) - (message-queue newq) - (match-define (notification _ canonical-hub canonical-topic body content-type) n) - (define link-headers (append (maybe-link-header canonical-hub 'hub) - (maybe-link-header canonical-topic 'self))) - (let deliver-one ((retries-remaining (max-delivery-retries)) - (retry-delay retry-delay)) - (define-values (resp _body) - (web-request! 'post - callback - #:headers (if content-type - (cons (cons 'content-type content-type) link-headers) - link-headers) - #:body body)) - (cond - [(eq? (web-response-header-code-type resp) 'successful) - (deliver-rest (initial-retry-delay))] - [(zero? retries-remaining) - (log-info "Dead letter for ~v" callback) - (dead-letters (enqueue (dead-letters) n)) - (deliver-rest retry-delay)] - [else - (log-info - "Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v" - callback - retry-delay - retries-remaining - resp) - (sleep retry-delay) - (deliver-one (- retries-remaining 1) - (min (* retry-delay (retry-delay-multiplier)) - (max-retry-delay)))])))) - (delivery-active? #f)) - - (during (subscription topic callback (subscription-settings - $expiry-deadline - $secret-bytes - $poll-interval-seconds)) - (assert (topic-demand topic poll-interval-seconds)) - - (on-start (log-info "Subscription configured: ~v" - `((topic ,topic) - (expiry-deadline ,expiry-deadline) - (callback ,callback) - (secret-bytes ,secret-bytes) - (poll-interval-seconds ,poll-interval-seconds)))) - - (stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0))) - (log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback)))) - - (on (message ($ n (notification topic _ _ _ _))) - (message-queue (enqueue (message-queue) n)) - (when (not (delivery-active?)) - (deliver-queued-notifications)))))