Introduce threads and send-ground-message to TCP driver.
This commit is contained in:
parent
3a5e6c8e29
commit
30c007e0be
|
@ -26,6 +26,12 @@
|
||||||
|
|
||||||
(struct tcp-channel (source destination subpacket) #:prefab)
|
(struct tcp-channel (source destination subpacket) #:prefab)
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Ground-level communication messages
|
||||||
|
|
||||||
|
(struct tcp-accepted (remote-addr local-addr cin cout) #:prefab)
|
||||||
|
;; tcp-channel does double-duty as a ground-level message as well
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Driver
|
;; Driver
|
||||||
|
|
||||||
|
@ -41,33 +47,55 @@
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Listener
|
;; Listener
|
||||||
|
|
||||||
(struct listener-state (listener server-addr) #:transparent)
|
(struct listener-state (control-ch server-addr) #:transparent)
|
||||||
|
|
||||||
|
(define (tcp-listener-thread control-ch listener server-addr)
|
||||||
|
(let loop ((blocked? #t))
|
||||||
|
(sync (handle-evt control-ch
|
||||||
|
(match-lambda
|
||||||
|
['unblock (loop #f)]
|
||||||
|
['quit (void)]))
|
||||||
|
(if blocked?
|
||||||
|
never-evt
|
||||||
|
(handle-evt (tcp:tcp-accept-evt listener)
|
||||||
|
(lambda (cin+cout)
|
||||||
|
(match-define (list cin cout) cin+cout)
|
||||||
|
(define-values (local-hostname local-port remote-hostname remote-port)
|
||||||
|
(tcp:tcp-addresses cin #t))
|
||||||
|
(send-ground-message
|
||||||
|
(tcp-accepted (tcp-address remote-hostname remote-port)
|
||||||
|
server-addr
|
||||||
|
cin
|
||||||
|
cout))
|
||||||
|
(loop blocked?))))))
|
||||||
|
(tcp:tcp-close listener))
|
||||||
|
|
||||||
(define (tcp-listener-behavior e state)
|
(define (tcp-listener-behavior e state)
|
||||||
(match e
|
(match e
|
||||||
[(routing-update g)
|
[(routing-update g)
|
||||||
(match-define (listener-state listener server-addr) state)
|
(match-define (listener-state control-ch server-addr) state)
|
||||||
(if (gestalt-empty? (gestalt-filter g (pub (tcp-channel ? server-addr ?) #:level 2)))
|
(and control-ch
|
||||||
(begin (when listener (tcp:tcp-close listener))
|
(if (gestalt-empty? (gestalt-filter g (pub (tcp-channel ? server-addr ?) #:level 2)))
|
||||||
(transition (struct-copy listener-state state [listener #f]) (quit)))
|
(begin (channel-put control-ch 'quit)
|
||||||
#f)]
|
(transition (struct-copy listener-state state [control-ch #f]) (quit)))
|
||||||
[(message (event _ (list (list cin cout))) 1 #f)
|
(begin (channel-put control-ch 'unblock)
|
||||||
(define-values (local-hostname local-port remote-hostname remote-port)
|
#f)))]
|
||||||
(tcp:tcp-addresses cin #t))
|
[(message (tcp-accepted remote-addr _ cin cout) 1 #f)
|
||||||
(transition state
|
(transition state (spawn-connection (listener-state-server-addr state)
|
||||||
(spawn-connection (listener-state-server-addr state)
|
remote-addr
|
||||||
(tcp-address remote-hostname remote-port)
|
cin
|
||||||
cin
|
cout))]
|
||||||
cout))]
|
|
||||||
[_ #f]))
|
[_ #f]))
|
||||||
|
|
||||||
(define (spawn-tcp-listener server-addr)
|
(define (spawn-tcp-listener server-addr)
|
||||||
(match-define (tcp-listener port) server-addr)
|
(match-define (tcp-listener port) server-addr)
|
||||||
(define listener (tcp:tcp-listen port 4 #t))
|
(define listener (tcp:tcp-listen port 4 #t))
|
||||||
|
(define control-ch (make-channel))
|
||||||
|
(thread (lambda () (tcp-listener-thread control-ch listener server-addr)))
|
||||||
(spawn tcp-listener-behavior
|
(spawn tcp-listener-behavior
|
||||||
(listener-state listener server-addr)
|
(listener-state control-ch server-addr)
|
||||||
(gestalt-union (pub (tcp-channel ? server-addr ?) #:level 2)
|
(gestalt-union (pub (tcp-channel ? server-addr ?) #:level 2)
|
||||||
(sub (event (tcp:tcp-accept-evt listener) ?) #:meta-level 1))))
|
(sub (tcp-accepted ? server-addr ? ?) #:meta-level 1))))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Outbound Connection
|
;; Outbound Connection
|
||||||
|
@ -91,14 +119,7 @@
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Connection
|
;; Connection
|
||||||
|
|
||||||
(struct connection-state (seen-peer? local-addr remote-addr cin cout) #:transparent)
|
(struct connection-state (seen-peer? control-ch cout) #:transparent)
|
||||||
|
|
||||||
(define (shutdown-connection state)
|
|
||||||
(define cin (connection-state-cin state))
|
|
||||||
(define cout (connection-state-cout state))
|
|
||||||
(when cin (close-input-port cin))
|
|
||||||
(when cout (close-output-port cout))
|
|
||||||
(transition (struct-copy connection-state state [cin #f] [cout #f]) (quit)))
|
|
||||||
|
|
||||||
(define (read-bytes-avail-evt len input-port)
|
(define (read-bytes-avail-evt len input-port)
|
||||||
(guard-evt
|
(guard-evt
|
||||||
|
@ -111,18 +132,36 @@
|
||||||
(if (= v len) bstr (subbytes bstr 0 v))
|
(if (= v len) bstr (subbytes bstr 0 v))
|
||||||
v)))))))
|
v)))))))
|
||||||
|
|
||||||
|
(define (tcp-connection-thread remote-addr local-addr control-ch cin)
|
||||||
|
(let loop ((blocked? #t))
|
||||||
|
(sync (handle-evt control-ch
|
||||||
|
(match-lambda
|
||||||
|
['unblock (loop #f)]
|
||||||
|
['quit (void)]))
|
||||||
|
(if blocked?
|
||||||
|
never-evt
|
||||||
|
(handle-evt (read-bytes-avail-evt 32768 cin)
|
||||||
|
(lambda (eof-or-bs)
|
||||||
|
(send-ground-message (tcp-channel remote-addr local-addr eof-or-bs))
|
||||||
|
(loop (or blocked? (eof-object? eof-or-bs))))))))
|
||||||
|
(close-input-port cin))
|
||||||
|
|
||||||
|
(define (shutdown-connection state)
|
||||||
|
(match-define (connection-state _ control-ch cout) state)
|
||||||
|
(when control-ch (channel-put control-ch 'quit))
|
||||||
|
(when cout (close-output-port cout))
|
||||||
|
(transition (struct-copy connection-state state [control-ch #f] [cout #f]) (quit)))
|
||||||
|
|
||||||
(define (tcp-connection e state)
|
(define (tcp-connection e state)
|
||||||
(with-handlers [((lambda (exn) #t)
|
(with-handlers [((lambda (exn) #t)
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(shutdown-connection state)
|
(shutdown-connection state)
|
||||||
(raise exn)))]
|
(raise exn)))]
|
||||||
(match e
|
(match e
|
||||||
[(message (event _ (list (? eof-object?))) 1 #f)
|
[(message (tcp-channel remote-addr local-addr (? eof-object?)) 1 #f)
|
||||||
(shutdown-connection state)]
|
(shutdown-connection state)]
|
||||||
[(message (event _ (list (? bytes? bs))) 1 #f)
|
[(message (tcp-channel remote-addr local-addr (? bytes? bs)) 1 #f)
|
||||||
(transition state (send (tcp-channel (connection-state-remote-addr state)
|
(transition state (send (tcp-channel remote-addr local-addr bs)))]
|
||||||
(connection-state-local-addr state)
|
|
||||||
bs)))]
|
|
||||||
[(message (tcp-channel _ _ bs) 0 #f)
|
[(message (tcp-channel _ _ bs) 0 #f)
|
||||||
(write-bytes bs (connection-state-cout state))
|
(write-bytes bs (connection-state-cout state))
|
||||||
(flush-output (connection-state-cout state))
|
(flush-output (connection-state-cout state))
|
||||||
|
@ -132,21 +171,18 @@
|
||||||
[(and (connection-state-seen-peer? state) (gestalt-empty? g))
|
[(and (connection-state-seen-peer? state) (gestalt-empty? g))
|
||||||
(shutdown-connection state)]
|
(shutdown-connection state)]
|
||||||
[(and (not (connection-state-seen-peer? state)) (not (gestalt-empty? g)))
|
[(and (not (connection-state-seen-peer? state)) (not (gestalt-empty? g)))
|
||||||
(define new-state (struct-copy connection-state state [seen-peer? #t]))
|
(channel-put (connection-state-control-ch state) 'unblock)
|
||||||
(transition new-state (routing-update (connection-gestalt new-state)))]
|
(transition (struct-copy connection-state state [seen-peer? #t]) '())]
|
||||||
[else
|
[else
|
||||||
#f])]
|
#f])]
|
||||||
[#f #f])))
|
[#f #f])))
|
||||||
|
|
||||||
(define (connection-gestalt state)
|
|
||||||
(match-define (connection-state seen-peer? local-addr remote-addr cin _) state)
|
|
||||||
(gestalt-union (pub (tcp-channel remote-addr local-addr ?))
|
|
||||||
(sub (tcp-channel local-addr remote-addr ?))
|
|
||||||
(pub (tcp-channel remote-addr local-addr ?) #:level 1)
|
|
||||||
(if seen-peer?
|
|
||||||
(sub (event (read-bytes-avail-evt 32768 cin) ?) #:meta-level 1)
|
|
||||||
(gestalt-empty))))
|
|
||||||
|
|
||||||
(define (spawn-connection local-addr remote-addr cin cout)
|
(define (spawn-connection local-addr remote-addr cin cout)
|
||||||
(define state (connection-state #f local-addr remote-addr cin cout))
|
(define control-ch (make-channel))
|
||||||
(spawn tcp-connection state (connection-gestalt state)))
|
(thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin)))
|
||||||
|
(spawn tcp-connection
|
||||||
|
(connection-state #f control-ch cout)
|
||||||
|
(gestalt-union (pub (tcp-channel remote-addr local-addr ?))
|
||||||
|
(sub (tcp-channel local-addr remote-addr ?))
|
||||||
|
(pub (tcp-channel remote-addr local-addr ?) #:level 1)
|
||||||
|
(sub (tcp-channel remote-addr local-addr ?) #:meta-level 1))))
|
||||||
|
|
Loading…
Reference in New Issue