diff --git a/racketmq/poke.rkt b/racketmq/poke.rkt index 5a33b6b..1c28157 100644 --- a/racketmq/poke.rkt +++ b/racketmq/poke.rkt @@ -45,8 +45,10 @@ #:body (string->bytes/utf-8 (string-join strs)))] [(list "sub" topic) (spawn-subscriber topic)] + [(list "sub" topic sub-id-str) + (spawn-subscriber topic (string->symbol sub-id-str))] [(list "unsub" topic) - (unsubscribe-from topic)] + (send! (list 'unsub topic))] [(list) (void)] [_ @@ -69,13 +71,13 @@ (h1 "Poke running"))))))) (actor #:name 'sink - (on (web-request-incoming (id req) vh 'post ("sink" ()) $body) - (printf "SINK POST: ~v >>> ~v\n" req body) + (on (web-request-incoming (id req) vh 'post ("sink" (,$sub-id ())) $body) + (printf "SINK POST ~a: ~v >>> ~v\n" sub-id req body) (if (equal? body #"fail") (web-respond/status! id 500 #"Deliberate failure") (web-respond/bytes! id #""))) - (on (web-request-get (id req) vh ("sink" ())) - (printf "SINK GET: ~v\n" req) + (on (web-request-get (id req) vh ("sink" (,$sub-id ()))) + (printf "SINK GET ~a: ~v\n" sub-id req) (define challenge (dict-ref (web-request-header-query req) 'hub.challenge "")) (web-respond/bytes! id (string->bytes/utf-8 challenge)))) @@ -88,22 +90,30 @@ (stop-when-timeout 1000 (printf "~a <== timeout\n" id))))) -(define (spawn-subscriber topic) - (request! 'post `("hub" ()) - #:headers (list (cons 'content-type "application/x-www-form-urlencoded")) - #:body (string->bytes/utf-8 - (alist->form-urlencoded - `((hub.callback . "http://localhost:7000/sink") - (hub.poll_interval_seconds . "10") - ;; (hub.lease_seconds . "5") - (hub.mode . "subscribe") - (hub.topic . ,topic)))))) +(define (spawn-subscriber topic [sub-id (gensym 'sub)]) + (actor #:name (list 'subscriber sub-id) -(define (unsubscribe-from topic) - (request! 'post `("hub" ()) - #:headers (list (cons 'content-type "application/x-www-form-urlencoded")) - #:body (string->bytes/utf-8 - (alist->form-urlencoded - `((hub.callback . "http://localhost:7000/sink") - (hub.mode . "unsubscribe") - (hub.topic . ,topic)))))) + (on-start + (request! 'post `("hub" ()) + #:headers (list (cons 'content-type "application/x-www-form-urlencoded")) + #:body (string->bytes/utf-8 + (alist->form-urlencoded + `((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id)) + (hub.poll_interval_seconds . "10") + ;; (hub.lease_seconds . "5") + (hub.mode . "subscribe") + (hub.topic . ,topic))))) + (log-info "Subscriber ~a starting" sub-id)) + + (stop-when (message (list 'unsub topic)) + (log-info "Subscriber ~a stopping" sub-id) + (request! 'post `("hub" ()) + #:headers + (list (cons 'content-type "application/x-www-form-urlencoded")) + #:body + (string->bytes/utf-8 + (alist->form-urlencoded + `((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id)) + (hub.mode . "unsubscribe") + (hub.topic . ,topic))))) + (log-info "Subscriber ~a stopped" sub-id))))