From 223033f664358e514c77e5e7ecdcc57da8a30d2e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 31 Oct 2016 14:49:33 -0400 Subject: [PATCH] Steps toward remote topics --- rmq/server.rkt | 236 ++++++++++++++++++++++++++++++------------------- 1 file changed, 144 insertions(+), 92 deletions(-) diff --git a/rmq/server.rkt b/rmq/server.rkt index ebb995d..cda879c 100644 --- a/rmq/server.rkt +++ b/rmq/server.rkt @@ -3,6 +3,7 @@ (provide ) (require racket/dict) +(require racket/exn) (require racket/format) (require racket/set) (require net/url) @@ -31,8 +32,17 @@ ;; (subscription Topic Deadline URLString URLString (Option Bytes)) (struct subscription (topic-name expiry-deadline canonical-hub callback secret) #:prefab) -;; (local-topic Topic (Option Number) (Option Number)) -(struct local-topic (name max-age max-count) #:prefab) +;; (local-topic-config Topic (Option Number) (Option Number)) +(struct local-topic-config (name max-age max-count) #:prefab) + +;; (topic-demand Topic) +(struct topic-demand (topic-name) #:prefab) + +;; (local-topic-demand String) +(struct local-topic-demand (name) #:prefab) + +;; (local-host String) +(struct local-host (name) #:prefab) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -53,49 +63,83 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(define vh (web-virtual-host "http" ? 7827)) +(define (vh host-name) (web-virtual-host "http" host-name 7827)) (actor #:name 'main - (assert vh) + (assert (local-host "localhost")) - (on (web-request-get (id req) vh ("" ())) - (actor* - (web-respond/xexpr! id - `(html - (body - (h1 "Status" - " " - ,(url->string - (resource->url - (web-request-header-resource req)))))))))) + (during (local-host $host-name) + (assert (vh host-name)) + + (on (web-request-get (id req) (vh host-name) ("" ())) + (actor* + (web-respond/xexpr! id + `(html + (body + (h1 "Status" + " " + ,(url->string + (resource->url + (web-request-header-resource req))))))))))) (actor #:name 'local-topic-manager (field [topics (set)]) - (on (web-request-incoming (id req) vh 'put ("topic" (,$topic ()))) - (when (not (set-member? (topics) topic)) - (topics (set-add (topics) topic)) - (retract! (local-topic topic ? ?)) - (assert! (local-topic topic #f #f))) ;; TODO: maximums - (web-respond/bytes! id #"")) + (during (local-host $host-name) + (on (web-request-incoming (id req) (vh host-name) 'put ("topic" (,$topic ()))) + (when (not (set-member? (topics) topic)) + (topics (set-add (topics) topic)) + (assert! (local-topic-demand topic)) + (retract! (local-topic-config topic ? ?)) + (assert! (local-topic-config topic #f #f))) ;; TODO: maximums + (web-respond/bytes! id #"")) - (on (web-request-incoming (id req) vh 'delete ("topic" (,$topic ()))) - (when (set-member? (topics) topic) - (topics (set-remove (topics) topic)) - (retract! (local-topic topic ? ?))) - (web-respond/bytes! id #"")) + (on (web-request-incoming (id req) (vh host-name) 'delete ("topic" (,$topic ()))) + (when (set-member? (topics) topic) + (topics (set-remove (topics) topic)) + (retract! (local-topic-demand topic)) + (retract! (local-topic-config topic ? ?))) + (web-respond/bytes! id #""))) - (during/actor (local-topic $topic _ _) - #:name (list 'topic topic) - (local-topic-main topic))) + (during/actor (local-topic-demand $topic) + #:name (list 'local-topic topic) + (local-topic-main topic))) + +(actor #:name 'topic-demand-analyzer + (define/query-set local-hosts (local-host $host-name) host-name) + (during (topic-demand $full-topic) + (with-handlers [(exn? (lambda (e) + (log-error "Topic demand error: ~a" (exn->string e))))] + (match-define (web-resource (web-virtual-host _ topic-host _) topic-path) + (url->resource (string->url full-topic))) + (define maybe-local-topic + (match topic-path [`("topic" (,topic ())) topic] [_ #f])) + (spawn-general-topic full-topic + topic-host + maybe-local-topic + (set-member? (local-hosts) topic-host))))) + +(define (spawn-general-topic full-topic topic-host maybe-local-topic maybe-start-as-local?) + (actor* #:name (list 'general-topic full-topic) + (define (local-state) + (react (stop-when (retracted (local-host topic-host)) (remote-state)) + (assert (local-topic-demand maybe-local-topic)))) + + (define (remote-state) + (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state)) + (remote-topic-main full-topic))) + + (if (and maybe-local-topic maybe-start-as-local?) + (local-state) + (remote-state)))) (define (local-topic-main topic) (field [max-age #f] [max-count #f]) - (on (asserted (local-topic topic $age $count)) + (on (asserted (local-topic-config topic $age $count)) (max-age age) (max-count count)) @@ -107,72 +151,77 @@ (max-count))) (on-stop (log-info "Terminating local topic ~v" topic)) - (on (web-request-incoming (id req) vh 'post ("topic" (,topic ())) $body) - (actor* - (define content-type - (dict-ref (web-request-header-headers req) 'content-type "application/octet-stream")) - (log-info "Topic ~a got ~v message! ~v" topic content-type body) - (send! (notification (url->string (resource->url (web-request-header-resource req))) - body - content-type)) - (web-respond/status! id 201 #"Created")))) + (during (local-host $host-name) + (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) + (actor* + (define content-type + (dict-ref (web-request-header-headers req) 'content-type "application/octet-stream")) + (log-info "Topic ~a got ~v message! ~v" topic content-type body) + (send! (notification (url->string (resource->url (web-request-header-resource req))) + body + content-type)) + (web-respond/status! id 201 #"Created"))))) + +(define (remote-topic-main full-topic) + (log-info "TODO: remote-topic-main")) (actor #:name 'hub - (on (web-request-incoming (id req) vh 'post ("hub" ()) $body) - ;; Initially, I had an (actor* ...) form here for fault - ;; isolation. However, this led to problems since I wanted - ;; to use `assert!` and `retract!` to signal to the - ;; `during/actor`, and the assertions were being lost as - ;; the `actor*` terminated. So instead, I'm using Rackety - ;; `with-handlers`. - (define ok? - (with-handlers [(values (lambda (e) #f))] - (define params (make-immutable-hash - (form-urlencoded->alist (bytes->string/utf-8 body)))) - (define callback (hash-ref params 'hub.callback)) - (define mode - (match (hash-ref params 'hub.mode) - ["subscribe" 'subscribe] - ["unsubscribe" 'unsubscribe])) - (define topic (hash-ref params 'hub.topic)) - (define lease-seconds - (match (hash-ref params 'hub.lease_seconds "unbounded") - ["unbounded" #f] - [n (string->number n)])) - (define secret-string (hash-ref params 'hub.secret #f)) - (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) - (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) - (define canonical-hub - (url->string (resource->url (web-request-header-resource req)))) - (match mode - ['subscribe - (if (subscription-change-validate "subscribe" - (or lease-seconds "unbounded") - topic - callback) - (begin - (retract! (subscription topic ? ? callback ?)) - (assert! - (subscription topic expiry-deadline canonical-hub callback secret-bytes)) - #t) - #f)] - ['unsubscribe - (if (subscription-change-validate "unsubscribe" - #f - topic - callback) - (begin - (retract! (subscription topic ? ? callback ?)) - #t) - #f)]))) - (if ok? - (web-respond/status! id 202 #"Accepted") - (web-respond/status! id 403 #"Forbidden" #"Validation failed"))) + (during (local-host $host-name) + (on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body) + ;; Initially, I had an (actor* ...) form here for fault + ;; isolation. However, this led to problems since I wanted + ;; to use `assert!` and `retract!` to signal to the + ;; `during/actor`, and the assertions were being lost as + ;; the `actor*` terminated. So instead, I'm using Rackety + ;; `with-handlers`. + (define ok? + (with-handlers [(values (lambda (e) #f))] + (define params (make-immutable-hash + (form-urlencoded->alist (bytes->string/utf-8 body)))) + (define callback (hash-ref params 'hub.callback)) + (define mode + (match (hash-ref params 'hub.mode) + ["subscribe" 'subscribe] + ["unsubscribe" 'unsubscribe])) + (define topic (hash-ref params 'hub.topic)) + (define lease-seconds + (match (hash-ref params 'hub.lease_seconds "unbounded") + ["unbounded" #f] + [n (string->number n)])) + (define secret-string (hash-ref params 'hub.secret #f)) + (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) + (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) + (define canonical-hub + (url->string (resource->url (web-request-header-resource req)))) + (match mode + ['subscribe + (if (subscription-change-validate "subscribe" + (or lease-seconds "unbounded") + topic + callback) + (begin + (retract! (subscription topic ? ? callback ?)) + (assert! + (subscription topic expiry-deadline canonical-hub callback secret-bytes)) + #t) + #f)] + ['unsubscribe + (if (subscription-change-validate "unsubscribe" + #f + topic + callback) + (begin + (retract! (subscription topic ? ? callback ?)) + #t) + #f)]))) + (if ok? + (web-respond/status! id 202 #"Accepted") + (web-respond/status! id 403 #"Forbidden" #"Validation failed")))) - (during/actor (subscription $topic _ _ $callback _) - #:name (list 'subscription topic callback) - #:on-crash (retract! (subscription topic ? ? callback ?)) - (subscription-main topic callback))) + (during/actor (subscription $partial-topic _ _ $callback _) + #:name (list 'subscription partial-topic callback) + #:on-crash (retract! (subscription partial-topic ? ? callback ?)) + (subscription-main partial-topic callback))) (define (subscription-change-validate mode lease topic callback) (define challenge (random-hex-string 16)) @@ -241,6 +290,9 @@ (during (subscription partial-topic $expiry-deadline $canonical-hub callback $secret-bytes) (define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub) partial-topic))) + + (assert (topic-demand topic)) + (on-start (log-info "Subscription configured: ~v ~v ~v ~v" topic expiry-deadline