Steps toward remote topics
This commit is contained in:
parent
06aab927bb
commit
9a98bcac0d
|
@ -10,6 +10,7 @@
|
|||
(require net/uri-codec)
|
||||
|
||||
(require/activate syndicate/drivers/timer)
|
||||
(require/activate syndicate/drivers/timestate)
|
||||
(require/activate syndicate/drivers/web)
|
||||
(require syndicate/protocol/advertise)
|
||||
(require syndicate/functional-queue)
|
||||
|
@ -44,6 +45,9 @@
|
|||
;; (local-host String)
|
||||
(struct local-host (name) #:prefab)
|
||||
|
||||
;; (canonical-local-host String)
|
||||
(struct canonical-local-host (name) #:prefab)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; (define (number->path n)
|
||||
|
@ -69,6 +73,8 @@
|
|||
|
||||
(assert (local-host "localhost"))
|
||||
|
||||
(assert (canonical-local-host "localhost"))
|
||||
|
||||
(during (local-host $host-name)
|
||||
(assert (vh host-name))
|
||||
|
||||
|
@ -109,31 +115,36 @@
|
|||
|
||||
(actor #:name 'topic-demand-analyzer
|
||||
(define/query-set local-hosts (local-host $host-name) host-name)
|
||||
(during (topic-demand $full-topic)
|
||||
(during/actor (topic-demand $full-topic)
|
||||
#:name (list 'general-topic full-topic)
|
||||
#:let [(local-hosts-snapshot (local-hosts))]
|
||||
(with-handlers [(exn? (lambda (e)
|
||||
(log-error "Topic demand error: ~a" (exn->string e))))]
|
||||
(match-define (web-resource (web-virtual-host _ topic-host _) topic-path)
|
||||
(url->resource (string->url full-topic)))
|
||||
(define maybe-local-topic
|
||||
(match topic-path [`("topic" (,topic ())) topic] [_ #f]))
|
||||
(spawn-general-topic full-topic
|
||||
(on-start
|
||||
(general-topic-main full-topic
|
||||
topic-host
|
||||
maybe-local-topic
|
||||
(set-member? (local-hosts) topic-host)))))
|
||||
(set-member? local-hosts-snapshot topic-host))))))
|
||||
|
||||
(define (spawn-general-topic full-topic topic-host maybe-local-topic maybe-start-as-local?)
|
||||
(actor* #:name (list 'general-topic full-topic)
|
||||
(define (local-state)
|
||||
(react (stop-when (retracted (local-host topic-host)) (remote-state))
|
||||
(assert (local-topic-demand maybe-local-topic))))
|
||||
(define (general-topic-main full-topic topic-host maybe-local-topic start-as-local?)
|
||||
(define (local-state)
|
||||
(react (stop-when (retracted (local-host topic-host)) (remote-state))
|
||||
(assert (local-topic-demand maybe-local-topic))))
|
||||
|
||||
(define (remote-state)
|
||||
(react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state))
|
||||
(remote-topic-main full-topic)))
|
||||
(define (remote-state)
|
||||
(react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state))
|
||||
(remote-topic-main full-topic)))
|
||||
|
||||
(if (and maybe-local-topic maybe-start-as-local?)
|
||||
(local-state)
|
||||
(remote-state))))
|
||||
(if (and maybe-local-topic start-as-local?)
|
||||
(local-state)
|
||||
(remote-state)))
|
||||
|
||||
(define (req-content-type req)
|
||||
(dict-ref (web-request-header-headers req) 'content-type "application/octet-stream"))
|
||||
|
||||
(define (local-topic-main topic)
|
||||
(field [max-age #f]
|
||||
|
@ -154,16 +165,35 @@
|
|||
(during (local-host $host-name)
|
||||
(on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body)
|
||||
(actor*
|
||||
(define content-type
|
||||
(dict-ref (web-request-header-headers req) 'content-type "application/octet-stream"))
|
||||
(log-info "Topic ~a got ~v message! ~v" topic content-type body)
|
||||
(send! (notification (url->string (resource->url (web-request-header-resource req)))
|
||||
body
|
||||
content-type))
|
||||
(define content-type (req-content-type req))
|
||||
(log-info "Local topic ~a got ~v message ~v" topic content-type body)
|
||||
(define full-topic (url->string (resource->url (web-request-header-resource req))))
|
||||
(send! (notification full-topic body content-type))
|
||||
(web-respond/status! id 201 #"Created")))))
|
||||
|
||||
(define (maintain-remote-subscription full-topic canonical-host-name sub-id)
|
||||
(define callback (url->string
|
||||
(resource->url
|
||||
(web-resource (vh canonical-host-name) `("sub" (,sub-id ()))))))
|
||||
(react
|
||||
(field [next-upstream-check (current-inexact-milliseconds)])
|
||||
(on (asserted (later-than (next-upstream-check)))
|
||||
(log-info "Rechecking upstream for ~a" full-topic)
|
||||
(define-values (resp response-body)
|
||||
(web-request! 'get full-topic #:redirect-budget 5))
|
||||
(log-info "Resource: ~v" resp))))
|
||||
|
||||
(define (remote-topic-main full-topic)
|
||||
(log-info "TODO: remote-topic-main"))
|
||||
(define sub-id (random-hex-string 16))
|
||||
(log-info "Remote sub endpoint ~a" sub-id)
|
||||
(during (canonical-local-host $canonical-host-name)
|
||||
(on-start (maintain-remote-subscription full-topic canonical-host-name sub-id))
|
||||
(on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body)
|
||||
(actor*
|
||||
(define content-type (req-content-type req))
|
||||
(log-info "Remote topic ~a got ~v message ~v" full-topic content-type body)
|
||||
(send! (notification full-topic body content-type))
|
||||
(web-respond/status! id 201 #"Created")))))
|
||||
|
||||
(actor #:name 'hub
|
||||
(during (local-host $host-name)
|
||||
|
|
Loading…
Reference in New Issue