#lang syndicate/actor (provide ) (require racket/dict) (require racket/exn) (require racket/format) (require racket/set) (require net/url) (require net/uri-codec) (require file/sha1) (require/activate syndicate/drivers/timer) (require/activate syndicate/drivers/timestate) (require/activate syndicate/drivers/web) (require syndicate/protocol/advertise) (require syndicate/functional-queue) (require "private/util.rkt") (module+ test (require rackunit)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; TODO: sane defaults (define *max-upstream-redirects* 5) (define *server-port* 7827) (define *canonical-host* "localhost") (define *accepted-hosts* (list *canonical-host*)) (define *min-poll-interval* 10) ;; seconds (define *default-lease* "unbounded") (define *default-poll-interval* "none") (define *max-dead-letters* 10) (define *max-delivery-retries* 10) (define *initial-retry-delay* 5.0) ;; seconds (define *retry-delay-multiplier* 1.618) (define *max-retry-delay* 30) ;; seconds ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; A Topic is a URIString. ;; A Deadline is an (Option Number), where #f indicates "unbounded", ;; and numbers indicate a moment in time as a Unix epoch timestamp as ;; might be returned from `current-seconds`. ;; A Notification contains both `topic-name`, indicating the *local* ;; name for the topic that is being subscribed to, as well as ;; `canonical-topic`, which is the rel=self URL given by the upstream ;; topic itself. The former is our local key for finding resources; ;; the latter is a property of the topic content, not a property of ;; the subscription. The same is true of `canonical-hub`: it pertains ;; to the topic, not the subscription. The `topic-name` is the only ;; field pertaining to a subscription; all the others pertain to the ;; content. (struct notification (topic-name ;; Topic canonical-hub ;; Option URLString canonical-topic ;; Option Topic content ;; Bytes content-type) ;; Option String #:prefab) ;; (update-subscription Topic URLString (Option SubscriptionSettings)) (struct update-subscription (topic callback settings) #:prefab) ;; message ;; (subscription Topic URLString SubscriptionSettings) (struct subscription (topic callback settings-) #:prefab) ;; assertion (struct subscription-settings (expiry-deadline ;; Deadline secret ;; Option Bytes poll-interval-seconds) ;; Option Number #:prefab) ;; (local-topic-config Topic (Option Number) (Option Number)) (struct local-topic-config (name max-age max-count) #:prefab) ;; (topic-demand Topic (Option Number)) (struct topic-demand (topic-name poll-interval-seconds) #:prefab) ;; (local-topic-demand String) (struct local-topic-demand (name) #:prefab) ;; (local-host String) (struct local-host (name) #:prefab) ;; (canonical-local-host String) (struct canonical-local-host (name) #:prefab) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; (define (number->path n) ;; (let ((top (quotient n 256)) ;; (bot (remainder n 256))) ;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0")) ;; (if (zero? top) ;; bot-piece ;; (string-append (number->path top) "-/" bot-piece)))) ;; (module+ test ;; (check-equal? (number->path 0) "00") ;; (check-equal? (number->path 123) "7b") ;; (check-equal? (number->path 12345) "30-/39") ;; (check-equal? (number->path 123456789012345678901234567890) ;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2")) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (vh host-name) (web-virtual-host "http" host-name *server-port*)) (actor #:name 'main (for [(h *accepted-hosts*)] (assert (local-host h))) (assert (canonical-local-host *canonical-host*)) (during (local-host $host-name) (assert (vh host-name)) (on (web-request-get (id req) (vh host-name) ("" ())) (actor* (web-respond/xexpr! id `(html (body (h1 "Status" " " ,(url->string (resource->url (web-request-header-resource req))))))))))) (actor #:name 'local-topic-manager (field [topics (set)]) (during (local-host $host-name) (on (web-request-incoming (id req) (vh host-name) 'put ("topic" (,$topic ()))) (when (not (set-member? (topics) topic)) (topics (set-add (topics) topic)) (assert! (local-topic-demand topic)) (retract! (local-topic-config topic ? ?)) (assert! (local-topic-config topic #f #f))) ;; TODO: maximums (web-respond/bytes! id #"")) (on (web-request-incoming (id req) (vh host-name) 'delete ("topic" (,$topic ()))) (when (set-member? (topics) topic) (topics (set-remove (topics) topic)) (retract! (local-topic-demand topic)) (retract! (local-topic-config topic ? ?))) (web-respond/bytes! id #""))) (during/actor (local-topic-demand $topic) #:name (list 'local-topic topic) (local-topic-main topic))) (actor #:name 'topic-demand-analyzer (define/query-set local-hosts (local-host $host-name) host-name) (during/actor (topic-demand $full-topic _) #:name (list 'general-topic full-topic) #:let [(local-hosts-snapshot (local-hosts))] (with-handlers [(exn? (lambda (e) (log-error "Topic demand error: ~a" (exn->string e))))] (match-define (web-resource (web-virtual-host _ topic-host _) topic-path) (url->resource (string->url full-topic))) (define maybe-local-topic (match topic-path [`("topic" (,topic ())) topic] [_ #f])) (general-topic-main full-topic topic-host maybe-local-topic (set-member? local-hosts-snapshot topic-host))))) (define (general-topic-main full-topic topic-host maybe-local-topic start-as-local?) (define (local-state) (react (stop-when (retracted (local-host topic-host)) (remote-state)) (assert (local-topic-demand maybe-local-topic)))) (define (remote-state) (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state)) (remote-topic-main full-topic))) (on-start (if (and maybe-local-topic start-as-local?) (local-state) (remote-state)))) (define (req-content-type req) (dict-ref (web-request-header-headers req) 'content-type #f)) (define (canonical-url canonical-host-name path) (url->string (resource->url (web-resource (vh canonical-host-name) path)))) (define (local-topic-main topic) (field [max-age #f] [max-count #f]) (field [current-content #f] [current-content-type #f] [last-modified-seconds #f]) (on (asserted (local-topic-config topic $age $count)) (max-age age) (max-count count)) (on-start (log-info "Creating local topic ~v" topic)) (begin/dataflow (log-info "Configured local topic ~v, max-age ~v, max-count ~v" topic (max-age) (max-count))) (on-stop (log-info "Terminating local topic ~v" topic)) (during (local-host $host-name) (during (canonical-local-host $canonical-host-name) (define hub-url (canonical-url canonical-host-name `("hub" ()))) (define self-url (canonical-url canonical-host-name `("topic" (,topic ())))) (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) (cons 'link (format "<~a>; rel=self" self-url)))) (define (topic-response id include-content?) ;; Used in both GET and HEAD requests (if (current-content) (web-respond/bytes! id #:header (web-response-header #:last-modified-seconds (last-modified-seconds) #:mime-type (and (current-content-type) (string->bytes/utf-8 (current-content-type))) #:headers discovery-headers) (if include-content? (current-content) #"")) (web-respond/bytes! id #:header (web-response-header #:code 204 #:message #"No Content" #:mime-type #f #:headers discovery-headers) #""))) ;; MUST NOT include a response body for 204 (on (web-request-incoming (id req) (vh host-name) 'head ("topic" (,topic ()))) (topic-response id #f)) (on (web-request-get (id req) (vh host-name) ("topic" (,topic ()))) (topic-response id #t)) (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) (define content-type (req-content-type req)) (log-info "Local topic ~a got ~v message ~v" topic content-type body) (current-content body) (current-content-type content-type) (last-modified-seconds (current-seconds)) (actor* (send! (notification self-url hub-url self-url body content-type)) (web-respond/status! id 201 #"Created")))))) (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) (log-info "Remote sub endpoint ~a" sub-id) (field [current-content-hash #f] [current-content-type #f] [current-upstream-hub #f] [established-upstream-hub #f]) (field [last-upstream-check 0] [poll-interval-seconds #f]) (define/query-set poll-intervals (topic-demand full-topic $i) i) (begin/dataflow (define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] (cond [(not i) candidate] [(< candidate i) candidate] [else i]))) (poll-interval-seconds (and candidate (max candidate *min-poll-interval*)))) (begin/dataflow (log-info "Poll interval for ~a is now ~a" full-topic (match (poll-interval-seconds) [#f "disabled"] [n (format "~a seconds" n)]))) (during (canonical-local-host $canonical-host-name) (define callback (canonical-url canonical-host-name `("sub" (,sub-id ())))) (define (refresh-subscription!) ;; TODO: shared secret ;; TODO: listen to lease duration and use it to refresh ourselves more smarterly (when (current-upstream-hub) (web-request! 'post (current-upstream-hub) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . ,callback) (hub.mode . "subscribe") (hub.topic . ,full-topic))))) (established-upstream-hub (current-upstream-hub)))) (define (unsubscribe!) (when (established-upstream-hub) (web-request! 'post (established-upstream-hub) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . ,callback) (hub.mode . "unsubscribe") (hub.topic . ,full-topic))))) (established-upstream-hub #f))) (begin/dataflow (when (not (equal? (current-upstream-hub) (established-upstream-hub))) (unsubscribe!) (refresh-subscription!))) (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) (log-info "Checking upstream ~a" full-topic) (define-values (resp response-body) (web-request! 'get full-topic #:redirect-budget *max-upstream-redirects*)) (last-upstream-check (current-inexact-milliseconds)) (match (web-response-header-code-type resp) ['successful (define new-content-hash (sha1 (open-input-bytes response-body))) (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) (when (not (and (equal? (current-content-hash) new-content-hash) (equal? (current-content-type) new-content-type))) (current-content-hash new-content-hash) (current-content-type new-content-type) (send! (notification full-topic upstream-hub upstream-topic response-body new-content-type))) (if (equal? (current-upstream-hub) upstream-hub) (refresh-subscription!) (current-upstream-hub upstream-hub))] [other (log-warning "Upstream ~a yielded ~a code ~a" full-topic other (web-response-header-code resp))])) (on (web-request-get (id req) (vh canonical-host-name) ("sub" (,sub-id ()))) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) (web-respond/bytes! id (string->bytes/utf-8 challenge))) (on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body) ;; TODO: verify the use of the shared secret (actor* (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) (define content-type (req-content-type req)) (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" full-topic content-type body upstream-hub upstream-topic) (current-content-hash (sha1 (open-input-bytes body))) (current-content-type content-type) (current-upstream-hub upstream-hub) (send! (notification full-topic upstream-hub upstream-topic body content-type)) (web-respond/status! id 201 #"Created"))))) (actor #:name 'hub (during (local-host $host-name) (during (canonical-local-host $canonical-host-name) (on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body) (asynchronous-verification-of-intent id req body canonical-host-name) (web-respond/status! id 202 #"Accepted")))) (on (message (update-subscription $topic $callback $settings)) (retract! (subscription topic callback ?)) (when settings (assert! (subscription topic callback settings)))) (during/actor (subscription $topic $callback _) #:name (list 'subscription topic callback) #:on-crash (retract! (subscription topic callback ?)) (subscription-main topic callback))) (define (asynchronous-verification-of-intent id req body canonical-host-name) (actor* (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) (define callback (hash-ref params 'hub.callback)) (define mode (match (hash-ref params 'hub.mode) ["subscribe" 'subscribe] ["unsubscribe" 'unsubscribe])) (define requested-topic (hash-ref params 'hub.topic)) (define topic (url->string (combine-url/relative (string->url (canonical-url canonical-host-name `("topic" ("" ())))) requested-topic))) (define lease-seconds (match (hash-ref params 'hub.lease_seconds *default-lease*) ["unbounded" #f] [n (string->number n)])) (define poll-interval-seconds (match (hash-ref params 'hub.poll_interval_seconds *default-poll-interval*) ["none" #f] [n (string->number n)])) (define secret-string (hash-ref params 'hub.secret #f)) (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) (match mode ['subscribe (when (subscription-change-validate "subscribe" (or lease-seconds "unbounded") requested-topic callback) (send! (update-subscription topic callback (subscription-settings expiry-deadline secret-bytes poll-interval-seconds))))] ['unsubscribe (when (subscription-change-validate "unsubscribe" #f requested-topic callback) (send! (update-subscription topic callback #f)))]))) (define (subscription-change-validate mode lease requested-topic callback) (define challenge (random-hex-string 16)) (define id (gensym 'validation)) (define extra-query (list* (cons 'hub.mode mode) (cons 'hub.topic requested-topic) (cons 'hub.challenge challenge) (if lease (list (cons 'hub.lease_seconds (~a lease))) (list)))) (define-values (resp body) (web-request! 'get (let* ((u (string->url callback))) (url->string (struct-copy url u [query (append (url-query u) extra-query)]))))) (and (eq? (web-response-header-code-type resp) 'successful) (equal? body (string->bytes/utf-8 challenge)))) (define (maybe-link-header urlstr rel) (if urlstr (list (cons 'link (format "<~a>; rel=~a" urlstr rel))) '())) (define (subscription-main topic callback) (field [delivery-active? #f] [message-queue (make-queue)] [dead-letters (make-queue)]) (stop-when (rising-edge (> (queue-length (dead-letters)) *max-dead-letters*)) (log-info "Too many dead letters for ~a" callback)) (define (deliver-queued-notifications) (delivery-active? #t) (let deliver-rest ((retry-delay *initial-retry-delay*)) (when (not (queue-empty? (message-queue))) (define-values (n newq) (dequeue (message-queue))) (message-queue newq) (match-define (notification _ canonical-hub canonical-topic body content-type) n) (define link-headers (append (maybe-link-header canonical-hub 'hub) (maybe-link-header canonical-topic 'self))) (let deliver-one ((retries-remaining *max-delivery-retries*) (retry-delay retry-delay)) (define-values (resp _body) (web-request! 'post callback #:headers (if content-type (cons (cons 'content-type content-type) link-headers) link-headers) #:body body)) (cond [(eq? (web-response-header-code-type resp) 'successful) (deliver-rest *initial-retry-delay*)] [(zero? retries-remaining) (log-info "Dead letter for ~v" callback) (dead-letters (enqueue (dead-letters) n)) (deliver-rest retry-delay)] [else (log-info "Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v" callback retry-delay retries-remaining resp) (sleep retry-delay) (deliver-one (- retries-remaining 1) (min (* retry-delay *retry-delay-multiplier*) *max-retry-delay*))])))) (delivery-active? #f)) (during (subscription topic callback (subscription-settings $expiry-deadline $secret-bytes $poll-interval-seconds)) (assert (topic-demand topic poll-interval-seconds)) (on-start (log-info "Subscription configured: ~v" `((topic ,topic) (expiry-deadline ,expiry-deadline) (callback ,callback) (secret-bytes ,secret-bytes) (poll-interval-seconds ,poll-interval-seconds)))) (stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0))) (log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback)))) (on (message ($ n (notification topic _ _ _ _))) (message-queue (enqueue (message-queue) n)) (when (not (delivery-active?)) (deliver-queued-notifications)))))