From 30c007e0becfacd1b06ec54acec15460201bb6a8 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 12 Jun 2014 19:46:17 -0400 Subject: [PATCH] Introduce threads and send-ground-message to TCP driver. --- minimart/drivers/tcp.rkt | 120 +++++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 42 deletions(-) diff --git a/minimart/drivers/tcp.rkt b/minimart/drivers/tcp.rkt index 4e0d89a..7ccf910 100644 --- a/minimart/drivers/tcp.rkt +++ b/minimart/drivers/tcp.rkt @@ -26,6 +26,12 @@ (struct tcp-channel (source destination subpacket) #:prefab) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Ground-level communication messages + +(struct tcp-accepted (remote-addr local-addr cin cout) #:prefab) +;; tcp-channel does double-duty as a ground-level message as well + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Driver @@ -41,33 +47,55 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Listener -(struct listener-state (listener server-addr) #:transparent) +(struct listener-state (control-ch server-addr) #:transparent) + +(define (tcp-listener-thread control-ch listener server-addr) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (tcp:tcp-accept-evt listener) + (lambda (cin+cout) + (match-define (list cin cout) cin+cout) + (define-values (local-hostname local-port remote-hostname remote-port) + (tcp:tcp-addresses cin #t)) + (send-ground-message + (tcp-accepted (tcp-address remote-hostname remote-port) + server-addr + cin + cout)) + (loop blocked?)))))) + (tcp:tcp-close listener)) (define (tcp-listener-behavior e state) (match e [(routing-update g) - (match-define (listener-state listener server-addr) state) - (if (gestalt-empty? (gestalt-filter g (pub (tcp-channel ? server-addr ?) #:level 2))) - (begin (when listener (tcp:tcp-close listener)) - (transition (struct-copy listener-state state [listener #f]) (quit))) - #f)] - [(message (event _ (list (list cin cout))) 1 #f) - (define-values (local-hostname local-port remote-hostname remote-port) - (tcp:tcp-addresses cin #t)) - (transition state - (spawn-connection (listener-state-server-addr state) - (tcp-address remote-hostname remote-port) - cin - cout))] + (match-define (listener-state control-ch server-addr) state) + (and control-ch + (if (gestalt-empty? (gestalt-filter g (pub (tcp-channel ? server-addr ?) #:level 2))) + (begin (channel-put control-ch 'quit) + (transition (struct-copy listener-state state [control-ch #f]) (quit))) + (begin (channel-put control-ch 'unblock) + #f)))] + [(message (tcp-accepted remote-addr _ cin cout) 1 #f) + (transition state (spawn-connection (listener-state-server-addr state) + remote-addr + cin + cout))] [_ #f])) (define (spawn-tcp-listener server-addr) (match-define (tcp-listener port) server-addr) (define listener (tcp:tcp-listen port 4 #t)) + (define control-ch (make-channel)) + (thread (lambda () (tcp-listener-thread control-ch listener server-addr))) (spawn tcp-listener-behavior - (listener-state listener server-addr) + (listener-state control-ch server-addr) (gestalt-union (pub (tcp-channel ? server-addr ?) #:level 2) - (sub (event (tcp:tcp-accept-evt listener) ?) #:meta-level 1)))) + (sub (tcp-accepted ? server-addr ? ?) #:meta-level 1)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Outbound Connection @@ -91,14 +119,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Connection -(struct connection-state (seen-peer? local-addr remote-addr cin cout) #:transparent) - -(define (shutdown-connection state) - (define cin (connection-state-cin state)) - (define cout (connection-state-cout state)) - (when cin (close-input-port cin)) - (when cout (close-output-port cout)) - (transition (struct-copy connection-state state [cin #f] [cout #f]) (quit))) +(struct connection-state (seen-peer? control-ch cout) #:transparent) (define (read-bytes-avail-evt len input-port) (guard-evt @@ -111,18 +132,36 @@ (if (= v len) bstr (subbytes bstr 0 v)) v))))))) +(define (tcp-connection-thread remote-addr local-addr control-ch cin) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (read-bytes-avail-evt 32768 cin) + (lambda (eof-or-bs) + (send-ground-message (tcp-channel remote-addr local-addr eof-or-bs)) + (loop (or blocked? (eof-object? eof-or-bs)))))))) + (close-input-port cin)) + +(define (shutdown-connection state) + (match-define (connection-state _ control-ch cout) state) + (when control-ch (channel-put control-ch 'quit)) + (when cout (close-output-port cout)) + (transition (struct-copy connection-state state [control-ch #f] [cout #f]) (quit))) + (define (tcp-connection e state) (with-handlers [((lambda (exn) #t) (lambda (exn) (shutdown-connection state) (raise exn)))] (match e - [(message (event _ (list (? eof-object?))) 1 #f) + [(message (tcp-channel remote-addr local-addr (? eof-object?)) 1 #f) (shutdown-connection state)] - [(message (event _ (list (? bytes? bs))) 1 #f) - (transition state (send (tcp-channel (connection-state-remote-addr state) - (connection-state-local-addr state) - bs)))] + [(message (tcp-channel remote-addr local-addr (? bytes? bs)) 1 #f) + (transition state (send (tcp-channel remote-addr local-addr bs)))] [(message (tcp-channel _ _ bs) 0 #f) (write-bytes bs (connection-state-cout state)) (flush-output (connection-state-cout state)) @@ -132,21 +171,18 @@ [(and (connection-state-seen-peer? state) (gestalt-empty? g)) (shutdown-connection state)] [(and (not (connection-state-seen-peer? state)) (not (gestalt-empty? g))) - (define new-state (struct-copy connection-state state [seen-peer? #t])) - (transition new-state (routing-update (connection-gestalt new-state)))] + (channel-put (connection-state-control-ch state) 'unblock) + (transition (struct-copy connection-state state [seen-peer? #t]) '())] [else #f])] [#f #f]))) -(define (connection-gestalt state) - (match-define (connection-state seen-peer? local-addr remote-addr cin _) state) - (gestalt-union (pub (tcp-channel remote-addr local-addr ?)) - (sub (tcp-channel local-addr remote-addr ?)) - (pub (tcp-channel remote-addr local-addr ?) #:level 1) - (if seen-peer? - (sub (event (read-bytes-avail-evt 32768 cin) ?) #:meta-level 1) - (gestalt-empty)))) - (define (spawn-connection local-addr remote-addr cin cout) - (define state (connection-state #f local-addr remote-addr cin cout)) - (spawn tcp-connection state (connection-gestalt state))) + (define control-ch (make-channel)) + (thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin))) + (spawn tcp-connection + (connection-state #f control-ch cout) + (gestalt-union (pub (tcp-channel remote-addr local-addr ?)) + (sub (tcp-channel local-addr remote-addr ?)) + (pub (tcp-channel remote-addr local-addr ?) #:level 1) + (sub (tcp-channel remote-addr local-addr ?) #:meta-level 1))))