#lang syndicate/actor (provide ) (require racket/format) (require racket/set) (require net/url) (require net/uri-codec) (require/activate syndicate/drivers/timer) (require/activate syndicate/drivers/web) (require syndicate/protocol/advertise) (require "private/util.rkt") (module+ test (require rackunit)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; A Topic is a URIString. ;; A Deadline is an (Option Number), where #f indicates "unbounded", ;; and numbers indicate a moment in time as a Unix epoch timestamp as ;; might be returned from `current-seconds`. ;; (notification Topic Bytes String (Option URIString)) (struct notification (topic-name content content-type) #:prefab) ;; (subscription Topic Deadline URLString URLString (Option Bytes)) (struct subscription (topic-name expiry-deadline canonical-hub callback secret) #:prefab) ;; (local-topic Topic (Option Number) (Option Number)) (struct local-topic (name max-age max-count) #:prefab) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; (define (number->path n) ;; (let ((top (quotient n 256)) ;; (bot (remainder n 256))) ;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0")) ;; (if (zero? top) ;; bot-piece ;; (string-append (number->path top) "-/" bot-piece)))) ;; (module+ test ;; (check-equal? (number->path 0) "00") ;; (check-equal? (number->path 123) "7b") ;; (check-equal? (number->path 12345) "30-/39") ;; (check-equal? (number->path 123456789012345678901234567890) ;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2")) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define vh (web-virtual-host "http" ? 7827)) (actor #:name 'main (assert vh) (on (web-request-get (id req) vh ("" ())) (actor* (web-respond/xexpr! id `(html (body (h1 "Status" " " ,(url->string (resource->url (web-request-header-resource req)))))))))) (actor #:name 'local-topic-manager (field [topics (set)]) (on (web-request-incoming (id req) vh 'put ("topic" (,$topic ()))) (when (not (set-member? (topics) topic)) (topics (set-add (topics) topic)) (retract! (local-topic topic ? ?)) (assert! (local-topic topic #f #f))) ;; TODO: maximums (web-respond/bytes! id #"")) (on (web-request-incoming (id req) vh 'delete ("topic" (,$topic ()))) (when (set-member? (topics) topic) (topics (set-remove (topics) topic)) (retract! (local-topic topic ? ?))) (web-respond/bytes! id #"")) (during/actor (local-topic $topic _ _) #:name (list 'topic topic) (local-topic-main topic))) (define (web-respond/status! id code message [body #""]) (web-respond/bytes! id #:header (web-response-header #:code code #:message message #:headers '()) body)) (define (local-topic-main topic) (field [max-age #f] [max-count #f]) (on (asserted (local-topic topic $age $count)) (max-age age) (max-count count)) (on-start (log-info "Creating local topic ~v" topic)) (begin/dataflow (log-info "Configured local topic ~v, max-age ~v, max-count ~v" topic (max-age) (max-count))) (on-stop (log-info "Terminating local topic ~v" topic)) (on (web-request-incoming (id req) vh 'post ("topic" (,topic ())) $body) (actor* (define content-type (cdr (or (assq 'content-type (web-request-header-headers req)) '(content-type . "application/octet-stream")))) (log-info "Got ~v message! ~v" content-type body) (send! (notification (url->string (resource->url (web-request-header-resource req))) body content-type)) (web-respond/status! id 201 #"Created")))) (actor #:name 'hub (on (web-request-incoming (id req) vh 'post ("hub" ()) $body) ;; Initially, I had an (actor* ...) form here for fault ;; isolation. However, this led to problems since I wanted ;; to use `assert!` and `retract!` to signal to the ;; `during/actor`, and the assertions were being lost as ;; the `actor*` terminated. So instead, I'm using Rackety ;; `with-handlers`. (define ok? (with-handlers [(values (lambda (e) #f))] (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) (define callback (hash-ref params 'hub.callback)) (define mode (match (hash-ref params 'hub.mode) ["subscribe" 'subscribe] ["unsubscribe" 'unsubscribe])) (define topic (hash-ref params 'hub.topic)) (define lease-seconds (match (hash-ref params 'hub.lease_seconds "unbounded") ["unbounded" #f] [n (string->number n)])) (define secret-string (hash-ref params 'hub.secret #f)) (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) (define canonical-hub (url->string (resource->url (web-request-header-resource req)))) (match mode ['subscribe (if (subscription-change-validate "subscribe" (or lease-seconds "unbounded") topic callback) (begin (retract! (subscription topic ? ? callback ?)) (assert! (subscription topic expiry-deadline canonical-hub callback secret-bytes)) #t) #f)] ['unsubscribe (if (subscription-change-validate "unsubscribe" #f topic callback) (begin (retract! (subscription topic ? ? callback ?)) #t) #f)]))) (if ok? (web-respond/status! id 202 #"Accepted") (web-respond/status! id 403 #"Forbidden" #"Validation failed"))) (during/actor (subscription $topic _ _ $callback _) #:name (list 'subscription topic callback) (subscription-main topic callback))) (define (subscription-change-validate mode lease topic callback) (define u (string->url callback)) (define res (url->resource u)) (define challenge (random-hex-string 16)) (define id (gensym 'validation)) (define extra-query (list* (cons 'hub.mode mode) (cons 'hub.topic topic) (cons 'hub.challenge challenge) (if lease (list (cons 'hub.lease_seconds (~a lease))) (list)))) (react/suspend (k) (on-start (send! (web-request id 'outbound (web-request-header 'get res '() (append (url-query u) extra-query)) '()))) (on (message (web-response-complete id $resp $body)) (k (and (<= 200 (web-response-header-code resp) 299) (equal? body (string->bytes/utf-8 challenge))))))) (define (subscription-main topic callback) (field [expiry-timer-id #f]) (during (subscription topic $expiry-deadline $canonical-hub callback $secret-bytes) (on-start (log-info "Subscription configured: ~v ~v ~v" expiry-deadline canonical-hub secret-bytes) (cond [expiry-deadline (expiry-timer-id (gensym 'subscription-expiry)) (log-info "Subscription will expire at ~a" expiry-deadline) (send! (set-timer (expiry-timer-id) (* expiry-deadline 1000.0) 'absolute))] [else (log-info "Subscription will not expire") (expiry-timer-id #f)])) (stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _)) (log-info "Subscription expired")))) (actor #:name 'sink (on (web-request-get (id req) vh ("sink" ())) (define challenge (cdr (or (assq 'hub.challenge (web-request-header-query req)) (cons 'hub.challenge "")))) (web-respond/bytes! id (string->bytes/utf-8 challenge))))