108 lines
4.1 KiB
Racket
108 lines
4.1 KiB
Racket
#lang syndicate/actor
|
|
|
|
(require racket/dict)
|
|
(require racket/format)
|
|
(require racket/set)
|
|
(require racket/string)
|
|
(require racket/port)
|
|
(require net/url)
|
|
(require net/uri-codec)
|
|
|
|
(require/activate syndicate/drivers/timer)
|
|
(require/activate syndicate/drivers/web)
|
|
|
|
(require "private/util.rkt")
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(define vh (web-virtual-host "http" ? 7000)) ;; client
|
|
|
|
(define server-res (url->resource (string->url "http://localhost:7827/")))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(define (request! verb path #:body [body #""])
|
|
(web-request! verb
|
|
(url->string (resource->url (struct-copy web-resource server-res [path path])))
|
|
#:body body))
|
|
|
|
(let ((e (read-bytes-line-evt (current-input-port) 'any)))
|
|
(define (print-prompt)
|
|
(printf "> ")
|
|
(flush-output))
|
|
(actor (on-start (print-prompt))
|
|
(stop-when (message (inbound (external-event e (list (? eof-object? _))))))
|
|
(assert 'shell-running)
|
|
(on (message (inbound (external-event e (list (? bytes? $bs)))))
|
|
(match (string-split (string-trim (bytes->string/utf-8 bs)))
|
|
[(list "topic" topic)
|
|
(request! 'put `("topic" (,topic ())))]
|
|
[(list "deltopic" topic)
|
|
(request! 'delete `("topic" (,topic ())))]
|
|
[(list* "pub" topic strs)
|
|
(request! 'post `("topic" (,topic ()))
|
|
#:body (string->bytes/utf-8 (string-join strs)))]
|
|
[(list "sub" topic)
|
|
(spawn-subscriber topic)]
|
|
[(list "unsub" topic)
|
|
(unsubscribe-from topic)]
|
|
[(list)
|
|
(void)]
|
|
[_
|
|
(printf "Unexpected input\n")])
|
|
(sleep 0.1)
|
|
(print-prompt))))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(actor #:name 'main
|
|
(stop-when (retracted 'shell-running))
|
|
|
|
(assert vh)
|
|
|
|
(on (web-request-get (id req) vh ("" ()))
|
|
(actor*
|
|
(web-respond/xexpr! id
|
|
`(html
|
|
(body
|
|
(h1 "Poke running")))))))
|
|
|
|
(actor #:name 'sink
|
|
(on (web-request-incoming (id req) vh 'post ("sink" ()) $body)
|
|
(printf "SINK POST: ~v >>> ~v\n" req body)
|
|
(if (equal? body #"fail")
|
|
(web-respond/status! id 500 #"Deliberate failure")
|
|
(web-respond/bytes! id #"")))
|
|
(on (web-request-get (id req) vh ("sink" ()))
|
|
(printf "SINK GET: ~v\n" req)
|
|
(define challenge (dict-ref (web-request-header-query req) 'hub.challenge ""))
|
|
(web-respond/bytes! id (string->bytes/utf-8 challenge))))
|
|
|
|
(actor #:name 'incoming-tracer
|
|
(assert (observe (web-response-complete _ _ _))) ;; :-( See journal for 30 Oct 2016
|
|
(on (web-request-incoming (id req) vh $verb ,$path $body)
|
|
(printf "~a ==> ~a ~v ~v\n" id verb path body)
|
|
(react (stop-when (message (web-response-complete id $resp $body))
|
|
(printf "~a <== ~v ~v\n" id resp body))
|
|
(on-start (send! (set-timer (list 'incoming-tracer id) 1000 'relative)))
|
|
(stop-when (message (timer-expired (list 'incoming-tracer id) _))
|
|
(printf "~a <== timeout\n" id)))))
|
|
|
|
(define (spawn-subscriber topic)
|
|
(request! 'post `("hub" ())
|
|
#:body (string->bytes/utf-8
|
|
(alist->form-urlencoded
|
|
`((hub.callback . "http://localhost:7000/sink")
|
|
(hub.poll_interval_seconds . "10")
|
|
;; (hub.lease_seconds . "5")
|
|
(hub.mode . "subscribe")
|
|
(hub.topic . ,topic))))))
|
|
|
|
(define (unsubscribe-from topic)
|
|
(request! 'post `("hub" ())
|
|
#:body (string->bytes/utf-8
|
|
(alist->form-urlencoded
|
|
`((hub.callback . "http://localhost:7000/sink")
|
|
(hub.mode . "unsubscribe")
|
|
(hub.topic . ,topic))))))
|