Steps toward remote topics
This commit is contained in:
parent
d9866843f2
commit
223033f664
236
rmq/server.rkt
236
rmq/server.rkt
|
@ -3,6 +3,7 @@
|
|||
(provide )
|
||||
|
||||
(require racket/dict)
|
||||
(require racket/exn)
|
||||
(require racket/format)
|
||||
(require racket/set)
|
||||
(require net/url)
|
||||
|
@ -31,8 +32,17 @@
|
|||
;; (subscription Topic Deadline URLString URLString (Option Bytes))
|
||||
(struct subscription (topic-name expiry-deadline canonical-hub callback secret) #:prefab)
|
||||
|
||||
;; (local-topic Topic (Option Number) (Option Number))
|
||||
(struct local-topic (name max-age max-count) #:prefab)
|
||||
;; (local-topic-config Topic (Option Number) (Option Number))
|
||||
(struct local-topic-config (name max-age max-count) #:prefab)
|
||||
|
||||
;; (topic-demand Topic)
|
||||
(struct topic-demand (topic-name) #:prefab)
|
||||
|
||||
;; (local-topic-demand String)
|
||||
(struct local-topic-demand (name) #:prefab)
|
||||
|
||||
;; (local-host String)
|
||||
(struct local-host (name) #:prefab)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
|
@ -53,49 +63,83 @@
|
|||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define vh (web-virtual-host "http" ? 7827))
|
||||
(define (vh host-name) (web-virtual-host "http" host-name 7827))
|
||||
|
||||
(actor #:name 'main
|
||||
|
||||
(assert vh)
|
||||
(assert (local-host "localhost"))
|
||||
|
||||
(on (web-request-get (id req) vh ("" ()))
|
||||
(actor*
|
||||
(web-respond/xexpr! id
|
||||
`(html
|
||||
(body
|
||||
(h1 "Status"
|
||||
" "
|
||||
,(url->string
|
||||
(resource->url
|
||||
(web-request-header-resource req))))))))))
|
||||
(during (local-host $host-name)
|
||||
(assert (vh host-name))
|
||||
|
||||
(on (web-request-get (id req) (vh host-name) ("" ()))
|
||||
(actor*
|
||||
(web-respond/xexpr! id
|
||||
`(html
|
||||
(body
|
||||
(h1 "Status"
|
||||
" "
|
||||
,(url->string
|
||||
(resource->url
|
||||
(web-request-header-resource req)))))))))))
|
||||
|
||||
(actor #:name 'local-topic-manager
|
||||
|
||||
(field [topics (set)])
|
||||
|
||||
(on (web-request-incoming (id req) vh 'put ("topic" (,$topic ())))
|
||||
(when (not (set-member? (topics) topic))
|
||||
(topics (set-add (topics) topic))
|
||||
(retract! (local-topic topic ? ?))
|
||||
(assert! (local-topic topic #f #f))) ;; TODO: maximums
|
||||
(web-respond/bytes! id #""))
|
||||
(during (local-host $host-name)
|
||||
(on (web-request-incoming (id req) (vh host-name) 'put ("topic" (,$topic ())))
|
||||
(when (not (set-member? (topics) topic))
|
||||
(topics (set-add (topics) topic))
|
||||
(assert! (local-topic-demand topic))
|
||||
(retract! (local-topic-config topic ? ?))
|
||||
(assert! (local-topic-config topic #f #f))) ;; TODO: maximums
|
||||
(web-respond/bytes! id #""))
|
||||
|
||||
(on (web-request-incoming (id req) vh 'delete ("topic" (,$topic ())))
|
||||
(when (set-member? (topics) topic)
|
||||
(topics (set-remove (topics) topic))
|
||||
(retract! (local-topic topic ? ?)))
|
||||
(web-respond/bytes! id #""))
|
||||
(on (web-request-incoming (id req) (vh host-name) 'delete ("topic" (,$topic ())))
|
||||
(when (set-member? (topics) topic)
|
||||
(topics (set-remove (topics) topic))
|
||||
(retract! (local-topic-demand topic))
|
||||
(retract! (local-topic-config topic ? ?)))
|
||||
(web-respond/bytes! id #"")))
|
||||
|
||||
(during/actor (local-topic $topic _ _)
|
||||
#:name (list 'topic topic)
|
||||
(local-topic-main topic)))
|
||||
(during/actor (local-topic-demand $topic)
|
||||
#:name (list 'local-topic topic)
|
||||
(local-topic-main topic)))
|
||||
|
||||
(actor #:name 'topic-demand-analyzer
|
||||
(define/query-set local-hosts (local-host $host-name) host-name)
|
||||
(during (topic-demand $full-topic)
|
||||
(with-handlers [(exn? (lambda (e)
|
||||
(log-error "Topic demand error: ~a" (exn->string e))))]
|
||||
(match-define (web-resource (web-virtual-host _ topic-host _) topic-path)
|
||||
(url->resource (string->url full-topic)))
|
||||
(define maybe-local-topic
|
||||
(match topic-path [`("topic" (,topic ())) topic] [_ #f]))
|
||||
(spawn-general-topic full-topic
|
||||
topic-host
|
||||
maybe-local-topic
|
||||
(set-member? (local-hosts) topic-host)))))
|
||||
|
||||
(define (spawn-general-topic full-topic topic-host maybe-local-topic maybe-start-as-local?)
|
||||
(actor* #:name (list 'general-topic full-topic)
|
||||
(define (local-state)
|
||||
(react (stop-when (retracted (local-host topic-host)) (remote-state))
|
||||
(assert (local-topic-demand maybe-local-topic))))
|
||||
|
||||
(define (remote-state)
|
||||
(react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state))
|
||||
(remote-topic-main full-topic)))
|
||||
|
||||
(if (and maybe-local-topic maybe-start-as-local?)
|
||||
(local-state)
|
||||
(remote-state))))
|
||||
|
||||
(define (local-topic-main topic)
|
||||
(field [max-age #f]
|
||||
[max-count #f])
|
||||
|
||||
(on (asserted (local-topic topic $age $count))
|
||||
(on (asserted (local-topic-config topic $age $count))
|
||||
(max-age age)
|
||||
(max-count count))
|
||||
|
||||
|
@ -107,72 +151,77 @@
|
|||
(max-count)))
|
||||
(on-stop (log-info "Terminating local topic ~v" topic))
|
||||
|
||||
(on (web-request-incoming (id req) vh 'post ("topic" (,topic ())) $body)
|
||||
(actor*
|
||||
(define content-type
|
||||
(dict-ref (web-request-header-headers req) 'content-type "application/octet-stream"))
|
||||
(log-info "Topic ~a got ~v message! ~v" topic content-type body)
|
||||
(send! (notification (url->string (resource->url (web-request-header-resource req)))
|
||||
body
|
||||
content-type))
|
||||
(web-respond/status! id 201 #"Created"))))
|
||||
(during (local-host $host-name)
|
||||
(on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body)
|
||||
(actor*
|
||||
(define content-type
|
||||
(dict-ref (web-request-header-headers req) 'content-type "application/octet-stream"))
|
||||
(log-info "Topic ~a got ~v message! ~v" topic content-type body)
|
||||
(send! (notification (url->string (resource->url (web-request-header-resource req)))
|
||||
body
|
||||
content-type))
|
||||
(web-respond/status! id 201 #"Created")))))
|
||||
|
||||
(define (remote-topic-main full-topic)
|
||||
(log-info "TODO: remote-topic-main"))
|
||||
|
||||
(actor #:name 'hub
|
||||
(on (web-request-incoming (id req) vh 'post ("hub" ()) $body)
|
||||
;; Initially, I had an (actor* ...) form here for fault
|
||||
;; isolation. However, this led to problems since I wanted
|
||||
;; to use `assert!` and `retract!` to signal to the
|
||||
;; `during/actor`, and the assertions were being lost as
|
||||
;; the `actor*` terminated. So instead, I'm using Rackety
|
||||
;; `with-handlers`.
|
||||
(define ok?
|
||||
(with-handlers [(values (lambda (e) #f))]
|
||||
(define params (make-immutable-hash
|
||||
(form-urlencoded->alist (bytes->string/utf-8 body))))
|
||||
(define callback (hash-ref params 'hub.callback))
|
||||
(define mode
|
||||
(match (hash-ref params 'hub.mode)
|
||||
["subscribe" 'subscribe]
|
||||
["unsubscribe" 'unsubscribe]))
|
||||
(define topic (hash-ref params 'hub.topic))
|
||||
(define lease-seconds
|
||||
(match (hash-ref params 'hub.lease_seconds "unbounded")
|
||||
["unbounded" #f]
|
||||
[n (string->number n)]))
|
||||
(define secret-string (hash-ref params 'hub.secret #f))
|
||||
(define secret-bytes (and secret-string (string->bytes/utf-8 secret-string)))
|
||||
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
|
||||
(define canonical-hub
|
||||
(url->string (resource->url (web-request-header-resource req))))
|
||||
(match mode
|
||||
['subscribe
|
||||
(if (subscription-change-validate "subscribe"
|
||||
(or lease-seconds "unbounded")
|
||||
topic
|
||||
callback)
|
||||
(begin
|
||||
(retract! (subscription topic ? ? callback ?))
|
||||
(assert!
|
||||
(subscription topic expiry-deadline canonical-hub callback secret-bytes))
|
||||
#t)
|
||||
#f)]
|
||||
['unsubscribe
|
||||
(if (subscription-change-validate "unsubscribe"
|
||||
#f
|
||||
topic
|
||||
callback)
|
||||
(begin
|
||||
(retract! (subscription topic ? ? callback ?))
|
||||
#t)
|
||||
#f)])))
|
||||
(if ok?
|
||||
(web-respond/status! id 202 #"Accepted")
|
||||
(web-respond/status! id 403 #"Forbidden" #"Validation failed")))
|
||||
(during (local-host $host-name)
|
||||
(on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body)
|
||||
;; Initially, I had an (actor* ...) form here for fault
|
||||
;; isolation. However, this led to problems since I wanted
|
||||
;; to use `assert!` and `retract!` to signal to the
|
||||
;; `during/actor`, and the assertions were being lost as
|
||||
;; the `actor*` terminated. So instead, I'm using Rackety
|
||||
;; `with-handlers`.
|
||||
(define ok?
|
||||
(with-handlers [(values (lambda (e) #f))]
|
||||
(define params (make-immutable-hash
|
||||
(form-urlencoded->alist (bytes->string/utf-8 body))))
|
||||
(define callback (hash-ref params 'hub.callback))
|
||||
(define mode
|
||||
(match (hash-ref params 'hub.mode)
|
||||
["subscribe" 'subscribe]
|
||||
["unsubscribe" 'unsubscribe]))
|
||||
(define topic (hash-ref params 'hub.topic))
|
||||
(define lease-seconds
|
||||
(match (hash-ref params 'hub.lease_seconds "unbounded")
|
||||
["unbounded" #f]
|
||||
[n (string->number n)]))
|
||||
(define secret-string (hash-ref params 'hub.secret #f))
|
||||
(define secret-bytes (and secret-string (string->bytes/utf-8 secret-string)))
|
||||
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
|
||||
(define canonical-hub
|
||||
(url->string (resource->url (web-request-header-resource req))))
|
||||
(match mode
|
||||
['subscribe
|
||||
(if (subscription-change-validate "subscribe"
|
||||
(or lease-seconds "unbounded")
|
||||
topic
|
||||
callback)
|
||||
(begin
|
||||
(retract! (subscription topic ? ? callback ?))
|
||||
(assert!
|
||||
(subscription topic expiry-deadline canonical-hub callback secret-bytes))
|
||||
#t)
|
||||
#f)]
|
||||
['unsubscribe
|
||||
(if (subscription-change-validate "unsubscribe"
|
||||
#f
|
||||
topic
|
||||
callback)
|
||||
(begin
|
||||
(retract! (subscription topic ? ? callback ?))
|
||||
#t)
|
||||
#f)])))
|
||||
(if ok?
|
||||
(web-respond/status! id 202 #"Accepted")
|
||||
(web-respond/status! id 403 #"Forbidden" #"Validation failed"))))
|
||||
|
||||
(during/actor (subscription $topic _ _ $callback _)
|
||||
#:name (list 'subscription topic callback)
|
||||
#:on-crash (retract! (subscription topic ? ? callback ?))
|
||||
(subscription-main topic callback)))
|
||||
(during/actor (subscription $partial-topic _ _ $callback _)
|
||||
#:name (list 'subscription partial-topic callback)
|
||||
#:on-crash (retract! (subscription partial-topic ? ? callback ?))
|
||||
(subscription-main partial-topic callback)))
|
||||
|
||||
(define (subscription-change-validate mode lease topic callback)
|
||||
(define challenge (random-hex-string 16))
|
||||
|
@ -241,6 +290,9 @@
|
|||
(during (subscription partial-topic $expiry-deadline $canonical-hub callback $secret-bytes)
|
||||
(define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub)
|
||||
partial-topic)))
|
||||
|
||||
(assert (topic-demand topic))
|
||||
|
||||
(on-start (log-info "Subscription configured: ~v ~v ~v ~v"
|
||||
topic
|
||||
expiry-deadline
|
||||
|
|
Loading…
Reference in New Issue