racketmq-2017/rmq/server.rkt

475 lines
20 KiB
Racket

#lang syndicate/actor
(provide )
(require racket/dict)
(require racket/exn)
(require racket/format)
(require racket/set)
(require net/url)
(require net/uri-codec)
(require file/sha1)
(require/activate syndicate/drivers/timer)
(require/activate syndicate/drivers/timestate)
(require/activate syndicate/drivers/web)
(require syndicate/protocol/advertise)
(require syndicate/functional-queue)
(require "private/util.rkt")
(module+ test (require rackunit))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TODO: sane defaults
(define *max-upstream-redirects* 5)
(define *server-port* 7827)
(define *canonical-host* "localhost")
(define *accepted-hosts* (list *canonical-host*))
(define *min-poll-interval* 10) ;; seconds
(define *default-lease* "unbounded")
(define *default-poll-interval* "none")
(define *max-dead-letters* 10)
(define *max-delivery-retries* 10)
(define *initial-retry-delay* 5.0) ;; seconds
(define *retry-delay-multiplier* 1.618)
(define *max-retry-delay* 30) ;; seconds
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; A Topic is a URIString.
;; A Deadline is an (Option Number), where #f indicates "unbounded",
;; and numbers indicate a moment in time as a Unix epoch timestamp as
;; might be returned from `current-seconds`.
;; (notification Topic Bytes (Option String))
(struct notification (topic-name content content-type) #:prefab)
;; (update-subscription Topic URLString (Option SubscriptionSettings))
(struct update-subscription (topic callback settings) #:prefab) ;; message
;; (subscription Topic URLString SubscriptionSettings)
(struct subscription (topic callback settings-) #:prefab) ;; assertion
(struct subscription-settings (expiry-deadline ;; Deadline
canonical-hub ;; URLString
secret ;; Option Bytes
poll-interval-seconds) ;; Option Number
#:prefab)
;; (local-topic-config Topic (Option Number) (Option Number))
(struct local-topic-config (name max-age max-count) #:prefab)
;; (topic-demand Topic (Option Number))
(struct topic-demand (topic-name poll-interval-seconds) #:prefab)
;; (local-topic-demand String)
(struct local-topic-demand (name) #:prefab)
;; (local-host String)
(struct local-host (name) #:prefab)
;; (canonical-local-host String)
(struct canonical-local-host (name) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; (define (number->path n)
;; (let ((top (quotient n 256))
;; (bot (remainder n 256)))
;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0"))
;; (if (zero? top)
;; bot-piece
;; (string-append (number->path top) "-/" bot-piece))))
;; (module+ test
;; (check-equal? (number->path 0) "00")
;; (check-equal? (number->path 123) "7b")
;; (check-equal? (number->path 12345) "30-/39")
;; (check-equal? (number->path 123456789012345678901234567890)
;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2"))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (vh host-name) (web-virtual-host "http" host-name *server-port*))
(actor #:name 'main
(for [(h *accepted-hosts*)]
(assert (local-host h)))
(assert (canonical-local-host *canonical-host*))
(during (local-host $host-name)
(assert (vh host-name))
(on (web-request-get (id req) (vh host-name) ("" ()))
(actor*
(web-respond/xexpr! id
`(html
(body
(h1 "Status"
" "
,(url->string
(resource->url
(web-request-header-resource req)))))))))))
(actor #:name 'local-topic-manager
(field [topics (set)])
(during (local-host $host-name)
(on (web-request-incoming (id req) (vh host-name) 'put ("topic" (,$topic ())))
(when (not (set-member? (topics) topic))
(topics (set-add (topics) topic))
(assert! (local-topic-demand topic))
(retract! (local-topic-config topic ? ?))
(assert! (local-topic-config topic #f #f))) ;; TODO: maximums
(web-respond/bytes! id #""))
(on (web-request-incoming (id req) (vh host-name) 'delete ("topic" (,$topic ())))
(when (set-member? (topics) topic)
(topics (set-remove (topics) topic))
(retract! (local-topic-demand topic))
(retract! (local-topic-config topic ? ?)))
(web-respond/bytes! id #"")))
(during/actor (local-topic-demand $topic)
#:name (list 'local-topic topic)
(local-topic-main topic)))
(actor #:name 'topic-demand-analyzer
(define/query-set local-hosts (local-host $host-name) host-name)
(during/actor (topic-demand $full-topic _)
#:name (list 'general-topic full-topic)
#:let [(local-hosts-snapshot (local-hosts))]
(with-handlers [(exn? (lambda (e)
(log-error "Topic demand error: ~a" (exn->string e))))]
(match-define (web-resource (web-virtual-host _ topic-host _) topic-path)
(url->resource (string->url full-topic)))
(define maybe-local-topic
(match topic-path [`("topic" (,topic ())) topic] [_ #f]))
(general-topic-main full-topic
topic-host
maybe-local-topic
(set-member? local-hosts-snapshot topic-host)))))
(define (general-topic-main full-topic topic-host maybe-local-topic start-as-local?)
(define (local-state)
(react (stop-when (retracted (local-host topic-host)) (remote-state))
(assert (local-topic-demand maybe-local-topic))))
(define (remote-state)
(react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state))
(remote-topic-main full-topic)))
(on-start
(if (and maybe-local-topic start-as-local?)
(local-state)
(remote-state))))
(define (req-content-type req)
(dict-ref (web-request-header-headers req) 'content-type #f))
(define (canonical-url canonical-host-name path)
(url->string (resource->url (web-resource (vh canonical-host-name) path))))
(define (canonical-topic-base-url canonical-hub-str)
(combine-url/relative (string->url canonical-hub-str) "/topic/"))
(define (local-topic-main topic)
(field [max-age #f]
[max-count #f])
(field [current-content #f]
[current-content-type #f]
[last-modified-seconds #f])
(on (asserted (local-topic-config topic $age $count))
(max-age age)
(max-count count))
(on-start (log-info "Creating local topic ~v" topic))
(begin/dataflow
(log-info "Configured local topic ~v, max-age ~v, max-count ~v"
topic
(max-age)
(max-count)))
(on-stop (log-info "Terminating local topic ~v" topic))
(during (local-host $host-name)
(during (canonical-local-host $canonical-host-name)
(define hub-url (canonical-url canonical-host-name `("hub" ())))
(define self-url (canonical-url canonical-host-name `("topic" (,topic ()))))
(define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url))
(cons 'link (format "<~a>; rel=self" self-url))))
(define (topic-response id include-content?) ;; Used in both GET and HEAD requests
(if (current-content)
(web-respond/bytes! id
#:header (web-response-header
#:last-modified-seconds (last-modified-seconds)
#:mime-type (and (current-content-type)
(string->bytes/utf-8
(current-content-type)))
#:headers discovery-headers)
(if include-content? (current-content) #""))
(web-respond/bytes! id
#:header (web-response-header
#:code 204
#:message #"No Content"
#:mime-type #f
#:headers discovery-headers)
#""))) ;; MUST NOT include a response body for 204
(on (web-request-incoming (id req) (vh host-name) 'head ("topic" (,topic ())))
(topic-response id #f))
(on (web-request-get (id req) (vh host-name) ("topic" (,topic ())))
(topic-response id #t)))
(on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body)
(define content-type (req-content-type req))
(log-info "Local topic ~a got ~v message ~v" topic content-type body)
(current-content body)
(current-content-type content-type)
(last-modified-seconds (current-seconds))
(actor*
(define full-topic (url->string (resource->url (web-request-header-resource req))))
(send! (notification full-topic body content-type))
(web-respond/status! id 201 #"Created")))))
(define (remote-topic-main full-topic)
(define sub-id (random-hex-string 16))
(log-info "Remote sub endpoint ~a" sub-id)
(field [current-content-hash #f]
[current-content-type #f]
[current-upstream-hub #f]
[established-upstream-hub #f])
(field [last-upstream-check 0]
[poll-interval-seconds #f])
(define/query-set poll-intervals (topic-demand full-topic $i) i)
(begin/dataflow
(define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))]
(cond [(not i) candidate]
[(< candidate i) candidate]
[else i])))
(poll-interval-seconds (and candidate (max candidate *min-poll-interval*))))
(begin/dataflow
(log-info "Poll interval for ~a is now ~a"
full-topic
(match (poll-interval-seconds)
[#f "disabled"]
[n (format "~a seconds" n)])))
(during (canonical-local-host $canonical-host-name)
(define callback (canonical-url canonical-host-name `("sub" (,sub-id ()))))
(define (refresh-subscription!)
(when (current-upstream-hub)
(web-request! 'post (current-upstream-hub)
#:body (string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,callback)
(hub.mode . "subscribe")
(hub.topic . ,full-topic)))))
(established-upstream-hub (current-upstream-hub))))
(define (unsubscribe!)
(when (established-upstream-hub)
(web-request! 'post (established-upstream-hub)
#:body (string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,callback)
(hub.mode . "unsubscribe")
(hub.topic . ,full-topic)))))
(established-upstream-hub #f)))
(begin/dataflow
(when (not (equal? (current-upstream-hub) (established-upstream-hub)))
(unsubscribe!)
(refresh-subscription!)))
(on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0)))))
(log-info "Checking upstream ~a" full-topic)
(define-values (resp response-body)
(web-request! 'get full-topic #:redirect-budget *max-upstream-redirects*))
(last-upstream-check (current-inexact-milliseconds))
(match (web-response-header-code-type resp)
['successful
(define new-content-hash (sha1 (open-input-bytes response-body)))
(define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f))
(when (not (and (equal? (current-content-hash) new-content-hash)
(equal? (current-content-type) new-content-type)))
(current-content-hash new-content-hash)
(current-content-type new-content-type)
(send! (notification full-topic response-body new-content-type)))
(define upstream-hub
(match (hash-ref (parse-link-headers (web-response-header-headers resp)) 'hub '())
[(cons hub-url _) hub-url]
['() #f]))
(if (equal? (current-upstream-hub) upstream-hub)
(refresh-subscription!)
(current-upstream-hub upstream-hub))]
[other
(log-warning "Upstream ~a yielded ~a code ~a"
full-topic
other
(web-response-header-code resp))]))
(on (web-request-get (id req) (vh canonical-host-name) ("sub" (,sub-id ())))
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
(log-info "Received verification-of-intent: ~v" (web-request-header-query req))
(web-respond/bytes! id (string->bytes/utf-8 challenge)))
(on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body)
(actor*
(define content-type (req-content-type req))
(log-info "Remote topic ~a got ~v message ~v" full-topic content-type body)
(current-content-hash (sha1 (open-input-bytes body)))
(current-content-type content-type)
(send! (notification full-topic body content-type))
(web-respond/status! id 201 #"Created")))))
(actor #:name 'hub
(during (local-host $host-name)
(on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body)
(asynchronous-verification-of-intent id req body)
(web-respond/status! id 202 #"Accepted")))
(on (message (update-subscription $topic $callback $settings))
(retract! (subscription topic callback ?))
(when settings (assert! (subscription topic callback settings))))
(during/actor (subscription $topic $callback _)
#:name (list 'subscription topic callback)
#:on-crash (retract! (subscription topic callback ?))
(subscription-main topic callback)))
(define (asynchronous-verification-of-intent id req body)
(actor* (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body))))
(define callback (hash-ref params 'hub.callback))
(define mode (match (hash-ref params 'hub.mode)
["subscribe" 'subscribe]
["unsubscribe" 'unsubscribe]))
(define topic (hash-ref params 'hub.topic))
(define lease-seconds (match (hash-ref params 'hub.lease_seconds *default-lease*)
["unbounded" #f]
[n (string->number n)]))
(define poll-interval-seconds
(match (hash-ref params 'hub.poll_interval_seconds *default-poll-interval*)
["none" #f]
[n (string->number n)]))
(define secret-string (hash-ref params 'hub.secret #f))
(define secret-bytes (and secret-string (string->bytes/utf-8 secret-string)))
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
(define canonical-hub (url->string (resource->url (web-request-header-resource req))))
(match mode
['subscribe
(when (subscription-change-validate "subscribe"
(or lease-seconds "unbounded")
topic
callback)
(send! (update-subscription topic
callback
(subscription-settings expiry-deadline
canonical-hub
secret-bytes
poll-interval-seconds))))]
['unsubscribe
(when (subscription-change-validate "unsubscribe" #f topic callback)
(send! (update-subscription topic callback #f)))])))
(define (subscription-change-validate mode lease topic callback)
(define challenge (random-hex-string 16))
(define id (gensym 'validation))
(define extra-query (list* (cons 'hub.mode mode)
(cons 'hub.topic topic)
(cons 'hub.challenge challenge)
(if lease
(list (cons 'hub.lease_seconds (~a lease)))
(list))))
(define-values (resp body)
(web-request! 'get
(let* ((u (string->url callback)))
(url->string
(struct-copy url u [query (append (url-query u) extra-query)])))))
(and (eq? (web-response-header-code-type resp) 'successful)
(equal? body (string->bytes/utf-8 challenge))))
(define (subscription-main partial-topic callback)
(field [delivery-active? #f]
[message-queue (make-queue)]
[dead-letters (make-queue)])
(stop-when (rising-edge (> (queue-length (dead-letters)) *max-dead-letters*))
(log-info "Too many dead letters for ~a" callback))
(define (deliver-queued-notifications canonical-hub)
(delivery-active? #t)
(let deliver-rest ((retry-delay *initial-retry-delay*))
(when (not (queue-empty? (message-queue)))
(define-values (n newq) (dequeue (message-queue)))
(message-queue newq)
(match-define (notification topic body content-type) n)
(define link-headers (list (cons 'link (format "<~a>; rel=hub" canonical-hub))
(cons 'link (format "<~a>; rel=self" topic))))
(let deliver-one ((retries-remaining *max-delivery-retries*)
(retry-delay retry-delay))
(define-values (resp _body)
(web-request! 'post
callback
#:headers (if content-type
(cons (cons 'content-type content-type) link-headers)
link-headers)
#:body body))
(cond
[(eq? (web-response-header-code-type resp) 'successful)
(deliver-rest *initial-retry-delay*)]
[(zero? retries-remaining)
(log-info "Dead letter for ~v" callback)
(dead-letters (enqueue (dead-letters) n))
(deliver-rest retry-delay)]
[else
(log-info
"Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v"
callback
retry-delay
retries-remaining
resp)
(sleep retry-delay)
(deliver-one (- retries-remaining 1)
(min (* retry-delay *retry-delay-multiplier*)
*max-retry-delay*))]))))
(delivery-active? #f))
(during (subscription partial-topic callback (subscription-settings
$expiry-deadline
$canonical-hub
$secret-bytes
$poll-interval-seconds))
(define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub)
partial-topic)))
(assert (topic-demand topic poll-interval-seconds))
(on-start (log-info "Subscription configured: ~v"
`((topic ,topic)
(expiry-deadline ,expiry-deadline)
(canonical-hub ,canonical-hub)
(callback ,callback)
(secret-bytes ,secret-bytes)
(poll-interval-seconds ,poll-interval-seconds))))
(stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0)))
(log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback))))
(on (message ($ n (notification topic _ _)))
(message-queue (enqueue (message-queue) n))
(when (not (delivery-active?))
(deliver-queued-notifications canonical-hub)))))