Actual somewhat-working TCP server -- the chat example from minimart
This commit is contained in:
parent
ccc5775f00
commit
4de4180c67
65
main.rkt
65
main.rkt
|
@ -1,12 +1,13 @@
|
||||||
#lang minimart
|
#lang minimart
|
||||||
|
|
||||||
|
(require minimart/demand-matcher)
|
||||||
(require minimart/drivers/timer)
|
(require minimart/drivers/timer)
|
||||||
(require "ethernet.rkt")
|
(require "ethernet.rkt")
|
||||||
(require "arp.rkt")
|
(require "arp.rkt")
|
||||||
(require "ip.rkt")
|
(require "ip.rkt")
|
||||||
(require "tcp.rkt")
|
(require "tcp.rkt")
|
||||||
|
|
||||||
(define interface "wlan0")
|
(define interface "vboxnet0")
|
||||||
|
|
||||||
;;(log-events-and-actions? #t)
|
;;(log-events-and-actions? #t)
|
||||||
|
|
||||||
|
@ -16,8 +17,66 @@
|
||||||
(spawn-ip-driver interface (bytes 192 168 56 222))
|
(spawn-ip-driver interface (bytes 192 168 56 222))
|
||||||
(spawn-tcp-driver)
|
(spawn-tcp-driver)
|
||||||
|
|
||||||
|
(let ()
|
||||||
|
(local-require racket/set racket/string)
|
||||||
|
|
||||||
|
(define (spawn-session them us)
|
||||||
|
(define user (gensym 'user))
|
||||||
|
(define remote-detector (compile-gestalt-projection (?!)))
|
||||||
|
(define peer-detector (compile-gestalt-projection `(,(?!) says ,?)))
|
||||||
|
(define (send-to-remote fmt . vs)
|
||||||
|
(send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))
|
||||||
|
(define (say who fmt . vs)
|
||||||
|
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
|
||||||
|
(list (send-to-remote "Welcome, ~a.\n" user)
|
||||||
|
(spawn (lambda (e old-peers)
|
||||||
|
(log-info "~a: ~v --> ~v" user e old-peers)
|
||||||
|
(match e
|
||||||
|
[(message (tcp-channel _ _ bs) 1 #f)
|
||||||
|
(transition old-peers
|
||||||
|
(send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))]
|
||||||
|
[(message `(,who says ,what) 0 #f)
|
||||||
|
(transition old-peers (say who "says: ~a" what))]
|
||||||
|
[(routing-update g)
|
||||||
|
(define new-peers
|
||||||
|
(matcher-key-set/single (gestalt-project g 0 0 #t peer-detector)))
|
||||||
|
(transition
|
||||||
|
new-peers
|
||||||
|
(list (when (matcher-empty? (gestalt-project g 1 0 #t remote-detector))
|
||||||
|
(quit))
|
||||||
|
(for/list [(who (set-subtract new-peers old-peers))]
|
||||||
|
(say who "arrived."))
|
||||||
|
(for/list [(who (set-subtract old-peers new-peers))]
|
||||||
|
(say who "departed."))))]
|
||||||
|
[#f #f]))
|
||||||
|
(set)
|
||||||
|
(gestalt-union (sub `(,? says ,?))
|
||||||
|
(sub `(,? says ,?) #:level 1)
|
||||||
|
(pub `(,user says ,?))
|
||||||
|
(sub (tcp-channel them us ?) #:meta-level 1)
|
||||||
|
(sub (tcp-channel them us ?) #:meta-level 1 #:level 1)
|
||||||
|
(pub (tcp-channel us them ?) #:meta-level 1)))))
|
||||||
|
|
||||||
|
(spawn-world
|
||||||
|
(spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?)
|
||||||
|
#:meta-level 1
|
||||||
|
spawn-session))
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
(spawn (lambda (e s)
|
(spawn (lambda (e s)
|
||||||
;; (log-info "SPY: ~v" e)
|
(local-require racket/pretty)
|
||||||
|
(match e
|
||||||
|
[(message m _ _)
|
||||||
|
(pretty-write `(MAIN ,m))]
|
||||||
|
[(routing-update g)
|
||||||
|
(printf "MAIN gestalt:\n")
|
||||||
|
(pretty-print-gestalt g)]
|
||||||
|
[_ (void)])
|
||||||
|
(flush-output)
|
||||||
#f)
|
#f)
|
||||||
(void)
|
(void)
|
||||||
(gestalt-union (sub ? #:level 5)))
|
(gestalt-union
|
||||||
|
;;(sub ? #:level 5)
|
||||||
|
(sub (tcp-channel ? ? ?) #:level 5)
|
||||||
|
))
|
||||||
|
|
284
tcp.rkt
284
tcp.rkt
|
@ -21,14 +21,15 @@
|
||||||
;; tcp-handle/tcp-address : "user" outbound connections
|
;; tcp-handle/tcp-address : "user" outbound connections
|
||||||
;; tcp-listener/tcp-address : "user" inbound connections
|
;; tcp-listener/tcp-address : "user" inbound connections
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Protocol messages
|
||||||
|
|
||||||
(struct tcp-address (host port) #:prefab)
|
(struct tcp-address (host port) #:prefab)
|
||||||
(struct tcp-handle (id) #:prefab)
|
(struct tcp-handle (id) #:prefab)
|
||||||
(struct tcp-listener (port) #:prefab)
|
(struct tcp-listener (port) #:prefab)
|
||||||
|
|
||||||
(struct tcp-channel (source destination subpacket) #:prefab)
|
(struct tcp-channel (source destination subpacket) #:prefab)
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(struct tcp-packet (from-wire?
|
(struct tcp-packet (from-wire?
|
||||||
source-ip
|
source-ip
|
||||||
source-port
|
source-port
|
||||||
|
@ -42,11 +43,125 @@
|
||||||
data)
|
data)
|
||||||
#:prefab)
|
#:prefab)
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; User-accessible driver startup
|
||||||
|
|
||||||
|
(define (spawn-tcp-driver)
|
||||||
|
(list (spawn-demand-matcher (tcp-channel ? (?! (tcp-listener ?)) ?)
|
||||||
|
#:demand-is-subscription? #t
|
||||||
|
#:demand-level 1
|
||||||
|
#:supply-level 2
|
||||||
|
(lambda (server-addr)
|
||||||
|
(match-define (tcp-listener port) server-addr)
|
||||||
|
(spawn-demand-matcher
|
||||||
|
(tcp-channel (?! (tcp-address ? ?)) (?! (tcp-address ? port)) ?)
|
||||||
|
(spawn-relay server-addr))))
|
||||||
|
(spawn-demand-matcher (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)
|
||||||
|
(lambda (local-addr remote-addr)
|
||||||
|
(send (tcp-port-allocation-request local-addr remote-addr))))
|
||||||
|
(spawn-port-allocator)
|
||||||
|
(spawn-kernel-tcp-driver)))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Port allocator
|
||||||
|
|
||||||
|
(struct tcp-port-allocation-request (local-addr remote-addr) #:prefab)
|
||||||
|
|
||||||
|
(struct port-allocator-state (used-ports local-ips) #:transparent)
|
||||||
|
|
||||||
|
(define (spawn-port-allocator)
|
||||||
|
(define port-projector
|
||||||
|
(compile-gestalt-projection (tcp-channel (tcp-address (?!) (?!)) (tcp-address (?!) (?!)) ?)))
|
||||||
|
(define ip-projector
|
||||||
|
(compile-gestalt-projection (ip-interface (?!) ?)))
|
||||||
|
|
||||||
|
;; TODO: Choose a sensible IP address for the outbound connection.
|
||||||
|
;; We don't have enough information to do this well at the moment,
|
||||||
|
;; so just pick some available local IP address.
|
||||||
|
;;
|
||||||
|
;; Interesting note: In some sense, the right answer is "?". This
|
||||||
|
;; would give us a form of mobility, where IP addresses only route
|
||||||
|
;; to a given bucket-of-state and ONLY the port number selects a
|
||||||
|
;; substate therein. That's not how TCP is defined however so we
|
||||||
|
;; can't do that.
|
||||||
|
(define (appropriate-ip s)
|
||||||
|
(set-first (port-allocator-state-local-ips s)))
|
||||||
|
|
||||||
|
(spawn (lambda (e s)
|
||||||
|
(match e
|
||||||
|
[(routing-update g)
|
||||||
|
(define extracted-ips (matcher-key-set (gestalt-project g 0 0 #t ip-projector)))
|
||||||
|
(define extracted-ports (matcher-key-set (gestalt-project g 0 0 #f port-projector)))
|
||||||
|
(if (or (not extracted-ports) (not extracted-ips))
|
||||||
|
(error 'tcp "Someone has published a wildcard TCP address or IP interface")
|
||||||
|
(transition (let ((local-ips (for/set [(e (in-set extracted-ips))] (car e))))
|
||||||
|
(port-allocator-state
|
||||||
|
(for/fold [(s (set))] [(e (in-set extracted-ports))]
|
||||||
|
(match-define (list si sp di dp) e)
|
||||||
|
(let* ((s (if (set-member? local-ips si) (set-add s sp) s))
|
||||||
|
(s (if (set-member? local-ips di) (set-add s dp) s)))
|
||||||
|
s))
|
||||||
|
local-ips))
|
||||||
|
'()))]
|
||||||
|
[(message (tcp-port-allocation-request local-addr remote-addr) _ _)
|
||||||
|
(define currently-used-ports (port-allocator-state-used-ports s))
|
||||||
|
(let randomly-allocate-until-unused ()
|
||||||
|
(define p (+ 1024 (random 64512)))
|
||||||
|
(if (set-member? currently-used-ports p)
|
||||||
|
(randomly-allocate-until-unused)
|
||||||
|
(transition (struct-copy port-allocator-state s
|
||||||
|
[used-ports (set-add currently-used-ports p)])
|
||||||
|
((spawn-relay local-addr)
|
||||||
|
remote-addr
|
||||||
|
(tcp-channel (appropriate-ip s) p)))))]
|
||||||
|
[_ #f]))
|
||||||
|
(port-allocator-state (set) (set))
|
||||||
|
(gestalt-union (sub (tcp-port-allocation-request ? ?))
|
||||||
|
(sub (projection->pattern ip-projector) #:level 1)
|
||||||
|
(pub (projection->pattern port-projector) #:level 1))))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Relay between kernel-level and user-level
|
||||||
|
|
||||||
|
(define ((spawn-relay local-user-addr) remote-addr local-tcp-addr)
|
||||||
|
(define local-peer-traffic (pub (tcp-channel remote-addr local-user-addr ?) #:level 1))
|
||||||
|
(define remote-peer-traffic (sub (tcp-channel remote-addr local-tcp-addr ?) #:level 1))
|
||||||
|
(spawn (lambda (e seen-local-peer?)
|
||||||
|
(local-require racket/pretty)
|
||||||
|
(pretty-write `(RELAY (local-user-addr ,local-user-addr)
|
||||||
|
(remote-addr ,remote-addr)
|
||||||
|
(local-tcp-addr ,local-tcp-addr)
|
||||||
|
(seen-local-peer? ,seen-local-peer?)
|
||||||
|
(e ,e)))
|
||||||
|
(flush-output)
|
||||||
|
(match e
|
||||||
|
[(routing-update g)
|
||||||
|
(define local-peer-absent? (gestalt-empty? (gestalt-filter g local-peer-traffic)))
|
||||||
|
(transition (or seen-local-peer? (not local-peer-absent?))
|
||||||
|
(when (or (and seen-local-peer? local-peer-absent?)
|
||||||
|
(gestalt-empty? (gestalt-filter g remote-peer-traffic)))
|
||||||
|
(quit)))]
|
||||||
|
[(message (tcp-channel (== local-user-addr) (== remote-addr) bs) _ _)
|
||||||
|
(transition seen-local-peer? (send (tcp-channel local-tcp-addr remote-addr bs)))]
|
||||||
|
[(message (tcp-channel (== remote-addr) (== local-tcp-addr) bs) _ _)
|
||||||
|
(transition seen-local-peer? (send (tcp-channel remote-addr local-user-addr bs)))]
|
||||||
|
[_ #f]))
|
||||||
|
#f
|
||||||
|
(gestalt-union local-peer-traffic
|
||||||
|
remote-peer-traffic
|
||||||
|
(sub (tcp-channel remote-addr local-tcp-addr ?))
|
||||||
|
(sub (tcp-channel local-user-addr remote-addr ?))
|
||||||
|
(pub (tcp-channel remote-addr local-user-addr ?))
|
||||||
|
(pub (tcp-channel local-tcp-addr remote-addr ?)))))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Codec & kernel-level driver
|
||||||
|
|
||||||
(define PROTOCOL-TCP 6)
|
(define PROTOCOL-TCP 6)
|
||||||
|
|
||||||
(struct codec-state (active-state-vectors) #:transparent)
|
(struct codec-state (active-state-vectors) #:transparent)
|
||||||
|
|
||||||
(define (spawn-tcp-driver)
|
(define (spawn-kernel-tcp-driver)
|
||||||
|
|
||||||
(define (flip-statevec statevec)
|
(define (flip-statevec statevec)
|
||||||
(match-define (list si sp di dp) statevec)
|
(match-define (list si sp di dp) statevec)
|
||||||
|
@ -80,7 +195,8 @@
|
||||||
(let* ((flags (set))
|
(let* ((flags (set))
|
||||||
(statevec (list src-ip src-port dst-ip dst-port))
|
(statevec (list src-ip src-port dst-ip dst-port))
|
||||||
(old-active-state-vectors (codec-state-active-state-vectors s))
|
(old-active-state-vectors (codec-state-active-state-vectors s))
|
||||||
(spawn-needed? (not (state-vector-active? statevec s))))
|
(spawn-needed? (and (not (state-vector-active? statevec s))
|
||||||
|
(zero? rst)))) ;; don't bother spawning if it's a rst
|
||||||
(define-syntax-rule (set-flags! v ...)
|
(define-syntax-rule (set-flags! v ...)
|
||||||
(begin (unless (zero? v) (set! flags (set-add flags 'v))) ...))
|
(begin (unless (zero? v) (set! flags (set-add flags 'v))) ...))
|
||||||
(set-flags! ns cwr ece urg ack psh rst syn fin)
|
(set-flags! ns cwr ece urg ack psh rst syn fin)
|
||||||
|
@ -200,6 +316,7 @@
|
||||||
#:level 1))))
|
#:level 1))))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Per-connection state vector process
|
||||||
|
|
||||||
(struct buffer (data ;; bit-string
|
(struct buffer (data ;; bit-string
|
||||||
seqn ;; names leftmost byte in data
|
seqn ;; names leftmost byte in data
|
||||||
|
@ -210,7 +327,8 @@
|
||||||
(struct conn-state (outbound ;; buffer
|
(struct conn-state (outbound ;; buffer
|
||||||
inbound ;; buffer
|
inbound ;; buffer
|
||||||
syn-acked? ;; boolean
|
syn-acked? ;; boolean
|
||||||
latest-activity-time) ;; from current-inexact-milliseconds
|
latest-activity-time ;; from current-inexact-milliseconds
|
||||||
|
local-peer-seen?) ;; boolean
|
||||||
#:transparent)
|
#:transparent)
|
||||||
|
|
||||||
(define transmit-check-interval-msec 100)
|
(define transmit-check-interval-msec 100)
|
||||||
|
@ -255,6 +373,17 @@
|
||||||
(define (seq> a b)
|
(define (seq> a b)
|
||||||
(< (seq- a b) #x80000000))
|
(< (seq- a b) #x80000000))
|
||||||
|
|
||||||
|
;; ConnState -> Gestalt
|
||||||
|
(define (compute-gestalt s)
|
||||||
|
(gestalt-union (sub (timer-expired (timer-name ?) ?))
|
||||||
|
(sub (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?))
|
||||||
|
(pub (tcp-packet #f dst-ip dst-port src-ip src-port ? ? ? ? ? ?))
|
||||||
|
(sub (tcp-channel dst src ?))
|
||||||
|
(if (not (buffer-finished? (conn-state-inbound s)))
|
||||||
|
(pub (tcp-channel src dst ?))
|
||||||
|
(gestalt-empty))
|
||||||
|
(pub (tcp-channel src dst ?) #:level 1)))
|
||||||
|
|
||||||
;; ConnState -> Transition
|
;; ConnState -> Transition
|
||||||
(define (deliver-inbound-locally s)
|
(define (deliver-inbound-locally s)
|
||||||
(define b (conn-state-inbound s))
|
(define b (conn-state-inbound s))
|
||||||
|
@ -272,16 +401,15 @@
|
||||||
(define b (conn-state-inbound s))
|
(define b (conn-state-inbound s))
|
||||||
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
|
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
|
||||||
(error 'check-fin "Nonempty inbound buffer"))
|
(error 'check-fin "Nonempty inbound buffer"))
|
||||||
(transition
|
(if (set-member? flags 'fin)
|
||||||
(if (set-member? flags 'fin)
|
(let ((new-s (struct-copy conn-state s
|
||||||
(struct-copy conn-state s
|
[inbound (struct-copy buffer b
|
||||||
[inbound (struct-copy buffer b
|
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
|
||||||
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
|
[finished? #t])])))
|
||||||
[finished? #t])])
|
(transition new-s (routing-update (compute-gestalt new-s))))
|
||||||
s)
|
(transition s '())))
|
||||||
'()))
|
|
||||||
|
|
||||||
;; Boolean Nat -> ConnState -> Transition
|
;; Boolean SeqNum -> ConnState -> Transition
|
||||||
(define ((discard-acknowledged-outbound ack? ackn) s)
|
(define ((discard-acknowledged-outbound ack? ackn) s)
|
||||||
(transition
|
(transition
|
||||||
(if (not ack?)
|
(if (not ack?)
|
||||||
|
@ -309,13 +437,15 @@
|
||||||
(define (all-output-acknowledged? s)
|
(define (all-output-acknowledged? s)
|
||||||
(bit-string-empty? (buffer-data (conn-state-outbound s))))
|
(bit-string-empty? (buffer-data (conn-state-outbound s))))
|
||||||
|
|
||||||
;; ConnState -> Transition
|
;; (Option SeqNum) -> ConnState -> Transition
|
||||||
(define ((send-outbound old-ackn) s)
|
(define ((send-outbound old-ackn) s)
|
||||||
(define b (conn-state-outbound s))
|
(define b (conn-state-outbound s))
|
||||||
(define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b))
|
(define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b))
|
||||||
(if (buffer-finished? b) 1 0))))
|
(if (buffer-finished? b) 1 0))))
|
||||||
|
|
||||||
(define segment-size (min maximum-segment-size
|
(define segment-size (min maximum-segment-size
|
||||||
(buffer-window b)
|
(if (conn-state-syn-acked? s) (buffer-window b) 1)
|
||||||
|
;; ^ can only send SYN until SYN is acked
|
||||||
pending-byte-count))
|
pending-byte-count))
|
||||||
(define segment-offset (if (conn-state-syn-acked? s) 0 1))
|
(define segment-offset (if (conn-state-syn-acked? s) 0 1))
|
||||||
(define-values (chunk0 remaining-data)
|
(define-values (chunk0 remaining-data)
|
||||||
|
@ -329,8 +459,9 @@
|
||||||
(when (not (conn-state-syn-acked? s))
|
(when (not (conn-state-syn-acked? s))
|
||||||
(set! flags (set-add flags 'syn)))
|
(set! flags (set-add flags 'syn)))
|
||||||
(when (and (buffer-finished? b)
|
(when (and (buffer-finished? b)
|
||||||
|
(conn-state-syn-acked? s)
|
||||||
(= segment-size pending-byte-count)
|
(= segment-size pending-byte-count)
|
||||||
(not (all-output-acknowledged? s)))
|
(not (all-output-acknowledged? s))) ;; TODO: reexamine. This looks fishy
|
||||||
(set! flags (set-add flags 'fin)))
|
(set! flags (set-add flags 'fin)))
|
||||||
(define window (min 65535 ;; limit of field width
|
(define window (min 65535 ;; limit of field width
|
||||||
(max 0 ;; can't be negative
|
(max 0 ;; can't be negative
|
||||||
|
@ -340,12 +471,14 @@
|
||||||
(transition s
|
(transition s
|
||||||
(unless (and (equal? ackn old-ackn)
|
(unless (and (equal? ackn old-ackn)
|
||||||
(conn-state-syn-acked? s)
|
(conn-state-syn-acked? s)
|
||||||
|
(not (set-member? flags 'fin))
|
||||||
(zero? (bit-string-byte-count chunk)))
|
(zero? (bit-string-byte-count chunk)))
|
||||||
(send (tcp-packet #f
|
(local-require racket/pretty)
|
||||||
dst-ip
|
(pretty-write `(send-outbound (old-ackn ,old-ackn)
|
||||||
dst-port
|
(s ,s)
|
||||||
src-ip
|
(flags ,flags)))
|
||||||
src-port
|
(flush-output)
|
||||||
|
(send (tcp-packet #f dst-ip dst-port src-ip src-port
|
||||||
(buffer-seqn b)
|
(buffer-seqn b)
|
||||||
(or ackn 0)
|
(or ackn 0)
|
||||||
flags
|
flags
|
||||||
|
@ -361,13 +494,13 @@
|
||||||
|
|
||||||
;; ConnState -> Transition
|
;; ConnState -> Transition
|
||||||
(define (quit-when-done s)
|
(define (quit-when-done s)
|
||||||
(if (and (buffer-finished? (conn-state-outbound s))
|
(transition s (when (and (buffer-finished? (conn-state-outbound s))
|
||||||
(buffer-finished? (conn-state-inbound s))
|
(buffer-finished? (conn-state-inbound s))
|
||||||
(all-output-acknowledged? s)
|
(all-output-acknowledged? s)
|
||||||
(> (- (current-inexact-milliseconds) (conn-state-latest-activity-time s))
|
(> (- (current-inexact-milliseconds)
|
||||||
(* 2 1000 maximum-segment-lifetime-sec)))
|
(conn-state-latest-activity-time s))
|
||||||
(transition s (quit))
|
(* 2 1000 maximum-segment-lifetime-sec)))
|
||||||
(transition s '())))
|
(quit))))
|
||||||
|
|
||||||
;; Action
|
;; Action
|
||||||
(define send-set-transmit-check-timer
|
(define send-set-transmit-check-timer
|
||||||
|
@ -375,23 +508,74 @@
|
||||||
transmit-check-interval-msec
|
transmit-check-interval-msec
|
||||||
'relative)))
|
'relative)))
|
||||||
|
|
||||||
|
;; ConnState -> Transition
|
||||||
|
(define (reset seqn ackn is-fin? s)
|
||||||
|
(log-warning "Sending RST from ~a:~a to ~a:~a"
|
||||||
|
(ip-address->hostname dst-ip)
|
||||||
|
dst-port
|
||||||
|
(ip-address->hostname src-ip)
|
||||||
|
src-port)
|
||||||
|
(transition s
|
||||||
|
(list
|
||||||
|
(send (tcp-packet #f dst-ip dst-port src-ip src-port
|
||||||
|
seqn
|
||||||
|
(seq+ ackn (if is-fin? 1 0))
|
||||||
|
(set 'ack 'rst)
|
||||||
|
0
|
||||||
|
#""
|
||||||
|
#""))
|
||||||
|
(quit))))
|
||||||
|
|
||||||
|
;; ConnState -> ConnState
|
||||||
|
(define (close-outbound-stream s)
|
||||||
|
(transition
|
||||||
|
(struct-copy conn-state s
|
||||||
|
[outbound (struct-copy buffer (buffer-push (conn-state-outbound s) #"!") ;; dummy FIN byte
|
||||||
|
[finished? #t])])
|
||||||
|
'()))
|
||||||
|
|
||||||
(define (state-vector-behavior e s)
|
(define (state-vector-behavior e s)
|
||||||
(define old-ackn (buffer-seqn (conn-state-inbound s)))
|
(define old-ackn (buffer-seqn (conn-state-inbound s)))
|
||||||
(match e
|
(match e
|
||||||
|
[(routing-update g)
|
||||||
|
(log-info "State vector routing-update:\n~a" (gestalt->pretty-string g))
|
||||||
|
(define local-peer-present? (not (gestalt-empty? g)))
|
||||||
|
(cond
|
||||||
|
[(and local-peer-present? (not (conn-state-local-peer-seen? s)))
|
||||||
|
(transition (struct-copy conn-state s [local-peer-seen? #t]) '())]
|
||||||
|
[(and (not local-peer-present?) (conn-state-local-peer-seen? s))
|
||||||
|
(log-info "Closing outbound stream.")
|
||||||
|
(sequence-transitions (close-outbound-stream s)
|
||||||
|
(send-outbound old-ackn)
|
||||||
|
bump-activity-time
|
||||||
|
quit-when-done)]
|
||||||
|
[else #f])]
|
||||||
[(message (tcp-packet #t _ _ _ _ seqn ackn flags window options data) _ _)
|
[(message (tcp-packet #t _ _ _ _ seqn ackn flags window options data) _ _)
|
||||||
(define expected (next-expected-seqn s))
|
(define expected (next-expected-seqn s))
|
||||||
(sequence-transitions (if (not expected) ;; haven't seen syn yet...
|
(if (and (not expected) ;; no syn yet
|
||||||
(if (set-member? flags 'syn) ;; ... and this is it
|
(not (set-member? flags 'syn))) ;; and this isn't it
|
||||||
(incorporate-segment data
|
(reset ackn ;; this is *our* seqn
|
||||||
(set-inbound-seqn (seq+ seqn 1) s))
|
seqn ;; this is what we should acknowledge...
|
||||||
(transition s '()))
|
(set-member? flags 'fin) ;; ... +1, if fin is set
|
||||||
(if (= expected seqn)
|
s)
|
||||||
(incorporate-segment data s)
|
(sequence-transitions (cond
|
||||||
(transition s '())))
|
[(not expected) ;; haven't seen syn yet, but we know this is it
|
||||||
deliver-inbound-locally
|
(incorporate-segment data (set-inbound-seqn (seq+ seqn 1) s))]
|
||||||
(check-fin flags)
|
[(= expected seqn)
|
||||||
(discard-acknowledged-outbound (set-member? flags 'ack) ackn)
|
(incorporate-segment data s)]
|
||||||
(update-outbound-window window)
|
[else
|
||||||
|
(transition s '())])
|
||||||
|
deliver-inbound-locally
|
||||||
|
(check-fin flags)
|
||||||
|
(discard-acknowledged-outbound (set-member? flags 'ack) ackn)
|
||||||
|
(update-outbound-window window)
|
||||||
|
(send-outbound old-ackn)
|
||||||
|
bump-activity-time
|
||||||
|
quit-when-done))]
|
||||||
|
[(message (tcp-channel _ _ bs) _ _)
|
||||||
|
(sequence-transitions (transition (struct-copy conn-state s
|
||||||
|
[outbound (buffer-push (conn-state-outbound s) bs)])
|
||||||
|
'())
|
||||||
(send-outbound old-ackn)
|
(send-outbound old-ackn)
|
||||||
bump-activity-time
|
bump-activity-time
|
||||||
quit-when-done)]
|
quit-when-done)]
|
||||||
|
@ -412,13 +596,11 @@
|
||||||
;; TODO append a dummy byte at FIN position in outbound buffer
|
;; TODO append a dummy byte at FIN position in outbound buffer
|
||||||
(list
|
(list
|
||||||
send-set-transmit-check-timer
|
send-set-transmit-check-timer
|
||||||
(spawn state-vector-behavior
|
(let ((state0 (conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position
|
||||||
(conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position
|
(buffer #"" #f inbound-buffer-limit #f)
|
||||||
(buffer #"" #f inbound-buffer-limit #f)
|
#f
|
||||||
#f
|
(current-inexact-milliseconds)
|
||||||
(current-inexact-milliseconds))
|
#f)))
|
||||||
(gestalt-union (sub (timer-expired (timer-name ?) ?))
|
(spawn state-vector-behavior
|
||||||
(sub (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?))
|
state0
|
||||||
(pub (tcp-packet #f dst-ip dst-port src-ip src-port ? ? ? ? ? ?))
|
(compute-gestalt state0)))))
|
||||||
(sub (tcp-channel dst src ?))
|
|
||||||
(pub (tcp-channel src dst ?))))))
|
|
||||||
|
|
Loading…
Reference in New Issue