syndicate-rkt/syndicate-examples/netstack/tcp.rkt

772 lines
32 KiB
Racket

#lang syndicate
(provide (struct-out tcp-connection)
(struct-out tcp-accepted)
(struct-out tcp-rejected)
(struct-out tcp-out)
(struct-out tcp-in)
(struct-out tcp-in-line)
(struct-out tcp-address)
(struct-out tcp-listener)
spawn-tcp-driver)
(require racket/set)
(require bitsyntax)
(require "dump-bytes.rkt")
(require "checksum.rkt")
(require/activate syndicate/drivers/timer)
(require "ip.rkt")
(require "port-allocator.rkt")
(module+ test (require rackunit))
(define-logger netstack/tcp)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Protocol messages
(assertion-struct tcp-connection (id spec))
(assertion-struct tcp-accepted (id))
(assertion-struct tcp-rejected (id exn))
(message-struct tcp-out (id bytes))
(message-struct tcp-in (id bytes))
(message-struct tcp-in-line (id bytes))
(assertion-struct tcp-address (host port))
(assertion-struct tcp-listener (port))
(assertion-struct tcp-quad (remote-ip remote-port local-ip local-port))
(message-struct tcp-packet (from-wire?
quad
sequence-number
ack-number
flags
window-size
options
data))
;; (tcp-port-allocation Number (U TcpAddress TcpListener))
(assertion-struct tcp-port-allocation (port handle))
(define (tcp-quad->string from-wire? q)
(match-define (tcp-quad ri rp li lp) q)
(if from-wire?
(format "(I) ~a:~a -> ~a:~a" (ip-address->hostname ri) rp (ip-address->hostname li) lp)
(format "(O) ~a:~a -> ~a:~a" (ip-address->hostname li) lp (ip-address->hostname ri) rp)))
(define (summarize-tcp-packet packet)
(format "~a (seq ~a, ack ~a, flags ~a, window ~a, payload ~a)"
(tcp-quad->string (tcp-packet-from-wire? packet) (tcp-packet-quad packet))
(tcp-packet-sequence-number packet)
(tcp-packet-ack-number packet)
(tcp-packet-flags packet)
(tcp-packet-window-size packet)
(bit-string-byte-count (tcp-packet-data packet))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Driver startup
(define PROTOCOL-TCP 6)
(define (spawn-tcp-driver)
(spawn-port-allocator 'tcp (lambda () (query-set tcp-ports (tcp-port-allocation $p _) p)))
(spawn #:name 'kernel-tcp-driver
(define local-ips (query-local-ip-addresses))
(define/query-set active-state-vectors ($ q (tcp-quad _ _ _ _)) q)
(define (state-vector-active? statevec)
(set-member? (active-state-vectors) statevec))
(define (analyze-incoming-packet src-ip dst-ip body)
(bit-string-case body
([ (src-port :: integer bytes 2)
(dst-port :: integer bytes 2)
(sequence-number :: integer bytes 4)
(ack-number :: integer bytes 4)
(data-offset :: integer bits 4)
(reserved :: integer bits 3)
(ns :: integer bits 1)
(cwr :: integer bits 1)
(ece :: integer bits 1)
(urg :: integer bits 1)
(ack :: integer bits 1)
(psh :: integer bits 1)
(rst :: integer bits 1)
(syn :: integer bits 1)
(fin :: integer bits 1)
(window-size :: integer bytes 2)
(checksum :: integer bytes 2) ;; TODO: check checksum
(urgent-pointer :: integer bytes 2)
(rest :: binary) ]
(let* ((flags (set))
(statevec (tcp-quad src-ip src-port dst-ip dst-port))
(old-active-state-vectors (active-state-vectors))
(spawn-needed? (and (not (state-vector-active? statevec))
(zero? rst)))) ;; don't bother spawning if it's a rst
(define-syntax-rule (set-flags! v ...)
(begin (unless (zero? v) (set! flags (set-add flags 'v))) ...))
(set-flags! ns cwr ece urg ack psh rst syn fin)
(bit-string-case rest
([ (opts :: binary bytes (- (* data-offset 4) 20))
(data :: binary) ]
(let ((packet (tcp-packet #t
statevec
sequence-number
ack-number
flags
window-size
(bit-string->bytes opts)
(bit-string->bytes data))))
(log-netstack/tcp-debug "TCP ~a" (summarize-tcp-packet packet))
(when spawn-needed?
(log-netstack/tcp-debug " - spawn needed!")
(active-state-vectors (set-add (active-state-vectors) statevec))
(spawn-state-vector #f (tcp-address (ip-address->hostname src-ip) src-port) statevec))
(send! packet)))
(else #f))))
(else #f)))
(begin/dataflow
(log-netstack/tcp-debug "SCN yielded statevecs ~v and local-ips ~v"
(active-state-vectors)
(local-ips)))
(define (deliver-outbound-packet p)
(match-define (tcp-packet #f
(tcp-quad dst-ip ;; \__ remote
dst-port ;; /
src-ip ;; \__ local
src-port) ;; /
sequence-number
ack-number
flags
window-size
options
data)
p)
(log-netstack/tcp-debug "TCP ~a" (summarize-tcp-packet p))
(define (flag-bit sym) (if (set-member? flags sym) 1 0))
(define payload (bit-string (src-port :: integer bytes 2)
(dst-port :: integer bytes 2)
(sequence-number :: integer bytes 4)
(ack-number :: integer bytes 4)
((+ 5 (quotient (bit-string-byte-count options) 4))
:: integer bits 4) ;; TODO: enforce 4-byte alignment
(0 :: integer bits 3)
((flag-bit 'ns) :: integer bits 1)
((flag-bit 'cwr) :: integer bits 1)
((flag-bit 'ece) :: integer bits 1)
((flag-bit 'urg) :: integer bits 1)
((flag-bit 'ack) :: integer bits 1)
((flag-bit 'psh) :: integer bits 1)
((flag-bit 'rst) :: integer bits 1)
((flag-bit 'syn) :: integer bits 1)
((flag-bit 'fin) :: integer bits 1)
(window-size :: integer bytes 2)
(0 :: integer bytes 2) ;; checksum location
(0 :: integer bytes 2) ;; TODO: urgent pointer
(data :: binary)))
(define pseudo-header (bit-string (src-ip :: binary bytes 4)
(dst-ip :: binary bytes 4)
0
PROTOCOL-TCP
((bit-string-byte-count payload) :: integer bytes 2)))
(send! (ip-packet #f src-ip dst-ip PROTOCOL-TCP #""
(ip-checksum 16 payload #:pseudo-header pseudo-header))))
(on (message (ip-packet $source-if $src $dst PROTOCOL-TCP _ $body))
(when (and source-if ;; source-if == #f iff packet originates locally
(set-member? (local-ips) dst))
(analyze-incoming-packet src dst body)))
(on (message ($ p (tcp-packet #f _ _ _ _ _ _ _)))
(deliver-outbound-packet p))
(during (observe (tcp-connection _ (tcp-listener $port)))
(assert (tcp-port-allocation port (tcp-listener port))))
(on (asserted (tcp-connection $id (tcp-address $remote-host $remote-port)))
(define port (allocate-port! 'tcp))
;; TODO: Choose a sensible IP address for the outbound
;; connection. We don't have enough information to do this
;; well at the moment, so just pick some available local IP
;; address.
;;
;; Interesting note: In some sense, the right answer is a
;; *wildcard*. This would give us a form of mobility, where IP
;; addresses only route to a given bucket-of-state and ONLY the
;; port number selects a substate therein. That's not how TCP
;; is defined however so we can't do that.
(define appropriate-ip (set-first (local-ips)))
(define appropriate-host (ip-address->hostname appropriate-ip))
(define remote-ip (ip-string->ip-address remote-host))
(define q (tcp-quad remote-ip remote-port appropriate-ip port))
(active-state-vectors (set-add (active-state-vectors) q))
(spawn-state-vector #t id q))
(during/spawn (observe (tcp-in-line $id _))
#:name (list 'drivers/tcp 'line-reader id)
(local-require (only-in syndicate/support/bytes bytes-index))
(field [buffer #""])
(on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs)))
(begin/dataflow
(define newline-pos (bytes-index (buffer) (char->integer #\newline)))
(when newline-pos
(define line (subbytes (buffer) 0 newline-pos))
(buffer (subbytes (buffer) (+ newline-pos 1)))
(send! (tcp-in-line id line)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Per-connection state vector process
;;---------------------------------------------------------------------------
;; From the RFC:
;;
;; Send Sequence Variables
;;
;; SND.UNA - send unacknowledged
;; SND.NXT - send next
;; SND.WND - send window
;; SND.UP - send urgent pointer
;; SND.WL1 - segment sequence number used for last window update
;; SND.WL2 - segment acknowledgment number used for last window
;; update
;; ISS - initial send sequence number
;;
;; Receive Sequence Variables
;;
;; RCV.NXT - receive next
;; RCV.WND - receive window
;; RCV.UP - receive urgent pointer
;; IRS - initial receive sequence number
;;
;; The following diagrams may help to relate some of these variables to
;; the sequence space.
;;
;; Send Sequence Space
;;
;; 1 2 3 4
;; ----------|----------|----------|----------
;; SND.UNA SND.NXT SND.UNA
;; +SND.WND
;;
;; 1 - old sequence numbers which have been acknowledged
;; 2 - sequence numbers of unacknowledged data
;; 3 - sequence numbers allowed for new data transmission
;; 4 - future sequence numbers which are not yet allowed
;;
;; Send Sequence Space
;;
;; Figure 4.
;;
;; The send window is the portion of the sequence space labeled 3 in
;; figure 4.
;;
;; Receive Sequence Space
;;
;; 1 2 3
;; ----------|----------|----------
;; RCV.NXT RCV.NXT
;; +RCV.WND
;;
;; 1 - old sequence numbers which have been acknowledged
;; 2 - sequence numbers allowed for new reception
;; 3 - future sequence numbers which are not yet allowed
;;
;; Receive Sequence Space
;;
;; Figure 5.
;;
;; The receive window is the portion of the sequence space labeled 2 in
;; figure 5.
;;
;; There are also some variables used frequently in the discussion that
;; take their values from the fields of the current segment.
;;
;; Current Segment Variables
;;
;; SEG.SEQ - segment sequence number
;; SEG.ACK - segment acknowledgment number
;; SEG.LEN - segment length
;; SEG.WND - segment window
;; SEG.UP - segment urgent pointer
;; SEG.PRC - segment precedence value
;;
;;---------------------------------------------------------------------------
(struct buffer (data ;; bit-string
seqn ;; names leftmost byte in data
window ;; counts bytes from leftmost byte in data
finished?) ;; boolean: true after FIN
#:transparent)
;; Regarding acks:
;;
;; - we send an ack number that is (buffer-seqn (inbound)) plus the
;; number of buffered bytes.
;;
;; - acks received allow us to advance (buffer-seqn (outbound)) (that
;; is, SND.UNA) to that point, discarding buffered data to do so.
;; Regarding windows:
;;
;; - (buffer-window (outbound)) is the size of the peer's receive
;; window. Do not allow more than this many bytes to be
;; unacknowledged on the wire.
;;
;; - (buffer-window (inbound)) is the size of our receive window. The
;; peer should not exceed this; we should ignore data received that
;; extends beyond this. Once we implement flow control locally
;; (ahem) we should move this around, but at present it is fixed.
;; TODO: Zero receive window probe when we have something to say.
(define (buffer-push b data)
(struct-copy buffer b [data (bit-string-append (buffer-data b) data)]))
(define inbound-buffer-limit 65535)
(define maximum-segment-size 536) ;; bytes
(define maximum-segment-lifetime-sec (* 2 60)) ;; two minutes; 2MSL is TIME-WAIT timeout
(define user-timeout-msec (* 5 60 1000)) ;; per RFC 793, this should be per-connection, but I
;; cheat; RFC 793 says "the present global default is five minutes", which is
;; reasonable to be getting on with
(define (seq+ a b) (bitwise-and #xffffffff (+ a b)))
;; Always positive
(define (seq- larger smaller)
(if (< larger smaller) ;; wraparound has occurred
(+ (- larger smaller) #x100000000)
(- larger smaller)))
(define (seq> a b)
(not (seq>= b a)))
(define (seq>= a b)
(< (seq- a b) #x80000000))
(define (seq-min a b) (if (seq> a b) b a))
(define (seq-max a b) (if (seq> a b) a b))
(module+ test
(check-equal? (seq+ 41724780 1) 41724781)
(check-equal? (seq+ 0 1) 1)
(check-equal? (seq+ #x80000000 1) #x80000001)
(check-equal? (seq+ #xffffffff 1) #x00000000)
(check-equal? (seq> 41724780 41724780) #f)
(check-equal? (seq> 41724781 41724780) #t)
(check-equal? (seq> 41724780 41724781) #f)
(check-equal? (seq> 0 0) #f)
(check-equal? (seq> 1 0) #t)
(check-equal? (seq> 0 1) #f)
(check-equal? (seq> #x80000000 #x80000000) #f)
(check-equal? (seq> #x80000001 #x80000000) #t)
(check-equal? (seq> #x80000000 #x80000001) #f)
(check-equal? (seq> #xffffffff #xffffffff) #f)
(check-equal? (seq> #x00000000 #xffffffff) #t)
(check-equal? (seq> #xffffffff #x00000000) #f)
(check-equal? (seq>= 41724780 41724780) #t)
(check-equal? (seq>= 41724781 41724780) #t)
(check-equal? (seq>= 41724780 41724781) #f)
(check-equal? (seq>= 0 0) #t)
(check-equal? (seq>= 1 0) #t)
(check-equal? (seq>= 0 1) #f)
(check-equal? (seq>= #x80000000 #x80000000) #t)
(check-equal? (seq>= #x80000001 #x80000000) #t)
(check-equal? (seq>= #x80000000 #x80000001) #f)
(check-equal? (seq>= #xffffffff #xffffffff) #t)
(check-equal? (seq>= #x00000000 #xffffffff) #t)
(check-equal? (seq>= #xffffffff #x00000000) #f))
(define (spawn-state-vector outbound? connection-id q)
(match-define (tcp-quad remote-ip remote-port local-ip local-port) q)
(spawn #:name (list 'tcp-state-vector
(ip-address->hostname remote-ip)
remote-port
(ip-address->hostname local-ip)
local-port)
(define root-facet (current-facet))
(assert (tcp-port-allocation local-port
(tcp-address (ip-address->hostname remote-ip) remote-port)))
(define initial-outbound-seqn
;; Yuck
(inexact->exact (truncate (* #x100000000 (random)))))
(field [outbound (buffer #"!" initial-outbound-seqn 0 #f)] ;; dummy data at SYN position
[send-next initial-outbound-seqn] ;; SND.NXT
[high-water-mark initial-outbound-seqn]
[inbound (buffer #"" #f inbound-buffer-limit #f)]
[transmission-needed? #f]
[syn-acked? #f]
[fin-seen? #f]
[unblocked? #f]
[latest-peer-activity-time (current-inexact-milliseconds)]
;; ^ the most recent time we heard from our peer
[user-timeout-base-time (current-inexact-milliseconds)]
;; ^ when the index of the first outbound unacknowledged byte changed
;; RFC 6298
[rtt-estimate #f] ;; milliseconds; "SRTT"
[rtt-mean-deviation #f] ;; milliseconds; "RTTVAR"
[retransmission-timeout 1000] ;; milliseconds
[retransmission-deadline #f]
[rtt-estimate-seqn-target #f]
[rtt-estimate-start-time #f]
)
(define (next-expected-seqn)
(define b (inbound))
(define v (buffer-seqn b))
(and v (seq+ v (bit-string-byte-count (buffer-data b)))))
(define (set-inbound-seqn! seqn)
(inbound (struct-copy buffer (inbound) [seqn seqn])))
(define (incorporate-segment! data)
(when (not (buffer-finished? (inbound)))
(inbound (buffer-push (inbound) data))))
(define (deliver-inbound-locally!)
(define b (inbound))
(when (not (bit-string-empty? (buffer-data b)))
(define chunk (bit-string->bytes (buffer-data b)))
(send! (tcp-in connection-id chunk))
(inbound (struct-copy buffer b
[data #""]
[seqn (seq+ (buffer-seqn b) (bytes-length chunk))]))))
;; -> Void
(define (check-fin!)
(define b (inbound))
(when (not (buffer-finished? b))
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
(error 'check-fin "Nonempty inbound buffer"))
(when (fin-seen?)
(log-netstack/tcp-debug "Closing inbound stream.")
(inbound (struct-copy buffer b
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
[finished? #t]))
(transmission-needed? #t)))) ;; we must send an ack
(define (connected?)
(and (syn-acked?) ;; the SYN we sent has been acked by the remote peer
(not (buffer-finished? (inbound))))) ;; the remote peer hasn't sent a FIN
(on (asserted (tcp-accepted connection-id))
(unblocked? #t))
(begin/dataflow
(when (and (connected?) (unblocked?))
(deliver-inbound-locally!)
(check-fin!)))
;; -> Void
(define (arm-retransmission-timer!)
(log-netstack/tcp-debug "Arming retransmission timer (~a ms)" (retransmission-timeout))
(retransmission-deadline (+ (current-inexact-milliseconds) (retransmission-timeout))))
;; Timestamp -> Void
(define (start-rtt-estimate! now)
(define target (send-next))
(when (seq>= target (high-water-mark))
(log-netstack/tcp-debug "Starting RTT estimation; target seqn is ~a" target)
(rtt-estimate-start-time now)
(rtt-estimate-seqn-target target)))
;; -> Void
(define (reset-rtt-estimate!)
(rtt-estimate-start-time #f)
(rtt-estimate-seqn-target #f))
;; Timestamp -> Void
(define (finish-rtt-estimate! now)
(define rtt-measurement (- now (rtt-estimate-start-time)))
(reset-rtt-estimate!)
(log-netstack/tcp-debug "RTT measurement: ~a ms" rtt-measurement)
;; RFC 6298 Section 2.
(cond [(rtt-estimate) => ;; we have a previous estimate, RFC 6298 rule (2.3)
(lambda (prev-estimate)
(rtt-mean-deviation (+ (* 0.75 (rtt-mean-deviation))
(* 0.25 (abs (- rtt-measurement prev-estimate)))))
(rtt-estimate (+ (* 0.875 prev-estimate)
(* 0.125 rtt-measurement))))]
[else ;; no previous estimate, RFC 6298 rule (2.2) applies
(rtt-estimate rtt-measurement)
(rtt-mean-deviation (/ rtt-measurement 2))])
(default-retransmission-timeout!)
(log-netstack/tcp-debug "RTT measurement ~a ms; estimate ~a ms; mean deviation ~a ms; RTO ~a ms"
rtt-measurement
(rtt-estimate)
(rtt-mean-deviation)
(retransmission-timeout)))
(define (default-retransmission-timeout!)
(retransmission-timeout
(max 200 ;; RFC 6298 rule (2.4), but cribbing from Linux's 200ms minimum
(min 60000 ;; (2.5)
(+ (rtt-estimate) (* 4 (rtt-mean-deviation))))))) ;; (2.2), (2.3)
;; Boolean SeqNum -> Void
(define (discard-acknowledged-outbound! ack? ackn)
(when ack?
(let* ((b (outbound))
(base (buffer-seqn b))
(ackn (seq-min ackn (high-water-mark)))
(ackn (seq-max ackn base))
(dist (seq- ackn base)))
(user-timeout-base-time (current-inexact-milliseconds))
(when (positive? dist)
(when (not (syn-acked?)) (syn-acked? #t))
(log-netstack/tcp-debug "******** ackn ~a; send-next ~a; high-water-mark ~a"
ackn
(send-next)
(high-water-mark))
(when (seq> ackn (send-next)) (send-next ackn))
(when (and (rtt-estimate-seqn-target) (seq>= ackn (rtt-estimate-seqn-target)))
(finish-rtt-estimate! (current-inexact-milliseconds)))
(define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset!
(outbound (struct-copy buffer b [data remaining-data] [seqn ackn]))
(default-retransmission-timeout!)
(log-netstack/tcp-debug "Positive distance moved by ack, RTO now ~a"
(retransmission-timeout))
(arm-retransmission-timer!)))))
;; Nat -> Void
(define (update-outbound-window! peer-window)
(log-netstack/tcp-debug "Peer's receive-window is now ~a" peer-window)
(outbound (struct-copy buffer (outbound) [window peer-window])))
;; True iff there is no queued-up data waiting either for
;; transmission or (if transmitted already) for acknowledgement.
(define (all-output-acknowledged?)
(bit-string-empty? (buffer-data (outbound))))
(define (close-outbound-stream!)
(log-netstack/tcp-debug "Closing outbound stream.")
(define b (outbound))
(when (not (buffer-finished? b))
(outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte
[finished? #t]))
(transmission-needed? #t))) ;; the FIN machinery is awkwardly
;; different from the usual
;; advance-based decision on
;; whether to send a packet or not
;; SeqNum Boolean Boolean Bytes -> TcpPacket
(define (build-outbound-packet seqn mention-syn? mention-fin? payload)
(define ackn (next-expected-seqn))
(define window (min 65535 ;; limit of field width
(max 0 ;; can't be negative
(- (buffer-window (inbound))
(bit-string-byte-count (buffer-data (inbound)))))))
(define flags (set))
(when ackn (set! flags (set-add flags 'ack)))
(when mention-syn? (set! flags (set-add flags 'syn)))
(when mention-fin? (set! flags (set-add flags 'fin)))
(tcp-packet #f q seqn (or ackn 0) flags window #"" payload))
(define (outbound-data-chunk offset length)
(bit-string-take (bit-string-drop (buffer-data (outbound)) (* offset 8)) (* length 8)))
;; Transmit acknowledgements and outbound data.
(begin/dataflow
(define in-flight-count (seq- (send-next) (buffer-seqn (outbound))))
(define-values (mention-syn? ;; whether to mention SYN
payload-size ;; how many bytes of payload data to include
mention-fin? ;; whether to mention FIN
advance) ;; how far to advance send-next
(if (syn-acked?)
(let* ((effective-window (max 0 (- (buffer-window (outbound)) in-flight-count)))
(stream-ended? (buffer-finished? (outbound)))
(max-advance (- (bit-string-byte-count (buffer-data (outbound))) in-flight-count))
(payload-size (min maximum-segment-size effective-window max-advance)))
(if (and stream-ended? ;; there's a FIN enqueued,
(positive? payload-size) ;; we aren't sending nothing at all,
(= payload-size max-advance)) ;; and our payload would cover the FIN
(values #f (- payload-size 1) #t payload-size)
(values #f payload-size #f payload-size)))
(cond [(= in-flight-count 0) (values #t 0 #f 1)]
[(= in-flight-count 1) (values #t 0 #f 0)]
[else (error 'send-outbound!
"Invalid state: send-next had advanced too far before SYN")])))
(when (and (or (next-expected-seqn) outbound?)
;; ^ Talk only either if: we know the peer's seqn, or
;; we don't, but we're an outbound connection rather
;; than a listener.
(or (transmission-needed?)
(positive? advance))
;; ^ ... and we have something to say. Something to
;; ack, or something to send.
)
(define packet-seqn (if mention-syn? (buffer-seqn (outbound)) (send-next)))
(define packet (build-outbound-packet packet-seqn
mention-syn?
mention-fin?
(outbound-data-chunk in-flight-count payload-size)))
(when (positive? advance)
(define new-send-next (seq+ (send-next) advance))
(send-next new-send-next)
(when (seq> new-send-next (high-water-mark))
(high-water-mark new-send-next)))
(when (transmission-needed?)
(transmission-needed? #f))
;; (log-netstack/tcp-debug " sending ~v" packet)
(send! packet)
;; (if (> (random) 0.5)
;; (begin (log-netstack/tcp-debug "Send ~a" (summarize-tcp-packet packet))
;; (send! packet))
;; (log-netstack/tcp-debug "Drop ~a" (summarize-tcp-packet packet)))
(when (or mention-syn? mention-fin? (positive? advance))
(when (not (retransmission-deadline))
(arm-retransmission-timer!))
(when (not (rtt-estimate-start-time))
(start-rtt-estimate! (current-inexact-milliseconds))))))
(begin/dataflow
(when (and (retransmission-deadline) (all-output-acknowledged?))
(log-netstack/tcp-debug "All output acknowledged; disarming retransmission timer")
(retransmission-deadline #f)))
(on #:when (retransmission-deadline) (asserted (later-than (retransmission-deadline)))
(send-next (buffer-seqn (outbound)))
(log-netstack/tcp-debug "Retransmission deadline fired, RTO was ~a; reset to ~a"
(retransmission-timeout)
(send-next))
(update-outbound-window! maximum-segment-size) ;; temporary. Will reopen on next ack
(transmission-needed? #t)
(retransmission-deadline #f)
(reset-rtt-estimate!) ;; give up on current RTT estimation
(retransmission-timeout (min 64000 (* 2 (retransmission-timeout))))
(log-netstack/tcp-debug " RTO now ~a" (retransmission-timeout)))
(define (reset! seqn ackn)
(define reset-packet (tcp-packet #f q seqn ackn (set 'ack 'rst) 0 #"" #""))
(log-netstack/tcp-warning "Reset ~a" (summarize-tcp-packet reset-packet))
(stop-facet root-facet)
(send! reset-packet))
(assert q) ;; Declare that this state vector exists
(on-start (log-netstack/tcp-info "Starting ~a" (tcp-quad->string (not outbound?) q)))
(on-stop (log-netstack/tcp-info "Stopping ~a" (tcp-quad->string (not outbound?) q)))
(stop-when #:when (and (buffer-finished? (outbound))
(buffer-finished? (inbound))
(all-output-acknowledged?))
(asserted (later-than (+ (latest-peer-activity-time)
(* 2 1000 maximum-segment-lifetime-sec))))
;; Everything is cleanly shut down, and we just need to wait a while for unexpected
;; packets before we release the state vector.
)
(stop-when #:when (not (all-output-acknowledged?))
(asserted (later-than (+ (user-timeout-base-time) user-timeout-msec)))
;; We've been plaintively retransmitting for user-timeout-msec without hearing anything
;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but
;; it will do for now? TODO
(log-netstack/tcp-warning "TCP_USER_TIMEOUT fired."))
(define/query-value listener-listening?
#f (observe (tcp-connection _ (tcp-listener local-port))) #t)
(define (trigger-ack!)
(transmission-needed? #t))
(on (message (tcp-packet #t q $seqn $ackn $flags $window $options $data))
(define expected (next-expected-seqn))
(define is-syn? (set-member? flags 'syn))
(define is-fin? (set-member? flags 'fin))
(cond
[(set-member? flags 'rst)
(stop-facet root-facet
(when (not (connected?)) ;; --> rejected!
(define e (exn:fail:network
(format "~a: Connection rejected" (tcp-quad->string #f q))
(current-continuation-marks)))
(react (assert (tcp-rejected connection-id e))
(on-start (sleep 5)
(stop-current-facet)))))]
[(and (not expected) ;; no syn yet
(or (not is-syn?) ;; and this isn't it
(and (not (listener-listening?)) ;; or it is, but no listener...
(not outbound?)))) ;; ...and we're not an outbound connection
(reset! ackn ;; this is *our* seqn
(seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0)))
;; ^^ this is what we should acknowledge...
)]
[else
(cond
[(not expected) ;; haven't seen syn yet, but we know this is it
(set-inbound-seqn! (seq+ seqn 1))
(incorporate-segment! data)
(trigger-ack!)]
[(= expected seqn)
(incorporate-segment! data)
(when (positive? (bit-string-byte-count data)) (trigger-ack!))]
[else
(trigger-ack!)])
(when is-fin? (fin-seen? #t))
(discard-acknowledged-outbound! (set-member? flags 'ack) ackn)
(update-outbound-window! window)
(latest-peer-activity-time (current-inexact-milliseconds))]))
(on (message (tcp-out connection-id $bs))
;; (log-netstack/tcp-debug "GOT MORE STUFF TO DELIVER ~v" bs)
(when (all-output-acknowledged?)
;; Only move user-timeout-base-time if there wasn't
;; already some outstanding output.
(user-timeout-base-time (current-inexact-milliseconds)))
(outbound (buffer-push (outbound) bs)))
(if outbound?
(begin
(assert #:when (connected?) (tcp-accepted connection-id))
(on (retracted (tcp-connection connection-id (tcp-address _ _)))
(close-outbound-stream!)))
(begin
(assert #:when (connected?) (tcp-connection connection-id (tcp-listener local-port)))
(on (asserted (tcp-rejected connection-id _))
;; In principle, we have the flexibility to delay
;; replying to SYN until userland decides whether or not
;; to accept an incoming connection! We don't do that yet
;; though.
(close-outbound-stream!))
(on (retracted (tcp-accepted connection-id))
(close-outbound-stream!))
(on-start (sleep 5)
(when (not (unblocked?))
(log-netstack/tcp-error "TCP relay process ~a timed out waiting for peer" q)
(stop-facet root-facet)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(spawn-tcp-driver)