Initial quasi-port of Syndicate/rkt web driver

This commit is contained in:
Tony Garnock-Jones 2018-08-14 17:58:36 +01:00
parent 76674c77b3
commit 061765041b
2 changed files with 349 additions and 0 deletions

304
syndicate/drivers/web.rkt Normal file
View File

@ -0,0 +1,304 @@
#lang imperative-syndicate
(provide (struct-out http-server)
(struct-out http-resource)
(struct-out http-request)
(struct-out http-accepted)
(struct-out http-response)
(struct-out http-response-websocket)
(struct-out http-request-peer-details)
(struct-out http-request-cookie)
(struct-out http-response-chunk)
(struct-out websocket-out)
(struct-out websocket-in))
(require racket/async-channel)
(require racket/exn)
(require (only-in racket/list flatten))
(require (only-in racket/string string-append*))
(require racket/tcp)
(require net/rfc6455)
(require net/rfc6455/conn-api)
(require net/rfc6455/dispatcher)
(require net/url)
(require web-server/http/bindings)
(require web-server/http/cookie)
(require web-server/http/cookie-parse)
(require web-server/http/request)
(require web-server/http/request-structs)
(require web-server/http/response)
(require web-server/http/response-structs)
(require web-server/private/connection-manager)
(require (only-in web-server/private/util lowercase-symbol!))
(require web-server/dispatchers/dispatch)
(module+ test (require rackunit))
(define-logger syndicate/drivers/web)
(define (url-path->resource-path up)
(define elements (for/list [(p (in-list up))]
(match-define (path/param path-element params) p)
(list* path-element params)))
(foldr (lambda (e acc) (append e (list acc))) '() elements))
(define (build-headers hs)
(for/list ((h (in-list hs)))
(header (string->bytes/utf-8 (symbol->string (car h)))
(string->bytes/utf-8 (cdr h)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API/Protocol
(assertion-struct http-server (host port ssl?))
(assertion-struct http-resource (server path))
(assertion-struct http-request (id method resource headers query body))
(assertion-struct http-accepted (id))
(assertion-struct http-response (id code message last-modified-seconds mime-type headers detail))
;; ^ detail = (U Bytes 'chunked)
(assertion-struct http-response-websocket (id headers))
(assertion-struct http-request-peer-details (id local-ip local-port remote-ip remote-port))
(assertion-struct http-request-cookie (id name value domain path))
(message-struct http-response-chunk (id bytes))
(message-struct websocket-out (id body))
(message-struct websocket-in (id body))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Ground messages
(message-struct web-raw-request (id port connection addresses req control-ch))
(message-struct web-raw-client-conn (id connection))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define web-server-max-waiting (make-parameter 511)) ;; sockets
(define web-server-connection-manager (make-parameter #f))
(define web-server-initial-connection-timeout (make-parameter 30)) ;; seconds
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(spawn
#:name 'http-server-factory
(during (observe (http-request _ _ (http-resource $server _) _ _ _))
(assert server))
(during/spawn (http-server _ $port _)
#:name (list 'http-listener port)
(define ssl? #f)
(on (asserted (http-server _ port #t))
(error 'http-listener "SSL service not yet implemented")) ;; TODO
(define cm (or (web-server-connection-manager) (start-connection-manager)))
(define listener (tcp-listen port (web-server-max-waiting) #t))
(define listener-control (make-channel))
(thread (lambda ()
(let loop ()
(sync (handle-evt (tcp-accept-evt listener)
(lambda (ports)
(connection-thread port cm ports)
(loop)))
(handle-evt listener-control
(match-lambda
[(list 'quit k-ch)
(tcp-close listener)
(signal-background-activity! -1)
(channel-put k-ch (void))]))))))
(signal-background-activity! +1)
(on-start (log-syndicate/drivers/web-info "Starting listener on port ~v" port))
(on-stop (define k-ch (make-channel))
(log-syndicate/drivers/web-info "Stopping listener on port ~v" port)
(channel-put listener-control (list 'quit k-ch))
(channel-get k-ch)
(log-syndicate/drivers/web-info "Stopped listener on port ~v" port))
(on (message (inbound (web-raw-request $id port $conn $addresses $lowlevel-req $control-ch)))
(spawn #:name (list 'http-request id)
(define root-facet (current-facet))
(define method
(string->symbol (string-downcase (bytes->string/latin-1 (request-method lowlevel-req)))))
(define resource (http-resource (req->http-server lowlevel-req port ssl?)
(url-path->resource-path
(url-path (request-uri lowlevel-req)))))
(assert (http-request id
method
resource
(request-headers lowlevel-req)
(url-query (request-uri lowlevel-req))
(request-post-data/raw lowlevel-req)))
(for [(c (request-cookies lowlevel-req))]
(match-define (client-cookie n v d p) c)
(assert (http-request-cookie id n v d p)))
(match-let ([(list Lip Lport Rip Rport) addresses])
(assert (http-request-peer-details id Lip Lport Rip Rport)))
(define (respond/error! code message-bytes)
(respond!
(http-response id code message-bytes (current-seconds) #"text/plain" '() message-bytes)))
(define (respond! resp)
(match-define (http-response _ c m lms mime-type headers body) resp)
(define hs (build-headers headers))
(channel-put control-ch
(list 'response
(response/full c m lms mime-type hs (flatten body)))))
(define (respond/chunked! resp)
(match-define (http-response _ c m lms mime-type headers _) resp)
(define hs (build-headers headers))
(define stream-ch (make-async-channel))
(define (output-writer op)
(match (async-channel-get stream-ch)
[#f (void)]
[bss (for [(bs bss)] (write-bytes bs op))
;; (flush-output op) ;; seemingly does nothing. TODO
(output-writer op)]))
(react (stop-when (retracted resp))
(on-stop (async-channel-put stream-ch #f)
(stop-facet root-facet))
(on (message (http-response-chunk id $chunk))
(async-channel-put stream-ch (flatten chunk)))
(on-start (channel-put control-ch
(list 'response
(response c m lms mime-type hs output-writer))))))
(define (respond/websocket! headers)
(define ws-ch (make-channel))
(define hs (build-headers headers))
(react (stop-when (retracted (http-response-websocket id headers)))
(on-start (channel-put control-ch (list 'websocket hs ws-ch)))
(on-stop (channel-put ws-ch 'quit)
(stop-facet root-facet))
(on (message (websocket-out id $body))
(channel-put ws-ch (list 'send (string-append* (flatten body)))))
(on (message (inbound (websocket-in id $body)))
(if (eof-object? body)
(stop-current-facet)
(send! (websocket-in id body))))))
(field [respondent-exists? #f])
(on-start (for [(i 3)] (flush!)) ;; TODO: UGHHHH
(when (not (respondent-exists?))
(stop-facet root-facet (respond/error! 404 #"Not found"))))
(on (asserted (http-accepted id))
(respondent-exists? #t)
(react
(stop-when (retracted (http-accepted id))
(stop-facet root-facet (respond/error! 500 #"Server error")))
(stop-when (asserted ($ resp (http-response id _ _ _ _ _ $detail)))
(match detail
['chunked (respond/chunked! resp)]
[_ (stop-facet root-facet (respond! resp))]))
(stop-when (asserted (http-response-websocket id $headers))
(respond/websocket! headers))))))))
(define (req->http-server r port ssl?)
(match (assq 'host (request-headers r))
[#f
(http-server #f port ssl?)]
[(cons _ (regexp #px"(.*):(\\d+)" (list _ host port)))
(http-server host (string->number port) ssl?)]
[(cons _ host)
(http-server host port ssl?)]))
(define (connection-thread listen-port cm connection-ports)
(signal-background-activity! +1)
(thread
(lambda ()
(match-define (list i o) connection-ports)
;; Deliberately construct an empty custodian for the connection. Killing the connection
;; abruptly can cause deadlocks since the connection thread communicates with Syndicate
;; via synchronous channels.
(define conn
(new-connection cm (web-server-initial-connection-timeout) i o (make-custodian) #f))
(define addresses
(let-values (((Lip Lport Rip Rport) (tcp-addresses i #t)))
(list Lip Lport Rip Rport)))
(define control-ch (make-channel))
(let do-request ()
(define-values (req should-close?)
(with-handlers ([exn:fail? (lambda (e) (values #f #f))])
(read-request conn listen-port tcp-addresses)))
(when req
(define id (gensym 'web))
(ground-send! (inbound (web-raw-request id listen-port conn addresses req control-ch)))
(sync (handle-evt control-ch
(match-lambda
[(list 'websocket reply-headers ws-ch)
(with-handlers ((exn:dispatcher?
(lambda (_e)
(define resp
(response/full 400
#"Bad request"
(current-seconds)
#"text/plain"
(list)
(list)))
(output-response/method conn
resp
(request-method req)))))
((make-general-websockets-dispatcher
(websocket-connection-main id ws-ch)
(lambda _args (values reply-headers (void))))
conn req))]
[(list 'response resp)
(output-response/method conn resp (request-method req))
(when (not should-close?)
(do-request))])))))
(signal-background-activity! -1))))
(define ((websocket-connection-main id ws-ch) wsc _ws-connection-state)
(define quit-seen? #f)
(define (shutdown!)
(ground-send! (inbound (websocket-in id eof)))
(with-handlers ([(lambda (e) #t)
(lambda (e) (log-syndicate/drivers/web-info
"Unexpected ws-close! error: ~a"
(if (exn? e)
(exn->string e)
(format "~v" e))))])
(ws-close! wsc)))
(with-handlers [(exn:fail:network? (lambda (e) (shutdown!)))
(exn:fail:port-is-closed? (lambda (e) (shutdown!)))
(exn:fail? (lambda (e)
(log-syndicate/drivers/web-error "Unexpected websocket error: ~a"
(exn->string e))
(shutdown!)))]
(let loop ()
(sync (handle-evt wsc (lambda _args
(define msg (ws-recv wsc #:payload-type 'text))
(ground-send! (inbound (websocket-in id msg)))
(loop)))
(handle-evt ws-ch (match-lambda
['quit
(set! quit-seen? #t)
(void)]
[(list 'send m)
(ws-send! wsc m)
(loop)]))))
(ws-close! wsc))
(when (not quit-seen?)
(let loop ()
(when (not (equal? (channel-get ws-ch) 'quit))
(loop)))))
;; D-: uck barf
;; TODO: something to fix this :-/
(define (exn:fail:port-is-closed? e)
(and (exn:fail? e)
(regexp-match #px"port is closed" (exn-message e))))

View File

@ -0,0 +1,45 @@
#lang imperative-syndicate
(require/activate imperative-syndicate/drivers/web)
(require/activate imperative-syndicate/drivers/timer)
(define server (http-server "localhost" 8081 #f))
(spawn
(during (http-request $id $method $resource _ _ _)
(stop-when (asserted ($ details (http-request-peer-details id _ _ _ _)))
(log-info "~a: ~a ~v ~v" id method resource details)))
(during/spawn (http-request $id 'get (http-resource server '("" ())) _ _ _)
(assert (http-accepted id))
(assert (http-response id 200 #"OK" (current-seconds)
#"text/plain"
'()
#"Hi")))
(during/spawn (http-request $id 'get (http-resource server '("chunked" ())) _ _ _)
(assert (http-accepted id))
(assert (http-response id 200 #"Chunked" (current-seconds)
#"text/plain"
'()
'chunked))
(on-start (sleep 1)
(send! (http-response-chunk id #"One\n"))
(sleep 1)
(send! (http-response-chunk id #"Two\n"))
(sleep 1)
(send! (http-response-chunk id #"Three\n"))
(stop-current-facet)))
(during/spawn (http-request $id 'get (http-resource server '("ws-echo" ())) _ _ _)
(assert (http-accepted id))
(assert (http-response-websocket id '()))
(on (message (websocket-in id $body))
(log-info "~a sent: ~v" id body)
(send! (websocket-out id (format "You said: ~a" body))))
(on (message (websocket-in id "quit"))
(stop-current-facet))
(on-start (log-info "Starting websocket connection ~a" id))
(on-stop (log-info "Stopping websocket connection ~a" id)))
)