From d5fe945addc53375e7bc8d0d62febb8c9d3c427e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 9 May 2012 16:24:36 -0400 Subject: [PATCH] TCP driver and test program --- os2-tcp-test.rkt | 49 ++++++++++ os2-tcp.rkt | 246 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 295 insertions(+) create mode 100644 os2-tcp-test.rkt create mode 100644 os2-tcp.rkt diff --git a/os2-tcp-test.rkt b/os2-tcp-test.rkt new file mode 100644 index 0000000..7a08c33 --- /dev/null +++ b/os2-tcp-test.rkt @@ -0,0 +1,49 @@ +#lang racket/base +;; Trivial example program demonstrating os2-tcp.rkt. + +(require racket/set) +(require racket/match) +(require "os2.rkt") +(require "os2-tcp.rkt") + +(define (connection-handler local-addr) + (transition (set) ;; of remote TcpAddresses + (role 'inbound-handler (topic-subscriber (tcp-channel (wild) local-addr (wild))) + #:state active-remotes + #:topic t + #:on-presence (match t + [(topic 'publisher (tcp-channel (== local-addr) _ _) #f) + ;; Ignore loopback flow. + active-remotes] + [(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f) + (write `(arrived ,remote-addr)) (newline) + (transition (set-add active-remotes remote-addr) + (send-tcp-mode remote-addr local-addr 'lines) + (send-tcp-credit remote-addr local-addr 1))]) + #:on-absence (match t + [(topic 'publisher (tcp-channel (== local-addr) _ _) #f) + ;; Ignore loopback flow. + active-remotes] + [(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f) + (write `(departed ,remote-addr)) (newline) + (set-remove active-remotes remote-addr)]) + [(tcp-channel remote-addr (== local-addr) (? bytes? bs)) + (transition active-remotes + (send-tcp-credit remote-addr local-addr 1) + (for/list ([remote (in-set active-remotes)]) + (send-message (tcp-channel local-addr remote + (string->bytes/utf-8 + (format "~a: ~a~n" + remote-addr + (bytes->string/utf-8 bs)))))))]) + (role 'outbound-handler (topic-publisher (tcp-channel local-addr (wild) (wild))) + #:state active-remotes))) + +(define (main port) + (ground-vm + (transition 'none + (spawn tcp-spy) + (spawn tcp-driver) + (spawn (connection-handler (tcp-listener port)))))) + +(main 5999) diff --git a/os2-tcp.rkt b/os2-tcp.rkt new file mode 100644 index 0000000..f4ed966 --- /dev/null +++ b/os2-tcp.rkt @@ -0,0 +1,246 @@ +#lang racket/base +;; TCP drivers for os2.rkt + +(require racket/set) +(require racket/match) +(require (prefix-in tcp: racket/tcp)) +(require racket/port) +(require "os2.rkt") +(require "dump-bytes.rkt") + +(provide (struct-out tcp-address) + (struct-out tcp-handle) + (struct-out tcp-listener) + + (struct-out tcp-channel) + (struct-out tcp-credit) + (struct-out tcp-mode) + send-tcp-credit + send-tcp-mode + + tcp-driver + tcp-spy) + +;; A TcpAddress is one of +;; -- a (tcp-address String Uint16), representing a remote socket +;; -- a (tcp-handle Any), representing a local socket on a kernel-assigned port +;; -- a (tcp-listener Uint16), representing a local socket on a user-assigned port +;; Note that tcp-handle-ids must be chosen carefully: they are scoped +;; to the local VM, i.e. shared between processes in that VM, so +;; processes must make sure not to accidentally clash in handle ID +;; selection. They are also used in TcpChannel to mean a specific +;; *instance* of a TCP connection, so if you are likely to want to +;; reconnect individual flows, use different tcp-handle-ids. +(struct tcp-address (host port) #:prefab) +(struct tcp-handle (id) #:prefab) +(struct tcp-listener (port) #:prefab) + +;; A TcpChannel is a (tcp-channel TcpAddress TcpAddress TcpSubPacket), +;; and represents a section of a unidirectional TCP flow appearing on +;; our local "subnet" of the full TCP network, complete with source, +;; destination and subpacket. +(struct tcp-channel (source destination subpacket) #:prefab) + +;; A TcpSubPacket is either +;; -- a Bytes, representing a section of the data carried by a +;; TcpChannel. In principle, this should also have a sequence +;; number field, but for simplicity we rely on os2.rkt's +;; preservation of ordering. +;; -- an EndOfFile, representing the end of a the channel's stream. +;; -- a (tcp-credit NonNegativeInteger), for flow control. +;; -- a (tcp-mode TcpModeName), for mode selection. +(struct tcp-credit (amount) #:prefab) +(struct tcp-mode (name) #:prefab) + +;; A TcpModeName is either +;; -- 'lines, for reading line-at-a-time, or +;; -- 'bytes, for reading chunks of bytes. + +;; TODO: BUG?: Routing packets between two local sockets won't work +;; because the patterns aren't set up to recognise that situation. + +;; A TcpConnectionState is a (tcp-connection-state TcpModeName +;; Integer), representing the current input mode and issued credit. +(struct tcp-connection-state (mode credit) #:prefab) + +;; TcpAddress TcpAddress NonNegativeInteger -> Preaction +;; Sends a credit message on a channel using the correct (subscriber) role. +(define (send-tcp-credit source-addr sink-addr amount) + (send-message (tcp-channel source-addr sink-addr (tcp-credit amount)) 'subscriber)) + +;; TcpAddress TcpAddress TcpModeName -> Preaction +;; Sends a mode selection message on a channel using the correct (subscriber) role. +(define (send-tcp-mode source-addr sink-addr mode-name) + (send-message (tcp-channel source-addr sink-addr (tcp-mode mode-name)) 'subscriber)) + +;; TcpAddresses; represents various wildcard addresses +(define any-remote (tcp-address (wild) (wild))) +(define any-handle (tcp-handle (wild))) +(define any-listener (tcp-listener (wild))) + +;; BootK +;; Process acting as a TCP socket factory. +(define tcp-driver + (transition (set) + (role 'tcp-listener-factory + (set (topic-subscriber (tcp-channel any-listener any-remote (wild)) #:virtual? #t) + (topic-publisher (tcp-channel any-remote any-listener (wild)) #:virtual? #t)) + #:state active-handles + #:topic t + #:on-presence (maybe-spawn-socket t active-handles tcp-listener-manager) + #:on-absence (maybe-forget-socket t active-handles)) + (role 'tcp-connection-factory + (set (topic-subscriber (tcp-channel any-handle any-remote (wild)) #:virtual? #t) + (topic-publisher (tcp-channel any-remote any-handle (wild)) #:virtual? #t)) + #:state active-handles + #:topic t + #:on-presence (maybe-spawn-socket t active-handles tcp-connection-manager) + #:on-absence (maybe-forget-socket t active-handles)))) + +;; Topic Set (TcpAddress TcpAddress -> BootK) -> Transition +(define (maybe-spawn-socket t active-handles driver-fun) + (match t + [(or (topic 'publisher (tcp-channel local-addr remote-addr _) counterparty-virtual?) + (topic 'subscriber (tcp-channel remote-addr local-addr _) counterparty-virtual?)) + (cond + [counterparty-virtual? active-handles] + [(set-member? active-handles (cons local-addr remote-addr)) active-handles] + [else + (transition (set-add active-handles (cons local-addr remote-addr)) + (spawn (driver-fun local-addr remote-addr) + #:debug-name (cons local-addr remote-addr)))])])) + +;; Topic Set -> Transition +(define (maybe-forget-socket t active-handles) + (match t + [(or (topic 'publisher (tcp-channel local-addr _ _) counterparty-virtual?) + (topic 'subscriber (tcp-channel _ local-addr _) counterparty-virtual?)) + (cond + [counterparty-virtual? active-handles] + [else (set-remove active-handles local-addr)])])) + +;; TcpAddress TcpAddress -> BootK +(define ((tcp-listener-manager local-addr dummy-remote-addr) self-pid) + (match-define (tcp-listener port) local-addr) + (define listener (tcp:tcp-listen port 4 #t)) + (transition 'listener-is-running + (role 'closer + (set (topic-subscriber (tcp-channel local-addr any-remote (wild)) #:virtual? #t) + (topic-publisher (tcp-channel any-remote local-addr (wild)) #:virtual? #t)) + #:state state + #:on-absence (transition 'listener-is-closed + (kill) + (when (eq? state 'listener-is-running) + (spawn (lambda (dummy-pid) + (tcp:tcp-close listener) + (transition 'dummy (kill))) + #:debug-name (list 'tcp-listener-closer local-addr))))) + (role 'accepter (topic-subscriber (cons (tcp:tcp-accept-evt listener) (wild))) + #:state state + [(cons _ (list cin cout)) + (define-values (local-hostname local-port remote-hostname remote-port) + (tcp:tcp-addresses cin #t)) + (define remote-addr (tcp-address remote-hostname remote-port)) + (transition state + (spawn (tcp-connection-manager* local-addr remote-addr cin cout) + #:debug-name (cons local-addr remote-addr)))]))) + +;; TcpAddress TcpAddress -> BootK +(define ((tcp-connection-manager local-addr remote-addr) self-pid) + (match-define (tcp-address remote-hostname remote-port) remote-addr) + (define-values (cin cout) (tcp:tcp-connect remote-hostname remote-port)) + (tcp-connection-manager* local-addr remote-addr cin cout)) + +;; TcpAddress TcpAddress InputPort OutputPort -> Transition +;; +;; Our process state here is a Maybe, representing +;; a shutting-down state if #f. +(define (tcp-connection-manager* local-addr remote-addr cin cout) + (define (close-transition state send-eof?) + (transition #f + (kill) + (when (not (eq? state #f)) + (list (when send-eof? + (send-message (tcp-channel remote-addr local-addr eof))) + (spawn (lambda (dummy-pid) + (tcp:tcp-abandon-port cin) + (tcp:tcp-abandon-port cout) + (transition 'dummy (kill))) + #:debug-name (list 'tcp-connection-closer local-addr remote-addr)))))) + (define (adjust-credit state amount) + (let ((new-credit (+ (tcp-connection-state-credit state) amount))) + (transition (struct-copy tcp-connection-state state [credit new-credit]) + (delete-role 'inbound-relay) + (when (positive? new-credit) + (case (tcp-connection-state-mode state) + [(lines) + (role 'inbound-relay + (topic-subscriber (cons (read-bytes-line-evt cin 'any) (wild))) + #:state state + [(cons _ (? eof-object?)) + (close-transition state #t)] + [(cons _ (? bytes? bs)) + (extend-transition (adjust-credit state -1) + (send-message (tcp-channel remote-addr local-addr bs)))])] + [(bytes) + (role 'inbound-relay + (topic-subscriber (cons (read-bytes-evt new-credit cin) (wild))) + #:state state + [(cons _ (? eof-object?)) + (close-transition state #t)] + [(cons _ (? bytes? bs)) + (define len (bytes-length bs)) + (extend-transition (adjust-credit state (- len)) + (send-message (tcp-channel remote-addr local-addr bs)))])]))))) + (transition (tcp-connection-state 'bytes 0) + (role 'outbound-relay (topic-subscriber (tcp-channel local-addr remote-addr (wild))) + #:state state + #:on-absence (close-transition state #f) + [(tcp-channel (== local-addr) (== remote-addr) subpacket) + (match subpacket + [(? eof-object?) (close-transition state #f)] + [(? bytes? bs) + (define len (bytes-length bs)) + (write-bytes bs cout) + (flush-output cout) + (transition state (send-tcp-credit local-addr remote-addr len))] + [_ + (error 'tcp-connection-manager* + "Publisher on a channel isn't supposed to issue channel control messages")])]) + (role 'inbound-flow-control (topic-publisher (tcp-channel remote-addr local-addr (wild))) + #:state state + #:on-absence (close-transition state #f) + [(tcp-channel (== remote-addr) (== local-addr) subpacket) + (match subpacket + [(tcp-credit amount) + (and state (adjust-credit state amount))] + [(tcp-mode new-mode) + ;; Also resets credit to zero. + (and state (adjust-credit (tcp-connection-state new-mode 0) 0))] + [_ + (error 'tcp-connection-manager* + "Subscriber on a channel may only send channel control messages")])]))) + +;; BootK +;; Debugging aid: produces pretty hex dumps of TCP traffic sent on +;; this network. Also prints out other messages without special +;; formatting. +(define tcp-spy + (transition 'no-state + (role 'tcp-pretty-printer + (set (topic-subscriber (wild) #:virtual? #t) + (topic-publisher (wild) #:virtual? #t)) + #:state state + [(tcp-channel source dest (? bytes? body)) + (write `(TCPDATA ,source --> ,dest)) (newline) + (dump-bytes! body (bytes-length body)) + state] + [(tcp-channel source dest (? eof-object?)) + (write `(TCPEOF ,source --> ,dest)) (newline) + state] + [(tcp-channel source dest (tcp-credit amount)) + (write `(TCPCREDIT ,source --> ,dest ,amount)) (newline) + state] + [other + (write `(TCPOTHER ,other)) (newline) + state])))