Start on implementing W3C PubSub per 20 Oct 2016 draft
This commit is contained in:
parent
ea40d44579
commit
f2f8fda62e
221
rmq/main.rkt
221
rmq/main.rkt
|
@ -1,2 +1,221 @@
|
|||
#lang racket/base
|
||||
#lang syndicate/actor
|
||||
|
||||
(require racket/format)
|
||||
(require racket/set)
|
||||
(require racket/random)
|
||||
(require net/url)
|
||||
(require net/uri-codec)
|
||||
(require file/sha1)
|
||||
|
||||
(require/activate syndicate/drivers/timer)
|
||||
(require/activate syndicate/drivers/web)
|
||||
(require syndicate/protocol/advertise)
|
||||
|
||||
(module+ test (require rackunit))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; A Topic is a URIString.
|
||||
|
||||
;; A Deadline is an (Option Number), where #f indicates "unbounded",
|
||||
;; and numbers indicate a moment in time as a Unix epoch timestamp as
|
||||
;; might be returned from `current-seconds`.
|
||||
|
||||
;; (notification Topic Bytes String (Option URIString))
|
||||
(struct notification (topic-name content content-type) #:prefab)
|
||||
|
||||
;; (subscription Topic Deadline URLString URLString (Option Bytes))
|
||||
(struct subscription (topic-name expiry-deadline canonical-hub callback secret) #:prefab)
|
||||
|
||||
;; (local-topic Topic (Option Number) (Option Number))
|
||||
(struct local-topic (name max-age max-count) #:prefab)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; (define (number->path n)
|
||||
;; (let ((top (quotient n 256))
|
||||
;; (bot (remainder n 256)))
|
||||
;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0"))
|
||||
;; (if (zero? top)
|
||||
;; bot-piece
|
||||
;; (string-append (number->path top) "-/" bot-piece))))
|
||||
|
||||
;; (module+ test
|
||||
;; (check-equal? (number->path 0) "00")
|
||||
;; (check-equal? (number->path 123) "7b")
|
||||
;; (check-equal? (number->path 12345) "30-/39")
|
||||
;; (check-equal? (number->path 123456789012345678901234567890)
|
||||
;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2"))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(define vh (web-virtual-host "http" ? 7827))
|
||||
|
||||
(actor #:name 'main
|
||||
|
||||
(assert vh)
|
||||
|
||||
(on (web-request-get (id req) vh ("" ()))
|
||||
(actor*
|
||||
(web-respond/xexpr! id
|
||||
`(html
|
||||
(body
|
||||
(h1 "Status"
|
||||
" "
|
||||
,(url->string
|
||||
(resource->url
|
||||
(web-request-header-resource req))))))))))
|
||||
|
||||
(actor #:name 'local-topic-manager
|
||||
|
||||
(field [topics (set)])
|
||||
|
||||
(on (web-request-incoming (id req) vh 'put ("topic" (,$topic ())))
|
||||
(when (not (set-member? (topics) topic))
|
||||
(topics (set-add (topics) topic))
|
||||
(retract! (local-topic topic ? ?))
|
||||
(assert! (local-topic topic #f #f))) ;; TODO: maximums
|
||||
(web-respond/bytes! id #""))
|
||||
|
||||
(on (web-request-incoming (id req) vh 'delete ("topic" (,$topic ())))
|
||||
(when (set-member? (topics) topic)
|
||||
(topics (set-remove (topics) topic))
|
||||
(retract! (local-topic topic ? ?)))
|
||||
(web-respond/bytes! id #""))
|
||||
|
||||
(during/actor (local-topic $topic _ _)
|
||||
#:name (list 'topic topic)
|
||||
(local-topic-main topic)))
|
||||
|
||||
(define (web-respond/status! id code message [body #""])
|
||||
(web-respond/bytes! id
|
||||
#:header (web-response-header
|
||||
#:code code
|
||||
#:message message
|
||||
#:headers '())
|
||||
body))
|
||||
|
||||
(define (local-topic-main topic)
|
||||
(field [max-age #f]
|
||||
[max-count #f])
|
||||
|
||||
(on (asserted (local-topic topic $age $count))
|
||||
(max-age age)
|
||||
(max-count count))
|
||||
|
||||
(on-start (log-info "Creating local topic ~v" topic))
|
||||
(begin/dataflow
|
||||
(log-info "Configured local topic ~v, max-age ~v, max-count ~v"
|
||||
topic
|
||||
(max-age)
|
||||
(max-count)))
|
||||
(on-stop (log-info "Terminating local topic ~v" topic))
|
||||
|
||||
(on (web-request-incoming (id req) vh 'post ("topic" (,topic ())) $body)
|
||||
(actor*
|
||||
(define content-type
|
||||
(cdr (or (assq 'content-type (web-request-header-headers req))
|
||||
'(content-type . "application/octet-stream"))))
|
||||
(log-info "Got ~v message! ~v" content-type body)
|
||||
(send! (notification (url->string (resource->url (web-request-header-resource req)))
|
||||
body
|
||||
content-type))
|
||||
(web-respond/status! id 201 #"Created"))))
|
||||
|
||||
(actor #:name 'hub
|
||||
(on (web-request-incoming (id req) vh 'post ("hub" ()) $body)
|
||||
(actor*
|
||||
#:name (gensym 'hub-post)
|
||||
(define params (make-immutable-hash
|
||||
(form-urlencoded->alist (bytes->string/utf-8 body))))
|
||||
(define callback (hash-ref params 'hub.callback))
|
||||
(define mode
|
||||
(match (hash-ref params 'hub.mode)
|
||||
["subscribe" 'subscribe]
|
||||
["unsubscribe" 'unsubscribe]))
|
||||
(define topic (hash-ref params 'hub.topic))
|
||||
(define lease-seconds
|
||||
(match (hash-ref params 'hub.lease_seconds "unbounded")
|
||||
["unbounded" #f]
|
||||
[n (string->number n)]))
|
||||
(define secret-string (hash-ref params 'hub.secret #f))
|
||||
(define secret-bytes (and secret-string (string->bytes/utf-8 secret-string)))
|
||||
(define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds)))
|
||||
(define canonical-hub (url->string (resource->url (web-request-header-resource req))))
|
||||
(define ok?
|
||||
(match mode
|
||||
['subscribe
|
||||
(if (subscription-change-validate "subscribe"
|
||||
(or lease-seconds "unbounded")
|
||||
topic
|
||||
callback)
|
||||
(begin
|
||||
(retract! (subscription topic ? ? callback ?))
|
||||
(assert!
|
||||
(subscription topic expiry-deadline canonical-hub callback secret-bytes))
|
||||
#t)
|
||||
#f)]
|
||||
['unsubscribe
|
||||
(if (subscription-change-validate "unsubscribe"
|
||||
#f
|
||||
topic
|
||||
callback)
|
||||
(begin
|
||||
(retract! (subscription topic ? ? callback ?))
|
||||
#t)
|
||||
#f)]))
|
||||
(if ok?
|
||||
(web-respond/status! id 202 #"Accepted")
|
||||
(web-respond/status! id 403 #"Forbidden" #"Validation failed"))))
|
||||
|
||||
(during/actor (subscription $topic _ _ $callback _)
|
||||
#:name (list 'subscription topic callback)
|
||||
(subscription-main topic callback)))
|
||||
|
||||
(define (subscription-change-validate mode lease topic callback)
|
||||
(define u (string->url callback))
|
||||
(define res (url->resource u))
|
||||
(define challenge (bytes->hex-string (crypto-random-bytes 16)))
|
||||
(define id (gensym 'validation))
|
||||
(define extra-query (list* (cons 'hub.mode mode)
|
||||
(cons 'hub.topic topic)
|
||||
(cons 'hub.challenge challenge)
|
||||
(if lease
|
||||
(list (cons 'hub.lease_seconds (~a lease)))
|
||||
(list))))
|
||||
(react/suspend
|
||||
(k)
|
||||
(on-start (send! (web-request id
|
||||
'outbound
|
||||
(web-request-header 'get
|
||||
res
|
||||
'()
|
||||
(append (url-query u) extra-query))
|
||||
'())))
|
||||
(on (message (web-response-complete id $resp $body))
|
||||
(k (and (<= 200 (web-response-header-code resp) 299)
|
||||
(equal? body (string->bytes/utf-8 challenge)))))))
|
||||
|
||||
(define (subscription-main topic callback)
|
||||
(field [expiry-timer-id #f])
|
||||
(during (subscription topic $expiry-deadline $canonical-hub callback $secret-bytes)
|
||||
(on-start (log-info "Subscription configured: ~v ~v ~v"
|
||||
expiry-deadline
|
||||
canonical-hub
|
||||
secret-bytes)
|
||||
(cond
|
||||
[expiry-deadline
|
||||
(expiry-timer-id (gensym 'subscription-expiry))
|
||||
(log-info "Subscription will expire at ~a" expiry-deadline)
|
||||
(send! (set-timer (expiry-timer-id) (* expiry-deadline 1000.0) 'absolute))]
|
||||
[else
|
||||
(log-info "Subscription will not expire")
|
||||
(expiry-timer-id #f)]))
|
||||
(stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _))
|
||||
(log-info "Subscription expired"))))
|
||||
|
||||
(actor #:name 'sink
|
||||
(on (web-request-get (id req) vh ("sink" ()))
|
||||
(define challenge (cdr (or (assq 'hub.challenge (web-request-header-query req))
|
||||
(cons 'hub.challenge ""))))
|
||||
(web-respond/bytes! id (string->bytes/utf-8 challenge))))
|
||||
|
|
Loading…
Reference in New Issue