Distinct subscribers in poke.rkt
This commit is contained in:
parent
709042bb46
commit
7dc47f53ff
|
@ -45,8 +45,10 @@
|
||||||
#:body (string->bytes/utf-8 (string-join strs)))]
|
#:body (string->bytes/utf-8 (string-join strs)))]
|
||||||
[(list "sub" topic)
|
[(list "sub" topic)
|
||||||
(spawn-subscriber topic)]
|
(spawn-subscriber topic)]
|
||||||
|
[(list "sub" topic sub-id-str)
|
||||||
|
(spawn-subscriber topic (string->symbol sub-id-str))]
|
||||||
[(list "unsub" topic)
|
[(list "unsub" topic)
|
||||||
(unsubscribe-from topic)]
|
(send! (list 'unsub topic))]
|
||||||
[(list)
|
[(list)
|
||||||
(void)]
|
(void)]
|
||||||
[_
|
[_
|
||||||
|
@ -69,13 +71,13 @@
|
||||||
(h1 "Poke running")))))))
|
(h1 "Poke running")))))))
|
||||||
|
|
||||||
(actor #:name 'sink
|
(actor #:name 'sink
|
||||||
(on (web-request-incoming (id req) vh 'post ("sink" ()) $body)
|
(on (web-request-incoming (id req) vh 'post ("sink" (,$sub-id ())) $body)
|
||||||
(printf "SINK POST: ~v >>> ~v\n" req body)
|
(printf "SINK POST ~a: ~v >>> ~v\n" sub-id req body)
|
||||||
(if (equal? body #"fail")
|
(if (equal? body #"fail")
|
||||||
(web-respond/status! id 500 #"Deliberate failure")
|
(web-respond/status! id 500 #"Deliberate failure")
|
||||||
(web-respond/bytes! id #"")))
|
(web-respond/bytes! id #"")))
|
||||||
(on (web-request-get (id req) vh ("sink" ()))
|
(on (web-request-get (id req) vh ("sink" (,$sub-id ())))
|
||||||
(printf "SINK GET: ~v\n" req)
|
(printf "SINK GET ~a: ~v\n" sub-id req)
|
||||||
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
|
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
|
||||||
(web-respond/bytes! id (string->bytes/utf-8 challenge))))
|
(web-respond/bytes! id (string->bytes/utf-8 challenge))))
|
||||||
|
|
||||||
|
@ -88,22 +90,30 @@
|
||||||
(stop-when-timeout 1000
|
(stop-when-timeout 1000
|
||||||
(printf "~a <== timeout\n" id)))))
|
(printf "~a <== timeout\n" id)))))
|
||||||
|
|
||||||
(define (spawn-subscriber topic)
|
(define (spawn-subscriber topic [sub-id (gensym 'sub)])
|
||||||
(request! 'post `("hub" ())
|
(actor #:name (list 'subscriber sub-id)
|
||||||
#:headers (list (cons 'content-type "application/x-www-form-urlencoded"))
|
|
||||||
#:body (string->bytes/utf-8
|
|
||||||
(alist->form-urlencoded
|
|
||||||
`((hub.callback . "http://localhost:7000/sink")
|
|
||||||
(hub.poll_interval_seconds . "10")
|
|
||||||
;; (hub.lease_seconds . "5")
|
|
||||||
(hub.mode . "subscribe")
|
|
||||||
(hub.topic . ,topic))))))
|
|
||||||
|
|
||||||
(define (unsubscribe-from topic)
|
(on-start
|
||||||
(request! 'post `("hub" ())
|
(request! 'post `("hub" ())
|
||||||
#:headers (list (cons 'content-type "application/x-www-form-urlencoded"))
|
#:headers (list (cons 'content-type "application/x-www-form-urlencoded"))
|
||||||
#:body (string->bytes/utf-8
|
#:body (string->bytes/utf-8
|
||||||
(alist->form-urlencoded
|
(alist->form-urlencoded
|
||||||
`((hub.callback . "http://localhost:7000/sink")
|
`((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id))
|
||||||
(hub.mode . "unsubscribe")
|
(hub.poll_interval_seconds . "10")
|
||||||
(hub.topic . ,topic))))))
|
;; (hub.lease_seconds . "5")
|
||||||
|
(hub.mode . "subscribe")
|
||||||
|
(hub.topic . ,topic)))))
|
||||||
|
(log-info "Subscriber ~a starting" sub-id))
|
||||||
|
|
||||||
|
(stop-when (message (list 'unsub topic))
|
||||||
|
(log-info "Subscriber ~a stopping" sub-id)
|
||||||
|
(request! 'post `("hub" ())
|
||||||
|
#:headers
|
||||||
|
(list (cons 'content-type "application/x-www-form-urlencoded"))
|
||||||
|
#:body
|
||||||
|
(string->bytes/utf-8
|
||||||
|
(alist->form-urlencoded
|
||||||
|
`((hub.callback . ,(format "http://localhost:7000/sink/~a" sub-id))
|
||||||
|
(hub.mode . "unsubscribe")
|
||||||
|
(hub.topic . ,topic)))))
|
||||||
|
(log-info "Subscriber ~a stopped" sub-id))))
|
||||||
|
|
Loading…
Reference in New Issue