diff --git a/racketmq/config.rkt b/racketmq/config.rkt new file mode 100644 index 0000000..30c1651 --- /dev/null +++ b/racketmq/config.rkt @@ -0,0 +1,32 @@ +#lang syndicate/actor +;; Server Configuration + +(provide (struct-out config) + spawn-configuration + define/query-config + config-ref) + +(require racket/file) +(require/activate syndicate/drivers/filesystem) + +(struct config (item) #:prefab) + +(define (spawn-configuration path) + (actor #:name (list 'configuration-monitor path) + (during (file-content path file->list $items) + (log-info "config ~s: ~s" path items) + (for [(item items)] + (assert (config item)))))) + +(define-syntax define/query-config + (syntax-rules () + [(_ id default) + (define/query-config id id default)] + [(_ id key default) + (define/query-value id default (config (list 'key $val)) val)])) + +(define (config-ref key default) + (react/suspend (k) + (define/query-value actual default (config (list key $val)) val) + (on-start (flush!) + (k (actual))))) diff --git a/racketmq/defaults.rktd b/racketmq/defaults.rktd new file mode 100644 index 0000000..2ded0b5 --- /dev/null +++ b/racketmq/defaults.rktd @@ -0,0 +1,22 @@ +;; REQUIRED: +;; (canonical-host "localhost" 7827) + +;; OPTIONAL: + +;; (accepted-host "localhost" 7827) +;; (accepted-host "localhost" 80) +;; etc. + +(max-upstream-redirects 5) + +(min-poll-interval 60) + +(default-lease 86400) ;; seconds + +(default-poll-interval "none") ;; seconds, or "none" + +(max-dead-letters 10) +(max-delivery-retries 10) +(initial-retry-delay 5.0) ;; seconds +(retry-delay-multiplier 1.618) +(max-retry-delay 30) diff --git a/racketmq/protocol.rkt b/racketmq/protocol.rkt new file mode 100644 index 0000000..29fa88f --- /dev/null +++ b/racketmq/protocol.rkt @@ -0,0 +1,60 @@ +#lang racket/base +;; Assertions, Messages and Protocols for RacketMQ + +(provide (struct-out notification) + (struct-out update-subscription) + (struct-out subscription) + (struct-out subscription-settings) + (struct-out local-topic-config) + (struct-out topic-demand) + (struct-out local-topic-demand) + (struct-out local-host) + (struct-out canonical-local-host)) + +;; A Topic is a URIString. + +;; A Deadline is an (Option Number), where #f indicates "unbounded", +;; and numbers indicate a moment in time as a Unix epoch timestamp as +;; might be returned from `current-seconds`. + +;; A Notification contains both `topic-name`, indicating the *local* +;; name for the topic that is being subscribed to, as well as +;; `canonical-topic`, which is the rel=self URL given by the upstream +;; topic itself. The former is our local key for finding resources; +;; the latter is a property of the topic content, not a property of +;; the subscription. The same is true of `canonical-hub`: it pertains +;; to the topic, not the subscription. The `topic-name` is the only +;; field pertaining to a subscription; all the others pertain to the +;; content. +(struct notification (topic-name ;; Topic + canonical-hub ;; Option URLString + canonical-topic ;; Option Topic + content ;; Bytes + content-type) ;; Option String + #:prefab) ;; MESSAGE + +;; (update-subscription Topic URLString (Option SubscriptionSettings)) +(struct update-subscription (topic callback settings) #:prefab) ;; MESSAGE + +;; (subscription Topic URLString SubscriptionSettings) +(struct subscription (topic callback settings-) #:prefab) ;; ASSERTION + +(struct subscription-settings (expiry-deadline ;; Deadline + secret ;; Option Bytes + poll-interval-seconds) ;; Option Number + #:prefab) + +;; (local-topic-config Topic (Option Number) (Option Number)) +(struct local-topic-config (name max-age max-count) #:prefab) ;; ASSERTION + +;; (topic-demand Topic (Option Number)) +(struct topic-demand (topic-name poll-interval-seconds) #:prefab) ;; ASSERTION + +;; (local-topic-demand String) +(struct local-topic-demand (name) #:prefab) ;; ASSERTION + +;; (local-host String) +(struct local-host (name port) #:prefab) ;; ASSERTION + +;; (canonical-local-host String) +(struct canonical-local-host (name port) #:prefab) ;; ASSERTION diff --git a/racketmq/server.rkt b/racketmq/server.rkt index 5a27659..e3a5d4b 100644 --- a/racketmq/server.rkt +++ b/racketmq/server.rkt @@ -21,107 +21,26 @@ (require syndicate/functional-queue) (require "private/util.rkt") +(require "protocol.rkt") +(require "config.rkt") -(module+ test (require rackunit)) +(define (vh host-name port) (web-virtual-host "http" host-name port)) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -;; TODO: sane defaults -(define *max-upstream-redirects* 5) -(define *server-port* 7827) -(define *canonical-host* "localhost") -(define *accepted-hosts* (list *canonical-host*)) -(define *min-poll-interval* 10) ;; seconds -(define *default-lease* "unbounded") -(define *default-poll-interval* "none") -(define *max-dead-letters* 10) -(define *max-delivery-retries* 10) -(define *initial-retry-delay* 5.0) ;; seconds -(define *retry-delay-multiplier* 1.618) -(define *max-retry-delay* 30) ;; seconds - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -;; A Topic is a URIString. - -;; A Deadline is an (Option Number), where #f indicates "unbounded", -;; and numbers indicate a moment in time as a Unix epoch timestamp as -;; might be returned from `current-seconds`. - -;; A Notification contains both `topic-name`, indicating the *local* -;; name for the topic that is being subscribed to, as well as -;; `canonical-topic`, which is the rel=self URL given by the upstream -;; topic itself. The former is our local key for finding resources; -;; the latter is a property of the topic content, not a property of -;; the subscription. The same is true of `canonical-hub`: it pertains -;; to the topic, not the subscription. The `topic-name` is the only -;; field pertaining to a subscription; all the others pertain to the -;; content. -(struct notification (topic-name ;; Topic - canonical-hub ;; Option URLString - canonical-topic ;; Option Topic - content ;; Bytes - content-type) ;; Option String - #:prefab) - -;; (update-subscription Topic URLString (Option SubscriptionSettings)) -(struct update-subscription (topic callback settings) #:prefab) ;; message - -;; (subscription Topic URLString SubscriptionSettings) -(struct subscription (topic callback settings-) #:prefab) ;; assertion - -(struct subscription-settings (expiry-deadline ;; Deadline - secret ;; Option Bytes - poll-interval-seconds) ;; Option Number - #:prefab) - -;; (local-topic-config Topic (Option Number) (Option Number)) -(struct local-topic-config (name max-age max-count) #:prefab) - -;; (topic-demand Topic (Option Number)) -(struct topic-demand (topic-name poll-interval-seconds) #:prefab) - -;; (local-topic-demand String) -(struct local-topic-demand (name) #:prefab) - -;; (local-host String) -(struct local-host (name) #:prefab) - -;; (canonical-local-host String) -(struct canonical-local-host (name) #:prefab) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -;; (define (number->path n) -;; (let ((top (quotient n 256)) -;; (bot (remainder n 256))) -;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0")) -;; (if (zero? top) -;; bot-piece -;; (string-append (number->path top) "-/" bot-piece)))) - -;; (module+ test -;; (check-equal? (number->path 0) "00") -;; (check-equal? (number->path 123) "7b") -;; (check-equal? (number->path 12345) "30-/39") -;; (check-equal? (number->path 123456789012345678901234567890) -;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2")) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(define (vh host-name) (web-virtual-host "http" host-name *server-port*)) +(spawn-configuration "defaults.rktd") (actor #:name 'main - (for [(h *accepted-hosts*)] - (assert (local-host h))) + (during (config (list 'canonical-host $h $p)) + (assert (canonical-local-host h p)) + (assert (local-host h p))) - (assert (canonical-local-host *canonical-host*)) + (during (config (list 'accepted-host $h $p)) + (assert (local-host h p))) - (during (local-host $host-name) - (assert (vh host-name)) + (during (local-host $host-name $port) + (assert (vh host-name port)) - (on (web-request-get (id req) (vh host-name) ("" ())) + (on (web-request-get (id req) (vh host-name port) ("" ())) (actor* (web-respond/xexpr! id `(html @@ -138,8 +57,8 @@ (actor #:name 'static-content-server (define url->path (make-url->path htdocs-path)) - (during (local-host $host-name) - (on (web-request-get (id req) (vh host-name) ,_) + (during (local-host $host-name $port) + (on (web-request-get (id req) (vh host-name port) ,_) (define-values (path path-pieces) (url->path (resource->url (web-request-header-resource req)))) (when (file-exists? path) @@ -151,8 +70,8 @@ (field [topics (set)]) - (during (local-host $host-name) - (on (web-request-incoming (id req) (vh host-name) 'put ("topic" (,$topic ()))) + (during (local-host $host-name $port) + (on (web-request-incoming (id req) (vh host-name port) 'put ("topic" (,$topic ()))) (when (not (set-member? (topics) topic)) (topics (set-add (topics) topic)) (assert! (local-topic-demand topic)) @@ -160,7 +79,7 @@ (assert! (local-topic-config topic #f #f))) ;; TODO: maximums (web-respond/bytes! id #"")) - (on (web-request-incoming (id req) (vh host-name) 'delete ("topic" (,$topic ()))) + (on (web-request-incoming (id req) (vh host-name port) 'delete ("topic" (,$topic ()))) (when (set-member? (topics) topic) (topics (set-remove (topics) topic)) (retract! (local-topic-demand topic)) @@ -172,28 +91,32 @@ (local-topic-main topic))) (actor #:name 'topic-demand-analyzer - (define/query-set local-hosts (local-host $host-name) host-name) + (define/query-set local-hosts ($ h (local-host _ _)) h) (during/actor (topic-demand $full-topic _) #:name (list 'general-topic full-topic) #:let [(local-hosts-snapshot (local-hosts))] (with-handlers [(exn? (lambda (e) (log-error "Topic demand error: ~a" (exn->string e))))] - (match-define (web-resource (web-virtual-host _ topic-host _) topic-path) + (match-define (web-resource (web-virtual-host _ topic-host topic-port) topic-path) (url->resource (string->url full-topic))) (define maybe-local-topic (match topic-path [`("topic" (,topic ())) topic] [_ #f])) (general-topic-main full-topic topic-host + topic-port maybe-local-topic - (set-member? local-hosts-snapshot topic-host))))) + (set-member? local-hosts-snapshot + (local-host topic-host topic-port)))))) -(define (general-topic-main full-topic topic-host maybe-local-topic start-as-local?) +(define (general-topic-main full-topic topic-host topic-port maybe-local-topic start-as-local?) (define (local-state) - (react (stop-when (retracted (local-host topic-host)) (remote-state)) + (react (stop-when (retracted (local-host topic-host topic-port)) + (remote-state)) (assert (local-topic-demand maybe-local-topic)))) (define (remote-state) - (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state)) + (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host topic-port)) + (local-state)) (remote-topic-main full-topic))) (on-start @@ -204,8 +127,8 @@ (define (req-content-type req) (dict-ref (web-request-header-headers req) 'content-type #f)) -(define (canonical-url canonical-host-name path) - (url->string (resource->url (web-resource (vh canonical-host-name) path)))) +(define (canonical-url canonical-host-name cport path) + (url->string (resource->url (web-resource (vh canonical-host-name cport) path)))) (define (local-topic-main topic) (field [max-age #f] @@ -227,10 +150,10 @@ (max-count))) (on-stop (log-info "Terminating local topic ~v" topic)) - (during (local-host $host-name) - (during (canonical-local-host $canonical-host-name) - (define hub-url (canonical-url canonical-host-name `("hub" ()))) - (define self-url (canonical-url canonical-host-name `("topic" (,topic ())))) + (during (local-host $host-name $port) + (during (canonical-local-host $canonical-host-name $cport) + (define hub-url (canonical-url canonical-host-name cport `("hub" ()))) + (define self-url (canonical-url canonical-host-name cport `("topic" (,topic ())))) (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) (cons 'link (format "<~a>; rel=self" self-url)))) @@ -252,11 +175,11 @@ #:headers discovery-headers) #""))) ;; MUST NOT include a response body for 204 - (on (web-request-incoming (id req) (vh host-name) 'head ("topic" (,topic ()))) + (on (web-request-incoming (id req) (vh host-name port) 'head ("topic" (,topic ()))) (topic-response id #f)) - (on (web-request-get (id req) (vh host-name) ("topic" (,topic ()))) + (on (web-request-get (id req) (vh host-name port) ("topic" (,topic ()))) (topic-response id #t)) - (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) + (on (web-request-incoming (id req) (vh host-name port) 'post ("topic" (,topic ())) $body) (define content-type (req-content-type req)) (log-info "Local topic ~a got ~v message ~v" topic content-type body) (current-content body) @@ -283,12 +206,13 @@ [poll-interval-seconds #f]) (define/query-set poll-intervals (topic-demand full-topic $i) i) + (define/query-config min-poll-interval 60) (begin/dataflow (define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] (cond [(not i) candidate] [(< candidate i) candidate] [else i]))) - (poll-interval-seconds (and candidate (max candidate *min-poll-interval*)))) + (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) (begin/dataflow (log-info "Poll interval for ~a is now ~a" full-topic @@ -296,8 +220,8 @@ [#f "disabled"] [n (format "~a seconds" n)]))) - (during (canonical-local-host $canonical-host-name) - (define callback (canonical-url canonical-host-name `("sub" (,sub-id ())))) + (during (canonical-local-host $canonical-host-name $cport) + (define callback (canonical-url canonical-host-name cport `("sub" (,sub-id ())))) (define (refresh-subscription!) ;; TODO: shared secret @@ -326,10 +250,12 @@ (unsubscribe!) (refresh-subscription!))) + (define/query-config max-upstream-redirects 5) + (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) (log-info "Checking upstream ~a" full-topic) (define-values (resp response-body) - (web-request! 'get full-topic #:redirect-budget *max-upstream-redirects*)) + (web-request! 'get full-topic #:redirect-budget (max-upstream-redirects))) (last-upstream-check (current-inexact-milliseconds)) (match (web-response-header-code-type resp) ['successful @@ -356,12 +282,12 @@ other (web-response-header-code resp))])) - (on (web-request-get (id req) (vh canonical-host-name) ("sub" (,sub-id ()))) + (on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ()))) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) (web-respond/bytes! id (string->bytes/utf-8 challenge))) - (on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body) + (on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body) ;; TODO: verify the use of the shared secret (actor* (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) @@ -385,10 +311,10 @@ (web-respond/status! id 201 #"Created"))))) (actor #:name 'hub - (during (local-host $host-name) - (during (canonical-local-host $canonical-host-name) - (on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body) - (asynchronous-verification-of-intent id req body canonical-host-name) + (during (local-host $host-name $port) + (during (canonical-local-host $canonical-host-name $cport) + (on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body) + (asynchronous-verification-of-intent id req body canonical-host-name cport) (web-respond/status! id 202 #"Accepted")))) (on (message (update-subscription $topic $callback $settings)) @@ -400,7 +326,7 @@ #:on-crash (retract! (subscription topic callback ?)) (subscription-main topic callback))) -(define (asynchronous-verification-of-intent id req body canonical-host-name) +(define (asynchronous-verification-of-intent id req body canonical-host-name cport) (actor* (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) (define callback (hash-ref params 'hub.callback)) (define mode (match (hash-ref params 'hub.mode) @@ -410,13 +336,18 @@ (define topic (url->string (combine-url/relative (string->url (canonical-url canonical-host-name + cport `("topic" ("" ())))) requested-topic))) - (define lease-seconds (match (hash-ref params 'hub.lease_seconds *default-lease*) + (define lease-seconds (match (hash-ref params + 'hub.lease_seconds + (config-ref 'default-lease 86400)) ["unbounded" #f] [n (string->number n)])) (define poll-interval-seconds - (match (hash-ref params 'hub.poll_interval_seconds *default-poll-interval*) + (match (hash-ref params + 'hub.poll_interval_seconds + (config-ref 'default-poll-interval "none")) ["none" #f] [n (string->number n)])) (define secret-string (hash-ref params 'hub.secret #f)) @@ -464,19 +395,25 @@ [message-queue (make-queue)] [dead-letters (make-queue)]) - (stop-when (rising-edge (> (queue-length (dead-letters)) *max-dead-letters*)) + (define/query-config max-dead-letters 10) + (define/query-config initial-retry-delay 5.0) + (define/query-config max-delivery-retries 10) + (define/query-config retry-delay-multiplier 1.618) + (define/query-config max-retry-delay 30) + + (stop-when (rising-edge (> (queue-length (dead-letters)) (max-dead-letters))) (log-info "Too many dead letters for ~a" callback)) (define (deliver-queued-notifications) (delivery-active? #t) - (let deliver-rest ((retry-delay *initial-retry-delay*)) + (let deliver-rest ((retry-delay (initial-retry-delay))) (when (not (queue-empty? (message-queue))) (define-values (n newq) (dequeue (message-queue))) (message-queue newq) (match-define (notification _ canonical-hub canonical-topic body content-type) n) (define link-headers (append (maybe-link-header canonical-hub 'hub) (maybe-link-header canonical-topic 'self))) - (let deliver-one ((retries-remaining *max-delivery-retries*) + (let deliver-one ((retries-remaining (max-delivery-retries)) (retry-delay retry-delay)) (define-values (resp _body) (web-request! 'post @@ -487,7 +424,7 @@ #:body body)) (cond [(eq? (web-response-header-code-type resp) 'successful) - (deliver-rest *initial-retry-delay*)] + (deliver-rest (initial-retry-delay))] [(zero? retries-remaining) (log-info "Dead letter for ~v" callback) (dead-letters (enqueue (dead-letters) n)) @@ -501,8 +438,8 @@ resp) (sleep retry-delay) (deliver-one (- retries-remaining 1) - (min (* retry-delay *retry-delay-multiplier*) - *max-retry-delay*))])))) + (min (* retry-delay (retry-delay-multiplier)) + (max-retry-delay)))])))) (delivery-active? #f)) (during (subscription topic callback (subscription-settings