From d9866843f2554327fb4d436e16afab55be38f03e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 30 Oct 2016 20:31:29 -0400 Subject: [PATCH] Flesh out server significantly; stub test utility --- rmq/poke.rkt | 93 +++++++++++++++++++++++++++++++++++++ rmq/private/util.rkt | 38 ++++++++++++++- rmq/server.rkt | 107 ++++++++++++++++++++++++++++--------------- 3 files changed, 200 insertions(+), 38 deletions(-) create mode 100644 rmq/poke.rkt diff --git a/rmq/poke.rkt b/rmq/poke.rkt new file mode 100644 index 0000000..29abe3e --- /dev/null +++ b/rmq/poke.rkt @@ -0,0 +1,93 @@ +#lang syndicate/actor + +(require racket/dict) +(require racket/format) +(require racket/set) +(require racket/string) +(require racket/port) +(require net/url) +(require net/uri-codec) + +(require/activate syndicate/drivers/timer) +(require/activate syndicate/drivers/web) + +(require "private/util.rkt") + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define vh (web-virtual-host "http" ? 7000)) ;; client + +(define server-res (url->resource (string->url "http://localhost:7827/"))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (request! verb path #:body [body #""]) + (web-request! verb + (url->string (resource->url (struct-copy web-resource server-res [path path]))) + #:body body)) + +(let ((e (read-bytes-line-evt (current-input-port) 'any))) + (define (print-prompt) + (printf "> ") + (flush-output)) + (actor (on-start (print-prompt)) + (stop-when (message (inbound (external-event e (list (? eof-object? _)))))) + (assert 'shell-running) + (on (message (inbound (external-event e (list (? bytes? $bs))))) + (match (string-split (string-trim (bytes->string/utf-8 bs))) + [(list "topic" topic) + (request! 'put `("topic" (,topic ())))] + [(list* "pub" topic strs) + (request! 'post `("topic" (,topic ())) + #:body (string->bytes/utf-8 (string-join strs)))] + [(list "sub" topic) + (spawn-subscriber topic)] + [(list) + (void)] + [_ + (printf "Unexpected input\n")]) + (sleep 0.1) + (print-prompt)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(actor #:name 'main + (stop-when (retracted 'shell-running)) + + (assert vh) + + (on (web-request-get (id req) vh ("" ())) + (actor* + (web-respond/xexpr! id + `(html + (body + (h1 "Poke running"))))))) + +(actor #:name 'sink + (on (web-request-incoming (id req) vh 'post ("sink" ()) $body) + (printf "SINK POST: ~v >>> ~v\n" req body) + (if (equal? body #"fail") + (web-respond/status! id 500 #"Deliberate failure") + (web-respond/bytes! id #""))) + (on (web-request-get (id req) vh ("sink" ())) + (printf "SINK GET: ~v\n" req) + (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) + (web-respond/bytes! id (string->bytes/utf-8 challenge)))) + +(actor #:name 'incoming-tracer + (assert (observe (web-response-complete _ _ _))) ;; :-( See journal for 30 Oct 2016 + (on (web-request-incoming (id req) vh $verb ,$path $body) + (printf "~a ==> ~a ~v ~v\n" id verb path body) + (react (stop-when (message (web-response-complete id $resp $body)) + (printf "~a <== ~v ~v\n" id resp body)) + (on-start (send! (set-timer (list 'incoming-tracer id) 1000 'relative))) + (stop-when (message (timer-expired (list 'incoming-tracer id) _)) + (printf "~a <== timeout\n" id))))) + +(define (spawn-subscriber topic) + (request! 'post `("hub" ()) + #:body (string->bytes/utf-8 + (alist->form-urlencoded + `((hub.callback . "http://localhost:7000/sink") + (hub.mode . "subscribe") + (hub.topic . ,topic)))))) diff --git a/rmq/private/util.rkt b/rmq/private/util.rkt index 9d1b2f7..15e7828 100644 --- a/rmq/private/util.rkt +++ b/rmq/private/util.rkt @@ -1,14 +1,25 @@ #lang racket/base -(provide random-hex-string - extend-url-string-query) +(provide sleep + random-hex-string + extend-url-string-query + web-respond/status! + web-request!) (require (only-in file/sha1 bytes->hex-string)) (require (only-in racket/random crypto-random-bytes)) (require net/url) +(require syndicate/actor) +(require syndicate/drivers/timer) +(require syndicate/drivers/web) (module+ test (require rackunit)) +(define (sleep sec) + (define timer-id (gensym 'sleep)) + (until (message (timer-expired timer-id _)) + (on-start (send! (set-timer timer-id (* sec 1000.0) 'relative))))) + (define (random-hex-string half-length) (bytes->hex-string (crypto-random-bytes half-length))) @@ -16,6 +27,29 @@ (define u (string->url urlstr)) (url->string (struct-copy url u [query (append (url-query u) extension)]))) +(define (web-respond/status! id code message [body #""]) + (web-respond/bytes! id + #:header (web-response-header + #:code code + #:message message + #:headers '()) + body)) + +(define (web-request! verb urlstr #:body [body #""] #:headers [headers '()]) + (define req-id (gensym 'req)) + (define u (string->url urlstr)) + (define res (url->resource u)) + (react/suspend (k) + (on-start + (printf "~a --> ~a ~a ~v\n" req-id verb urlstr body) + (send! (web-request req-id + 'outbound + (web-request-header verb res headers (url-query u)) + body))) + (stop-when (message (web-response-complete req-id $resp $body)) + (printf "~a <-- ~v ~v\n" req-id resp body) + (k resp body)))) + (module+ test (check-equal? (extend-url-string-query "http://localhost/" '((a . "hi"))) "http://localhost/?a=hi") diff --git a/rmq/server.rkt b/rmq/server.rkt index 13b9072..ebb995d 100644 --- a/rmq/server.rkt +++ b/rmq/server.rkt @@ -2,6 +2,7 @@ (provide ) +(require racket/dict) (require racket/format) (require racket/set) (require net/url) @@ -10,6 +11,7 @@ (require/activate syndicate/drivers/timer) (require/activate syndicate/drivers/web) (require syndicate/protocol/advertise) +(require syndicate/functional-queue) (require "private/util.rkt") @@ -89,14 +91,6 @@ #:name (list 'topic topic) (local-topic-main topic))) -(define (web-respond/status! id code message [body #""]) - (web-respond/bytes! id - #:header (web-response-header - #:code code - #:message message - #:headers '()) - body)) - (define (local-topic-main topic) (field [max-age #f] [max-count #f]) @@ -116,9 +110,8 @@ (on (web-request-incoming (id req) vh 'post ("topic" (,topic ())) $body) (actor* (define content-type - (cdr (or (assq 'content-type (web-request-header-headers req)) - '(content-type . "application/octet-stream")))) - (log-info "Got ~v message! ~v" content-type body) + (dict-ref (web-request-header-headers req) 'content-type "application/octet-stream")) + (log-info "Topic ~a got ~v message! ~v" topic content-type body) (send! (notification (url->string (resource->url (web-request-header-resource req))) body content-type)) @@ -178,11 +171,10 @@ (during/actor (subscription $topic _ _ $callback _) #:name (list 'subscription topic callback) + #:on-crash (retract! (subscription topic ? ? callback ?)) (subscription-main topic callback))) (define (subscription-change-validate mode lease topic callback) - (define u (string->url callback)) - (define res (url->resource u)) (define challenge (random-hex-string 16)) (define id (gensym 'validation)) (define extra-query (list* (cons 'hub.mode mode) @@ -191,23 +183,66 @@ (if lease (list (cons 'hub.lease_seconds (~a lease))) (list)))) - (react/suspend - (k) - (on-start (send! (web-request id - 'outbound - (web-request-header 'get - res - '() - (append (url-query u) extra-query)) - '()))) - (on (message (web-response-complete id $resp $body)) - (k (and (<= 200 (web-response-header-code resp) 299) - (equal? body (string->bytes/utf-8 challenge))))))) + (define-values (resp body) + (web-request! 'get + (let* ((u (string->url callback))) + (url->string + (struct-copy url u [query (append (url-query u) extra-query)]))))) + (and (eq? (web-response-header-code-type resp) 'successful) + (equal? body (string->bytes/utf-8 challenge)))) -(define (subscription-main topic callback) - (field [expiry-timer-id #f]) - (during (subscription topic $expiry-deadline $canonical-hub callback $secret-bytes) - (on-start (log-info "Subscription configured: ~v ~v ~v" +(define (canonical-topic-base-url canonical-hub-str) + (combine-url/relative (string->url canonical-hub-str) "/topic/")) + +(define (subscription-main partial-topic callback) + (field [expiry-timer-id #f] + [delivery-active? #f] + [message-queue (make-queue)] + [dead-letters (make-queue)]) + + (stop-when (rising-edge (> (queue-length (dead-letters)) 1)) + (log-info "Too many dead letters for ~a" callback)) + + (define (deliver-queued-notifications canonical-hub) + (delivery-active? #t) + (let deliver-rest () + (when (not (queue-empty? (message-queue))) + (define-values (n newq) (dequeue (message-queue))) + (message-queue newq) + (match-define (notification topic body content-type) n) + (let deliver-one ((retries-remaining 1) + (retry-delay 5.0)) + (define-values (resp _body) + (web-request! 'post + callback + #:headers (list (cons 'content-type content-type) + (cons 'link (format "<~a>; rel=hub" canonical-hub)) + (cons 'link (format "<~a>; rel=self" topic))) + #:body body)) + (cond + [(eq? (web-response-header-code-type resp) 'successful) + (deliver-rest)] + [(zero? retries-remaining) + (log-info "Dead letter for ~v" callback) + (dead-letters (enqueue (dead-letters) n)) + (deliver-rest)] + [else + (log-info + "Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v" + callback + retry-delay + retries-remaining + resp) + (sleep retry-delay) + (deliver-one (- retries-remaining 1) + (min (* retry-delay 1.618) 30))])))) + (delivery-active? #f)) + + (during (subscription partial-topic $expiry-deadline $canonical-hub callback $secret-bytes) + (define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub) + partial-topic))) + (on-start (log-info "Subscription configured: ~v ~v ~v ~v" + topic expiry-deadline canonical-hub secret-bytes) @@ -219,11 +254,11 @@ [else (log-info "Subscription will not expire") (expiry-timer-id #f)])) - (stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _)) - (log-info "Subscription expired")))) -(actor #:name 'sink - (on (web-request-get (id req) vh ("sink" ())) - (define challenge (cdr (or (assq 'hub.challenge (web-request-header-query req)) - (cons 'hub.challenge "")))) - (web-respond/bytes! id (string->bytes/utf-8 challenge)))) + (stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _)) + (log-info "Subscription expired")) + + (on (message ($ n (notification topic _ _))) + (message-queue (enqueue (message-queue) n)) + (when (not (delivery-active?)) + (deliver-queued-notifications canonical-hub)))))