diff --git a/rmq/server.rkt b/rmq/server.rkt index cda879c..4e4932d 100644 --- a/rmq/server.rkt +++ b/rmq/server.rkt @@ -10,6 +10,7 @@ (require net/uri-codec) (require/activate syndicate/drivers/timer) +(require/activate syndicate/drivers/timestate) (require/activate syndicate/drivers/web) (require syndicate/protocol/advertise) (require syndicate/functional-queue) @@ -44,6 +45,9 @@ ;; (local-host String) (struct local-host (name) #:prefab) +;; (canonical-local-host String) +(struct canonical-local-host (name) #:prefab) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; (define (number->path n) @@ -69,6 +73,8 @@ (assert (local-host "localhost")) + (assert (canonical-local-host "localhost")) + (during (local-host $host-name) (assert (vh host-name)) @@ -109,31 +115,36 @@ (actor #:name 'topic-demand-analyzer (define/query-set local-hosts (local-host $host-name) host-name) - (during (topic-demand $full-topic) + (during/actor (topic-demand $full-topic) + #:name (list 'general-topic full-topic) + #:let [(local-hosts-snapshot (local-hosts))] (with-handlers [(exn? (lambda (e) (log-error "Topic demand error: ~a" (exn->string e))))] (match-define (web-resource (web-virtual-host _ topic-host _) topic-path) (url->resource (string->url full-topic))) (define maybe-local-topic (match topic-path [`("topic" (,topic ())) topic] [_ #f])) - (spawn-general-topic full-topic + (on-start + (general-topic-main full-topic topic-host maybe-local-topic - (set-member? (local-hosts) topic-host))))) + (set-member? local-hosts-snapshot topic-host)))))) -(define (spawn-general-topic full-topic topic-host maybe-local-topic maybe-start-as-local?) - (actor* #:name (list 'general-topic full-topic) - (define (local-state) - (react (stop-when (retracted (local-host topic-host)) (remote-state)) - (assert (local-topic-demand maybe-local-topic)))) +(define (general-topic-main full-topic topic-host maybe-local-topic start-as-local?) + (define (local-state) + (react (stop-when (retracted (local-host topic-host)) (remote-state)) + (assert (local-topic-demand maybe-local-topic)))) - (define (remote-state) - (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state)) - (remote-topic-main full-topic))) + (define (remote-state) + (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state)) + (remote-topic-main full-topic))) - (if (and maybe-local-topic maybe-start-as-local?) - (local-state) - (remote-state)))) + (if (and maybe-local-topic start-as-local?) + (local-state) + (remote-state))) + +(define (req-content-type req) + (dict-ref (web-request-header-headers req) 'content-type "application/octet-stream")) (define (local-topic-main topic) (field [max-age #f] @@ -154,16 +165,35 @@ (during (local-host $host-name) (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) (actor* - (define content-type - (dict-ref (web-request-header-headers req) 'content-type "application/octet-stream")) - (log-info "Topic ~a got ~v message! ~v" topic content-type body) - (send! (notification (url->string (resource->url (web-request-header-resource req))) - body - content-type)) + (define content-type (req-content-type req)) + (log-info "Local topic ~a got ~v message ~v" topic content-type body) + (define full-topic (url->string (resource->url (web-request-header-resource req)))) + (send! (notification full-topic body content-type)) (web-respond/status! id 201 #"Created"))))) +(define (maintain-remote-subscription full-topic canonical-host-name sub-id) + (define callback (url->string + (resource->url + (web-resource (vh canonical-host-name) `("sub" (,sub-id ())))))) + (react + (field [next-upstream-check (current-inexact-milliseconds)]) + (on (asserted (later-than (next-upstream-check))) + (log-info "Rechecking upstream for ~a" full-topic) + (define-values (resp response-body) + (web-request! 'get full-topic #:redirect-budget 5)) + (log-info "Resource: ~v" resp)))) + (define (remote-topic-main full-topic) - (log-info "TODO: remote-topic-main")) + (define sub-id (random-hex-string 16)) + (log-info "Remote sub endpoint ~a" sub-id) + (during (canonical-local-host $canonical-host-name) + (on-start (maintain-remote-subscription full-topic canonical-host-name sub-id)) + (on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body) + (actor* + (define content-type (req-content-type req)) + (log-info "Remote topic ~a got ~v message ~v" full-topic content-type body) + (send! (notification full-topic body content-type)) + (web-respond/status! id 201 #"Created"))))) (actor #:name 'hub (during (local-host $host-name)