TCP driver and test program
This commit is contained in:
parent
3ec9abad45
commit
d5fe945add
|
@ -0,0 +1,49 @@
|
|||
#lang racket/base
|
||||
;; Trivial example program demonstrating os2-tcp.rkt.
|
||||
|
||||
(require racket/set)
|
||||
(require racket/match)
|
||||
(require "os2.rkt")
|
||||
(require "os2-tcp.rkt")
|
||||
|
||||
(define (connection-handler local-addr)
|
||||
(transition (set) ;; of remote TcpAddresses
|
||||
(role 'inbound-handler (topic-subscriber (tcp-channel (wild) local-addr (wild)))
|
||||
#:state active-remotes
|
||||
#:topic t
|
||||
#:on-presence (match t
|
||||
[(topic 'publisher (tcp-channel (== local-addr) _ _) #f)
|
||||
;; Ignore loopback flow.
|
||||
active-remotes]
|
||||
[(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f)
|
||||
(write `(arrived ,remote-addr)) (newline)
|
||||
(transition (set-add active-remotes remote-addr)
|
||||
(send-tcp-mode remote-addr local-addr 'lines)
|
||||
(send-tcp-credit remote-addr local-addr 1))])
|
||||
#:on-absence (match t
|
||||
[(topic 'publisher (tcp-channel (== local-addr) _ _) #f)
|
||||
;; Ignore loopback flow.
|
||||
active-remotes]
|
||||
[(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f)
|
||||
(write `(departed ,remote-addr)) (newline)
|
||||
(set-remove active-remotes remote-addr)])
|
||||
[(tcp-channel remote-addr (== local-addr) (? bytes? bs))
|
||||
(transition active-remotes
|
||||
(send-tcp-credit remote-addr local-addr 1)
|
||||
(for/list ([remote (in-set active-remotes)])
|
||||
(send-message (tcp-channel local-addr remote
|
||||
(string->bytes/utf-8
|
||||
(format "~a: ~a~n"
|
||||
remote-addr
|
||||
(bytes->string/utf-8 bs)))))))])
|
||||
(role 'outbound-handler (topic-publisher (tcp-channel local-addr (wild) (wild)))
|
||||
#:state active-remotes)))
|
||||
|
||||
(define (main port)
|
||||
(ground-vm
|
||||
(transition 'none
|
||||
(spawn tcp-spy)
|
||||
(spawn tcp-driver)
|
||||
(spawn (connection-handler (tcp-listener port))))))
|
||||
|
||||
(main 5999)
|
|
@ -0,0 +1,246 @@
|
|||
#lang racket/base
|
||||
;; TCP drivers for os2.rkt
|
||||
|
||||
(require racket/set)
|
||||
(require racket/match)
|
||||
(require (prefix-in tcp: racket/tcp))
|
||||
(require racket/port)
|
||||
(require "os2.rkt")
|
||||
(require "dump-bytes.rkt")
|
||||
|
||||
(provide (struct-out tcp-address)
|
||||
(struct-out tcp-handle)
|
||||
(struct-out tcp-listener)
|
||||
|
||||
(struct-out tcp-channel)
|
||||
(struct-out tcp-credit)
|
||||
(struct-out tcp-mode)
|
||||
send-tcp-credit
|
||||
send-tcp-mode
|
||||
|
||||
tcp-driver
|
||||
tcp-spy)
|
||||
|
||||
;; A TcpAddress is one of
|
||||
;; -- a (tcp-address String Uint16), representing a remote socket
|
||||
;; -- a (tcp-handle Any), representing a local socket on a kernel-assigned port
|
||||
;; -- a (tcp-listener Uint16), representing a local socket on a user-assigned port
|
||||
;; Note that tcp-handle-ids must be chosen carefully: they are scoped
|
||||
;; to the local VM, i.e. shared between processes in that VM, so
|
||||
;; processes must make sure not to accidentally clash in handle ID
|
||||
;; selection. They are also used in TcpChannel to mean a specific
|
||||
;; *instance* of a TCP connection, so if you are likely to want to
|
||||
;; reconnect individual flows, use different tcp-handle-ids.
|
||||
(struct tcp-address (host port) #:prefab)
|
||||
(struct tcp-handle (id) #:prefab)
|
||||
(struct tcp-listener (port) #:prefab)
|
||||
|
||||
;; A TcpChannel is a (tcp-channel TcpAddress TcpAddress TcpSubPacket),
|
||||
;; and represents a section of a unidirectional TCP flow appearing on
|
||||
;; our local "subnet" of the full TCP network, complete with source,
|
||||
;; destination and subpacket.
|
||||
(struct tcp-channel (source destination subpacket) #:prefab)
|
||||
|
||||
;; A TcpSubPacket is either
|
||||
;; -- a Bytes, representing a section of the data carried by a
|
||||
;; TcpChannel. In principle, this should also have a sequence
|
||||
;; number field, but for simplicity we rely on os2.rkt's
|
||||
;; preservation of ordering.
|
||||
;; -- an EndOfFile, representing the end of a the channel's stream.
|
||||
;; -- a (tcp-credit NonNegativeInteger), for flow control.
|
||||
;; -- a (tcp-mode TcpModeName), for mode selection.
|
||||
(struct tcp-credit (amount) #:prefab)
|
||||
(struct tcp-mode (name) #:prefab)
|
||||
|
||||
;; A TcpModeName is either
|
||||
;; -- 'lines, for reading line-at-a-time, or
|
||||
;; -- 'bytes, for reading chunks of bytes.
|
||||
|
||||
;; TODO: BUG?: Routing packets between two local sockets won't work
|
||||
;; because the patterns aren't set up to recognise that situation.
|
||||
|
||||
;; A TcpConnectionState is a (tcp-connection-state TcpModeName
|
||||
;; Integer), representing the current input mode and issued credit.
|
||||
(struct tcp-connection-state (mode credit) #:prefab)
|
||||
|
||||
;; TcpAddress TcpAddress NonNegativeInteger -> Preaction
|
||||
;; Sends a credit message on a channel using the correct (subscriber) role.
|
||||
(define (send-tcp-credit source-addr sink-addr amount)
|
||||
(send-message (tcp-channel source-addr sink-addr (tcp-credit amount)) 'subscriber))
|
||||
|
||||
;; TcpAddress TcpAddress TcpModeName -> Preaction
|
||||
;; Sends a mode selection message on a channel using the correct (subscriber) role.
|
||||
(define (send-tcp-mode source-addr sink-addr mode-name)
|
||||
(send-message (tcp-channel source-addr sink-addr (tcp-mode mode-name)) 'subscriber))
|
||||
|
||||
;; TcpAddresses; represents various wildcard addresses
|
||||
(define any-remote (tcp-address (wild) (wild)))
|
||||
(define any-handle (tcp-handle (wild)))
|
||||
(define any-listener (tcp-listener (wild)))
|
||||
|
||||
;; BootK
|
||||
;; Process acting as a TCP socket factory.
|
||||
(define tcp-driver
|
||||
(transition (set)
|
||||
(role 'tcp-listener-factory
|
||||
(set (topic-subscriber (tcp-channel any-listener any-remote (wild)) #:virtual? #t)
|
||||
(topic-publisher (tcp-channel any-remote any-listener (wild)) #:virtual? #t))
|
||||
#:state active-handles
|
||||
#:topic t
|
||||
#:on-presence (maybe-spawn-socket t active-handles tcp-listener-manager)
|
||||
#:on-absence (maybe-forget-socket t active-handles))
|
||||
(role 'tcp-connection-factory
|
||||
(set (topic-subscriber (tcp-channel any-handle any-remote (wild)) #:virtual? #t)
|
||||
(topic-publisher (tcp-channel any-remote any-handle (wild)) #:virtual? #t))
|
||||
#:state active-handles
|
||||
#:topic t
|
||||
#:on-presence (maybe-spawn-socket t active-handles tcp-connection-manager)
|
||||
#:on-absence (maybe-forget-socket t active-handles))))
|
||||
|
||||
;; Topic Set<HandleMapping> (TcpAddress TcpAddress -> BootK) -> Transition
|
||||
(define (maybe-spawn-socket t active-handles driver-fun)
|
||||
(match t
|
||||
[(or (topic 'publisher (tcp-channel local-addr remote-addr _) counterparty-virtual?)
|
||||
(topic 'subscriber (tcp-channel remote-addr local-addr _) counterparty-virtual?))
|
||||
(cond
|
||||
[counterparty-virtual? active-handles]
|
||||
[(set-member? active-handles (cons local-addr remote-addr)) active-handles]
|
||||
[else
|
||||
(transition (set-add active-handles (cons local-addr remote-addr))
|
||||
(spawn (driver-fun local-addr remote-addr)
|
||||
#:debug-name (cons local-addr remote-addr)))])]))
|
||||
|
||||
;; Topic Set<HandleMapping> -> Transition
|
||||
(define (maybe-forget-socket t active-handles)
|
||||
(match t
|
||||
[(or (topic 'publisher (tcp-channel local-addr _ _) counterparty-virtual?)
|
||||
(topic 'subscriber (tcp-channel _ local-addr _) counterparty-virtual?))
|
||||
(cond
|
||||
[counterparty-virtual? active-handles]
|
||||
[else (set-remove active-handles local-addr)])]))
|
||||
|
||||
;; TcpAddress TcpAddress -> BootK
|
||||
(define ((tcp-listener-manager local-addr dummy-remote-addr) self-pid)
|
||||
(match-define (tcp-listener port) local-addr)
|
||||
(define listener (tcp:tcp-listen port 4 #t))
|
||||
(transition 'listener-is-running
|
||||
(role 'closer
|
||||
(set (topic-subscriber (tcp-channel local-addr any-remote (wild)) #:virtual? #t)
|
||||
(topic-publisher (tcp-channel any-remote local-addr (wild)) #:virtual? #t))
|
||||
#:state state
|
||||
#:on-absence (transition 'listener-is-closed
|
||||
(kill)
|
||||
(when (eq? state 'listener-is-running)
|
||||
(spawn (lambda (dummy-pid)
|
||||
(tcp:tcp-close listener)
|
||||
(transition 'dummy (kill)))
|
||||
#:debug-name (list 'tcp-listener-closer local-addr)))))
|
||||
(role 'accepter (topic-subscriber (cons (tcp:tcp-accept-evt listener) (wild)))
|
||||
#:state state
|
||||
[(cons _ (list cin cout))
|
||||
(define-values (local-hostname local-port remote-hostname remote-port)
|
||||
(tcp:tcp-addresses cin #t))
|
||||
(define remote-addr (tcp-address remote-hostname remote-port))
|
||||
(transition state
|
||||
(spawn (tcp-connection-manager* local-addr remote-addr cin cout)
|
||||
#:debug-name (cons local-addr remote-addr)))])))
|
||||
|
||||
;; TcpAddress TcpAddress -> BootK
|
||||
(define ((tcp-connection-manager local-addr remote-addr) self-pid)
|
||||
(match-define (tcp-address remote-hostname remote-port) remote-addr)
|
||||
(define-values (cin cout) (tcp:tcp-connect remote-hostname remote-port))
|
||||
(tcp-connection-manager* local-addr remote-addr cin cout))
|
||||
|
||||
;; TcpAddress TcpAddress InputPort OutputPort -> Transition
|
||||
;;
|
||||
;; Our process state here is a Maybe<TcpConnectionState>, representing
|
||||
;; a shutting-down state if #f.
|
||||
(define (tcp-connection-manager* local-addr remote-addr cin cout)
|
||||
(define (close-transition state send-eof?)
|
||||
(transition #f
|
||||
(kill)
|
||||
(when (not (eq? state #f))
|
||||
(list (when send-eof?
|
||||
(send-message (tcp-channel remote-addr local-addr eof)))
|
||||
(spawn (lambda (dummy-pid)
|
||||
(tcp:tcp-abandon-port cin)
|
||||
(tcp:tcp-abandon-port cout)
|
||||
(transition 'dummy (kill)))
|
||||
#:debug-name (list 'tcp-connection-closer local-addr remote-addr))))))
|
||||
(define (adjust-credit state amount)
|
||||
(let ((new-credit (+ (tcp-connection-state-credit state) amount)))
|
||||
(transition (struct-copy tcp-connection-state state [credit new-credit])
|
||||
(delete-role 'inbound-relay)
|
||||
(when (positive? new-credit)
|
||||
(case (tcp-connection-state-mode state)
|
||||
[(lines)
|
||||
(role 'inbound-relay
|
||||
(topic-subscriber (cons (read-bytes-line-evt cin 'any) (wild)))
|
||||
#:state state
|
||||
[(cons _ (? eof-object?))
|
||||
(close-transition state #t)]
|
||||
[(cons _ (? bytes? bs))
|
||||
(extend-transition (adjust-credit state -1)
|
||||
(send-message (tcp-channel remote-addr local-addr bs)))])]
|
||||
[(bytes)
|
||||
(role 'inbound-relay
|
||||
(topic-subscriber (cons (read-bytes-evt new-credit cin) (wild)))
|
||||
#:state state
|
||||
[(cons _ (? eof-object?))
|
||||
(close-transition state #t)]
|
||||
[(cons _ (? bytes? bs))
|
||||
(define len (bytes-length bs))
|
||||
(extend-transition (adjust-credit state (- len))
|
||||
(send-message (tcp-channel remote-addr local-addr bs)))])])))))
|
||||
(transition (tcp-connection-state 'bytes 0)
|
||||
(role 'outbound-relay (topic-subscriber (tcp-channel local-addr remote-addr (wild)))
|
||||
#:state state
|
||||
#:on-absence (close-transition state #f)
|
||||
[(tcp-channel (== local-addr) (== remote-addr) subpacket)
|
||||
(match subpacket
|
||||
[(? eof-object?) (close-transition state #f)]
|
||||
[(? bytes? bs)
|
||||
(define len (bytes-length bs))
|
||||
(write-bytes bs cout)
|
||||
(flush-output cout)
|
||||
(transition state (send-tcp-credit local-addr remote-addr len))]
|
||||
[_
|
||||
(error 'tcp-connection-manager*
|
||||
"Publisher on a channel isn't supposed to issue channel control messages")])])
|
||||
(role 'inbound-flow-control (topic-publisher (tcp-channel remote-addr local-addr (wild)))
|
||||
#:state state
|
||||
#:on-absence (close-transition state #f)
|
||||
[(tcp-channel (== remote-addr) (== local-addr) subpacket)
|
||||
(match subpacket
|
||||
[(tcp-credit amount)
|
||||
(and state (adjust-credit state amount))]
|
||||
[(tcp-mode new-mode)
|
||||
;; Also resets credit to zero.
|
||||
(and state (adjust-credit (tcp-connection-state new-mode 0) 0))]
|
||||
[_
|
||||
(error 'tcp-connection-manager*
|
||||
"Subscriber on a channel may only send channel control messages")])])))
|
||||
|
||||
;; BootK
|
||||
;; Debugging aid: produces pretty hex dumps of TCP traffic sent on
|
||||
;; this network. Also prints out other messages without special
|
||||
;; formatting.
|
||||
(define tcp-spy
|
||||
(transition 'no-state
|
||||
(role 'tcp-pretty-printer
|
||||
(set (topic-subscriber (wild) #:virtual? #t)
|
||||
(topic-publisher (wild) #:virtual? #t))
|
||||
#:state state
|
||||
[(tcp-channel source dest (? bytes? body))
|
||||
(write `(TCPDATA ,source --> ,dest)) (newline)
|
||||
(dump-bytes! body (bytes-length body))
|
||||
state]
|
||||
[(tcp-channel source dest (? eof-object?))
|
||||
(write `(TCPEOF ,source --> ,dest)) (newline)
|
||||
state]
|
||||
[(tcp-channel source dest (tcp-credit amount))
|
||||
(write `(TCPCREDIT ,source --> ,dest ,amount)) (newline)
|
||||
state]
|
||||
[other
|
||||
(write `(TCPOTHER ,other)) (newline)
|
||||
state])))
|
Loading…
Reference in New Issue