#lang syndicate/actor (require racket/dict) (require racket/format) (require racket/set) (require racket/string) (require racket/port) (require net/url) (require net/uri-codec) (require/activate syndicate/drivers/timestate) (require/activate syndicate/drivers/web) (require (except-in "private/util.rkt" vh)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define vh (web-virtual-host "http" ? 7000)) ;; client (define server-res (url->resource (string->url "http://localhost:7827/"))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (request! verb path #:headers [headers '()] #:body [body #""]) (web-request! verb (url->string (resource->url (struct-copy web-resource server-res [path path]))) #:headers headers #:body body)) (let ((e (read-bytes-line-evt (current-input-port) 'any))) (define (print-prompt) (printf "> ") (flush-output)) (spawn (on-start (print-prompt)) (stop-when (message (inbound (external-event e (list (? eof-object? _)))))) (assert 'shell-running) (on (message (inbound (external-event e (list (? bytes? $bs))))) (match (string-split (string-trim (bytes->string/utf-8 bs))) [(list "topic" topic) (request! 'put `("topic" (,topic ())))] [(list "deltopic" topic) (request! 'delete `("topic" (,topic ())))] [(list* "pub" topic strs) (request! 'post `("topic" (,topic ())) #:body (string->bytes/utf-8 (string-join strs)))] [(list "sub" topic) (spawn-subscriber topic)] [(list "sub" topic sub-id-str) (spawn-subscriber topic (string->symbol sub-id-str))] [(list "unsub" topic) (send! (list 'unsub topic))] [(list) (void)] [_ (printf "Unexpected input\n")]) (sleep 0.1) (print-prompt)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (spawn #:name 'main (stop-when (retracted 'shell-running)) (assert vh) (on (web-request-get (id req) vh ("" ())) (spawn* (web-respond/xexpr! id `(html (body (h1 "Poke running"))))))) (spawn #:name 'sink (on (web-request-incoming (id req) vh 'post ("sink" (,$sub-id ())) $body) (printf "SINK POST ~a: ~v >>> ~v\n" sub-id req body) (if (equal? body #"fail") (web-respond/status! id 500 #"Deliberate failure") (web-respond/bytes! id #""))) (on (web-request-get (id req) vh ("sink" (,$sub-id ()))) (printf "SINK GET ~a: ~v\n" sub-id req) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (web-respond/bytes! id (string->bytes/utf-8 challenge)))) (spawn #:name 'incoming-tracer (assert (observe (web-response-complete _ _ _))) ;; :-( See journal for 30 Oct 2016 (on (web-request-incoming (id req) vh $verb ,$path $body) (printf "~a ==> ~a ~v ~v\n" id verb path body) (react (stop-when (message (web-response-complete id $resp $body)) (printf "~a <== ~v ~v\n" id resp body)) (stop-when-timeout 1000 (printf "~a <== timeout\n" id))))) (define (spawn-subscriber topic [sub-id (gensym 'sub)]) (spawn #:name (list 'subscriber sub-id) (on-start (request! 'post `("hub" ()) #:headers (list (cons 'content-type "application/x-www-form-urlencoded")) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id)) (hub.poll_interval_seconds . "10") ;; (hub.lease_seconds . "5") (hub.mode . "subscribe") (hub.topic . ,topic))))) (log-info "Subscriber ~a starting" sub-id)) (stop-when (message (list 'unsub topic)) (log-info "Subscriber ~a stopping" sub-id) (request! 'post `("hub" ()) #:headers (list (cons 'content-type "application/x-www-form-urlencoded")) #:body (string->bytes/utf-8 (alist->form-urlencoded `((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id)) (hub.mode . "unsubscribe") (hub.topic . ,topic))))) (log-info "Subscriber ~a stopped" sub-id))))