96 lines
4.1 KiB
Racket
96 lines
4.1 KiB
Racket
#lang syndicate/actor
|
|
|
|
(require racket/set)
|
|
|
|
(require/activate syndicate/drivers/web)
|
|
|
|
(require "../private/util.rkt")
|
|
(require "../protocol.rkt")
|
|
|
|
(actor #:name 'local-topic-manager
|
|
|
|
(field [topics (set)])
|
|
|
|
(during (local-host $host-name $port)
|
|
(on (web-request-incoming (id req) (vh host-name port) 'put ("topic" (,$topic ())))
|
|
(when (not (set-member? (topics) topic))
|
|
(topics (set-add (topics) topic))
|
|
(assert! (local-topic-demand topic))
|
|
(retract! (local-topic-config topic ? ?))
|
|
(assert! (local-topic-config topic #f #f))) ;; TODO: maximums
|
|
(web-respond/bytes! id #""))
|
|
|
|
(on (web-request-incoming (id req) (vh host-name port) 'delete ("topic" (,$topic ())))
|
|
(when (set-member? (topics) topic)
|
|
(topics (set-remove (topics) topic))
|
|
(retract! (local-topic-demand topic))
|
|
(retract! (local-topic-config topic ? ?)))
|
|
(web-respond/bytes! id #"")))
|
|
|
|
(during/actor (local-topic-demand $topic)
|
|
#:name (list 'local-topic topic)
|
|
(local-topic-main topic)))
|
|
|
|
(define (local-topic-main topic)
|
|
(field [max-age #f]
|
|
[max-count #f])
|
|
|
|
(field [current-content #f]
|
|
[current-content-type #f]
|
|
[last-modified-seconds #f])
|
|
|
|
(on (asserted (local-topic-config topic $age $count))
|
|
(max-age age)
|
|
(max-count count))
|
|
|
|
(on-start (log-info "Creating local topic ~v" topic))
|
|
(begin/dataflow
|
|
(log-info "Configured local topic ~v, max-age ~v, max-count ~v"
|
|
topic
|
|
(max-age)
|
|
(max-count)))
|
|
(on-stop (log-info "Terminating local topic ~v" topic))
|
|
|
|
(during (local-host $host-name $port)
|
|
(during (canonical-local-host $canonical-host-name $cport)
|
|
(define hub-url (canonical-url canonical-host-name cport `("hub" ())))
|
|
(define self-url (canonical-url canonical-host-name cport `("topic" (,topic ()))))
|
|
(define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url))
|
|
(cons 'link (format "<~a>; rel=self" self-url))))
|
|
|
|
(define (topic-response id include-content?) ;; Used in both GET and HEAD requests
|
|
(if (current-content)
|
|
(web-respond/bytes! id
|
|
#:header (web-response-header
|
|
#:last-modified-seconds (last-modified-seconds)
|
|
#:mime-type (and (current-content-type)
|
|
(string->bytes/utf-8
|
|
(current-content-type)))
|
|
#:headers discovery-headers)
|
|
(if include-content? (current-content) #""))
|
|
(web-respond/bytes! id
|
|
#:header (web-response-header
|
|
#:code 204
|
|
#:message #"No Content"
|
|
#:mime-type #f
|
|
#:headers discovery-headers)
|
|
#""))) ;; MUST NOT include a response body for 204
|
|
|
|
(on (web-request-incoming (id req) (vh host-name port) 'head ("topic" (,topic ())))
|
|
(topic-response id #f))
|
|
(on (web-request-get (id req) (vh host-name port) ("topic" (,topic ())))
|
|
(topic-response id #t))
|
|
(on (web-request-incoming (id req) (vh host-name port) 'post ("topic" (,topic ())) $body)
|
|
(define content-type (web-request-header-content-type req))
|
|
(log-info "Local topic ~a got ~v message ~v" topic content-type body)
|
|
(current-content body)
|
|
(current-content-type content-type)
|
|
(last-modified-seconds (current-seconds))
|
|
(actor*
|
|
(send! (notification self-url
|
|
hub-url
|
|
self-url
|
|
body
|
|
content-type))
|
|
(web-respond/status! id 201 #"Created"))))))
|