#lang syndicate/actor (provide ) (require racket/dict) (require racket/exn) (require racket/file) (require racket/format) (require racket/runtime-path) (require racket/set) (require net/url) (require net/uri-codec) (require file/sha1) (require web-server/dispatchers/filesystem-map) (require web-server/private/mime-types) (require/activate syndicate/drivers/timer) (require/activate syndicate/drivers/timestate) (require/activate syndicate/drivers/web) (require syndicate/protocol/advertise) (require syndicate/functional-queue) (require "private/util.rkt") (require "protocol.rkt") (require "config.rkt") (define (vh host-name port) (web-virtual-host "http" host-name port)) (spawn-configuration "defaults.rktd") (actor #:name 'main (during (config (list 'canonical-host $h $p)) (assert (canonical-local-host h p)) (assert (local-host h p))) (during (config (list 'accepted-host $h $p)) (assert (local-host h p))) (during (local-host $host-name $port) (assert (vh host-name port)) (on (web-request-get (id req) (vh host-name port) ("" ())) (actor* (web-respond/xexpr! id `(html (body (h1 "Status" " " ,(url->string (resource->url (web-request-header-resource req))))))))))) (begin-for-declarations (define-runtime-path htdocs-path "htdocs") (define path->mime-type (make-path->mime-type "/etc/mime.types"))) (actor #:name 'static-content-server (define url->path (make-url->path htdocs-path)) (during (local-host $host-name $port) (on (web-request-get (id req) (vh host-name port) ,_) (define-values (path path-pieces) (url->path (resource->url (web-request-header-resource req)))) (when (file-exists? path) (web-respond/bytes! id #:header (web-response-header #:mime-type (path->mime-type path)) (file->bytes path)))))) (actor #:name 'local-topic-manager (field [topics (set)]) (during (local-host $host-name $port) (on (web-request-incoming (id req) (vh host-name port) 'put ("topic" (,$topic ()))) (when (not (set-member? (topics) topic)) (topics (set-add (topics) topic)) (assert! (local-topic-demand topic)) (retract! (local-topic-config topic ? ?)) (assert! (local-topic-config topic #f #f))) ;; TODO: maximums (web-respond/bytes! id #"")) (on (web-request-incoming (id req) (vh host-name port) 'delete ("topic" (,$topic ()))) (when (set-member? (topics) topic) (topics (set-remove (topics) topic)) (retract! (local-topic-demand topic)) (retract! (local-topic-config topic ? ?))) (web-respond/bytes! id #""))) (during/actor (local-topic-demand $topic) #:name (list 'local-topic topic) (local-topic-main topic))) (actor #:name 'topic-demand-analyzer (define/query-set local-hosts ($ h (local-host _ _)) h) (during/actor (topic-demand $full-topic _) #:name (list 'general-topic full-topic) #:let [(local-hosts-snapshot (local-hosts))] (with-handlers [(exn? (lambda (e) (log-error "Topic demand error: ~a" (exn->string e))))] (match-define (web-resource (web-virtual-host _ topic-host topic-port) topic-path) (url->resource (string->url full-topic))) (define maybe-local-topic (match topic-path [`("topic" (,topic ())) topic] [_ #f])) (general-topic-main full-topic topic-host topic-port maybe-local-topic (set-member? local-hosts-snapshot (local-host topic-host topic-port)))))) (define (general-topic-main full-topic topic-host topic-port maybe-local-topic start-as-local?) (define (local-state) (react (stop-when (retracted (local-host topic-host topic-port)) (remote-state)) (assert (local-topic-demand maybe-local-topic)))) (define (remote-state) (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host topic-port)) (local-state)) (remote-topic-main full-topic))) (on-start (if (and maybe-local-topic start-as-local?) (local-state) (remote-state)))) (define (req-content-type req) (dict-ref (web-request-header-headers req) 'content-type #f)) (define (canonical-url canonical-host-name cport path) (url->string (resource->url (web-resource (vh canonical-host-name cport) path)))) (define (local-topic-main topic) (field [max-age #f] [max-count #f]) (field [current-content #f] [current-content-type #f] [last-modified-seconds #f]) (on (asserted (local-topic-config topic $age $count)) (max-age age) (max-count count)) (on-start (log-info "Creating local topic ~v" topic)) (begin/dataflow (log-info "Configured local topic ~v, max-age ~v, max-count ~v" topic (max-age) (max-count))) (on-stop (log-info "Terminating local topic ~v" topic)) (during (local-host $host-name $port) (during (canonical-local-host $canonical-host-name $cport) (define hub-url (canonical-url canonical-host-name cport `("hub" ()))) (define self-url (canonical-url canonical-host-name cport `("topic" (,topic ())))) (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) (cons 'link (format "<~a>; rel=self" self-url)))) (define (topic-response id include-content?) ;; Used in both GET and HEAD requests (if (current-content) (web-respond/bytes! id #:header (web-response-header #:last-modified-seconds (last-modified-seconds) #:mime-type (and (current-content-type) (string->bytes/utf-8 (current-content-type))) #:headers discovery-headers) (if include-content? (current-content) #"")) (web-respond/bytes! id #:header (web-response-header #:code 204 #:message #"No Content" #:mime-type #f #:headers discovery-headers) #""))) ;; MUST NOT include a response body for 204 (on (web-request-incoming (id req) (vh host-name port) 'head ("topic" (,topic ()))) (topic-response id #f)) (on (web-request-get (id req) (vh host-name port) ("topic" (,topic ()))) (topic-response id #t)) (on (web-request-incoming (id req) (vh host-name port) 'post ("topic" (,topic ())) $body) (define content-type (req-content-type req)) (log-info "Local topic ~a got ~v message ~v" topic content-type body) (current-content body) (current-content-type content-type) (last-modified-seconds (current-seconds)) (actor* (send! (notification self-url hub-url self-url body content-type)) (web-respond/status! id 201 #"Created")))))) (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) (log-info "Remote sub endpoint ~a" sub-id) (field [current-content-hash #f] [current-content-type #f] [current-upstream-hub #f] [established-upstream-hub #f]) (field [last-upstream-check 0] [poll-interval-seconds #f]) (define/query-set poll-intervals (topic-demand full-topic $i) i) (define/query-config min-poll-interval 60) (begin/dataflow (define candidate (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] (cond [(not i) candidate] [(< candidate i) candidate] [else i]))) (poll-interval-seconds (and candidate (max candidate (min-poll-interval))))) (begin/dataflow (log-info "Poll interval for ~a is now ~a" full-topic (match (poll-interval-seconds) [#f "disabled"] [n (format "~a seconds" n)]))) (during (canonical-local-host $canonical-host-name $cport) (define callback (canonical-url canonical-host-name cport `("sub" (,sub-id ())))) (define (refresh-subscription!) ;; TODO: shared secret ;; TODO: listen to lease duration and use it to refresh ourselves more smarterly (when (current-upstream-hub) (web-request! 'post (current-upstream-hub) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . ,callback) (hub.mode . "subscribe") (hub.topic . ,full-topic))))) (established-upstream-hub (current-upstream-hub)))) (define (unsubscribe!) (when (established-upstream-hub) (web-request! 'post (established-upstream-hub) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . ,callback) (hub.mode . "unsubscribe") (hub.topic . ,full-topic))))) (established-upstream-hub #f))) (begin/dataflow (when (not (equal? (current-upstream-hub) (established-upstream-hub))) (unsubscribe!) (refresh-subscription!))) (define/query-config max-upstream-redirects 5) (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) (log-info "Checking upstream ~a" full-topic) (define-values (resp response-body) (web-request! 'get full-topic #:redirect-budget (max-upstream-redirects))) (last-upstream-check (current-inexact-milliseconds)) (match (web-response-header-code-type resp) ['successful (define new-content-hash (sha1 (open-input-bytes response-body))) (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) (define parsed-link-headers (parse-link-headers (web-response-header-headers resp))) (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) (when (not (and (equal? (current-content-hash) new-content-hash) (equal? (current-content-type) new-content-type))) (current-content-hash new-content-hash) (current-content-type new-content-type) (send! (notification full-topic upstream-hub upstream-topic response-body new-content-type))) (if (equal? (current-upstream-hub) upstream-hub) (refresh-subscription!) (current-upstream-hub upstream-hub))] [other (log-warning "Upstream ~a yielded ~a code ~a" full-topic other (web-response-header-code resp))])) (on (web-request-get (id req) (vh canonical-host-name cport) ("sub" (,sub-id ()))) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (log-info "Received verification-of-intent: ~v" (web-request-header-query req)) (web-respond/bytes! id (string->bytes/utf-8 challenge))) (on (web-request-incoming (id req) (vh canonical-host-name cport) 'post ("sub" (,sub-id ())) $body) ;; TODO: verify the use of the shared secret (actor* (define parsed-link-headers (parse-link-headers (web-request-header-headers req))) (define upstream-hub (link-header-ref parsed-link-headers 'hub #f)) (define upstream-topic (link-header-ref parsed-link-headers 'self #f)) (define content-type (req-content-type req)) (log-info "Remote topic ~a got ~v message ~v; upstream hub ~v, topic ~v" full-topic content-type body upstream-hub upstream-topic) (current-content-hash (sha1 (open-input-bytes body))) (current-content-type content-type) (current-upstream-hub upstream-hub) (send! (notification full-topic upstream-hub upstream-topic body content-type)) (web-respond/status! id 201 #"Created"))))) (actor #:name 'hub (during (local-host $host-name $port) (during (canonical-local-host $canonical-host-name $cport) (on (web-request-incoming (id req) (vh host-name port) 'post ("hub" ()) $body) (asynchronous-verification-of-intent id req body canonical-host-name cport) (web-respond/status! id 202 #"Accepted")))) (on (message (update-subscription $topic $callback $settings)) (retract! (subscription topic callback ?)) (when settings (assert! (subscription topic callback settings)))) (during/actor (subscription $topic $callback _) #:name (list 'subscription topic callback) #:on-crash (retract! (subscription topic callback ?)) (subscription-main topic callback))) (define (asynchronous-verification-of-intent id req body canonical-host-name cport) (actor* (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) (define callback (hash-ref params 'hub.callback)) (define mode (match (hash-ref params 'hub.mode) ["subscribe" 'subscribe] ["unsubscribe" 'unsubscribe])) (define requested-topic (hash-ref params 'hub.topic)) (define topic (url->string (combine-url/relative (string->url (canonical-url canonical-host-name cport `("topic" ("" ())))) requested-topic))) (define lease-seconds (match (hash-ref params 'hub.lease_seconds (config-ref 'default-lease 86400)) ["unbounded" #f] [n (string->number n)])) (define poll-interval-seconds (match (hash-ref params 'hub.poll_interval_seconds (config-ref 'default-poll-interval "none")) ["none" #f] [n (string->number n)])) (define secret-string (hash-ref params 'hub.secret #f)) (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) (match mode ['subscribe (when (subscription-change-validate "subscribe" (or lease-seconds "unbounded") requested-topic callback) (send! (update-subscription topic callback (subscription-settings expiry-deadline secret-bytes poll-interval-seconds))))] ['unsubscribe (when (subscription-change-validate "unsubscribe" #f requested-topic callback) (send! (update-subscription topic callback #f)))]))) (define (subscription-change-validate mode lease requested-topic callback) (define challenge (random-hex-string 16)) (define id (gensym 'validation)) (define extra-query (list* (cons 'hub.mode mode) (cons 'hub.topic requested-topic) (cons 'hub.challenge challenge) (if lease (list (cons 'hub.lease_seconds (~a lease))) (list)))) (define-values (resp body) (web-request! 'get (let* ((u (string->url callback))) (url->string (struct-copy url u [query (append (url-query u) extra-query)]))))) (and (eq? (web-response-header-code-type resp) 'successful) (equal? body (string->bytes/utf-8 challenge)))) (define (maybe-link-header urlstr rel) (if urlstr (list (cons 'link (format "<~a>; rel=~a" urlstr rel))) '())) (define (subscription-main topic callback) (field [delivery-active? #f] [message-queue (make-queue)] [dead-letters (make-queue)]) (define/query-config max-dead-letters 10) (define/query-config initial-retry-delay 5.0) (define/query-config max-delivery-retries 10) (define/query-config retry-delay-multiplier 1.618) (define/query-config max-retry-delay 30) (stop-when (rising-edge (> (queue-length (dead-letters)) (max-dead-letters))) (log-info "Too many dead letters for ~a" callback)) (define (deliver-queued-notifications) (delivery-active? #t) (let deliver-rest ((retry-delay (initial-retry-delay))) (when (not (queue-empty? (message-queue))) (define-values (n newq) (dequeue (message-queue))) (message-queue newq) (match-define (notification _ canonical-hub canonical-topic body content-type) n) (define link-headers (append (maybe-link-header canonical-hub 'hub) (maybe-link-header canonical-topic 'self))) (let deliver-one ((retries-remaining (max-delivery-retries)) (retry-delay retry-delay)) (define-values (resp _body) (web-request! 'post callback #:headers (if content-type (cons (cons 'content-type content-type) link-headers) link-headers) #:body body)) (cond [(eq? (web-response-header-code-type resp) 'successful) (deliver-rest (initial-retry-delay))] [(zero? retries-remaining) (log-info "Dead letter for ~v" callback) (dead-letters (enqueue (dead-letters) n)) (deliver-rest retry-delay)] [else (log-info "Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v" callback retry-delay retries-remaining resp) (sleep retry-delay) (deliver-one (- retries-remaining 1) (min (* retry-delay (retry-delay-multiplier)) (max-retry-delay)))])))) (delivery-active? #f)) (during (subscription topic callback (subscription-settings $expiry-deadline $secret-bytes $poll-interval-seconds)) (assert (topic-demand topic poll-interval-seconds)) (on-start (log-info "Subscription configured: ~v" `((topic ,topic) (expiry-deadline ,expiry-deadline) (callback ,callback) (secret-bytes ,secret-bytes) (poll-interval-seconds ,poll-interval-seconds)))) (stop-when #:when expiry-deadline (asserted (later-than (* expiry-deadline 1000.0))) (log-info "Subscription expired: ~v" `((topic ,topic) (callback ,callback)))) (on (message ($ n (notification topic _ _ _ _))) (message-queue (enqueue (message-queue) n)) (when (not (delivery-active?)) (deliver-queued-notifications)))))