racketmq-2017/rmq/server.rkt

265 lines
11 KiB
Racket
Raw Normal View History

#lang syndicate/actor
2015-09-30 16:17:50 +00:00
2016-10-29 11:16:29 +00:00
(provide )
(require racket/dict)
(require racket/format)
(require racket/set)
(require net/url)
(require net/uri-codec)
(require/activate syndicate/drivers/timer)
(require/activate syndicate/drivers/web)
(require syndicate/protocol/advertise)
(require syndicate/functional-queue)
2016-10-29 11:16:29 +00:00
(require "private/util.rkt")
(module+ test (require rackunit))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; A Topic is a URIString.
;; A Deadline is an (Option Number), where #f indicates "unbounded",
;; and numbers indicate a moment in time as a Unix epoch timestamp as
;; might be returned from `current-seconds`.
;; (notification Topic Bytes String (Option URIString))
(struct notification (topic-name content content-type) #:prefab)
;; (subscription Topic Deadline URLString URLString (Option Bytes))
(struct subscription (topic-name expiry-deadline canonical-hub callback secret) #:prefab)
;; (local-topic Topic (Option Number) (Option Number))
(struct local-topic (name max-age max-count) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; (define (number->path n)
;; (let ((top (quotient n 256))
;; (bot (remainder n 256)))
;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0"))
;; (if (zero? top)
;; bot-piece
;; (string-append (number->path top) "-/" bot-piece))))
;; (module+ test
;; (check-equal? (number->path 0) "00")
;; (check-equal? (number->path 123) "7b")
;; (check-equal? (number->path 12345) "30-/39")
;; (check-equal? (number->path 123456789012345678901234567890)
;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2"))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define vh (web-virtual-host "http" ? 7827))
(actor #:name 'main
(assert vh)
(on (web-request-get (id req) vh ("" ()))
(actor*
(web-respond/xexpr! id
`(html
(body
(h1 "Status"
" "
,(url->string
(resource->url
(web-request-header-resource req))))))))))
(actor #:name 'local-topic-manager
(field [topics (set)])
(on (web-request-incoming (id req) vh 'put ("topic" (,$topic ())))
(when (not (set-member? (topics) topic))
(topics (set-add (topics) topic))
(retract! (local-topic topic ? ?))
(assert! (local-topic topic #f #f))) ;; TODO: maximums
(web-respond/bytes! id #""))
(on (web-request-incoming (id req) vh 'delete ("topic" (,$topic ())))
(when (set-member? (topics) topic)
(topics (set-remove (topics) topic))
(retract! (local-topic topic ? ?)))
(web-respond/bytes! id #""))
(during/actor (local-topic $topic _ _)
#:name (list 'topic topic)
(local-topic-main topic)))
(define (local-topic-main topic)
(field [max-age #f]
[max-count #f])
(on (asserted (local-topic topic $age $count))
(max-age age)
(max-count count))
(on-start (log-info "Creating local topic ~v" topic))
(begin/dataflow
(log-info "Configured local topic ~v, max-age ~v, max-count ~v"
topic
(max-age)
(max-count)))
(on-stop (log-info "Terminating local topic ~v" topic))
(on (web-request-incoming (id req) vh 'post ("topic" (,topic ())) $body)
(actor*
(define content-type
(dict-ref (web-request-header-headers req) 'content-type "application/octet-stream"))
(log-info "Topic ~a got ~v message! ~v" topic content-type body)
(send! (notification (url->string (resource->url (web-request-header-resource req)))
body
content-type))
(web-respond/status! id 201 #"Created"))))
(actor #:name 'hub
(on (web-request-incoming (id req) vh 'post ("hub" ()) $body)
;; Initially, I had an (actor* ...) form here for fault
;; isolation. However, this led to problems since I wanted
;; to use `assert!` and `retract!` to signal to the
;; `during/actor`, and the assertions were being lost as
;; the `actor*` terminated. So instead, I'm using Rackety
;; `with-handlers`.
(define ok?
(with-handlers [(values (lambda (e) #f))]
(define params (make-immutable-hash
(form-urlencoded->alist (bytes->string/utf-8 body))))
(define callback (hash-ref params 'hub.callback))
(define mode
(match (hash-ref params 'hub.mode)
["subscribe" 'subscribe]
["unsubscribe" 'unsubscribe]))
(define topic (hash-ref params 'hub.topic))
(define lease-seconds
(match (hash-ref params 'hub.lease_seconds "unbounded")
["unbounded" #f]
[n (string->number n)]))
(define secret-string (hash-ref params 'hub.secret #f))
(define secret-bytes (and secret-string (string->bytes/utf-8 secret-string)))
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
(define canonical-hub
(url->string (resource->url (web-request-header-resource req))))
(match mode
['subscribe
(if (subscription-change-validate "subscribe"
(or lease-seconds "unbounded")
topic
callback)
(begin
(retract! (subscription topic ? ? callback ?))
(assert!
(subscription topic expiry-deadline canonical-hub callback secret-bytes))
#t)
#f)]
['unsubscribe
(if (subscription-change-validate "unsubscribe"
#f
topic
callback)
(begin
(retract! (subscription topic ? ? callback ?))
#t)
#f)])))
(if ok?
(web-respond/status! id 202 #"Accepted")
(web-respond/status! id 403 #"Forbidden" #"Validation failed")))
(during/actor (subscription $topic _ _ $callback _)
#:name (list 'subscription topic callback)
#:on-crash (retract! (subscription topic ? ? callback ?))
(subscription-main topic callback)))
(define (subscription-change-validate mode lease topic callback)
2016-10-29 11:16:29 +00:00
(define challenge (random-hex-string 16))
(define id (gensym 'validation))
(define extra-query (list* (cons 'hub.mode mode)
(cons 'hub.topic topic)
(cons 'hub.challenge challenge)
(if lease
(list (cons 'hub.lease_seconds (~a lease)))
(list))))
(define-values (resp body)
(web-request! 'get
(let* ((u (string->url callback)))
(url->string
(struct-copy url u [query (append (url-query u) extra-query)])))))
(and (eq? (web-response-header-code-type resp) 'successful)
(equal? body (string->bytes/utf-8 challenge))))
(define (canonical-topic-base-url canonical-hub-str)
(combine-url/relative (string->url canonical-hub-str) "/topic/"))
(define (subscription-main partial-topic callback)
(field [expiry-timer-id #f]
[delivery-active? #f]
[message-queue (make-queue)]
[dead-letters (make-queue)])
(stop-when (rising-edge (> (queue-length (dead-letters)) 1))
(log-info "Too many dead letters for ~a" callback))
(define (deliver-queued-notifications canonical-hub)
(delivery-active? #t)
(let deliver-rest ()
(when (not (queue-empty? (message-queue)))
(define-values (n newq) (dequeue (message-queue)))
(message-queue newq)
(match-define (notification topic body content-type) n)
(let deliver-one ((retries-remaining 1)
(retry-delay 5.0))
(define-values (resp _body)
(web-request! 'post
callback
#:headers (list (cons 'content-type content-type)
(cons 'link (format "<~a>; rel=hub" canonical-hub))
(cons 'link (format "<~a>; rel=self" topic)))
#:body body))
(cond
[(eq? (web-response-header-code-type resp) 'successful)
(deliver-rest)]
[(zero? retries-remaining)
(log-info "Dead letter for ~v" callback)
(dead-letters (enqueue (dead-letters) n))
(deliver-rest)]
[else
(log-info
"Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v"
callback
retry-delay
retries-remaining
resp)
(sleep retry-delay)
(deliver-one (- retries-remaining 1)
(min (* retry-delay 1.618) 30))]))))
(delivery-active? #f))
(during (subscription partial-topic $expiry-deadline $canonical-hub callback $secret-bytes)
(define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub)
partial-topic)))
(on-start (log-info "Subscription configured: ~v ~v ~v ~v"
topic
expiry-deadline
canonical-hub
secret-bytes)
(cond
[expiry-deadline
(expiry-timer-id (gensym 'subscription-expiry))
(log-info "Subscription will expire at ~a" expiry-deadline)
(send! (set-timer (expiry-timer-id) (* expiry-deadline 1000.0) 'absolute))]
[else
(log-info "Subscription will not expire")
(expiry-timer-id #f)]))
(stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _))
(log-info "Subscription expired"))
(on (message ($ n (notification topic _ _)))
(message-queue (enqueue (message-queue) n))
(when (not (delivery-active?))
(deliver-queued-notifications canonical-hub)))))