336 lines
15 KiB
Racket
336 lines
15 KiB
Racket
#lang imperative-syndicate
|
|
|
|
(provide (struct-out http-server)
|
|
(struct-out http-resource)
|
|
(struct-out http-request)
|
|
(struct-out http-accepted)
|
|
(except-out (struct-out http-response) http-response)
|
|
(rename-out [make-http-response http-response]
|
|
[http-response <http-response>])
|
|
(except-out (struct-out http-response-websocket) http-response-websocket)
|
|
(rename-out [make-http-response-websocket http-response-websocket]
|
|
[http-response-websocket <http-response-websocket>])
|
|
(struct-out http-request-peer-details)
|
|
(struct-out http-request-cookie)
|
|
(struct-out http-response-chunk)
|
|
(struct-out websocket-out)
|
|
(struct-out websocket-in)
|
|
|
|
xexpr->bytes/utf-8)
|
|
|
|
(require racket/async-channel)
|
|
(require racket/exn)
|
|
(require (only-in racket/list flatten))
|
|
(require (only-in racket/string string-append*))
|
|
(require racket/tcp)
|
|
|
|
(require net/rfc6455)
|
|
(require net/rfc6455/conn-api)
|
|
(require net/rfc6455/dispatcher)
|
|
(require net/url)
|
|
|
|
(require struct-defaults)
|
|
|
|
(require web-server/http/bindings)
|
|
(require web-server/http/cookie)
|
|
(require web-server/http/cookie-parse)
|
|
(require web-server/http/request)
|
|
(require web-server/http/request-structs)
|
|
(require web-server/http/response)
|
|
(require web-server/http/response-structs)
|
|
(require web-server/private/connection-manager)
|
|
(require (only-in web-server/private/util lowercase-symbol!))
|
|
(require web-server/dispatchers/dispatch)
|
|
|
|
(require xml)
|
|
|
|
(module+ test (require rackunit))
|
|
|
|
(define-logger syndicate/drivers/web)
|
|
|
|
(define (url-path->resource-path up)
|
|
(define elements (for/list [(p (in-list up))]
|
|
(match-define (path/param path-element params) p)
|
|
(list* path-element params)))
|
|
(foldr (lambda (e acc) (append e (list acc))) '() elements))
|
|
|
|
(define (build-headers hs)
|
|
(for/list ((h (in-list hs)))
|
|
(header (string->bytes/utf-8 (symbol->string (car h)))
|
|
(string->bytes/utf-8 (cdr h)))))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
;; API/Protocol
|
|
|
|
(assertion-struct http-server (host port ssl?))
|
|
(assertion-struct http-resource (server path))
|
|
|
|
(assertion-struct http-request (id method resource headers query body))
|
|
(assertion-struct http-accepted (id))
|
|
|
|
(assertion-struct http-response (id code message last-modified-seconds mime-type headers detail))
|
|
;; ^ detail = (U Bytes 'chunked)
|
|
(assertion-struct http-response-websocket (id headers))
|
|
|
|
(assertion-struct http-request-peer-details (id local-ip local-port remote-ip remote-port))
|
|
(assertion-struct http-request-cookie (id name value domain path))
|
|
|
|
(message-struct http-response-chunk (id bytes))
|
|
|
|
(message-struct websocket-out (id body))
|
|
(message-struct websocket-in (id body))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
;; Ground messages
|
|
|
|
(message-struct web-raw-request (id port connection addresses req control-ch))
|
|
(message-struct web-raw-client-conn (id connection))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(define web-server-max-waiting (make-parameter 511)) ;; sockets
|
|
(define web-server-connection-manager (make-parameter #f))
|
|
(define web-server-initial-connection-timeout (make-parameter 30)) ;; seconds
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(spawn
|
|
#:name 'http-server-factory
|
|
|
|
(during (observe (http-request _ _ (http-resource $server _) _ _ _))
|
|
(assert server))
|
|
|
|
(during/spawn (http-server _ $port _)
|
|
#:name (list 'http-listener port)
|
|
|
|
(define ssl? #f)
|
|
(on (asserted (http-server _ port #t))
|
|
(error 'http-listener "SSL service not yet implemented")) ;; TODO
|
|
|
|
(define cm (or (web-server-connection-manager) (start-connection-manager)))
|
|
|
|
(define listener (tcp-listen port (web-server-max-waiting) #t))
|
|
(define listener-control (make-channel))
|
|
(thread (lambda ()
|
|
(let loop ()
|
|
(sync (handle-evt (tcp-accept-evt listener)
|
|
(lambda (ports)
|
|
(connection-thread port cm ports)
|
|
(loop)))
|
|
(handle-evt listener-control
|
|
(match-lambda
|
|
[(list 'quit k-ch)
|
|
(tcp-close listener)
|
|
(signal-background-activity! -1)
|
|
(channel-put k-ch (void))]))))))
|
|
(signal-background-activity! +1)
|
|
|
|
(on-start (log-syndicate/drivers/web-info "Starting listener on port ~v" port))
|
|
(on-stop (define k-ch (make-channel))
|
|
(log-syndicate/drivers/web-info "Stopping listener on port ~v" port)
|
|
(channel-put listener-control (list 'quit k-ch))
|
|
(channel-get k-ch)
|
|
(log-syndicate/drivers/web-info "Stopped listener on port ~v" port))
|
|
|
|
(on (message (inbound (web-raw-request $id port $conn $addresses $lowlevel-req $control-ch)))
|
|
(spawn #:name (list 'http-request id)
|
|
(define root-facet (current-facet))
|
|
|
|
(define method
|
|
(string->symbol (string-downcase (bytes->string/latin-1 (request-method lowlevel-req)))))
|
|
(define resource (http-resource (req->http-server lowlevel-req port ssl?)
|
|
(url-path->resource-path
|
|
(url-path (request-uri lowlevel-req)))))
|
|
|
|
(assert (http-request id
|
|
method
|
|
resource
|
|
(request-headers lowlevel-req)
|
|
(url-query (request-uri lowlevel-req))
|
|
(request-post-data/raw lowlevel-req)))
|
|
|
|
(for [(c (request-cookies lowlevel-req))]
|
|
(match-define (client-cookie n v d p) c)
|
|
(assert (http-request-cookie id n v d p)))
|
|
|
|
(match-let ([(list Lip Lport Rip Rport) addresses])
|
|
(assert (http-request-peer-details id Lip Lport Rip Rport)))
|
|
|
|
(define (respond! resp)
|
|
(match-define (http-response _ c m lms mime-type headers body) resp)
|
|
(define hs (build-headers headers))
|
|
(channel-put control-ch
|
|
(list 'response
|
|
(response/full c m lms mime-type hs (flatten body)))))
|
|
|
|
(define (respond/chunked! resp)
|
|
(match-define (http-response _ c m lms mime-type headers _) resp)
|
|
(define hs (build-headers headers))
|
|
(define stream-ch (make-async-channel))
|
|
(define (output-writer op)
|
|
(match (async-channel-get stream-ch)
|
|
[#f (void)]
|
|
[bss (for [(bs bss)] (write-bytes bs op))
|
|
;; (flush-output op) ;; seemingly does nothing. TODO
|
|
(output-writer op)]))
|
|
(react (stop-when (retracted resp))
|
|
(on-stop (async-channel-put stream-ch #f)
|
|
(stop-facet root-facet))
|
|
(on (message (http-response-chunk id $chunk))
|
|
(async-channel-put stream-ch (flatten chunk)))
|
|
(on-start (channel-put control-ch
|
|
(list 'response
|
|
(response c m lms mime-type hs output-writer))))))
|
|
|
|
(define (respond/websocket! headers)
|
|
(define ws-ch (make-channel))
|
|
(define hs (build-headers headers))
|
|
(react (stop-when (retracted (http-response-websocket id headers)))
|
|
(on-start (channel-put control-ch (list 'websocket hs ws-ch)))
|
|
(on-stop (channel-put ws-ch 'quit)
|
|
(stop-facet root-facet))
|
|
(on (message (websocket-out id $body))
|
|
(channel-put ws-ch (list 'send (string-append* (flatten body)))))
|
|
(on (message (inbound (websocket-in id $body)))
|
|
(if (eof-object? body)
|
|
(stop-current-facet)
|
|
(send! (websocket-in id body))))))
|
|
|
|
(field [respondent-exists? #f])
|
|
(on-start (for [(i 3)] (flush!)) ;; TODO: UGHHHH
|
|
(when (not (respondent-exists?))
|
|
(stop-facet root-facet
|
|
(respond! (make-http-response #:code 404
|
|
#:message #"Not found"
|
|
id
|
|
(xexpr->bytes/utf-8
|
|
`(html (h1 "Not found"))))))))
|
|
|
|
(on (asserted (http-accepted id))
|
|
(respondent-exists? #t)
|
|
(react
|
|
(stop-when (retracted (http-accepted id))
|
|
(stop-facet root-facet
|
|
(respond! (make-http-response #:code 500
|
|
#:message #"Server error"
|
|
id
|
|
(xexpr->bytes/utf-8
|
|
`(html (h1 "Server error")))))))
|
|
(stop-when (asserted ($ resp (http-response id _ _ _ _ _ $detail)))
|
|
(match detail
|
|
['chunked (respond/chunked! resp)]
|
|
[_ (stop-facet root-facet (respond! resp))]))
|
|
(stop-when (asserted (http-response-websocket id $headers))
|
|
(respond/websocket! headers))))))))
|
|
|
|
(define (req->http-server r port ssl?)
|
|
(match (assq 'host (request-headers r))
|
|
[#f
|
|
(http-server #f port ssl?)]
|
|
[(cons _ (regexp #px"(.*):(\\d+)" (list _ host port)))
|
|
(http-server host (string->number port) ssl?)]
|
|
[(cons _ host)
|
|
(http-server host port ssl?)]))
|
|
|
|
(define (connection-thread listen-port cm connection-ports)
|
|
(signal-background-activity! +1)
|
|
(thread
|
|
(lambda ()
|
|
(match-define (list i o) connection-ports)
|
|
;; Deliberately construct an empty custodian for the connection. Killing the connection
|
|
;; abruptly can cause deadlocks since the connection thread communicates with Syndicate
|
|
;; via synchronous channels.
|
|
(define conn
|
|
(new-connection cm (web-server-initial-connection-timeout) i o (make-custodian) #f))
|
|
(define addresses
|
|
(let-values (((Lip Lport Rip Rport) (tcp-addresses i #t)))
|
|
(list Lip Lport Rip Rport)))
|
|
(define control-ch (make-channel))
|
|
(let do-request ()
|
|
(define-values (req should-close?)
|
|
(with-handlers ([exn:fail? (lambda (e) (values #f #f))])
|
|
(read-request conn listen-port tcp-addresses)))
|
|
(when req
|
|
(define id (gensym 'web))
|
|
(ground-send! (inbound (web-raw-request id listen-port conn addresses req control-ch)))
|
|
(sync (handle-evt control-ch
|
|
(match-lambda
|
|
[(list 'websocket reply-headers ws-ch)
|
|
(with-handlers ((exn:dispatcher?
|
|
(lambda (_e)
|
|
(define resp
|
|
(response/full 400
|
|
#"Bad request"
|
|
(current-seconds)
|
|
#"text/plain"
|
|
(list)
|
|
(list)))
|
|
(output-response/method conn
|
|
resp
|
|
(request-method req)))))
|
|
((make-general-websockets-dispatcher
|
|
(websocket-connection-main id ws-ch)
|
|
(lambda _args (values reply-headers (void))))
|
|
conn req))]
|
|
[(list 'response resp)
|
|
(output-response/method conn resp (request-method req))
|
|
(when (not should-close?)
|
|
(do-request))])))))
|
|
(signal-background-activity! -1))))
|
|
|
|
(define ((websocket-connection-main id ws-ch) wsc _ws-connection-state)
|
|
(define quit-seen? #f)
|
|
(define (shutdown!)
|
|
(ground-send! (inbound (websocket-in id eof)))
|
|
(with-handlers ([(lambda (e) #t)
|
|
(lambda (e) (log-syndicate/drivers/web-info
|
|
"Unexpected ws-close! error: ~a"
|
|
(if (exn? e)
|
|
(exn->string e)
|
|
(format "~v" e))))])
|
|
(ws-close! wsc)))
|
|
(with-handlers [(exn:fail:network? (lambda (e) (shutdown!)))
|
|
(exn:fail:port-is-closed? (lambda (e) (shutdown!)))
|
|
(exn:fail? (lambda (e)
|
|
(log-syndicate/drivers/web-error "Unexpected websocket error: ~a"
|
|
(exn->string e))
|
|
(shutdown!)))]
|
|
(let loop ()
|
|
(sync (handle-evt wsc (lambda _args
|
|
(define msg (ws-recv wsc #:payload-type 'text))
|
|
(ground-send! (inbound (websocket-in id msg)))
|
|
(loop)))
|
|
(handle-evt ws-ch (match-lambda
|
|
['quit
|
|
(set! quit-seen? #t)
|
|
(void)]
|
|
[(list 'send m)
|
|
(ws-send! wsc m)
|
|
(loop)]))))
|
|
(ws-close! wsc))
|
|
(when (not quit-seen?)
|
|
(let loop ()
|
|
(when (not (equal? (channel-get ws-ch) 'quit))
|
|
(loop)))))
|
|
|
|
;; D-: uck barf
|
|
;; TODO: something to fix this :-/
|
|
(define (exn:fail:port-is-closed? e)
|
|
(and (exn:fail? e)
|
|
(regexp-match #px"port is closed" (exn-message e))))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(begin-for-declarations
|
|
(define-struct-defaults make-http-response http-response
|
|
(#:code [http-response-code 200]
|
|
#:message [http-response-message #"OK"]
|
|
#:last-modified-seconds [http-response-last-modified-seconds (current-seconds)]
|
|
#:mime-type [http-response-mime-type #"text/html"]
|
|
#:headers [http-response-headers '()]))
|
|
(define-struct-defaults make-http-response-websocket http-response-websocket
|
|
(#:headers [http-response-websocket-headers '()])))
|
|
|
|
(define (xexpr->bytes/utf-8 #:preamble [preamble #"<!DOCTYPE html>"] xexpr)
|
|
(bytes-append preamble (string->bytes/utf-8 (xexpr->string xexpr))))
|