#lang syndicate/actor (provide ) (require racket/dict) (require racket/exn) (require racket/format) (require racket/set) (require net/url) (require net/uri-codec) (require file/sha1) (require/activate syndicate/drivers/timer) (require/activate syndicate/drivers/timestate) (require/activate syndicate/drivers/web) (require syndicate/protocol/advertise) (require syndicate/functional-queue) (require "private/util.rkt") (module+ test (require rackunit)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; A Topic is a URIString. ;; A Deadline is an (Option Number), where #f indicates "unbounded", ;; and numbers indicate a moment in time as a Unix epoch timestamp as ;; might be returned from `current-seconds`. ;; (notification Topic Bytes (Option String)) (struct notification (topic-name content content-type) #:prefab) (struct subscription (topic-name ;; Topic expiry-deadline ;; Deadline canonical-hub ;; URLString callback ;; URLString secret ;; Option Bytes poll-interval-seconds) ;; Option Number #:prefab) ;; (local-topic-config Topic (Option Number) (Option Number)) (struct local-topic-config (name max-age max-count) #:prefab) ;; (topic-demand Topic (Option Number)) (struct topic-demand (topic-name poll-interval-seconds) #:prefab) ;; (local-topic-demand String) (struct local-topic-demand (name) #:prefab) ;; (local-host String) (struct local-host (name) #:prefab) ;; (canonical-local-host String) (struct canonical-local-host (name) #:prefab) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; (define (number->path n) ;; (let ((top (quotient n 256)) ;; (bot (remainder n 256))) ;; (define bot-piece (~r bot #:base 16 #:min-width 2 #:pad-string "0")) ;; (if (zero? top) ;; bot-piece ;; (string-append (number->path top) "-/" bot-piece)))) ;; (module+ test ;; (check-equal? (number->path 0) "00") ;; (check-equal? (number->path 123) "7b") ;; (check-equal? (number->path 12345) "30-/39") ;; (check-equal? (number->path 123456789012345678901234567890) ;; "01-/8e-/e9-/0f-/f6-/c3-/73-/e0-/ee-/4e-/3f-/0a-/d2")) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (vh host-name) (web-virtual-host "http" host-name 7827)) (actor #:name 'main (assert (local-host "localhost")) (assert (canonical-local-host "localhost")) (during (local-host $host-name) (assert (vh host-name)) (on (web-request-get (id req) (vh host-name) ("" ())) (actor* (web-respond/xexpr! id `(html (body (h1 "Status" " " ,(url->string (resource->url (web-request-header-resource req))))))))))) (actor #:name 'local-topic-manager (field [topics (set)]) (during (local-host $host-name) (on (web-request-incoming (id req) (vh host-name) 'put ("topic" (,$topic ()))) (when (not (set-member? (topics) topic)) (topics (set-add (topics) topic)) (assert! (local-topic-demand topic)) (retract! (local-topic-config topic ? ?)) (assert! (local-topic-config topic #f #f))) ;; TODO: maximums (web-respond/bytes! id #"")) (on (web-request-incoming (id req) (vh host-name) 'delete ("topic" (,$topic ()))) (when (set-member? (topics) topic) (topics (set-remove (topics) topic)) (retract! (local-topic-demand topic)) (retract! (local-topic-config topic ? ?))) (web-respond/bytes! id #""))) (during/actor (local-topic-demand $topic) #:name (list 'local-topic topic) (local-topic-main topic))) (actor #:name 'topic-demand-analyzer (define/query-set local-hosts (local-host $host-name) host-name) (during/actor (topic-demand $full-topic _) #:name (list 'general-topic full-topic) #:let [(local-hosts-snapshot (local-hosts))] (with-handlers [(exn? (lambda (e) (log-error "Topic demand error: ~a" (exn->string e))))] (match-define (web-resource (web-virtual-host _ topic-host _) topic-path) (url->resource (string->url full-topic))) (define maybe-local-topic (match topic-path [`("topic" (,topic ())) topic] [_ #f])) (general-topic-main full-topic topic-host maybe-local-topic (set-member? local-hosts-snapshot topic-host))))) (define (general-topic-main full-topic topic-host maybe-local-topic start-as-local?) (define (local-state) (react (stop-when (retracted (local-host topic-host)) (remote-state)) (assert (local-topic-demand maybe-local-topic)))) (define (remote-state) (react (stop-when #:when maybe-local-topic (asserted (local-host topic-host)) (local-state)) (remote-topic-main full-topic))) (on-start (if (and maybe-local-topic start-as-local?) (local-state) (remote-state)))) (define (req-content-type req) (dict-ref (web-request-header-headers req) 'content-type #f)) (define (canonical-url canonical-host-name path) (url->string (resource->url (web-resource (vh canonical-host-name) path)))) (define (local-topic-main topic) (field [max-age #f] [max-count #f]) (field [current-content #f] [current-content-type #f] [last-modified-seconds #f]) (on (asserted (local-topic-config topic $age $count)) (max-age age) (max-count count)) (on-start (log-info "Creating local topic ~v" topic)) (begin/dataflow (log-info "Configured local topic ~v, max-age ~v, max-count ~v" topic (max-age) (max-count))) (on-stop (log-info "Terminating local topic ~v" topic)) (during (local-host $host-name) (during (canonical-local-host $canonical-host-name) (define hub-url (canonical-url canonical-host-name `("hub" ()))) (define self-url (canonical-url canonical-host-name `("topic" (,topic ())))) (define discovery-headers (list (cons 'link (format "<~a>; rel=hub" hub-url)) (cons 'link (format "<~a>; rel=self" self-url)))) (on (web-request-get (id req) (vh host-name) ("topic" (,topic ()))) (if (current-content) (web-respond/bytes! id #:header (web-response-header #:last-modified-seconds (last-modified-seconds) #:mime-type (and (current-content-type) (string->bytes/utf-8 (current-content-type))) #:headers discovery-headers) (current-content)) (web-respond/bytes! id #:header (web-response-header #:code 204 #:message #"No Content" #:mime-type #f #:headers discovery-headers) #"")))) ;; MUST NOT include a response body for 204 (on (web-request-incoming (id req) (vh host-name) 'post ("topic" (,topic ())) $body) (define content-type (req-content-type req)) (log-info "Local topic ~a got ~v message ~v" topic content-type body) (current-content body) (current-content-type content-type) (last-modified-seconds (current-seconds)) (actor* (define full-topic (url->string (resource->url (web-request-header-resource req)))) (send! (notification full-topic body content-type)) (web-respond/status! id 201 #"Created"))))) (define (maintain-remote-subscription full-topic canonical-host-name sub-id) (define callback (canonical-url canonical-host-name `("sub" (,sub-id ())))) (react (field [last-upstream-check 0] [poll-interval-seconds #f]) (field [current-content-hash #f] [current-content-type #f]) (define/query-set poll-intervals (topic-demand full-topic $i) i) (begin/dataflow (poll-interval-seconds (for/fold [(i #f)] [(candidate (in-set (poll-intervals)))] (cond [(not i) candidate] [(< candidate i) candidate] [else i])))) (begin/dataflow (log-info "Poll interval for ~a is now ~a" full-topic (match (poll-interval-seconds) [#f "disabled"] [n (format "~a seconds" n)]))) (on (asserted (later-than (+ (last-upstream-check) (* 1000 (or (poll-interval-seconds) 0))))) (log-info "Checking upstream ~a" full-topic) (define-values (resp response-body) (web-request! 'get full-topic #:redirect-budget 5)) (last-upstream-check (current-inexact-milliseconds)) (match (web-response-header-code-type resp) ['successful (define new-content-hash (sha1 (open-input-bytes response-body))) (define new-content-type (dict-ref (web-response-header-headers resp) 'content-type #f)) (when (not (and (equal? (current-content-hash) new-content-hash) (equal? (current-content-type) new-content-type))) (current-content-hash new-content-hash) (current-content-type new-content-type) (send! (notification full-topic response-body new-content-type)))] [other (log-warning "Upstream ~a yielded ~a code ~a" full-topic other (web-response-header-code resp))])))) (define (remote-topic-main full-topic) (define sub-id (random-hex-string 16)) (log-info "Remote sub endpoint ~a" sub-id) (during (canonical-local-host $canonical-host-name) (on-start (maintain-remote-subscription full-topic canonical-host-name sub-id)) (on (web-request-incoming (id req) (vh canonical-host-name) 'post ("sub" (,sub-id ())) $body) (actor* (define content-type (req-content-type req)) (log-info "Remote topic ~a got ~v message ~v" full-topic content-type body) (send! (notification full-topic body content-type)) (web-respond/status! id 201 #"Created"))))) (actor #:name 'hub (during (local-host $host-name) (on (web-request-incoming (id req) (vh host-name) 'post ("hub" ()) $body) ;; Initially, I had an (actor* ...) form here for fault ;; isolation. However, this led to problems since I wanted ;; to use `assert!` and `retract!` to signal to the ;; `during/actor`, and the assertions were being lost as ;; the `actor*` terminated. So instead, I'm using Rackety ;; `with-handlers`. (define ok? (with-handlers [(values (lambda (e) #f))] (define params (make-immutable-hash (form-urlencoded->alist (bytes->string/utf-8 body)))) (define callback (hash-ref params 'hub.callback)) (define mode (match (hash-ref params 'hub.mode) ["subscribe" 'subscribe] ["unsubscribe" 'unsubscribe])) (define topic (hash-ref params 'hub.topic)) (define lease-seconds (match (hash-ref params 'hub.lease_seconds "unbounded") ["unbounded" #f] [n (string->number n)])) (define poll-interval-seconds (match (hash-ref params 'hub.poll_interval_seconds "none") ["none" #f] [n (string->number n)])) (define secret-string (hash-ref params 'hub.secret #f)) (define secret-bytes (and secret-string (string->bytes/utf-8 secret-string))) (define expiry-deadline (and lease-seconds (+ (current-seconds) lease-seconds))) (define canonical-hub (url->string (resource->url (web-request-header-resource req)))) (match mode ['subscribe (if (subscription-change-validate "subscribe" (or lease-seconds "unbounded") topic callback) (begin (retract! (subscription topic ? ? callback ? ?)) (assert! (subscription topic expiry-deadline canonical-hub callback secret-bytes poll-interval-seconds)) #t) #f)] ['unsubscribe (if (subscription-change-validate "unsubscribe" #f topic callback) (begin (retract! (subscription topic ? ? callback ? ?)) #t) #f)]))) (if ok? (web-respond/status! id 202 #"Accepted") (web-respond/status! id 403 #"Forbidden" #"Validation failed")))) (during/actor (subscription $partial-topic _ _ $callback _ _) #:name (list 'subscription partial-topic callback) #:on-crash (retract! (subscription partial-topic ? ? callback ? ?)) (subscription-main partial-topic callback))) (define (subscription-change-validate mode lease topic callback) (define challenge (random-hex-string 16)) (define id (gensym 'validation)) (define extra-query (list* (cons 'hub.mode mode) (cons 'hub.topic topic) (cons 'hub.challenge challenge) (if lease (list (cons 'hub.lease_seconds (~a lease))) (list)))) (define-values (resp body) (web-request! 'get (let* ((u (string->url callback))) (url->string (struct-copy url u [query (append (url-query u) extra-query)]))))) (and (eq? (web-response-header-code-type resp) 'successful) (equal? body (string->bytes/utf-8 challenge)))) (define (canonical-topic-base-url canonical-hub-str) (combine-url/relative (string->url canonical-hub-str) "/topic/")) (define (subscription-main partial-topic callback) (field [expiry-timer-id #f] [delivery-active? #f] [message-queue (make-queue)] [dead-letters (make-queue)]) (stop-when (rising-edge (> (queue-length (dead-letters)) 1)) (log-info "Too many dead letters for ~a" callback)) (define (deliver-queued-notifications canonical-hub) (delivery-active? #t) (let deliver-rest () (when (not (queue-empty? (message-queue))) (define-values (n newq) (dequeue (message-queue))) (message-queue newq) (match-define (notification topic body content-type) n) (define link-headers (list (cons 'link (format "<~a>; rel=hub" canonical-hub)) (cons 'link (format "<~a>; rel=self" topic)))) (let deliver-one ((retries-remaining 1) (retry-delay 5.0)) (define-values (resp _body) (web-request! 'post callback #:headers (if content-type (cons (cons 'content-type content-type) link-headers) link-headers) #:body body)) (cond [(eq? (web-response-header-code-type resp) 'successful) (deliver-rest)] [(zero? retries-remaining) (log-info "Dead letter for ~v" callback) (dead-letters (enqueue (dead-letters) n)) (deliver-rest)] [else (log-info "Delivery to ~v failed; pausing for ~a seconds; ~a retries remaining. Response: ~v" callback retry-delay retries-remaining resp) (sleep retry-delay) (deliver-one (- retries-remaining 1) (min (* retry-delay 1.618) 30))])))) (delivery-active? #f)) (during (subscription partial-topic $expiry-deadline $canonical-hub callback $secret-bytes $poll-interval-seconds) (define topic (url->string (combine-url/relative (canonical-topic-base-url canonical-hub) partial-topic))) (assert (topic-demand topic poll-interval-seconds)) (on-start (log-info "Subscription configured: ~v ~v ~v ~v ~v" topic expiry-deadline canonical-hub secret-bytes poll-interval-seconds) (cond [expiry-deadline (expiry-timer-id (gensym 'subscription-expiry)) (log-info "Subscription will expire at ~a" expiry-deadline) (send! (set-timer (expiry-timer-id) (* expiry-deadline 1000.0) 'absolute))] [else (log-info "Subscription will not expire") (expiry-timer-id #f)])) (stop-when #:when (expiry-timer-id) (message (timer-expired (expiry-timer-id) _)) (log-info "Subscription expired")) (on (message ($ n (notification topic _ _))) (message-queue (enqueue (message-queue) n)) (when (not (delivery-active?)) (deliver-queued-notifications canonical-hub)))))