From 061765041b9f9c841bdbc132f6cbd3515679ba73 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 14 Aug 2018 17:58:36 +0100 Subject: [PATCH] Initial quasi-port of Syndicate/rkt web driver --- syndicate/drivers/web.rkt | 304 ++++++++++++++++++++++++++++++++ syndicate/examples/web-core.rkt | 45 +++++ 2 files changed, 349 insertions(+) create mode 100644 syndicate/drivers/web.rkt create mode 100644 syndicate/examples/web-core.rkt diff --git a/syndicate/drivers/web.rkt b/syndicate/drivers/web.rkt new file mode 100644 index 0000000..1f83659 --- /dev/null +++ b/syndicate/drivers/web.rkt @@ -0,0 +1,304 @@ +#lang imperative-syndicate + +(provide (struct-out http-server) + (struct-out http-resource) + (struct-out http-request) + (struct-out http-accepted) + (struct-out http-response) + (struct-out http-response-websocket) + (struct-out http-request-peer-details) + (struct-out http-request-cookie) + (struct-out http-response-chunk) + (struct-out websocket-out) + (struct-out websocket-in)) + +(require racket/async-channel) +(require racket/exn) +(require (only-in racket/list flatten)) +(require (only-in racket/string string-append*)) +(require racket/tcp) + +(require net/rfc6455) +(require net/rfc6455/conn-api) +(require net/rfc6455/dispatcher) +(require net/url) + +(require web-server/http/bindings) +(require web-server/http/cookie) +(require web-server/http/cookie-parse) +(require web-server/http/request) +(require web-server/http/request-structs) +(require web-server/http/response) +(require web-server/http/response-structs) +(require web-server/private/connection-manager) +(require (only-in web-server/private/util lowercase-symbol!)) +(require web-server/dispatchers/dispatch) + +(module+ test (require rackunit)) + +(define-logger syndicate/drivers/web) + +(define (url-path->resource-path up) + (define elements (for/list [(p (in-list up))] + (match-define (path/param path-element params) p) + (list* path-element params))) + (foldr (lambda (e acc) (append e (list acc))) '() elements)) + +(define (build-headers hs) + (for/list ((h (in-list hs))) + (header (string->bytes/utf-8 (symbol->string (car h))) + (string->bytes/utf-8 (cdr h))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; API/Protocol + +(assertion-struct http-server (host port ssl?)) +(assertion-struct http-resource (server path)) + +(assertion-struct http-request (id method resource headers query body)) +(assertion-struct http-accepted (id)) + +(assertion-struct http-response (id code message last-modified-seconds mime-type headers detail)) +;; ^ detail = (U Bytes 'chunked) +(assertion-struct http-response-websocket (id headers)) + +(assertion-struct http-request-peer-details (id local-ip local-port remote-ip remote-port)) +(assertion-struct http-request-cookie (id name value domain path)) + +(message-struct http-response-chunk (id bytes)) + +(message-struct websocket-out (id body)) +(message-struct websocket-in (id body)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Ground messages + +(message-struct web-raw-request (id port connection addresses req control-ch)) +(message-struct web-raw-client-conn (id connection)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define web-server-max-waiting (make-parameter 511)) ;; sockets +(define web-server-connection-manager (make-parameter #f)) +(define web-server-initial-connection-timeout (make-parameter 30)) ;; seconds + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn + #:name 'http-server-factory + + (during (observe (http-request _ _ (http-resource $server _) _ _ _)) + (assert server)) + + (during/spawn (http-server _ $port _) + #:name (list 'http-listener port) + + (define ssl? #f) + (on (asserted (http-server _ port #t)) + (error 'http-listener "SSL service not yet implemented")) ;; TODO + + (define cm (or (web-server-connection-manager) (start-connection-manager))) + + (define listener (tcp-listen port (web-server-max-waiting) #t)) + (define listener-control (make-channel)) + (thread (lambda () + (let loop () + (sync (handle-evt (tcp-accept-evt listener) + (lambda (ports) + (connection-thread port cm ports) + (loop))) + (handle-evt listener-control + (match-lambda + [(list 'quit k-ch) + (tcp-close listener) + (signal-background-activity! -1) + (channel-put k-ch (void))])))))) + (signal-background-activity! +1) + + (on-start (log-syndicate/drivers/web-info "Starting listener on port ~v" port)) + (on-stop (define k-ch (make-channel)) + (log-syndicate/drivers/web-info "Stopping listener on port ~v" port) + (channel-put listener-control (list 'quit k-ch)) + (channel-get k-ch) + (log-syndicate/drivers/web-info "Stopped listener on port ~v" port)) + + (on (message (inbound (web-raw-request $id port $conn $addresses $lowlevel-req $control-ch))) + (spawn #:name (list 'http-request id) + (define root-facet (current-facet)) + + (define method + (string->symbol (string-downcase (bytes->string/latin-1 (request-method lowlevel-req))))) + (define resource (http-resource (req->http-server lowlevel-req port ssl?) + (url-path->resource-path + (url-path (request-uri lowlevel-req))))) + + (assert (http-request id + method + resource + (request-headers lowlevel-req) + (url-query (request-uri lowlevel-req)) + (request-post-data/raw lowlevel-req))) + + (for [(c (request-cookies lowlevel-req))] + (match-define (client-cookie n v d p) c) + (assert (http-request-cookie id n v d p))) + + (match-let ([(list Lip Lport Rip Rport) addresses]) + (assert (http-request-peer-details id Lip Lport Rip Rport))) + + (define (respond/error! code message-bytes) + (respond! + (http-response id code message-bytes (current-seconds) #"text/plain" '() message-bytes))) + + (define (respond! resp) + (match-define (http-response _ c m lms mime-type headers body) resp) + (define hs (build-headers headers)) + (channel-put control-ch + (list 'response + (response/full c m lms mime-type hs (flatten body))))) + + (define (respond/chunked! resp) + (match-define (http-response _ c m lms mime-type headers _) resp) + (define hs (build-headers headers)) + (define stream-ch (make-async-channel)) + (define (output-writer op) + (match (async-channel-get stream-ch) + [#f (void)] + [bss (for [(bs bss)] (write-bytes bs op)) + ;; (flush-output op) ;; seemingly does nothing. TODO + (output-writer op)])) + (react (stop-when (retracted resp)) + (on-stop (async-channel-put stream-ch #f) + (stop-facet root-facet)) + (on (message (http-response-chunk id $chunk)) + (async-channel-put stream-ch (flatten chunk))) + (on-start (channel-put control-ch + (list 'response + (response c m lms mime-type hs output-writer)))))) + + (define (respond/websocket! headers) + (define ws-ch (make-channel)) + (define hs (build-headers headers)) + (react (stop-when (retracted (http-response-websocket id headers))) + (on-start (channel-put control-ch (list 'websocket hs ws-ch))) + (on-stop (channel-put ws-ch 'quit) + (stop-facet root-facet)) + (on (message (websocket-out id $body)) + (channel-put ws-ch (list 'send (string-append* (flatten body))))) + (on (message (inbound (websocket-in id $body))) + (if (eof-object? body) + (stop-current-facet) + (send! (websocket-in id body)))))) + + (field [respondent-exists? #f]) + (on-start (for [(i 3)] (flush!)) ;; TODO: UGHHHH + (when (not (respondent-exists?)) + (stop-facet root-facet (respond/error! 404 #"Not found")))) + + (on (asserted (http-accepted id)) + (respondent-exists? #t) + (react + (stop-when (retracted (http-accepted id)) + (stop-facet root-facet (respond/error! 500 #"Server error"))) + (stop-when (asserted ($ resp (http-response id _ _ _ _ _ $detail))) + (match detail + ['chunked (respond/chunked! resp)] + [_ (stop-facet root-facet (respond! resp))])) + (stop-when (asserted (http-response-websocket id $headers)) + (respond/websocket! headers)))))))) + +(define (req->http-server r port ssl?) + (match (assq 'host (request-headers r)) + [#f + (http-server #f port ssl?)] + [(cons _ (regexp #px"(.*):(\\d+)" (list _ host port))) + (http-server host (string->number port) ssl?)] + [(cons _ host) + (http-server host port ssl?)])) + +(define (connection-thread listen-port cm connection-ports) + (signal-background-activity! +1) + (thread + (lambda () + (match-define (list i o) connection-ports) + ;; Deliberately construct an empty custodian for the connection. Killing the connection + ;; abruptly can cause deadlocks since the connection thread communicates with Syndicate + ;; via synchronous channels. + (define conn + (new-connection cm (web-server-initial-connection-timeout) i o (make-custodian) #f)) + (define addresses + (let-values (((Lip Lport Rip Rport) (tcp-addresses i #t))) + (list Lip Lport Rip Rport))) + (define control-ch (make-channel)) + (let do-request () + (define-values (req should-close?) + (with-handlers ([exn:fail? (lambda (e) (values #f #f))]) + (read-request conn listen-port tcp-addresses))) + (when req + (define id (gensym 'web)) + (ground-send! (inbound (web-raw-request id listen-port conn addresses req control-ch))) + (sync (handle-evt control-ch + (match-lambda + [(list 'websocket reply-headers ws-ch) + (with-handlers ((exn:dispatcher? + (lambda (_e) + (define resp + (response/full 400 + #"Bad request" + (current-seconds) + #"text/plain" + (list) + (list))) + (output-response/method conn + resp + (request-method req))))) + ((make-general-websockets-dispatcher + (websocket-connection-main id ws-ch) + (lambda _args (values reply-headers (void)))) + conn req))] + [(list 'response resp) + (output-response/method conn resp (request-method req)) + (when (not should-close?) + (do-request))]))))) + (signal-background-activity! -1)))) + +(define ((websocket-connection-main id ws-ch) wsc _ws-connection-state) + (define quit-seen? #f) + (define (shutdown!) + (ground-send! (inbound (websocket-in id eof))) + (with-handlers ([(lambda (e) #t) + (lambda (e) (log-syndicate/drivers/web-info + "Unexpected ws-close! error: ~a" + (if (exn? e) + (exn->string e) + (format "~v" e))))]) + (ws-close! wsc))) + (with-handlers [(exn:fail:network? (lambda (e) (shutdown!))) + (exn:fail:port-is-closed? (lambda (e) (shutdown!))) + (exn:fail? (lambda (e) + (log-syndicate/drivers/web-error "Unexpected websocket error: ~a" + (exn->string e)) + (shutdown!)))] + (let loop () + (sync (handle-evt wsc (lambda _args + (define msg (ws-recv wsc #:payload-type 'text)) + (ground-send! (inbound (websocket-in id msg))) + (loop))) + (handle-evt ws-ch (match-lambda + ['quit + (set! quit-seen? #t) + (void)] + [(list 'send m) + (ws-send! wsc m) + (loop)])))) + (ws-close! wsc)) + (when (not quit-seen?) + (let loop () + (when (not (equal? (channel-get ws-ch) 'quit)) + (loop))))) + +;; D-: uck barf +;; TODO: something to fix this :-/ +(define (exn:fail:port-is-closed? e) + (and (exn:fail? e) + (regexp-match #px"port is closed" (exn-message e)))) diff --git a/syndicate/examples/web-core.rkt b/syndicate/examples/web-core.rkt new file mode 100644 index 0000000..f02fd17 --- /dev/null +++ b/syndicate/examples/web-core.rkt @@ -0,0 +1,45 @@ +#lang imperative-syndicate + +(require/activate imperative-syndicate/drivers/web) +(require/activate imperative-syndicate/drivers/timer) + +(define server (http-server "localhost" 8081 #f)) + +(spawn + + (during (http-request $id $method $resource _ _ _) + (stop-when (asserted ($ details (http-request-peer-details id _ _ _ _))) + (log-info "~a: ~a ~v ~v" id method resource details))) + + (during/spawn (http-request $id 'get (http-resource server '("" ())) _ _ _) + (assert (http-accepted id)) + (assert (http-response id 200 #"OK" (current-seconds) + #"text/plain" + '() + #"Hi"))) + + (during/spawn (http-request $id 'get (http-resource server '("chunked" ())) _ _ _) + (assert (http-accepted id)) + (assert (http-response id 200 #"Chunked" (current-seconds) + #"text/plain" + '() + 'chunked)) + (on-start (sleep 1) + (send! (http-response-chunk id #"One\n")) + (sleep 1) + (send! (http-response-chunk id #"Two\n")) + (sleep 1) + (send! (http-response-chunk id #"Three\n")) + (stop-current-facet))) + + (during/spawn (http-request $id 'get (http-resource server '("ws-echo" ())) _ _ _) + (assert (http-accepted id)) + (assert (http-response-websocket id '())) + (on (message (websocket-in id $body)) + (log-info "~a sent: ~v" id body) + (send! (websocket-out id (format "You said: ~a" body)))) + (on (message (websocket-in id "quit")) + (stop-current-facet)) + (on-start (log-info "Starting websocket connection ~a" id)) + (on-stop (log-info "Stopping websocket connection ~a" id))) + )