Many improvements to the netstack TCP implementation.
- New timestate `on-timeout` complementing `stop-when-timeout` - IP layer avoids spurious reordering - Demo HTTP server sends 4kB responses, for testing - TCP now has something closer to proper sliding-window behavior - TCP RTT estimator - TCP now uses timestate driver rather than raw timer driver - Many small TCP bugs found and fixed
This commit is contained in:
parent
87495bdc37
commit
53e26b08a1
|
@ -176,20 +176,22 @@
|
|||
(when (and (not (equal? (ip-packet-source-interface p) interface-name))
|
||||
(ip-address-in-subnet? destination network netmask))
|
||||
(define timer-id (gensym 'ippkt))
|
||||
(react (on-start (send! (set-timer timer-id 5000 'relative)))
|
||||
;; v Use `spawn` instead of `react` to avoid gratuitous packet
|
||||
;; reordering.
|
||||
(spawn (on-start (send! (set-timer timer-id 5000 'relative)))
|
||||
(stop-when (message (timer-expired timer-id _))
|
||||
(log-warning "ARP lookup of ~a failed, packet dropped"
|
||||
(ip-address->hostname destination)))
|
||||
(log-warning "ARP lookup of ~a failed, packet dropped"
|
||||
(ip-address->hostname destination)))
|
||||
(stop-when (asserted (arp-query IPv4-ethertype
|
||||
destination
|
||||
($ interface (ethernet-interface interface-name _))
|
||||
$destination-hwaddr))
|
||||
(send! (ethernet-packet interface
|
||||
#f
|
||||
(ethernet-interface-hwaddr interface)
|
||||
destination-hwaddr
|
||||
IPv4-ethertype
|
||||
(format-ip-packet p))))))))
|
||||
(send! (ethernet-packet interface
|
||||
#f
|
||||
(ethernet-interface-hwaddr interface)
|
||||
destination-hwaddr
|
||||
IPv4-ethertype
|
||||
(format-ip-packet p))))))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
|
|
|
@ -83,4 +83,7 @@
|
|||
"<p>There have been ~a requests prior to this one.</p>\n")
|
||||
counter)))
|
||||
(send! (outbound (tcp-channel us them response)))
|
||||
(for [(i 4)]
|
||||
(define buf (make-bytes 1024 (+ #x30 i)))
|
||||
(send! (outbound (tcp-channel us them buf))))
|
||||
(stop-facet (current-facet-id)))))))
|
||||
|
|
|
@ -13,10 +13,14 @@
|
|||
(require "dump-bytes.rkt")
|
||||
(require "checksum.rkt")
|
||||
|
||||
(require/activate syndicate/drivers/timer)
|
||||
(require/activate syndicate/drivers/timestate)
|
||||
(require "ip.rkt")
|
||||
(require "port-allocator.rkt")
|
||||
|
||||
(module+ test (require rackunit))
|
||||
|
||||
(define-logger netstack/tcp)
|
||||
|
||||
;; tcp-address/tcp-address : "kernel" tcp connection state machines
|
||||
;; tcp-handle/tcp-address : "user" outbound connections
|
||||
;; tcp-listener/tcp-address : "user" inbound connections
|
||||
|
@ -46,6 +50,19 @@
|
|||
;; (tcp-port-allocation Number (U TcpHandle TcpListener))
|
||||
(struct tcp-port-allocation (port handle) #:prefab)
|
||||
|
||||
(define (summarize-tcp-packet packet)
|
||||
(format "(~a) ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a, payload ~a)"
|
||||
(if (tcp-packet-from-wire? packet) "I" "O")
|
||||
(ip-address->hostname (tcp-packet-source-ip packet))
|
||||
(tcp-packet-source-port packet)
|
||||
(ip-address->hostname (tcp-packet-destination-ip packet))
|
||||
(tcp-packet-destination-port packet)
|
||||
(tcp-packet-sequence-number packet)
|
||||
(tcp-packet-ack-number packet)
|
||||
(tcp-packet-flags packet)
|
||||
(tcp-packet-window-size packet)
|
||||
(bit-string-byte-count (tcp-packet-data packet))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; User-accessible driver startup
|
||||
|
||||
|
@ -100,10 +117,9 @@
|
|||
(field [local-peer-present? #f]
|
||||
[remote-peer-present? #f])
|
||||
|
||||
(on-start (send! (set-timer timer-name relay-peer-wait-time-msec 'relative)))
|
||||
(on (message (timer-expired timer-name _))
|
||||
(when (not (and (local-peer-present?) (remote-peer-present?)))
|
||||
(error 'spawn-relay "TCP relay process timed out waiting for peer")))
|
||||
(on-timeout relay-peer-wait-time-msec
|
||||
(when (not (and (local-peer-present?) (remote-peer-present?)))
|
||||
(error 'spawn-relay "TCP relay process timed out waiting for peer")))
|
||||
|
||||
(on (asserted (observe (tcp-channel remote-addr local-user-addr _)))
|
||||
(local-peer-present? #t))
|
||||
|
@ -165,16 +181,6 @@
|
|||
(define-syntax-rule (set-flags! v ...)
|
||||
(begin (unless (zero? v) (set! flags (set-add flags 'v))) ...))
|
||||
(set-flags! ns cwr ece urg ack psh rst syn fin)
|
||||
(log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)"
|
||||
(ip-address->hostname src-ip)
|
||||
src-port
|
||||
(ip-address->hostname dst-ip)
|
||||
dst-port
|
||||
sequence-number
|
||||
ack-number
|
||||
flags
|
||||
window-size)
|
||||
(when spawn-needed? (log-info " - spawn needed!"))
|
||||
(bit-string-case rest
|
||||
([ (opts :: binary bytes (- (* data-offset 4) 20))
|
||||
(data :: binary) ]
|
||||
|
@ -189,7 +195,9 @@
|
|||
window-size
|
||||
(bit-string->bytes opts)
|
||||
(bit-string->bytes data))))
|
||||
(log-netstack/tcp-debug "TCP ~a" (summarize-tcp-packet packet))
|
||||
(when spawn-needed?
|
||||
(log-netstack/tcp-debug " - spawn needed!")
|
||||
(active-state-vectors (set-add (active-state-vectors) statevec))
|
||||
(spawn-state-vector src-ip src-port dst-ip dst-port))
|
||||
(send! packet)))
|
||||
|
@ -197,9 +205,9 @@
|
|||
(else #f)))
|
||||
|
||||
(begin/dataflow
|
||||
(log-info "SCN yielded statevecs ~v and local-ips ~v"
|
||||
(active-state-vectors)
|
||||
(local-ips)))
|
||||
(log-netstack/tcp-debug "SCN yielded statevecs ~v and local-ips ~v"
|
||||
(active-state-vectors)
|
||||
(local-ips)))
|
||||
|
||||
(define (deliver-outbound-packet p)
|
||||
(match-define (tcp-packet #f
|
||||
|
@ -214,15 +222,7 @@
|
|||
options
|
||||
data)
|
||||
p)
|
||||
(log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)"
|
||||
(ip-address->hostname src-ip)
|
||||
src-port
|
||||
(ip-address->hostname dst-ip)
|
||||
dst-port
|
||||
sequence-number
|
||||
ack-number
|
||||
flags
|
||||
window-size)
|
||||
(log-netstack/tcp-debug "TCP ~a" (summarize-tcp-packet p))
|
||||
(define (flag-bit sym) (if (set-member? flags sym) 1 0))
|
||||
(define payload (bit-string (src-port :: integer bytes 2)
|
||||
(dst-port :: integer bytes 2)
|
||||
|
@ -263,12 +263,108 @@
|
|||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Per-connection state vector process
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
;; From the RFC:
|
||||
;;
|
||||
;; Send Sequence Variables
|
||||
;;
|
||||
;; SND.UNA - send unacknowledged
|
||||
;; SND.NXT - send next
|
||||
;; SND.WND - send window
|
||||
;; SND.UP - send urgent pointer
|
||||
;; SND.WL1 - segment sequence number used for last window update
|
||||
;; SND.WL2 - segment acknowledgment number used for last window
|
||||
;; update
|
||||
;; ISS - initial send sequence number
|
||||
;;
|
||||
;; Receive Sequence Variables
|
||||
;;
|
||||
;; RCV.NXT - receive next
|
||||
;; RCV.WND - receive window
|
||||
;; RCV.UP - receive urgent pointer
|
||||
;; IRS - initial receive sequence number
|
||||
;;
|
||||
;; The following diagrams may help to relate some of these variables to
|
||||
;; the sequence space.
|
||||
;;
|
||||
;; Send Sequence Space
|
||||
;;
|
||||
;; 1 2 3 4
|
||||
;; ----------|----------|----------|----------
|
||||
;; SND.UNA SND.NXT SND.UNA
|
||||
;; +SND.WND
|
||||
;;
|
||||
;; 1 - old sequence numbers which have been acknowledged
|
||||
;; 2 - sequence numbers of unacknowledged data
|
||||
;; 3 - sequence numbers allowed for new data transmission
|
||||
;; 4 - future sequence numbers which are not yet allowed
|
||||
;;
|
||||
;; Send Sequence Space
|
||||
;;
|
||||
;; Figure 4.
|
||||
;;
|
||||
;; The send window is the portion of the sequence space labeled 3 in
|
||||
;; figure 4.
|
||||
;;
|
||||
;; Receive Sequence Space
|
||||
;;
|
||||
;; 1 2 3
|
||||
;; ----------|----------|----------
|
||||
;; RCV.NXT RCV.NXT
|
||||
;; +RCV.WND
|
||||
;;
|
||||
;; 1 - old sequence numbers which have been acknowledged
|
||||
;; 2 - sequence numbers allowed for new reception
|
||||
;; 3 - future sequence numbers which are not yet allowed
|
||||
;;
|
||||
;; Receive Sequence Space
|
||||
;;
|
||||
;; Figure 5.
|
||||
;;
|
||||
;; The receive window is the portion of the sequence space labeled 2 in
|
||||
;; figure 5.
|
||||
;;
|
||||
;; There are also some variables used frequently in the discussion that
|
||||
;; take their values from the fields of the current segment.
|
||||
;;
|
||||
;; Current Segment Variables
|
||||
;;
|
||||
;; SEG.SEQ - segment sequence number
|
||||
;; SEG.ACK - segment acknowledgment number
|
||||
;; SEG.LEN - segment length
|
||||
;; SEG.WND - segment window
|
||||
;; SEG.UP - segment urgent pointer
|
||||
;; SEG.PRC - segment precedence value
|
||||
;;
|
||||
;;---------------------------------------------------------------------------
|
||||
|
||||
(struct buffer (data ;; bit-string
|
||||
seqn ;; names leftmost byte in data
|
||||
window ;; counts bytes from leftmost byte in data
|
||||
finished?) ;; boolean: true after FIN
|
||||
#:transparent)
|
||||
|
||||
;; Regarding acks:
|
||||
;;
|
||||
;; - we send an ack number that is (buffer-seqn (inbound)) plus the
|
||||
;; number of buffered bytes.
|
||||
;;
|
||||
;; - acks received allow us to advance (buffer-seqn (outbound)) (that
|
||||
;; is, SND.UNA) to that point, discarding buffered data to do so.
|
||||
|
||||
;; Regarding windows:
|
||||
;;
|
||||
;; - (buffer-window (outbound)) is the size of the peer's receive
|
||||
;; window. Do not allow more than this many bytes to be
|
||||
;; unacknowledged on the wire.
|
||||
;;
|
||||
;; - (buffer-window (inbound)) is the size of our receive window. The
|
||||
;; peer should not exceed this; we should ignore data received that
|
||||
;; extends beyond this. Once we implement flow control locally
|
||||
;; (ahem) we should move this around, but at present it is fixed.
|
||||
|
||||
;; TODO: Zero receive window probe when we have something to say.
|
||||
|
||||
(define (buffer-push b data)
|
||||
(struct-copy buffer b [data (bit-string-append (buffer-data b) data)]))
|
||||
|
||||
|
@ -289,12 +385,55 @@
|
|||
(- larger smaller)))
|
||||
|
||||
(define (seq> a b)
|
||||
(not (seq>= b a)))
|
||||
|
||||
(define (seq>= a b)
|
||||
(< (seq- a b) #x80000000))
|
||||
|
||||
(define (seq-min a b) (if (seq> a b) b a))
|
||||
(define (seq-max a b) (if (seq> a b) a b))
|
||||
|
||||
(module+ test
|
||||
(check-equal? (seq+ 41724780 1) 41724781)
|
||||
(check-equal? (seq+ 0 1) 1)
|
||||
(check-equal? (seq+ #x80000000 1) #x80000001)
|
||||
(check-equal? (seq+ #xffffffff 1) #x00000000)
|
||||
|
||||
(check-equal? (seq> 41724780 41724780) #f)
|
||||
(check-equal? (seq> 41724781 41724780) #t)
|
||||
(check-equal? (seq> 41724780 41724781) #f)
|
||||
|
||||
(check-equal? (seq> 0 0) #f)
|
||||
(check-equal? (seq> 1 0) #t)
|
||||
(check-equal? (seq> 0 1) #f)
|
||||
|
||||
(check-equal? (seq> #x80000000 #x80000000) #f)
|
||||
(check-equal? (seq> #x80000001 #x80000000) #t)
|
||||
(check-equal? (seq> #x80000000 #x80000001) #f)
|
||||
|
||||
(check-equal? (seq> #xffffffff #xffffffff) #f)
|
||||
(check-equal? (seq> #x00000000 #xffffffff) #t)
|
||||
(check-equal? (seq> #xffffffff #x00000000) #f)
|
||||
|
||||
(check-equal? (seq>= 41724780 41724780) #t)
|
||||
(check-equal? (seq>= 41724781 41724780) #t)
|
||||
(check-equal? (seq>= 41724780 41724781) #f)
|
||||
|
||||
(check-equal? (seq>= 0 0) #t)
|
||||
(check-equal? (seq>= 1 0) #t)
|
||||
(check-equal? (seq>= 0 1) #f)
|
||||
|
||||
(check-equal? (seq>= #x80000000 #x80000000) #t)
|
||||
(check-equal? (seq>= #x80000001 #x80000000) #t)
|
||||
(check-equal? (seq>= #x80000000 #x80000001) #f)
|
||||
|
||||
(check-equal? (seq>= #xffffffff #xffffffff) #t)
|
||||
(check-equal? (seq>= #x00000000 #xffffffff) #t)
|
||||
(check-equal? (seq>= #xffffffff #x00000000) #f))
|
||||
|
||||
(define (spawn-state-vector src-ip src-port dst-ip dst-port)
|
||||
(define src (tcp-address (ip-address->hostname src-ip) src-port))
|
||||
(define dst (tcp-address (ip-address->hostname dst-ip) dst-port))
|
||||
(define (timer-name kind) (list 'tcp-timer kind src dst))
|
||||
|
||||
(spawn
|
||||
#:name (list 'tcp-state-vector
|
||||
|
@ -317,20 +456,26 @@
|
|||
(inexact->exact (truncate (* #x100000000 (random)))))
|
||||
|
||||
(field [outbound (buffer #"!" initial-outbound-seqn 0 #f)] ;; dummy data at SYN position
|
||||
[send-next initial-outbound-seqn] ;; SND.NXT
|
||||
[high-water-mark initial-outbound-seqn]
|
||||
|
||||
[inbound (buffer #"" #f inbound-buffer-limit #f)]
|
||||
[transmission-needed? #f]
|
||||
[syn-acked? #f]
|
||||
|
||||
[latest-peer-activity-time (current-inexact-milliseconds)]
|
||||
;; ^ the most recent time we heard from our peer
|
||||
[user-timeout-base-time (current-inexact-milliseconds)]
|
||||
;; ^ when the index of the first outbound unacknowledged byte changed
|
||||
[most-recent-time (current-inexact-milliseconds)]
|
||||
;; ^ updated by timer expiry; a field, to trigger quit checks
|
||||
)
|
||||
|
||||
(let ()
|
||||
(local-require (submod syndicate/actor priorities))
|
||||
(on-event #:priority *query-priority*
|
||||
[_ (most-recent-time (current-inexact-milliseconds))]))
|
||||
;; RFC 6298
|
||||
[rtt-estimate #f] ;; milliseconds; "SRTT"
|
||||
[rtt-mean-deviation #f] ;; milliseconds; "RTTVAR"
|
||||
[retransmission-timeout 1000] ;; milliseconds
|
||||
[retransmission-deadline #f]
|
||||
[rtt-estimate-seqn-target #f]
|
||||
[rtt-estimate-start-time #f]
|
||||
)
|
||||
|
||||
(define (next-expected-seqn)
|
||||
(define b (inbound))
|
||||
|
@ -341,7 +486,6 @@
|
|||
(inbound (struct-copy buffer (inbound) [seqn seqn])))
|
||||
|
||||
(define (incorporate-segment! data)
|
||||
;; (log-info "GOT INBOUND STUFF TO DELIVER ~v" data)
|
||||
(when (not (buffer-finished? (inbound)))
|
||||
(inbound (buffer-push (inbound) data))))
|
||||
|
||||
|
@ -357,149 +501,258 @@
|
|||
;; (Setof Symbol) -> Void
|
||||
(define (check-fin! flags)
|
||||
(define b (inbound))
|
||||
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
|
||||
(error 'check-fin "Nonempty inbound buffer"))
|
||||
(when (set-member? flags 'fin)
|
||||
(log-info "Closing inbound stream.")
|
||||
(inbound (struct-copy buffer b
|
||||
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
|
||||
[finished? #t]))))
|
||||
(when (not (buffer-finished? b))
|
||||
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
|
||||
(error 'check-fin "Nonempty inbound buffer"))
|
||||
(when (set-member? flags 'fin)
|
||||
(log-netstack/tcp-debug "Closing inbound stream.")
|
||||
(inbound (struct-copy buffer b
|
||||
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
|
||||
[finished? #t]))
|
||||
(transmission-needed? #t)))) ;; we must send an ack
|
||||
|
||||
;; -> Void
|
||||
(define (arm-retransmission-timer!)
|
||||
(log-netstack/tcp-debug "Arming retransmission timer (~a ms)" (retransmission-timeout))
|
||||
(retransmission-deadline (+ (current-inexact-milliseconds) (retransmission-timeout))))
|
||||
|
||||
;; Timestamp -> Void
|
||||
(define (start-rtt-estimate! now)
|
||||
(define target (send-next))
|
||||
(when (seq>= target (high-water-mark))
|
||||
(log-netstack/tcp-debug "Starting RTT estimation; target seqn is ~a" target)
|
||||
(rtt-estimate-start-time now)
|
||||
(rtt-estimate-seqn-target target)))
|
||||
|
||||
;; -> Void
|
||||
(define (reset-rtt-estimate!)
|
||||
(rtt-estimate-start-time #f)
|
||||
(rtt-estimate-seqn-target #f))
|
||||
|
||||
;; Timestamp -> Void
|
||||
(define (finish-rtt-estimate! now)
|
||||
(define rtt-measurement (- now (rtt-estimate-start-time)))
|
||||
(reset-rtt-estimate!)
|
||||
(log-netstack/tcp-debug "RTT measurement: ~a ms" rtt-measurement)
|
||||
;; RFC 6298 Section 2.
|
||||
(cond [(rtt-estimate) => ;; we have a previous estimate, RFC 6298 rule (2.3)
|
||||
(lambda (prev-estimate)
|
||||
(rtt-mean-deviation (+ (* 0.75 (rtt-mean-deviation))
|
||||
(* 0.25 (abs (- rtt-measurement prev-estimate)))))
|
||||
(rtt-estimate (+ (* 0.875 prev-estimate)
|
||||
(* 0.125 rtt-measurement))))]
|
||||
[else ;; no previous estimate, RFC 6298 rule (2.2) applies
|
||||
(rtt-estimate rtt-measurement)
|
||||
(rtt-mean-deviation (/ rtt-measurement 2))])
|
||||
(default-retransmission-timeout!)
|
||||
(log-netstack/tcp-debug "RTT measurement ~a ms; estimate ~a ms; mean deviation ~a ms; RTO ~a ms"
|
||||
rtt-measurement
|
||||
(rtt-estimate)
|
||||
(rtt-mean-deviation)
|
||||
(retransmission-timeout)))
|
||||
|
||||
(define (default-retransmission-timeout!)
|
||||
(retransmission-timeout
|
||||
(max 200 ;; RFC 6298 rule (2.4), but cribbing from Linux's 200ms minimum
|
||||
(min 60000 ;; (2.5)
|
||||
(+ (rtt-estimate) (* 4 (rtt-mean-deviation))))))) ;; (2.2), (2.3)
|
||||
|
||||
;; Boolean SeqNum -> Void
|
||||
(define (discard-acknowledged-outbound! ack? ackn)
|
||||
(when ack?
|
||||
(let* ((b (outbound))
|
||||
(base (buffer-seqn b))
|
||||
(limit (seq+ (buffer-seqn b) (bit-string-byte-count (buffer-data b))))
|
||||
(ackn (if (seq> ackn limit) limit ackn))
|
||||
(ackn (if (seq> base ackn) base ackn))
|
||||
(ackn (seq-min ackn (high-water-mark)))
|
||||
(ackn (seq-max ackn base))
|
||||
(dist (seq- ackn base)))
|
||||
(define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset!
|
||||
(user-timeout-base-time (current-inexact-milliseconds))
|
||||
(outbound (struct-copy buffer b [data remaining-data] [seqn ackn]))
|
||||
(syn-acked? (or (syn-acked?) (positive? dist))))))
|
||||
(when (positive? dist)
|
||||
(when (not (syn-acked?)) (syn-acked? #t))
|
||||
(log-netstack/tcp-debug "******** ackn ~a; send-next ~a; high-water-mark ~a"
|
||||
ackn
|
||||
(send-next)
|
||||
(high-water-mark))
|
||||
(when (seq> ackn (send-next)) (send-next ackn))
|
||||
(when (and (rtt-estimate-seqn-target) (seq>= ackn (rtt-estimate-seqn-target)))
|
||||
(finish-rtt-estimate! (current-inexact-milliseconds)))
|
||||
|
||||
(define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset!
|
||||
(outbound (struct-copy buffer b [data remaining-data] [seqn ackn]))
|
||||
|
||||
(default-retransmission-timeout!)
|
||||
(log-netstack/tcp-debug "Positive distance moved by ack, RTO now ~a"
|
||||
(retransmission-timeout))
|
||||
(arm-retransmission-timer!)))))
|
||||
|
||||
;; Nat -> Void
|
||||
(define (update-outbound-window! peer-window)
|
||||
(log-netstack/tcp-debug "Peer's receive-window is now ~a" peer-window)
|
||||
(outbound (struct-copy buffer (outbound) [window peer-window])))
|
||||
|
||||
;; True iff there is no queued-up data waiting either for
|
||||
;; transmission or (if transmitted already) for acknowledgement.
|
||||
(define (all-output-acknowledged?)
|
||||
(bit-string-empty? (buffer-data (outbound))))
|
||||
|
||||
;; (Option SeqNum) -> Void
|
||||
(define (send-outbound! old-ackn)
|
||||
(define b (outbound))
|
||||
(define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b))
|
||||
(if (buffer-finished? b) 1 0))))
|
||||
|
||||
(define segment-size (min maximum-segment-size
|
||||
(if (syn-acked?) (buffer-window b) 1)
|
||||
;; ^ can only send SYN until SYN is acked
|
||||
pending-byte-count))
|
||||
(define segment-offset (if (syn-acked?) 0 1))
|
||||
(define chunk0 (bit-string-take (buffer-data b) (* segment-size 8))) ;; bit offset!
|
||||
(define chunk (bit-string-drop chunk0 (* segment-offset 8))) ;; bit offset!
|
||||
(define ackn (next-expected-seqn))
|
||||
(define flags (set))
|
||||
(when ackn
|
||||
(set! flags (set-add flags 'ack)))
|
||||
(when (not (syn-acked?))
|
||||
(set! flags (set-add flags 'syn)))
|
||||
(when (and (buffer-finished? b)
|
||||
(syn-acked?)
|
||||
(= segment-size pending-byte-count)
|
||||
(not (all-output-acknowledged?))) ;; TODO: reexamine. This looks fishy
|
||||
(set! flags (set-add flags 'fin)))
|
||||
(define window (min 65535 ;; limit of field width
|
||||
(max 0 ;; can't be negative
|
||||
(- (buffer-window (inbound))
|
||||
(bit-string-byte-count (buffer-data (inbound)))))))
|
||||
(unless (and (equal? ackn old-ackn)
|
||||
(syn-acked?)
|
||||
(not (set-member? flags 'fin))
|
||||
(zero? (bit-string-byte-count chunk)))
|
||||
(local-require racket/pretty)
|
||||
(pretty-write `(send-outbound (old-ackn ,old-ackn)
|
||||
(flags ,flags)))
|
||||
(flush-output)
|
||||
(send! (tcp-packet #f dst-ip dst-port src-ip src-port
|
||||
(buffer-seqn b)
|
||||
(or ackn 0)
|
||||
flags
|
||||
window
|
||||
#""
|
||||
chunk))))
|
||||
|
||||
(define (bump-peer-activity-time!)
|
||||
(latest-peer-activity-time (current-inexact-milliseconds)))
|
||||
|
||||
;; Number -> Boolean
|
||||
(define (heard-from-peer-within-msec? msec)
|
||||
(<= (- (most-recent-time) (latest-peer-activity-time)) msec))
|
||||
|
||||
(define (user-timeout-expired?)
|
||||
(and (not (all-output-acknowledged?))
|
||||
(> (- (most-recent-time) (user-timeout-base-time))
|
||||
user-timeout-msec)))
|
||||
|
||||
(define (send-set-transmit-check-timer!)
|
||||
(send! (set-timer (timer-name 'transmit-check)
|
||||
transmit-check-interval-msec
|
||||
'relative)))
|
||||
|
||||
(define (reset! seqn ackn)
|
||||
(log-warning "Sending RST from ~a:~a to ~a:~a"
|
||||
(ip-address->hostname dst-ip)
|
||||
dst-port
|
||||
(ip-address->hostname src-ip)
|
||||
src-port)
|
||||
(stop-facet root-facet)
|
||||
(send! (tcp-packet #f dst-ip dst-port src-ip src-port
|
||||
seqn
|
||||
ackn
|
||||
(set 'ack 'rst)
|
||||
0
|
||||
#""
|
||||
#"")))
|
||||
|
||||
(define (close-outbound-stream!)
|
||||
(define b (outbound))
|
||||
(when (not (buffer-finished? b))
|
||||
(outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte
|
||||
[finished? #t]))))
|
||||
[finished? #t]))
|
||||
(transmission-needed? #t))) ;; the FIN machinery is awkwardly
|
||||
;; different from the usual
|
||||
;; advance-based decision on
|
||||
;; whether to send a packet or not
|
||||
|
||||
;; SeqNum Boolean Boolean Bytes -> TcpPacket
|
||||
(define (build-outbound-packet seqn mention-syn? mention-fin? payload)
|
||||
(define ackn (next-expected-seqn))
|
||||
(define window (min 65535 ;; limit of field width
|
||||
(max 0 ;; can't be negative
|
||||
(- (buffer-window (inbound))
|
||||
(bit-string-byte-count (buffer-data (inbound)))))))
|
||||
|
||||
(define flags (set))
|
||||
(when ackn (set! flags (set-add flags 'ack)))
|
||||
(when mention-syn? (set! flags (set-add flags 'syn)))
|
||||
(when mention-fin? (set! flags (set-add flags 'fin)))
|
||||
|
||||
(tcp-packet #f dst-ip dst-port src-ip src-port
|
||||
seqn
|
||||
(or ackn 0)
|
||||
flags
|
||||
window
|
||||
#""
|
||||
payload))
|
||||
|
||||
(define (outbound-data-chunk offset length)
|
||||
(bit-string-take (bit-string-drop (buffer-data (outbound)) (* offset 8)) (* length 8)))
|
||||
|
||||
;; Transmit acknowledgements and outbound data.
|
||||
(begin/dataflow
|
||||
(define in-flight-count (seq- (send-next) (buffer-seqn (outbound))))
|
||||
|
||||
(define-values (mention-syn? ;; whether to mention SYN
|
||||
payload-size ;; how many bytes of payload data to include
|
||||
mention-fin? ;; whether to mention FIN
|
||||
advance) ;; how far to advance send-next
|
||||
(if (syn-acked?)
|
||||
(let* ((effective-window (max 0 (- (buffer-window (outbound)) in-flight-count)))
|
||||
(stream-ended? (buffer-finished? (outbound)))
|
||||
(max-advance (- (bit-string-byte-count (buffer-data (outbound))) in-flight-count))
|
||||
(payload-size (min maximum-segment-size effective-window max-advance)))
|
||||
(if (and stream-ended? ;; there's a FIN enqueued,
|
||||
(positive? payload-size) ;; we aren't sending nothing at all,
|
||||
(= payload-size max-advance)) ;; and our payload would cover the FIN
|
||||
(values #f (- payload-size 1) #t payload-size)
|
||||
(values #f payload-size #f payload-size)))
|
||||
(cond [(= in-flight-count 0) (values #t 0 #f 1)]
|
||||
[(= in-flight-count 1) (values #t 0 #f 0)]
|
||||
[else (error 'send-outbound!
|
||||
"Invalid state: send-next had advanced too far before SYN")])))
|
||||
|
||||
(when (and (or (next-expected-seqn) (local-peer-seen?))
|
||||
;; ^ Talk only either if: we know the peer's seqn, or
|
||||
;; we don't, but a local peer exists, which means
|
||||
;; we're an outbound connection rather than a
|
||||
;; listener.
|
||||
(or (transmission-needed?)
|
||||
(positive? advance))
|
||||
;; ^ ... and we have something to say. Something to
|
||||
;; ack, or something to send.
|
||||
)
|
||||
(define packet-seqn (if mention-syn? (buffer-seqn (outbound)) (send-next)))
|
||||
(define packet (build-outbound-packet packet-seqn
|
||||
mention-syn?
|
||||
mention-fin?
|
||||
(outbound-data-chunk in-flight-count payload-size)))
|
||||
(when (positive? advance)
|
||||
(define new-send-next (seq+ (send-next) advance))
|
||||
(send-next new-send-next)
|
||||
(when (seq> new-send-next (high-water-mark))
|
||||
(high-water-mark new-send-next)))
|
||||
(when (transmission-needed?)
|
||||
(transmission-needed? #f))
|
||||
|
||||
;; (log-netstack/tcp-debug " sending ~v" packet)
|
||||
(send! packet)
|
||||
;; (if (> (random) 0.5)
|
||||
;; (begin (log-netstack/tcp-debug "Send ~a" (summarize-tcp-packet packet))
|
||||
;; (send! packet))
|
||||
;; (log-netstack/tcp-debug "Drop ~a" (summarize-tcp-packet packet)))
|
||||
|
||||
(when (or mention-syn? mention-fin? (positive? advance))
|
||||
(when (not (retransmission-deadline))
|
||||
(arm-retransmission-timer!))
|
||||
(when (not (rtt-estimate-start-time))
|
||||
(start-rtt-estimate! (current-inexact-milliseconds))))))
|
||||
|
||||
(begin/dataflow
|
||||
(when (and (retransmission-deadline) (all-output-acknowledged?))
|
||||
(log-netstack/tcp-debug "All output acknowledged; disarming retransmission timer")
|
||||
(retransmission-deadline #f)))
|
||||
|
||||
(on #:when (retransmission-deadline) (asserted (later-than (retransmission-deadline)))
|
||||
(send-next (buffer-seqn (outbound)))
|
||||
(log-netstack/tcp-debug "Retransmission deadline fired, RTO was ~a; reset to ~a"
|
||||
(retransmission-timeout)
|
||||
(send-next))
|
||||
(update-outbound-window! maximum-segment-size) ;; temporary. Will reopen on next ack
|
||||
(transmission-needed? #t)
|
||||
(retransmission-deadline #f)
|
||||
(reset-rtt-estimate!) ;; give up on current RTT estimation
|
||||
(retransmission-timeout (min 64000 (* 2 (retransmission-timeout))))
|
||||
(log-netstack/tcp-debug " RTO now ~a" (retransmission-timeout)))
|
||||
|
||||
(define (reset! seqn ackn)
|
||||
(define reset-packet (tcp-packet #f dst-ip dst-port src-ip src-port
|
||||
seqn
|
||||
ackn
|
||||
(set 'ack 'rst)
|
||||
0
|
||||
#""
|
||||
#""))
|
||||
(log-netstack/tcp-warning "Reset ~a" (summarize-tcp-packet reset-packet))
|
||||
(stop-facet root-facet)
|
||||
(send! reset-packet))
|
||||
|
||||
(assert #:when (and (syn-acked?) (not (buffer-finished? (inbound))))
|
||||
(advertise (tcp-channel src dst _)))
|
||||
|
||||
(stop-when-true
|
||||
(and (buffer-finished? (outbound))
|
||||
(buffer-finished? (inbound))
|
||||
(all-output-acknowledged?)
|
||||
(not (heard-from-peer-within-msec? (* 2 1000 maximum-segment-lifetime-sec))))
|
||||
(on-start (log-netstack/tcp-info "Starting state vector ~a-~a" src-port dst-port))
|
||||
(on-stop (log-netstack/tcp-info "Stopping state vector ~a-~a" src-port dst-port))
|
||||
|
||||
(stop-when #:when (and (buffer-finished? (outbound))
|
||||
(buffer-finished? (inbound))
|
||||
(all-output-acknowledged?))
|
||||
(asserted (later-than (+ (latest-peer-activity-time)
|
||||
(* 2 1000 maximum-segment-lifetime-sec))))
|
||||
;; Everything is cleanly shut down, and we just need to wait a while for unexpected
|
||||
;; packets before we release the state vector.
|
||||
)
|
||||
|
||||
(stop-when-true (user-timeout-expired?)
|
||||
(stop-when #:when (not (all-output-acknowledged?))
|
||||
(asserted (later-than (+ (user-timeout-base-time) user-timeout-msec)))
|
||||
;; We've been plaintively retransmitting for user-timeout-msec without hearing anything
|
||||
;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but
|
||||
;; it will do for now? TODO
|
||||
(log-info "TCP_USER_TIMEOUT fired."))
|
||||
(log-netstack/tcp-warning "TCP_USER_TIMEOUT fired."))
|
||||
|
||||
(define/query-value local-peer-seen? #f (observe (tcp-channel src dst _)) #t
|
||||
#:on-remove (begin
|
||||
(log-info "Closing outbound stream.")
|
||||
(close-outbound-stream!)
|
||||
(send-outbound! (buffer-seqn (inbound)))))
|
||||
(log-netstack/tcp-debug "Closing outbound stream.")
|
||||
(close-outbound-stream!)))
|
||||
|
||||
(define/query-value listener-listening?
|
||||
#f
|
||||
(observe (advertise (tcp-channel _ (tcp-listener dst-port) _)))
|
||||
#t)
|
||||
|
||||
(define (trigger-ack!)
|
||||
(transmission-needed? #t))
|
||||
|
||||
(on (message (tcp-packet #t src-ip src-port dst-ip dst-port
|
||||
$seqn $ackn $flags $window $options $data))
|
||||
(define old-ackn (buffer-seqn (inbound)))
|
||||
(define expected (next-expected-seqn))
|
||||
(define is-syn? (set-member? flags 'syn))
|
||||
(define is-fin? (set-member? flags 'fin))
|
||||
|
@ -517,38 +770,28 @@
|
|||
(cond
|
||||
[(not expected) ;; haven't seen syn yet, but we know this is it
|
||||
(set-inbound-seqn! (seq+ seqn 1))
|
||||
(incorporate-segment! data)]
|
||||
(incorporate-segment! data)
|
||||
(trigger-ack!)]
|
||||
[(= expected seqn)
|
||||
(incorporate-segment! data)]
|
||||
[else (void)])
|
||||
(incorporate-segment! data)
|
||||
(when (positive? (bit-string-byte-count data)) (trigger-ack!))]
|
||||
[else
|
||||
(trigger-ack!)])
|
||||
(deliver-inbound-locally!)
|
||||
(check-fin! flags)
|
||||
(discard-acknowledged-outbound! (set-member? flags 'ack) ackn)
|
||||
(update-outbound-window! window)
|
||||
(send-outbound! old-ackn)
|
||||
(bump-peer-activity-time!)]))
|
||||
(latest-peer-activity-time (current-inexact-milliseconds))]))
|
||||
|
||||
(on (message (tcp-channel dst src $bs))
|
||||
(define old-ackn (buffer-seqn (inbound)))
|
||||
;; (log-info "GOT MORE STUFF TO DELIVER ~v" bs)
|
||||
;; (log-netstack/tcp-debug "GOT MORE STUFF TO DELIVER ~v" bs)
|
||||
|
||||
(when (all-output-acknowledged?)
|
||||
;; Only move user-timeout-base-time if there wasn't
|
||||
;; already some outstanding output.
|
||||
(user-timeout-base-time (current-inexact-milliseconds)))
|
||||
|
||||
(outbound (buffer-push (outbound) bs))
|
||||
(send-outbound! old-ackn))
|
||||
|
||||
(on-start (send-set-transmit-check-timer!))
|
||||
(on (message (timer-expired (timer-name 'transmit-check) _))
|
||||
(define old-ackn (buffer-seqn (inbound)))
|
||||
;; TODO: I am abusing this timer for multiple tasks. Notably, this is a (crude) means of
|
||||
;; retransmitting outbound data as well as a means of checking for an expired
|
||||
;; TCP_USER_TIMEOUT. A better design would have separate timers and a more fine-grained
|
||||
;; approach.
|
||||
(send-set-transmit-check-timer!)
|
||||
(send-outbound! old-ackn))))
|
||||
(outbound (buffer-push (outbound) bs)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#lang syndicate
|
||||
|
||||
(provide (struct-out later-than)
|
||||
on-timeout
|
||||
stop-when-timeout
|
||||
sleep)
|
||||
|
||||
|
@ -15,10 +16,13 @@
|
|||
(on (message (timer-expired timer-id _))
|
||||
(react (assert (later-than msecs))))))
|
||||
|
||||
(define-syntax-rule (stop-when-timeout relative-msecs body ...)
|
||||
(define-syntax-rule (on-timeout relative-msecs body ...)
|
||||
(let ((timer-id (gensym 'timeout)))
|
||||
(on-start (send! (set-timer timer-id relative-msecs 'relative)))
|
||||
(stop-when (message (timer-expired timer-id _)) body ...)))
|
||||
(on (message (timer-expired timer-id _)) body ...)))
|
||||
|
||||
(define-syntax-rule (stop-when-timeout relative-msecs body ...)
|
||||
(on-timeout relative-msecs (stop-current-facet body ...)))
|
||||
|
||||
(define (sleep sec)
|
||||
(define timer-id (gensym 'sleep))
|
||||
|
|
Loading…
Reference in New Issue