Reinterpret canonical-local-host -> canonical-baseurl.
In order to support HTTPS nginx proxying, we need to maintain a "fictional" baseurl which is not directly connected to this server's listen ports. This has caused a change to configuration: now, a baseurl is specified along with one or more listen specifications.
This commit is contained in:
parent
1802a36ce3
commit
f14e0acfcb
47
README.md
47
README.md
|
@ -20,7 +20,7 @@ See the specification of the W3C WebSub protocol at
|
|||
|
||||
1. Install Racket from <http://download.racket-lang.org/>
|
||||
2. Install RacketMQ by running `raco pkg install --auto racketmq`
|
||||
3. `racketmq --canonical-host localhost 7827`
|
||||
3. `racketmq --baseurl http://localhost:7827/ --listen localhost 7827`
|
||||
|
||||
To install from git, replace the `raco pkg install ...` step above
|
||||
with an invocation of `make link` from the top directory of your git
|
||||
|
@ -45,9 +45,8 @@ checkout.
|
|||
|
||||
## Configuration
|
||||
|
||||
RacketMQ has only one required configuration variable: you must tell
|
||||
the hub its primary ("canonical") *host name* and *port number*. These
|
||||
are used to build URLs for clients of the Hub to use.
|
||||
The most important RacketMQ configuration variable is its canonical
|
||||
base URL: the URL prefix used to build URLs for clients to use.
|
||||
|
||||
When the RacketMQ startup script is given a "`-f` *filename*" option,
|
||||
it loads configuration data from the named file. The option can be
|
||||
|
@ -59,7 +58,8 @@ For a fully-commented example configuration file, see
|
|||
Within each file, each configuration entry should be a list (see
|
||||
[Racket syntax](https://docs.racket-lang.org/reference/reader.html))
|
||||
with a symbol (the "key") as its first item followed by zero or more
|
||||
items.
|
||||
items. Line comments start with semicolon (`;`) as usual for
|
||||
S-expression languages.
|
||||
|
||||
Each configuration file is automatically reread by the server when it
|
||||
is changed: if you need to make changes, consider doing so atomically
|
||||
|
@ -68,29 +68,30 @@ by producing an updated configuration file and using
|
|||
|
||||
### Required configuration data
|
||||
|
||||
(canonical-host "localhost" 7827)
|
||||
(canonical-baseurl "http://localhost:7827/")
|
||||
|
||||
Exactly one "canonical-host" key, containing two values, a string
|
||||
hostname, and a number TCP port. This causes an HTTP server to be
|
||||
spun up on the named port.
|
||||
Exactly one "canonical-baseurl" key, containing a URL string naming
|
||||
the base URL used for constructing URLs that are given out to third
|
||||
parties, such as subscription endpoints for upstream hubs to use.
|
||||
|
||||
Since this is the only mandatory configuration item, RacketMQ can run
|
||||
without any configuration file at all if the server is started with
|
||||
the `--canonical-host` command-line argument:
|
||||
This is *just* for URL construction, and does NOT create any HTTP
|
||||
listeners. Those are configured with "http-listener" keys:
|
||||
|
||||
racketmq --canonical-host localhost 7827
|
||||
|
||||
### Optional configuration data
|
||||
|
||||
(accepted-host "localhost" 7827)
|
||||
(accepted-host "localhost" 80)
|
||||
(accepted-host "www.example.com" 7827)
|
||||
(http-listener "localhost" 7827)
|
||||
;; (http-listener "localhost" 80)
|
||||
;; (http-listener "www.example.com" 7827)
|
||||
;;
|
||||
;; etc.
|
||||
|
||||
If you want your server to appear under several aliases, add them
|
||||
here. HTTP servers will be spun up for all mentioned port numbers,
|
||||
and within those servers, `Host` headers matching the given host
|
||||
names will be accepted.
|
||||
At least one "http-listener" key is required. These cause an HTTP
|
||||
server to be spun up for each mentioned port number. Traffic will only
|
||||
be accepted for HTTP Host headers mentioned in these keys.
|
||||
|
||||
Since these are the only mandatory configuration item, RacketMQ can
|
||||
run without any configuration file at all if the server is started
|
||||
with the `--baseurl` and `--listen` command-line arguments:
|
||||
|
||||
racketmq --baseurl http://localhost:7827/ --listen localhost 7827
|
||||
|
||||
### Fine tuning
|
||||
|
||||
|
|
|
@ -15,24 +15,24 @@
|
|||
;;===========================================================================
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
;; Exactly one "canonical-host" key, containing two values, a string
|
||||
;; hostname, and a number TCP port. This causes an HTTP server to be
|
||||
;; spun up on the named port.
|
||||
(canonical-host "localhost" 7827)
|
||||
|
||||
;;===========================================================================
|
||||
;; OPTIONAL:
|
||||
;;===========================================================================
|
||||
;; Exactly one "canonical-baseurl" key, containing a URL string naming
|
||||
;; the base URL used for constructing URLs that are given out to third
|
||||
;; parties, such as subscription endpoints for upstream hubs to use.
|
||||
;;
|
||||
;; This is *just* for URL construction, and does NOT create any HTTP
|
||||
;; listeners. Those are configured with "http-listener" keys,
|
||||
;; documented below.
|
||||
;;
|
||||
(canonical-baseurl "http://localhost:7827/")
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
;; If you want your server to appear under several aliases, add them
|
||||
;; here. HTTP servers will be spun up for all mentioned port numbers,
|
||||
;; and within those servers, `Host` headers matching the given host
|
||||
;; names will be accepted.
|
||||
;; At least one "http-listener" key. These cause an HTTP server to be
|
||||
;; spun up for each mentioned port number. Traffic will only be
|
||||
;; accepted for HTTP Host headers mentioned in these keys.
|
||||
;;
|
||||
;; (accepted-host "localhost" 7827)
|
||||
;; (accepted-host "localhost" 80)
|
||||
;; (accepted-host "www.example.com" 7827)
|
||||
(http-listener "localhost" 7827)
|
||||
;; (http-listener "localhost" 80)
|
||||
;; (http-listener "www.example.com" 7827)
|
||||
;;
|
||||
;; etc.
|
||||
|
||||
|
|
|
@ -12,17 +12,21 @@
|
|||
(require/activate "hub/static-content.rkt")
|
||||
(require/activate "hub/topic-demand.rkt")
|
||||
(require/activate "hub/local-topic.rkt")
|
||||
(require/activate "hub/remote-topic.rkt")
|
||||
(require/activate "hub/subscription.rkt")
|
||||
(require/activate "hub/websocket.rkt")
|
||||
|
||||
(command-line #:program "racketmq"
|
||||
|
||||
#:once-each
|
||||
["--canonical-host" host port "Specify the canonical host and port for this hub"
|
||||
(actor #:name (list 'command-line-canonical-host host port)
|
||||
(assert (config (list 'canonical-host host (string->number port)))))]
|
||||
["--baseurl" baseurl "Specify the canonical base URL for this hub"
|
||||
(actor #:name (list 'command-line-canonical-baseurl baseurl)
|
||||
(assert (config (list 'canonical-baseurl baseurl))))]
|
||||
|
||||
#:multi
|
||||
[("-l" "--listen") host port "Specify one HTTP listener"
|
||||
(actor #:name (list 'command-line-http-listener host port)
|
||||
(assert (config (list 'http-listener host (string->number port)))))]
|
||||
[("-o" "--option") key vals "Specify a single configuration option"
|
||||
(actor #:name (list 'config-option key vals)
|
||||
(assert (config (cons (string->symbol key)
|
||||
|
@ -32,21 +36,20 @@
|
|||
|
||||
(actor #:name 'main
|
||||
|
||||
(during (config (list 'canonical-host $h $p))
|
||||
(assert (canonical-local-host h p))
|
||||
(assert (local-host h p)))
|
||||
(during (config (list 'canonical-baseurl $u))
|
||||
(assert (canonical-baseurl u)))
|
||||
|
||||
(define/query-set canonical-local-hosts ($ c (canonical-local-host _ _)) c)
|
||||
(stop-when (rising-edge (> (set-count (canonical-local-hosts)) 1))
|
||||
(log-error "Too many canonical-host records in configuration."))
|
||||
(define/query-set canonical-baseurls ($ c (canonical-baseurl _)) c)
|
||||
(stop-when (rising-edge (> (set-count (canonical-baseurls)) 1))
|
||||
(log-error "Too many canonical-baseurl records in configuration."))
|
||||
(on-start (sleep 0.1)
|
||||
(when (set-empty? (canonical-local-hosts))
|
||||
(log-error "No canonical-host records specified; try the --canonical-host command line argument")))
|
||||
;; TODO: Make the too-many-canonical-host-records situation recoverable.
|
||||
(when (set-empty? (canonical-baseurls))
|
||||
(log-error "No canonical-baseurl records specified; try the --baseurl command line argument")))
|
||||
;; TODO: Make the too-many-canonical-baseurl-records situation recoverable.
|
||||
;; TODO: And/or, make the whole application quit when it gets into a bad state.
|
||||
|
||||
(during (config (list 'accepted-host $h $p))
|
||||
(assert (local-host h p)))
|
||||
(during (config (list 'http-listener $h $p))
|
||||
(assert (http-listener h p)))
|
||||
|
||||
(during (local-host $host-name $port)
|
||||
(during (http-listener $host-name $port)
|
||||
(assert (vh host-name port))))
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
|
||||
(field [topics (set)])
|
||||
|
||||
(during (local-host $host-name $port)
|
||||
(during (http-listener $host-name $port)
|
||||
(on (web-request-incoming (id req) (vh host-name port) 'put ("topic" (,$topic ())))
|
||||
(when (not (set-member? (topics) topic))
|
||||
(topics (set-add (topics) topic))
|
||||
|
@ -51,10 +51,10 @@
|
|||
(max-count)))
|
||||
(on-stop (log-info "Terminating local topic ~v" topic))
|
||||
|
||||
(during (local-host $host-name $port)
|
||||
(during (canonical-local-host $canonical-host-name $cport)
|
||||
(define hub-url (canonical-url canonical-host-name cport `("hub" ())))
|
||||
(define self-url (canonical-url canonical-host-name cport `("topic" (,topic ()))))
|
||||
(during (http-listener $host-name $port)
|
||||
(during (canonical-baseurl $baseurl)
|
||||
(define hub-url (canonical-url baseurl `("hub" ())))
|
||||
(define self-url (canonical-url baseurl `("topic" (,topic ()))))
|
||||
(define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url))
|
||||
(cons 'link (format "<~a>; rel=self" self-url))))
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
#lang syndicate/actor
|
||||
|
||||
(provide remote-topic-main)
|
||||
|
||||
(require racket/dict)
|
||||
(require racket/set)
|
||||
(require file/sha1)
|
||||
|
@ -34,6 +32,11 @@
|
|||
(check-equal? (shrink-lease 90) 81)
|
||||
(check-equal? (shrink-lease 50) 45))
|
||||
|
||||
(actor #:name 'remote-topic-manager
|
||||
(during/actor (remote-topic-demand $full-topic)
|
||||
#:name (list 'remote-topic full-topic)
|
||||
(remote-topic-main full-topic)))
|
||||
|
||||
(define (remote-topic-main full-topic)
|
||||
(define sub-id (random-hex-string 16))
|
||||
(log-info "Remote sub endpoint ~a for topic ~s" sub-id full-topic)
|
||||
|
@ -62,8 +65,8 @@
|
|||
[#f "disabled"]
|
||||
[n (format "~a seconds" n)])))
|
||||
|
||||
(during (canonical-local-host $canonical-host-name $cport)
|
||||
(define callback (canonical-url canonical-host-name cport `("sub" (,sub-id ()))))
|
||||
(during (canonical-baseurl $baseurl)
|
||||
(define callback (canonical-url baseurl `("sub" (,sub-id ()))))
|
||||
|
||||
(define (refresh-subscription!)
|
||||
;; TODO: shared secret
|
||||
|
@ -161,35 +164,36 @@
|
|||
(* 1000 (or (poll-interval-seconds) 0)))))
|
||||
(poll-upstream!))
|
||||
|
||||
(on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ())))
|
||||
(log-info "Received verification-of-intent: ~v" (web-request-header-query req))
|
||||
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
|
||||
(define lease-seconds (dict-ref (web-request-header-query req) 'hub.lease_seconds #f))
|
||||
(if lease-seconds
|
||||
(next-subscription-refresh (+ (current-inexact-milliseconds)
|
||||
(* 1000.0 (shrink-lease lease-seconds))))
|
||||
(log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic))
|
||||
(web-respond/bytes! id (string->bytes/utf-8 challenge)))
|
||||
(during (http-listener $host-name $port)
|
||||
(on (web-request-get (id req) (vh host-name port) ("sub" (,sub-id ())))
|
||||
(log-info "Received verification-of-intent: ~v" (web-request-header-query req))
|
||||
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
|
||||
(define lease-seconds (dict-ref (web-request-header-query req) 'hub.lease_seconds #f))
|
||||
(if lease-seconds
|
||||
(next-subscription-refresh (+ (current-inexact-milliseconds)
|
||||
(* 1000.0 (shrink-lease lease-seconds))))
|
||||
(log-warning "Upstream hub for topic ~s did not supply hub.lease_seconds" full-topic))
|
||||
(web-respond/bytes! id (string->bytes/utf-8 challenge)))
|
||||
|
||||
(on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body)
|
||||
;; TODO: verify the use of the shared secret
|
||||
(actor*
|
||||
(define parsed-link-headers (parse-link-headers (web-request-header-headers req)))
|
||||
(define upstream-hub (link-header-ref parsed-link-headers 'hub #f))
|
||||
(define upstream-topic (link-header-ref parsed-link-headers 'self #f))
|
||||
(define content-type (web-request-header-content-type req))
|
||||
(log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v"
|
||||
full-topic
|
||||
content-type
|
||||
body
|
||||
upstream-hub
|
||||
upstream-topic)
|
||||
(current-content-hash (sha1 (open-input-bytes body)))
|
||||
(current-content-type content-type)
|
||||
(current-upstream-hub upstream-hub)
|
||||
(send! (notification full-topic
|
||||
upstream-hub
|
||||
upstream-topic
|
||||
body
|
||||
content-type))
|
||||
(web-respond/status! id 201 #"Created")))))
|
||||
(on (web-request-incoming (id req) (vh host-name port) 'post ("sub" (,sub-id ())) $body)
|
||||
;; TODO: verify the use of the shared secret
|
||||
(actor*
|
||||
(define parsed-link-headers (parse-link-headers (web-request-header-headers req)))
|
||||
(define upstream-hub (link-header-ref parsed-link-headers 'hub #f))
|
||||
(define upstream-topic (link-header-ref parsed-link-headers 'self #f))
|
||||
(define content-type (web-request-header-content-type req))
|
||||
(log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v"
|
||||
full-topic
|
||||
content-type
|
||||
body
|
||||
upstream-hub
|
||||
upstream-topic)
|
||||
(current-content-hash (sha1 (open-input-bytes body)))
|
||||
(current-content-type content-type)
|
||||
(current-upstream-hub upstream-hub)
|
||||
(send! (notification full-topic
|
||||
upstream-hub
|
||||
upstream-topic
|
||||
body
|
||||
content-type))
|
||||
(web-respond/status! id 201 #"Created"))))))
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
(actor #:name 'static-content-server
|
||||
(define url->path (make-url->path htdocs-path))
|
||||
(during (local-host $host-name $port)
|
||||
(during (http-listener $host-name $port)
|
||||
(on (web-request-get (id req) (vh host-name port) ,_)
|
||||
(define-values (path path-pieces)
|
||||
(url->path (resource->url (web-request-header-resource req))))
|
||||
|
|
|
@ -13,10 +13,10 @@
|
|||
(require "../protocol.rkt")
|
||||
|
||||
(actor #:name 'hub
|
||||
(during (local-host $host-name $port)
|
||||
(during (canonical-local-host $canonical-host-name $cport)
|
||||
(during (http-listener $host-name $port)
|
||||
(during (canonical-baseurl $baseurl)
|
||||
(on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body)
|
||||
(asynchronous-verification-of-intent id req body canonical-host-name cport)
|
||||
(asynchronous-verification-of-intent id req body baseurl)
|
||||
(web-respond/status! id 202 #"Accepted"))))
|
||||
|
||||
(on (message (update-subscription $topic $callback $settings))
|
||||
|
@ -28,7 +28,7 @@
|
|||
#:on-crash (retract! (subscription topic callback ?))
|
||||
(subscription-main topic callback)))
|
||||
|
||||
(define (asynchronous-verification-of-intent id req body canonical-host-name cport)
|
||||
(define (asynchronous-verification-of-intent id req body baseurl)
|
||||
(actor* #:name 'verification-of-intent
|
||||
(define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body))))
|
||||
(define callback (hash-ref params 'hub.callback))
|
||||
|
@ -38,9 +38,7 @@
|
|||
(define requested-topic (hash-ref params 'hub.topic))
|
||||
(define topic
|
||||
(url->string
|
||||
(combine-url/relative (string->url (canonical-url canonical-host-name
|
||||
cport
|
||||
`("topic" ("" ()))))
|
||||
(combine-url/relative (string->url (canonical-url baseurl `("topic" ("" ()))))
|
||||
requested-topic)))
|
||||
(define requested-lease-seconds
|
||||
(string->number
|
||||
|
|
|
@ -2,44 +2,32 @@
|
|||
|
||||
(require racket/exn)
|
||||
(require racket/set)
|
||||
(require racket/string)
|
||||
(require net/url)
|
||||
|
||||
(require/activate syndicate/drivers/web)
|
||||
(require/activate "remote-topic.rkt")
|
||||
|
||||
(require "../private/util.rkt")
|
||||
(require "../protocol.rkt")
|
||||
|
||||
(actor #:name 'topic-demand-analyzer
|
||||
(define/query-set local-hosts ($ h (local-host _ _)) h)
|
||||
(during/actor (topic-demand $full-topic _)
|
||||
#:name (list 'general-topic full-topic)
|
||||
#:let [(local-hosts-snapshot (local-hosts))]
|
||||
(with-handlers [(exn? (lambda (e)
|
||||
(log-error "Topic demand error: ~a" (exn->string e))))]
|
||||
(match-define (web-resource (web-virtual-host _ topic-host topic-port) topic-path)
|
||||
(url->resource (string->url full-topic)))
|
||||
(define maybe-local-topic
|
||||
(match topic-path [`("topic" (,topic ())) topic] [_ #f]))
|
||||
(general-topic-main full-topic
|
||||
topic-host
|
||||
topic-port
|
||||
maybe-local-topic
|
||||
(set-member? local-hosts-snapshot
|
||||
(local-host topic-host topic-port))))))
|
||||
#:name (list 'topic-demand full-topic)
|
||||
|
||||
(define (general-topic-main full-topic topic-host topic-port maybe-local-topic start-as-local?)
|
||||
(define (local-state)
|
||||
(react (stop-when (retracted (local-host topic-host topic-port))
|
||||
(remote-state))
|
||||
(assert (local-topic-demand maybe-local-topic))))
|
||||
(define/query-value topic-baseurl #f (canonical-baseurl $b)
|
||||
(canonical-url b `("topic" ("" ()))))
|
||||
|
||||
(define (remote-state)
|
||||
(react (stop-when #:when maybe-local-topic (asserted (local-host topic-host topic-port))
|
||||
(local-state))
|
||||
(remote-topic-main full-topic)))
|
||||
(define/dataflow state
|
||||
(cond
|
||||
[(not (topic-baseurl)) 'unknown]
|
||||
[(string-prefix? full-topic (topic-baseurl)) 'local]
|
||||
[else 'remote]))
|
||||
|
||||
(on-start
|
||||
(if (and maybe-local-topic start-as-local?)
|
||||
(local-state)
|
||||
(remote-state))))
|
||||
(begin/dataflow
|
||||
(log-info "Topic-demand for ~s is in state ~s." full-topic (state)))
|
||||
|
||||
(assert #:when (eq? (state) 'local)
|
||||
(local-topic-demand (substring full-topic (string-length (topic-baseurl)))))
|
||||
|
||||
(assert #:when (eq? (state) 'remote)
|
||||
(remote-topic-demand full-topic))))
|
||||
|
|
|
@ -13,21 +13,19 @@
|
|||
(require "../protocol.rkt")
|
||||
|
||||
(actor #:name 'websocket-hub
|
||||
(during (local-host $host-name $port)
|
||||
(during (canonical-local-host $canonical-host-name $cport)
|
||||
(during (http-listener $host-name $port)
|
||||
(during (canonical-baseurl $baseurl)
|
||||
(on (web-request-get (id req) (vh host-name port) ("hub" ()))
|
||||
(when (equal? (dict-ref (web-request-header-headers req) 'upgrade #f) "websocket")
|
||||
(websocket-subscription id req canonical-host-name cport))))))
|
||||
(websocket-subscription id req baseurl))))))
|
||||
|
||||
(define (websocket-subscription id req canonical-host-name cport)
|
||||
(define (websocket-subscription id req baseurl)
|
||||
(actor* #:name (list 'websocket-subscription id)
|
||||
(define params (web-request-header-query req))
|
||||
(define requested-topic (dict-ref params 'hub.topic))
|
||||
(define topic ;; TODO: abstract this expression out (see also subscription.rkt)
|
||||
(url->string
|
||||
(combine-url/relative (string->url (canonical-url canonical-host-name
|
||||
cport
|
||||
`("topic" ("" ()))))
|
||||
(combine-url/relative (string->url (canonical-url baseurl `("topic" ("" ()))))
|
||||
requested-topic)))
|
||||
(define poll-interval-seconds
|
||||
(match (dict-ref params
|
||||
|
|
|
@ -144,5 +144,7 @@
|
|||
|
||||
(define (vh host-name port) (web-virtual-host "http" host-name port))
|
||||
|
||||
(define (canonical-url canonical-host-name cport path)
|
||||
(url->string (resource->url (web-resource (vh canonical-host-name cport) path))))
|
||||
(define (canonical-url baseurl path)
|
||||
(define b (url->resource (string->url baseurl)))
|
||||
(url->string (resource->url (struct-copy web-resource b
|
||||
[path (append-url-path (web-resource-path b) path)]))))
|
||||
|
|
|
@ -8,8 +8,9 @@
|
|||
(struct-out local-topic-config)
|
||||
(struct-out topic-demand)
|
||||
(struct-out local-topic-demand)
|
||||
(struct-out local-host)
|
||||
(struct-out canonical-local-host))
|
||||
(struct-out remote-topic-demand)
|
||||
(struct-out http-listener)
|
||||
(struct-out canonical-baseurl))
|
||||
|
||||
;; A Topic is a URIString.
|
||||
|
||||
|
@ -53,8 +54,11 @@
|
|||
;; (local-topic-demand String)
|
||||
(struct local-topic-demand (name) #:prefab) ;; ASSERTION
|
||||
|
||||
;; (local-host String Number)
|
||||
(struct local-host (name port) #:prefab) ;; ASSERTION
|
||||
;; (remote-topic-demand Topic)
|
||||
(struct remote-topic-demand (topic-name) #:prefab) ;; ASSERTION
|
||||
|
||||
;; (canonical-local-host String Number)
|
||||
(struct canonical-local-host (name port) #:prefab) ;; ASSERTION
|
||||
;; (http-listener String Number)
|
||||
(struct http-listener (name port) #:prefab) ;; ASSERTION
|
||||
|
||||
;; (canonical-baseurl URLString)
|
||||
(struct canonical-baseurl (string) #:prefab) ;; ASSERTION
|
||||
|
|
Loading…
Reference in New Issue