From f2f8fda62e4dcd7519e02de8c701b2c9f72b5588 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 25 Oct 2016 12:36:09 -0400 Subject: [PATCH] Start on implementing W3C PubSub per 20 Oct 2016 draft --- rmq/main.rkt | 221 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 220 insertions(+), 1 deletion(-) diff --git a/rmq/main.rkt b/rmq/main.rkt index 75fdd22..cc6682d 100644 --- a/rmq/main.rkt +++ b/rmq/main.rkt @@ -1,2 +1,221 @@ -#lang racket/base +#lang syndicate/actor +(require racket/format) +(require racket/set) +(require racket/random) +(require net/url) +(require net/uri-codec) +(require file/sha1) + +(require/activate syndicate/drivers/timer) +(require/activate syndicate/drivers/web) +(require syndicate/protocol/advertise) + +(module+ test (require rackunit)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; A Topic is a URIString. + +;; A Deadline is an (Option Number), where #f indicates "unbounded", +;; and numbers indicate a moment in time as a Unix epoch timestamp as +;; might be returned from `current-seconds`. + +;; (notification Topic Bytes String (Option URIString)) +(struct notification (topic-name content content-type) #:prefab) + +;; (subscription Topic Deadline URLString URLString (Option Bytes)) +(struct subscription (topic-name expiry-deadline canonical-hub callback secret) #:prefab) + +;; (local-topic Topic (Option Number) (Option Number)) +(struct local-topic (name max-age max-count) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; (define (number->path n) +;; (let ((top (quotient n 256)) +;; (bot (remainder n 256))) +;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0")) +;; (if (zero? top) +;; bot-piece +;; (string-append (number->path top) "-/" bot-piece)))) + +;; (module+ test +;; (check-equal? (number->path 0) "00") +;; (check-equal? (number->path 123) "7b") +;; (check-equal? (number->path 12345) "30-/39") +;; (check-equal? (number->path 123456789012345678901234567890) +;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2")) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define vh (web-virtual-host "http" ? 7827)) + +(actor #:name 'main + + (assert vh) + + (on (web-request-get (id req) vh ("" ())) + (actor* + (web-respond/xexpr! id + `(html + (body + (h1 "Status" + " " + ,(url->string + (resource->url + (web-request-header-resource req)))))))))) + +(actor #:name 'local-topic-manager + + (field [topics (set)]) + + (on (web-request-incoming (id req) vh 'put ("topic" (,$topic ()))) + (when (not (set-member? (topics) topic)) + (topics (set-add (topics) topic)) + (retract! (local-topic topic ? ?)) + (assert! (local-topic topic #f #f))) ;; TODO: maximums + (web-respond/bytes! id #"")) + + (on (web-request-incoming (id req) vh 'delete ("topic" (,$topic ()))) + (when (set-member? (topics) topic) + (topics (set-remove (topics) topic)) + (retract! (local-topic topic ? ?))) + (web-respond/bytes! id #"")) + + (during/actor (local-topic $topic _ _) + #:name (list 'topic topic) + (local-topic-main topic))) + +(define (web-respond/status! id code message [body #""]) + (web-respond/bytes! id + #:header (web-response-header + #:code code + #:message message + #:headers '()) + body)) + +(define (local-topic-main topic) + (field [max-age #f] + [max-count #f]) + + (on (asserted (local-topic topic $age $count)) + (max-age age) + (max-count count)) + + (on-start (log-info "Creating local topic ~v" topic)) + (begin/dataflow + (log-info "Configured local topic ~v, max-age ~v, max-count ~v" + topic + (max-age) + (max-count))) + (on-stop (log-info "Terminating local topic ~v" topic)) + + (on (web-request-incoming (id req) vh 'post ("topic" (,topic ())) $body) + (actor* + (define content-type + (cdr (or (assq 'content-type (web-request-header-headers req)) + '(content-type . "application/octet-stream")))) + (log-info "Got ~v message! ~v" content-type body) + (send! (notification (url->string (resource->url (web-request-header-resource req))) + body + content-type)) + (web-respond/status! id 201 #"Created")))) + +(actor #:name 'hub + (on (web-request-incoming (id req) vh 'post ("hub" ()) $body) + (actor* + #:name (gensym 'hub-post) + (define params (make-immutable-hash + (form-urlencoded->alist (bytes->string/utf-8 body)))) + (define callback (hash-ref params 'hub.callback)) + (define mode + (match (hash-ref params 'hub.mode) + ["subscribe" 'subscribe] + ["unsubscribe" 'unsubscribe])) + (define topic (hash-ref params 'hub.topic)) + (define lease-seconds + (match (hash-ref params 'hub.lease_seconds "unbounded") + ["unbounded" #f] + [n (string->number n)])) + (define secret-string (hash-ref params 'hub.secret #f)) + (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) + (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) + (define canonical-hub (url->string (resource->url (web-request-header-resource req)))) + (define ok? + (match mode + ['subscribe + (if (subscription-change-validate "subscribe" + (or lease-seconds "unbounded") + topic + callback) + (begin + (retract! (subscription topic ? ? callback ?)) + (assert! + (subscription topic expiry-deadline canonical-hub callback secret-bytes)) + #t) + #f)] + ['unsubscribe + (if (subscription-change-validate "unsubscribe" + #f + topic + callback) + (begin + (retract! (subscription topic ? ? callback ?)) + #t) + #f)])) + (if ok? + (web-respond/status! id 202 #"Accepted") + (web-respond/status! id 403 #"Forbidden" #"Validation failed")))) + + (during/actor (subscription $topic _ _ $callback _) + #:name (list 'subscription topic callback) + (subscription-main topic callback))) + +(define (subscription-change-validate mode lease topic callback) + (define u (string->url callback)) + (define res (url->resource u)) + (define challenge (bytes->hex-string (crypto-random-bytes 16))) + (define id (gensym 'validation)) + (define extra-query (list* (cons 'hub.mode mode) + (cons 'hub.topic topic) + (cons 'hub.challenge challenge) + (if lease + (list (cons 'hub.lease_seconds (~a lease))) + (list)))) + (react/suspend + (k) + (on-start (send! (web-request id + 'outbound + (web-request-header 'get + res + '() + (append (url-query u) extra-query)) + '()))) + (on (message (web-response-complete id $resp $body)) + (k (and (<= 200 (web-response-header-code resp) 299) + (equal? body (string->bytes/utf-8 challenge))))))) + +(define (subscription-main topic callback) + (field [expiry-timer-id #f]) + (during (subscription topic $expiry-deadline $canonical-hub callback $secret-bytes) + (on-start (log-info "Subscription configured: ~v ~v ~v" + expiry-deadline + canonical-hub + secret-bytes) + (cond + [expiry-deadline + (expiry-timer-id (gensym 'subscription-expiry)) + (log-info "Subscription will expire at ~a" expiry-deadline) + (send! (set-timer (expiry-timer-id) (* expiry-deadline 1000.0) 'absolute))] + [else + (log-info "Subscription will not expire") + (expiry-timer-id #f)])) + (stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _)) + (log-info "Subscription expired")))) + +(actor #:name 'sink + (on (web-request-get (id req) vh ("sink" ())) + (define challenge (cdr (or (assq 'hub.challenge (web-request-header-query req)) + (cons 'hub.challenge "")))) + (web-respond/bytes! id (string->bytes/utf-8 challenge))))