diff --git a/README.md b/README.md index 3972df0..4ee490a 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ See the specification of the W3C WebSub protocol at 1. Install Racket from 2. Install RacketMQ by running `raco pkg install --auto racketmq` - 3. `racketmq --canonical-host localhost 7827` + 3. `racketmq --baseurl http://localhost:7827/ --listen localhost 7827` To install from git, replace the `raco pkg install ...` step above with an invocation of `make link` from the top directory of your git @@ -45,9 +45,8 @@ checkout. ## Configuration -RacketMQ has only one required configuration variable: you must tell -the hub its primary ("canonical") *host name* and *port number*. These -are used to build URLs for clients of the Hub to use. +The most important RacketMQ configuration variable is its canonical +base URL: the URL prefix used to build URLs for clients to use. When the RacketMQ startup script is given a "`-f` *filename*" option, it loads configuration data from the named file. The option can be @@ -59,7 +58,8 @@ For a fully-commented example configuration file, see Within each file, each configuration entry should be a list (see [Racket syntax](https://docs.racket-lang.org/reference/reader.html)) with a symbol (the "key") as its first item followed by zero or more -items. +items. Line comments start with semicolon (`;`) as usual for +S-expression languages. Each configuration file is automatically reread by the server when it is changed: if you need to make changes, consider doing so atomically @@ -68,29 +68,30 @@ by producing an updated configuration file and using ### Required configuration data - (canonical-host "localhost" 7827) + (canonical-baseurl "http://localhost:7827/") -Exactly one "canonical-host" key, containing two values, a string -hostname, and a number TCP port. This causes an HTTP server to be -spun up on the named port. +Exactly one "canonical-baseurl" key, containing a URL string naming +the base URL used for constructing URLs that are given out to third +parties, such as subscription endpoints for upstream hubs to use. -Since this is the only mandatory configuration item, RacketMQ can run -without any configuration file at all if the server is started with -the `--canonical-host` command-line argument: +This is *just* for URL construction, and does NOT create any HTTP +listeners. Those are configured with "http-listener" keys: - racketmq --canonical-host localhost 7827 - -### Optional configuration data - - (accepted-host "localhost" 7827) - (accepted-host "localhost" 80) - (accepted-host "www.example.com" 7827) + (http-listener "localhost" 7827) + ;; (http-listener "localhost" 80) + ;; (http-listener "www.example.com" 7827) + ;; ;; etc. -If you want your server to appear under several aliases, add them -here. HTTP servers will be spun up for all mentioned port numbers, -and within those servers, `Host` headers matching the given host -names will be accepted. +At least one "http-listener" key is required. These cause an HTTP +server to be spun up for each mentioned port number. Traffic will only +be accepted for HTTP Host headers mentioned in these keys. + +Since these are the only mandatory configuration item, RacketMQ can +run without any configuration file at all if the server is started +with the `--baseurl` and `--listen` command-line arguments: + + racketmq --baseurl http://localhost:7827/ --listen localhost 7827 ### Fine tuning diff --git a/racketmq/defaults.rktd b/racketmq/defaults.rktd index 7fc36c7..e9ca01e 100644 --- a/racketmq/defaults.rktd +++ b/racketmq/defaults.rktd @@ -15,24 +15,24 @@ ;;=========================================================================== ;;--------------------------------------------------------------------------- -;; Exactly one "canonical-host" key, containing two values, a string -;; hostname, and a number TCP port. This causes an HTTP server to be -;; spun up on the named port. -(canonical-host "localhost" 7827) - -;;=========================================================================== -;; OPTIONAL: -;;=========================================================================== +;; Exactly one "canonical-baseurl" key, containing a URL string naming +;; the base URL used for constructing URLs that are given out to third +;; parties, such as subscription endpoints for upstream hubs to use. +;; +;; This is *just* for URL construction, and does NOT create any HTTP +;; listeners. Those are configured with "http-listener" keys, +;; documented below. +;; +(canonical-baseurl "http://localhost:7827/") ;;--------------------------------------------------------------------------- -;; If you want your server to appear under several aliases, add them -;; here. HTTP servers will be spun up for all mentioned port numbers, -;; and within those servers, `Host` headers matching the given host -;; names will be accepted. +;; At least one "http-listener" key. These cause an HTTP server to be +;; spun up for each mentioned port number. Traffic will only be +;; accepted for HTTP Host headers mentioned in these keys. ;; -;; (accepted-host "localhost" 7827) -;; (accepted-host "localhost" 80) -;; (accepted-host "www.example.com" 7827) +(http-listener "localhost" 7827) +;; (http-listener "localhost" 80) +;; (http-listener "www.example.com" 7827) ;; ;; etc. diff --git a/racketmq/hub.rkt b/racketmq/hub.rkt index 8f5f6b8..6c96c1b 100644 --- a/racketmq/hub.rkt +++ b/racketmq/hub.rkt @@ -12,17 +12,21 @@ (require/activate "hub/static-content.rkt") (require/activate "hub/topic-demand.rkt") (require/activate "hub/local-topic.rkt") +(require/activate "hub/remote-topic.rkt") (require/activate "hub/subscription.rkt") (require/activate "hub/websocket.rkt") (command-line #:program "racketmq" #:once-each - ["--canonical-host" host port "Specify the canonical host and port for this hub" - (actor #:name (list 'command-line-canonical-host host port) - (assert (config (list 'canonical-host host (string->number port)))))] + ["--baseurl" baseurl "Specify the canonical base URL for this hub" + (actor #:name (list 'command-line-canonical-baseurl baseurl) + (assert (config (list 'canonical-baseurl baseurl))))] #:multi + [("-l" "--listen") host port "Specify one HTTP listener" + (actor #:name (list 'command-line-http-listener host port) + (assert (config (list 'http-listener host (string->number port)))))] [("-o" "--option") key vals "Specify a single configuration option" (actor #:name (list 'config-option key vals) (assert (config (cons (string->symbol key) @@ -32,21 +36,20 @@ (actor #:name 'main - (during (config (list 'canonical-host $h $p)) - (assert (canonical-local-host h p)) - (assert (local-host h p))) + (during (config (list 'canonical-baseurl $u)) + (assert (canonical-baseurl u))) - (define/query-set canonical-local-hosts ($ c (canonical-local-host _ _)) c) - (stop-when (rising-edge (> (set-count (canonical-local-hosts)) 1)) - (log-error "Too many canonical-host records in configuration.")) + (define/query-set canonical-baseurls ($ c (canonical-baseurl _)) c) + (stop-when (rising-edge (> (set-count (canonical-baseurls)) 1)) + (log-error "Too many canonical-baseurl records in configuration.")) (on-start (sleep 0.1) - (when (set-empty? (canonical-local-hosts)) - (log-error "No canonical-host records specified; try the --canonical-host command line argument"))) - ;; TODO: Make the too-many-canonical-host-records situation recoverable. + (when (set-empty? (canonical-baseurls)) + (log-error "No canonical-baseurl records specified; try the --baseurl command line argument"))) + ;; TODO: Make the too-many-canonical-baseurl-records situation recoverable. ;; TODO: And/or, make the whole application quit when it gets into a bad state. - (during (config (list 'accepted-host $h $p)) - (assert (local-host h p))) + (during (config (list 'http-listener $h $p)) + (assert (http-listener h p))) - (during (local-host $host-name $port) + (during (http-listener $host-name $port) (assert (vh host-name port)))) diff --git a/racketmq/hub/local-topic.rkt b/racketmq/hub/local-topic.rkt index 37d534b..1f6abe0 100644 --- a/racketmq/hub/local-topic.rkt +++ b/racketmq/hub/local-topic.rkt @@ -11,7 +11,7 @@ (field [topics (set)]) - (during (local-host $host-name $port) + (during (http-listener $host-name $port) (on (web-request-incoming (id req) (vh host-name port) 'put ("topic" (,$topic ()))) (when (not (set-member? (topics) topic)) (topics (set-add (topics) topic)) @@ -51,10 +51,10 @@ (max-count))) (on-stop (log-info "Terminating local topic ~v" topic)) - (during (local-host $host-name $port) - (during (canonical-local-host $canonical-host-name $cport) - (define hub-url (canonical-url canonical-host-name cport `("hub" ()))) - (define self-url (canonical-url canonical-host-name cport `("topic" (,topic ())))) + (during (http-listener $host-name $port) + (during (canonical-baseurl $baseurl) + (define hub-url (canonical-url baseurl `("hub" ()))) + (define self-url (canonical-url baseurl `("topic" (,topic ())))) (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) (cons 'link (format "<~a>; rel=self" self-url)))) diff --git a/racketmq/hub/remote-topic.rkt b/racketmq/hub/remote-topic.rkt index 9465301..0c8d51f 100644 --- a/racketmq/hub/remote-topic.rkt +++ b/racketmq/hub/remote-topic.rkt @@ -1,7 +1,5 @@ #lang syndicate/actor -(provide remote-topic-main) - (require racket/dict) (require racket/set) (require file/sha1) @@ -34,6 +32,11 @@ (check-equal? (shrink-lease 90) 81) (check-equal? (shrink-lease 50) 45)) +(actor #:name 'remote-topic-manager + (during/actor (remote-topic-demand $full-topic) + #:name (list 'remote-topic full-topic) + (remote-topic-main full-topic))) + (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) (log-info "Remote sub endpoint ~a for topic ~s" sub-id full-topic) @@ -62,8 +65,8 @@ [#f "disabled"] [n (format "~a seconds" n)]))) - (during (canonical-local-host $canonical-host-name $cport) - (define callback (canonical-url canonical-host-name cport `("sub" (,sub-id ())))) + (during (canonical-baseurl $baseurl) + (define callback (canonical-url baseurl `("sub" (,sub-id ())))) (define (refresh-subscription!) ;; TODO: shared secret @@ -161,35 +164,36 @@ (* 1000 (or (poll-interval-seconds) 0))))) (poll-upstream!)) - (on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ()))) - (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) - (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) - (define lease-seconds (dict-ref (web-request-header-query req) 'hub.lease_seconds #f)) - (if lease-seconds - (next-subscription-refresh (+ (current-inexact-milliseconds) - (* 1000.0 (shrink-lease lease-seconds)))) - (log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic)) - (web-respond/bytes! id (string->bytes/utf-8 challenge))) + (during (http-listener $host-name $port) + (on (web-request-get (id req) (vh host-name port) ("sub" (,sub-id ()))) + (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) + (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) + (define lease-seconds (dict-ref (web-request-header-query req) 'hub.lease_seconds #f)) + (if lease-seconds + (next-subscription-refresh (+ (current-inexact-milliseconds) + (* 1000.0 (shrink-lease lease-seconds)))) + (log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic)) + (web-respond/bytes! id (string->bytes/utf-8 challenge))) - (on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body) - ;; TODO: verify the use of the shared secret - (actor* - (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) - (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) - (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) - (define content-type (web-request-header-content-type req)) - (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" - full-topic - content-type - body - upstream-hub - upstream-topic) - (current-content-hash (sha1 (open-input-bytes body))) - (current-content-type content-type) - (current-upstream-hub upstream-hub) - (send! (notification full-topic - upstream-hub - upstream-topic - body - content-type)) - (web-respond/status! id 201 #"Created"))))) + (on (web-request-incoming (id req) (vh host-name port) 'post ("sub" (,sub-id ())) $body) + ;; TODO: verify the use of the shared secret + (actor* + (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) + (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) + (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) + (define content-type (web-request-header-content-type req)) + (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" + full-topic + content-type + body + upstream-hub + upstream-topic) + (current-content-hash (sha1 (open-input-bytes body))) + (current-content-type content-type) + (current-upstream-hub upstream-hub) + (send! (notification full-topic + upstream-hub + upstream-topic + body + content-type)) + (web-respond/status! id 201 #"Created")))))) diff --git a/racketmq/hub/static-content.rkt b/racketmq/hub/static-content.rkt index 8e63d4b..ecf6149 100644 --- a/racketmq/hub/static-content.rkt +++ b/racketmq/hub/static-content.rkt @@ -16,7 +16,7 @@ (actor #:name 'static-content-server (define url->path (make-url->path htdocs-path)) - (during (local-host $host-name $port) + (during (http-listener $host-name $port) (on (web-request-get (id req) (vh host-name port) ,_) (define-values (path path-pieces) (url->path (resource->url (web-request-header-resource req)))) diff --git a/racketmq/hub/subscription.rkt b/racketmq/hub/subscription.rkt index 6132412..87b3e93 100644 --- a/racketmq/hub/subscription.rkt +++ b/racketmq/hub/subscription.rkt @@ -13,10 +13,10 @@ (require "../protocol.rkt") (actor #:name 'hub - (during (local-host $host-name $port) - (during (canonical-local-host $canonical-host-name $cport) + (during (http-listener $host-name $port) + (during (canonical-baseurl $baseurl) (on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body) - (asynchronous-verification-of-intent id req body canonical-host-name cport) + (asynchronous-verification-of-intent id req body baseurl) (web-respond/status! id 202 #"Accepted")))) (on (message (update-subscription $topic $callback $settings)) @@ -28,7 +28,7 @@ #:on-crash (retract! (subscription topic callback ?)) (subscription-main topic callback))) -(define (asynchronous-verification-of-intent id req body canonical-host-name cport) +(define (asynchronous-verification-of-intent id req body baseurl) (actor* #:name 'verification-of-intent (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) (define callback (hash-ref params 'hub.callback)) @@ -38,9 +38,7 @@ (define requested-topic (hash-ref params 'hub.topic)) (define topic (url->string - (combine-url/relative (string->url (canonical-url canonical-host-name - cport - `("topic" ("" ())))) + (combine-url/relative (string->url (canonical-url baseurl `("topic" ("" ())))) requested-topic))) (define requested-lease-seconds (string->number diff --git a/racketmq/hub/topic-demand.rkt b/racketmq/hub/topic-demand.rkt index 9731437..228913d 100644 --- a/racketmq/hub/topic-demand.rkt +++ b/racketmq/hub/topic-demand.rkt @@ -2,44 +2,32 @@ (require racket/exn) (require racket/set) +(require racket/string) (require net/url) (require/activate syndicate/drivers/web) -(require/activate "remote-topic.rkt") (require "../private/util.rkt") (require "../protocol.rkt") (actor #:name 'topic-demand-analyzer - (define/query-set local-hosts ($ h (local-host _ _)) h) (during/actor (topic-demand $full-topic _) - #:name (list 'general-topic full-topic) - #:let [(local-hosts-snapshot (local-hosts))] - (with-handlers [(exn? (lambda (e) - (log-error "Topic demand error: ~a" (exn->string e))))] - (match-define (web-resource (web-virtual-host _ topic-host topic-port) topic-path) - (url->resource (string->url full-topic))) - (define maybe-local-topic - (match topic-path [`("topic" (,topic ())) topic] [_ #f])) - (general-topic-main full-topic - topic-host - topic-port - maybe-local-topic - (set-member? local-hosts-snapshot - (local-host topic-host topic-port)))))) + #:name (list 'topic-demand full-topic) -(define (general-topic-main full-topic topic-host topic-port maybe-local-topic start-as-local?) - (define (local-state) - (react (stop-when (retracted (local-host topic-host topic-port)) - (remote-state)) - (assert (local-topic-demand maybe-local-topic)))) + (define/query-value topic-baseurl #f (canonical-baseurl $b) + (canonical-url b `("topic" ("" ())))) - (define (remote-state) - (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host topic-port)) - (local-state)) - (remote-topic-main full-topic))) + (define/dataflow state + (cond + [(not (topic-baseurl)) 'unknown] + [(string-prefix? full-topic (topic-baseurl)) 'local] + [else 'remote])) - (on-start - (if (and maybe-local-topic start-as-local?) - (local-state) - (remote-state)))) + (begin/dataflow + (log-info "Topic-demand for ~s is in state ~s." full-topic (state))) + + (assert #:when (eq? (state) 'local) + (local-topic-demand (substring full-topic (string-length (topic-baseurl))))) + + (assert #:when (eq? (state) 'remote) + (remote-topic-demand full-topic)))) diff --git a/racketmq/hub/websocket.rkt b/racketmq/hub/websocket.rkt index 8038da7..626e748 100644 --- a/racketmq/hub/websocket.rkt +++ b/racketmq/hub/websocket.rkt @@ -13,21 +13,19 @@ (require "../protocol.rkt") (actor #:name 'websocket-hub - (during (local-host $host-name $port) - (during (canonical-local-host $canonical-host-name $cport) + (during (http-listener $host-name $port) + (during (canonical-baseurl $baseurl) (on (web-request-get (id req) (vh host-name port) ("hub" ())) (when (equal? (dict-ref (web-request-header-headers req) 'upgrade #f) "websocket") - (websocket-subscription id req canonical-host-name cport)))))) + (websocket-subscription id req baseurl)))))) -(define (websocket-subscription id req canonical-host-name cport) +(define (websocket-subscription id req baseurl) (actor* #:name (list 'websocket-subscription id) (define params (web-request-header-query req)) (define requested-topic (dict-ref params 'hub.topic)) (define topic ;; TODO: abstract this expression out (see also subscription.rkt) (url->string - (combine-url/relative (string->url (canonical-url canonical-host-name - cport - `("topic" ("" ())))) + (combine-url/relative (string->url (canonical-url baseurl `("topic" ("" ())))) requested-topic))) (define poll-interval-seconds (match (dict-ref params diff --git a/racketmq/private/util.rkt b/racketmq/private/util.rkt index d8a0d68..5a42a8c 100644 --- a/racketmq/private/util.rkt +++ b/racketmq/private/util.rkt @@ -144,5 +144,7 @@ (define (vh host-name port) (web-virtual-host "http" host-name port)) -(define (canonical-url canonical-host-name cport path) - (url->string (resource->url (web-resource (vh canonical-host-name cport) path)))) +(define (canonical-url baseurl path) + (define b (url->resource (string->url baseurl))) + (url->string (resource->url (struct-copy web-resource b + [path (append-url-path (web-resource-path b) path)])))) diff --git a/racketmq/protocol.rkt b/racketmq/protocol.rkt index 51e1dcd..d69b43b 100644 --- a/racketmq/protocol.rkt +++ b/racketmq/protocol.rkt @@ -8,8 +8,9 @@ (struct-out local-topic-config) (struct-out topic-demand) (struct-out local-topic-demand) - (struct-out local-host) - (struct-out canonical-local-host)) + (struct-out remote-topic-demand) + (struct-out http-listener) + (struct-out canonical-baseurl)) ;; A Topic is a URIString. @@ -53,8 +54,11 @@ ;; (local-topic-demand String) (struct local-topic-demand (name) #:prefab) ;; ASSERTION -;; (local-host String Number) -(struct local-host (name port) #:prefab) ;; ASSERTION +;; (remote-topic-demand Topic) +(struct remote-topic-demand (topic-name) #:prefab) ;; ASSERTION -;; (canonical-local-host String Number) -(struct canonical-local-host (name port) #:prefab) ;; ASSERTION +;; (http-listener String Number) +(struct http-listener (name port) #:prefab) ;; ASSERTION + +;; (canonical-baseurl URLString) +(struct canonical-baseurl (string) #:prefab) ;; ASSERTION