racketmq-2017/racketmq/poke.rkt

120 lines
4.9 KiB
Racket

#lang syndicate/actor
(require racket/dict)
(require racket/format)
(require racket/set)
(require racket/string)
(require racket/port)
(require net/url)
(require net/uri-codec)
(require/activate syndicate/drivers/timestate)
(require/activate syndicate/drivers/web)
(require (except-in "private/util.rkt" vh))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define vh (web-virtual-host "http" ? 7000)) ;; client
(define server-res (url->resource (string->url "http://localhost:7827/")))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (request! verb path #:headers [headers '()] #:body [body #""])
(web-request! verb
(url->string (resource->url (struct-copy web-resource server-res [path path])))
#:headers headers
#:body body))
(let ((e (read-bytes-line-evt (current-input-port) 'any)))
(define (print-prompt)
(printf "> ")
(flush-output))
(spawn (on-start (print-prompt))
(stop-when (message (inbound (external-event e (list (? eof-object? _))))))
(assert 'shell-running)
(on (message (inbound (external-event e (list (? bytes? $bs)))))
(match (string-split (string-trim (bytes->string/utf-8 bs)))
[(list "topic" topic)
(request! 'put `("topic" (,topic ())))]
[(list "deltopic" topic)
(request! 'delete `("topic" (,topic ())))]
[(list* "pub" topic strs)
(request! 'post `("topic" (,topic ()))
#:body (string->bytes/utf-8 (string-join strs)))]
[(list "sub" topic)
(spawn-subscriber topic)]
[(list "sub" topic sub-id-str)
(spawn-subscriber topic (string->symbol sub-id-str))]
[(list "unsub" topic)
(send! (list 'unsub topic))]
[(list)
(void)]
[_
(printf "Unexpected input\n")])
(sleep 0.1)
(print-prompt))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(spawn #:name 'main
(stop-when (retracted 'shell-running))
(assert vh)
(on (web-request-get (id req) vh ("" ()))
(spawn*
(web-respond/xexpr! id
`(html
(body
(h1 "Poke running")))))))
(spawn #:name 'sink
(on (web-request-incoming (id req) vh 'post ("sink" (,$sub-id ())) $body)
(printf "SINK POST ~a: ~v >>> ~v\n" sub-id req body)
(if (equal? body #"fail")
(web-respond/status! id 500 #"Deliberate failure")
(web-respond/bytes! id #"")))
(on (web-request-get (id req) vh ("sink" (,$sub-id ())))
(printf "SINK GET ~a: ~v\n" sub-id req)
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
(web-respond/bytes! id (string->bytes/utf-8 challenge))))
(spawn #:name 'incoming-tracer
(assert (observe (web-response-complete _ _ _))) ;; :-( See journal for 30 Oct 2016
(on (web-request-incoming (id req) vh $verb ,$path $body)
(printf "~a ==> ~a ~v ~v\n" id verb path body)
(react (stop-when (message (web-response-complete id $resp $body))
(printf "~a <== ~v ~v\n" id resp body))
(stop-when-timeout 1000
(printf "~a <== timeout\n" id)))))
(define (spawn-subscriber topic [sub-id (gensym 'sub)])
(spawn #:name (list 'subscriber sub-id)
(on-start
(request! 'post `("hub" ())
#:headers (list (cons 'content-type "application/x-www-form-urlencoded"))
#:body (string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id))
(hub.poll_interval_seconds . "10")
;; (hub.lease_seconds . "5")
(hub.mode . "subscribe")
(hub.topic . ,topic)))))
(log-info "Subscriber ~a starting" sub-id))
(stop-when (message (list 'unsub topic))
(log-info "Subscriber ~a stopping" sub-id)
(request! 'post `("hub" ())
#:headers
(list (cons 'content-type "application/x-www-form-urlencoded"))
#:body
(string->bytes/utf-8
(alist->form-urlencoded
`((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id))
(hub.mode . "unsubscribe")
(hub.topic . ,topic)))))
(log-info "Subscriber ~a stopped" sub-id))))