New web driver

This commit is contained in:
Tony Garnock-Jones 2016-07-16 16:20:57 -04:00
parent 4f570fcd18
commit d36ccbb0c2
2 changed files with 497 additions and 0 deletions

View File

@ -0,0 +1,404 @@
#lang syndicate/actor
;; More general web driver: supports normal HTTP as well as websockets.
(provide (struct-out web-virtual-host)
(struct-out web-resource)
url->resource
(struct-out web-request)
(struct-out web-request-header)
(struct-out web-response-header)
(struct-out web-response-complete)
(struct-out web-response-chunked)
(struct-out web-response-websocket)
(struct-out web-response-chunk)
(struct-out websocket-message)
spawn-web-driver)
(require net/url)
(require net/rfc6455)
(require net/rfc6455/conn-api)
(require net/rfc6455/dispatcher)
(require net/http-client)
(require racket/exn)
(require racket/tcp)
(require racket/set)
(require racket/async-channel)
(require (only-in racket/bytes bytes-join))
(require (only-in racket/list flatten))
(require (only-in racket/port port->bytes))
(require web-server/http/bindings)
(require web-server/http/request)
(require web-server/http/request-structs)
(require web-server/http/response)
(require web-server/http/response-structs)
(require web-server/private/connection-manager)
(require (only-in web-server/private/util lowercase-symbol!))
(require web-server/dispatchers/dispatch)
(require/activate "timer.rkt")
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(struct web-virtual-host (scheme name port) #:prefab)
(struct web-resource (virtual-host path) #:prefab)
(struct web-request (id direction header* body) #:prefab)
(struct web-request-header (method resource headers query) #:prefab)
(struct web-response-header (code message last-modified-seconds mime-type headers) #:prefab)
(struct web-response-complete (id header body) #:prefab)
(struct web-response-chunked (id header) #:prefab)
(struct web-response-websocket (id headers) #:prefab)
(struct web-response-chunk (id bytes) #:prefab)
(struct websocket-message (id direction body) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Ground-level communication messages
(struct web-raw-request (id port connection req control-ch) #:prefab)
(struct web-raw-client-conn (id connection) #:prefab)
(struct web-incoming-message (id message) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define web-server-max-waiting (make-parameter 511)) ;; sockets
(define web-server-connection-manager (make-parameter #f))
(define web-server-initial-connection-timeout (make-parameter 30)) ;; seconds
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (url->resource u)
(web-resource (web-virtual-host (url-scheme u)
(url-host u)
(url-port u))
(format-url-path u)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (spawn-web-driver)
(actor #:name 'web-server-manager
(react
(during/actor (web-virtual-host "http" _ $port)
#:name (list 'web-server port)
(setup-web-server "http"
(or (web-server-connection-manager)
(start-connection-manager))
port))))
(actor #:name 'web-client-manager
(react
(on (message (web-request $id 'outbound $req $body))
(actor #:name (list 'web-client id)
(do-client-request id req body))))))
(define (setup-web-server scheme cm port)
(define listener (tcp-listen port (web-server-max-waiting) #t))
(define listener-control (make-channel))
(thread (lambda ()
(let loop ()
(sync (handle-evt (tcp-accept-evt listener)
(lambda (ports)
(handle-incoming-connection port cm ports)
(loop)))
(handle-evt listener-control
(match-lambda
['quit (void)]))))))
(on-stop (channel-put listener-control 'quit))
(on (message (web-raw-request $id port $conn $lowlevel-req $control-ch) #:meta-level 1)
(define web-req (web-request id
'inbound
(web-request-header
(string->symbol (string-downcase
(bytes->string/latin-1
(request-method lowlevel-req))))
(web-resource (req->virtual-host scheme lowlevel-req port)
(format-url-path (request-uri lowlevel-req)))
(request-headers lowlevel-req)
(url-query (request-uri lowlevel-req)))
(request-post-data/raw lowlevel-req)))
(actor #:name (list 'web-req id)
(react (on-start (send! (set-timer (list 'web-req id) 100 'relative))
(send! web-req))
(stop-when (message (timer-expired (list 'web-req id) _))
(do-response-complete control-ch
id
(web-response-header 404
#"Not found"
(current-seconds)
#"text/plain"
'())
'()))
(stop-when (message (web-response-complete id $rh $body))
(do-response-complete control-ch id rh body))
(stop-when (asserted (web-response-chunked id $rh))
(do-response-chunked control-ch id rh))
(stop-when (asserted (web-response-websocket id $headers))
(do-response-websocket control-ch id headers))))))
(define (do-response-complete control-ch id rh constree-of-bytes)
(match-define (web-response-header code resp-message last-modified-seconds mime-type headers) rh)
(channel-put control-ch
(list 'response
(response/full code
resp-message
last-modified-seconds
mime-type
(build-headers headers)
(flatten constree-of-bytes)))))
(define (do-response-chunked control-ch id rh)
(match-define (web-response-header code resp-message last-modified-seconds mime-type headers) rh)
(define stream-ch (make-async-channel))
(react (stop-when (retracted (web-response-chunked id rh)))
(on-stop (async-channel-put stream-ch #f))
(on (message (web-response-chunk id $chunk))
(async-channel-put stream-ch (flatten chunk)))
(on-start (channel-put control-ch
(list 'response
(response code
resp-message
last-modified-seconds
mime-type
(build-headers headers)
(lambda (output-port)
(let loop ()
(match (async-channel-get stream-ch)
[#f
(void)]
[bss (for [(bs (in-list bss))]
(write-bytes bs
output-port))
(loop)])))))))))
(define (do-response-websocket control-ch id headers)
(define ws-ch (make-channel))
(react (stop-when (retracted (web-response-websocket id headers)))
(on-start (channel-put control-ch (list 'websocket headers ws-ch)))
(run-websocket-connection id ws-ch)))
(define (run-websocket-connection id ws-ch)
(on-stop (channel-put ws-ch 'quit))
(on (message (websocket-message id 'outbound $body))
(channel-put ws-ch (list 'send body)))
(stop-when (message (web-incoming-message id (? eof-object? _)) #:meta-level 1))
(on (message (web-incoming-message id $body) #:meta-level 1)
(unless (eof-object? body) (send! (websocket-message id 'inbound body)))))
(define (req->virtual-host scheme r port)
(cond [(assq 'host (request-headers r)) =>
(lambda (h)
(match (cdr h)
[(regexp #px"(.*):(\\d+)" (list _ host port))
(web-virtual-host scheme host (string->number port))]
[host
(web-virtual-host scheme host port)]))]
[else
(web-virtual-host scheme #f port)]))
(define (format-url-path u)
(define elements (for/list [(p (in-list (url-path u)))]
(match-define (path/param path-element params) p)
(list* path-element params)))
(foldr (lambda (e acc) (append e (list acc))) '() elements))
(define (build-headers hs)
(for/list ((h (in-list hs)))
(header (string->bytes/utf-8 (symbol->string (car h)))
(string->bytes/utf-8 (cdr h)))))
(define (build-http-client-headers hs)
(for/list ((h (in-list hs)))
(format "~a: ~a" (car h) (cdr h))))
(define (handle-incoming-connection listen-port cm connection-ports)
(thread
(lambda ()
(match-define (list i o) connection-ports)
;; Deliberately construct an empty custodian for the connection. Killing the connection
;; abruptly can cause deadlocks since the connection thread communicates with Syndicate
;; via synchronous channels.
(define conn
(new-connection cm (web-server-initial-connection-timeout) i o (make-custodian) #f))
(define control-ch (make-channel))
(let do-request ()
(define-values (req initial-headers) ;; TODO initial-headers?!?!
(with-handlers ([exn:fail? (lambda (e) (values #f #f))])
(read-request conn listen-port tcp-addresses)))
(when req
(define id (gensym 'web))
(send-ground-message (web-raw-request id listen-port conn req control-ch))
(sync (handle-evt control-ch
(match-lambda
[(list 'websocket reply-headers ws-ch)
(with-handlers ((exn:dispatcher?
(lambda (_e) (bad-request conn req))))
((make-general-websockets-dispatcher
(websocket-connection-main id ws-ch)
(lambda _args (values reply-headers (void))))
conn req))]
[(list 'response resp)
(output-response/method conn resp (request-method req))])))
(do-request))))))
;; D-: uck barf
;; TODO: something to fix this :-/
(define (exn:fail:port-is-closed? e)
(and (exn:fail? e)
(regexp-match #px"port is closed" (exn-message e))))
(define ((websocket-connection-main id ws-ch) wsc _ws-connection-state)
(let loop ()
(sync (handle-evt wsc
(lambda _args
(define msg
(with-handlers ([exn:fail:network? (lambda (e) eof)]
[exn:fail:port-is-closed? (lambda (e) eof)]
[exn:fail? (lambda (e)
(log-error "Unexpected ws-recv error: ~a"
(exn->string e))
eof)])
(ws-recv wsc #:payload-type 'text)))
(send-ground-message (web-incoming-message id msg))
(loop)))
(handle-evt ws-ch
(match-lambda
['quit
(void)]
[(list 'send m)
(with-handlers [(exn:fail:port-is-closed?
(lambda (e)
(ws-close! wsc)))]
(ws-send! wsc m))
(loop)]))))
(ws-close! wsc))
(define (bad-request conn req)
(output-response/method conn
(response/full 400
#"Bad request"
(current-seconds)
#"text/plain"
(list)
(list))
(request-method req)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (do-client-request id req body)
(react
(stop-when (asserted (observe (web-response-websocket id _)))
(do-request-websocket id req))
(stop-when (asserted (observe (web-response-complete id _ _)))
(do-request-complete id req body))
(stop-when (asserted (observe (web-response-chunked id _)))
(do-request-chunked id req body))))
(define (analyze-outbound-request req)
(match-define (web-request-header method
(web-resource (web-virtual-host scheme host port) path)
headers
query)
req)
(values host
(or port (match scheme
["http" 80]
["https" 443]
[_ #f]))
method
(url->string (url scheme
#f
host
port
#t
(let loop ((p path))
(match p
['() '()]
[(list d par ... rest)
(cons (path/param d par) (loop rest))]))
query
#f))
headers))
(define (do-request-websocket id req)
(define-values (_host server-port method urlstr headers) (analyze-outbound-request req))
(define control-ch (make-channel))
(if (not server-port)
(send-ground-message (web-raw-client-conn id #f))
(thread
(lambda ()
(log-info "Connecting to ~a ~a" urlstr (current-inexact-milliseconds))
(define c (with-handlers [(exn? values)]
(ws-connect (string->url urlstr) #:headers headers)))
(when (exn? c)
(log-info "Connection to ~a failed: ~a" urlstr (exn->string c)))
(send-ground-message (web-raw-client-conn id c))
(when (not (exn? c))
(log-info "Connected to ~a ~a" url (current-inexact-milliseconds))
((websocket-connection-main id control-ch) c (void))))))
(react
(stop-when (message (web-raw-client-conn id $c) #:meta-level 1)
(react (stop-when (retracted (observe (web-response-websocket id _))))
(if (ws-conn? c)
(begin (assert (web-response-websocket id (ws-conn-headers c)))
(run-websocket-connection id control-ch))
(assert (web-response-websocket id #f)))))))
(define (do-request-complete id req body)
(define-values (host server-port method urlstr headers) (analyze-outbound-request req))
(thread
(lambda ()
(define response
(with-handlers [(exn? values)]
(when (not server-port)
(error 'http-sendrecv "No server port specified"))
(define-values (first-line header-lines body-port)
(http-sendrecv host
urlstr
#:headers (build-http-client-headers headers)
#:port server-port
#:method (string-upcase (symbol->string method))
#:data (bytes-join (flatten body) #"")))
(match first-line
[(regexp #px"\\S+\\s(\\d+)\\s(.*)" (list _ codebs msgbs))
(define code (string->number (bytes->string/latin-1 codebs)))
(define msg (bytes->string/utf-8 msgbs))
(define response-headers
(for/list ((h (in-list (read-headers (open-input-bytes
(bytes-join header-lines #"\r\n"))))))
(match-define (header k v) h)
(cons (lowercase-symbol! (bytes->string/utf-8 k))
(bytes->string/utf-8 v))))
(define response-body (port->bytes body-port))
(web-response-complete id
(web-response-header code
msg
#f ;; TODO: fill in from response-headers
(cond [(assq 'content-type response-headers)
=> cdr]
[else #f])
response-headers)
response-body)]
[_
(error 'http-sendrecv "Bad first line: ~v" first-line)])))
(send-ground-message (web-raw-client-conn id response))))
(react
(stop-when (message (web-raw-client-conn id $r) #:meta-level 1)
(react (stop-when (retracted (observe (web-response-complete id _ _))))
(if (exn? r)
(assert (web-response-websocket id #f #f))
(begin (assert r)))))))
(define (do-request-chunked id req body)
(log-error "do-request-chunked: unimplemented")
(react (stop-when (retracted (observe (web-response-chunked id _))))
(assert (web-response-chunked id #f))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(spawn-web-driver)

View File

@ -0,0 +1,93 @@
#lang syndicate/actor
;; Simple demo of web driver
(require/activate syndicate/drivers/timer)
(require/activate syndicate/drivers/web)
(require net/url)
(define vh (web-virtual-host "http" ? 9090))
(define (sleep sec)
(define timer-id (gensym 'sleep))
(until (message (timer-expired timer-id _))
(on-start (send! (set-timer timer-id (* sec 1000.0) 'relative)))))
(actor #:name 'server
(react
(field [counter 0])
(assert vh)
(on (message (web-request $id
'inbound
($ req (web-request-header _ (web-resource vh `("ws" ())) _ _))
_))
(actor (react
(assert (web-response-websocket id '()))
(stop-when (retracted (observe (websocket-message id 'outbound _)))
(log-info "Connection dropped"))
(stop-when (message (websocket-message id 'inbound "quit"))
(log-info "Received quit command"))
(on (message (websocket-message id 'inbound $str))
(log-info "Got ~v" str)
(define u (string->url str))
(when (url-scheme u)
(let ((r (gensym 'client)))
(react (on-start
(send! (web-request r
'outbound
(web-request-header 'get
(url->resource u)
'()
'())
#"")))
(stop-when (asserted (web-response-complete r $h $body))
(log-info "Got headers back: ~v" h)
(log-info "Got body back: ~v" body)))))
(send! (websocket-message id 'outbound str))))))
(on (message (web-request $id
'inbound
(web-request-header 'get (web-resource vh `("slow" ())) _ _)
_))
(react (field [done? #f])
(stop-when (rising-edge (done?)))
(assert (web-response-chunked id
(web-response-header 200
#"Slowly"
(current-seconds)
#"text/plain"
'())))
(on (asserted (observe (web-response-chunk id _)))
;;
;; TODO: output-response-body/chunked in web-server's response.rkt
;; doesn't flush each chunk as it appears. Should it?
;;
;; TODO: this kind of protocol pattern appears quite frequently. Perhaps
;; we want a general-purpose *stream* protocol? For use by TCP,
;; websockets, etc etc.
;;
(send! (web-response-chunk id #"first\n"))
(sleep 2)
(send! (web-response-chunk id #"second\n"))
(sleep 2)
(send! (web-response-chunk id #"third\n"))
(sleep 2)
(done? #t))))
(on (message (web-request $id
'inbound
(web-request-header 'get (web-resource vh `("foo" ,$path)) _ _)
_))
(define req-num (counter))
(counter (+ (counter) 1))
(send! (web-response-complete
id
(web-response-header 200
#"OK"
(current-seconds)
#"text/plain"
'())
(string->bytes/utf-8
(format "Hi there. Your path was ~v, and this is request ~a"
path
req-num)))))))