This is running on syndicate's own\n" + "\n" + "TCP/IP stack.
\n" + "There have been ~a requests prior to this one.
\n") + counter))) + (send! (outbound (tcp-out them response))) + (for [(i 4)] + (define buf (make-bytes 1024 (+ #x30 (modulo i 10)))) + (send! (outbound (tcp-out them buf)))) + (stop-facet (current-facet))))))) diff --git a/syndicate/examples/netstack/port-allocator.rkt b/syndicate/examples/netstack/port-allocator.rkt new file mode 100644 index 0000000..dfd392f --- /dev/null +++ b/syndicate/examples/netstack/port-allocator.rkt @@ -0,0 +1,36 @@ +#lang imperative-syndicate +;; UDP/TCP port allocator + +(provide spawn-port-allocator + allocate-port! + (struct-out port-allocation-request) + (struct-out port-allocation-reply)) + +(require racket/set) +(require "ip.rkt") + +(struct port-allocation-request (reqid type) #:prefab) +(struct port-allocation-reply (reqid port) #:prefab) + +(define (spawn-port-allocator allocator-type query-used-ports) + (spawn #:name (list 'port-allocator allocator-type) + (define local-ips (query-local-ip-addresses)) + (define used-ports (query-used-ports)) + + (begin/dataflow + (log-info "port-allocator ~v used ports: ~v" allocator-type (used-ports))) + + (on (message (port-allocation-request $reqid allocator-type)) + (define currently-used-ports (used-ports)) + (let randomly-allocate-until-unused () + (define p (+ 1024 (random 64512))) + (if (set-member? currently-used-ports p) + (randomly-allocate-until-unused) + (begin (used-ports (set-add currently-used-ports p)) + (send! (port-allocation-reply reqid p)))))))) + +(define (allocate-port! type) + (define reqid (gensym 'allocate-port!)) + (react/suspend (done) + (stop-when (message (port-allocation-reply reqid $port)) (done port)) + (on-start (send! (port-allocation-request reqid type))))) diff --git a/syndicate/examples/netstack/tcp.rkt b/syndicate/examples/netstack/tcp.rkt new file mode 100644 index 0000000..a44e76c --- /dev/null +++ b/syndicate/examples/netstack/tcp.rkt @@ -0,0 +1,755 @@ +#lang imperative-syndicate + +(provide (struct-out tcp-connection) + (struct-out tcp-accepted) + (struct-out tcp-out) + (struct-out tcp-in) + (struct-out tcp-in-line) + + (struct-out tcp-address) + (struct-out tcp-listener) + + spawn-tcp-driver) + +(require racket/set) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "checksum.rkt") + +(require/activate imperative-syndicate/drivers/timer) +(require "ip.rkt") +(require "port-allocator.rkt") + +(module+ test (require rackunit)) + +(define-logger netstack/tcp) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(assertion-struct tcp-connection (id spec)) +(assertion-struct tcp-accepted (id)) +(message-struct tcp-out (id bytes)) +(message-struct tcp-in (id bytes)) +(message-struct tcp-in-line (id bytes)) + +(assertion-struct tcp-address (host port)) +(assertion-struct tcp-listener (port)) + +(assertion-struct tcp-quad (remote-ip remote-port local-ip local-port)) + +(message-struct tcp-packet (from-wire? + quad + sequence-number + ack-number + flags + window-size + options + data)) + +;; (tcp-port-allocation Number (U TcpAddress TcpListener)) +(assertion-struct tcp-port-allocation (port handle)) + +(define (tcp-quad->string from-wire? q) + (match-define (tcp-quad ri rp li lp) q) + (if from-wire? + (format "(I) ~a:~a -> ~a:~a" (ip-address->hostname ri) rp (ip-address->hostname li) lp) + (format "(O) ~a:~a -> ~a:~a" (ip-address->hostname li) lp (ip-address->hostname ri) rp))) + +(define (summarize-tcp-packet packet) + (format "~a (seq ~a, ack ~a, flags ~a, window ~a, payload ~a)" + (tcp-quad->string (tcp-packet-from-wire? packet) (tcp-packet-quad packet)) + (tcp-packet-sequence-number packet) + (tcp-packet-ack-number packet) + (tcp-packet-flags packet) + (tcp-packet-window-size packet) + (bit-string-byte-count (tcp-packet-data packet)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Driver startup + +(define PROTOCOL-TCP 6) + +(define (spawn-tcp-driver) + (spawn-port-allocator 'tcp (lambda () (query-set tcp-ports (tcp-port-allocation $p _) p))) + + (spawn #:name 'kernel-tcp-driver + (define local-ips (query-local-ip-addresses)) + + (define/query-set active-state-vectors ($ q (tcp-quad _ _ _ _)) q) + + (define (state-vector-active? statevec) + (set-member? (active-state-vectors) statevec)) + + (define (analyze-incoming-packet src-ip dst-ip body) + (bit-string-case body + ([ (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (sequence-number :: integer bytes 4) + (ack-number :: integer bytes 4) + (data-offset :: integer bits 4) + (reserved :: integer bits 3) + (ns :: integer bits 1) + (cwr :: integer bits 1) + (ece :: integer bits 1) + (urg :: integer bits 1) + (ack :: integer bits 1) + (psh :: integer bits 1) + (rst :: integer bits 1) + (syn :: integer bits 1) + (fin :: integer bits 1) + (window-size :: integer bytes 2) + (checksum :: integer bytes 2) ;; TODO: check checksum + (urgent-pointer :: integer bytes 2) + (rest :: binary) ] + (let* ((flags (set)) + (statevec (tcp-quad src-ip src-port dst-ip dst-port)) + (old-active-state-vectors (active-state-vectors)) + (spawn-needed? (and (not (state-vector-active? statevec)) + (zero? rst)))) ;; don't bother spawning if it's a rst + (define-syntax-rule (set-flags! v ...) + (begin (unless (zero? v) (set! flags (set-add flags 'v))) ...)) + (set-flags! ns cwr ece urg ack psh rst syn fin) + (bit-string-case rest + ([ (opts :: binary bytes (- (* data-offset 4) 20)) + (data :: binary) ] + (let ((packet (tcp-packet #t + statevec + sequence-number + ack-number + flags + window-size + (bit-string->bytes opts) + (bit-string->bytes data)))) + (log-netstack/tcp-debug "TCP ~a" (summarize-tcp-packet packet)) + (when spawn-needed? + (log-netstack/tcp-debug " - spawn needed!") + (active-state-vectors (set-add (active-state-vectors) statevec)) + (spawn-state-vector #f (tcp-address (ip-address->hostname src-ip) src-port) statevec)) + (send! packet))) + (else #f)))) + (else #f))) + + (begin/dataflow + (log-netstack/tcp-debug "SCN yielded statevecs ~v and local-ips ~v" + (active-state-vectors) + (local-ips))) + + (define (deliver-outbound-packet p) + (match-define (tcp-packet #f + (tcp-quad dst-ip ;; \__ remote + dst-port ;; / + src-ip ;; \__ local + src-port) ;; / + sequence-number + ack-number + flags + window-size + options + data) + p) + (log-netstack/tcp-debug "TCP ~a" (summarize-tcp-packet p)) + (define (flag-bit sym) (if (set-member? flags sym) 1 0)) + (define payload (bit-string (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (sequence-number :: integer bytes 4) + (ack-number :: integer bytes 4) + ((+ 5 (quotient (bit-string-byte-count options) 4)) + :: integer bits 4) ;; TODO: enforce 4-byte alignment + (0 :: integer bits 3) + ((flag-bit 'ns) :: integer bits 1) + ((flag-bit 'cwr) :: integer bits 1) + ((flag-bit 'ece) :: integer bits 1) + ((flag-bit 'urg) :: integer bits 1) + ((flag-bit 'ack) :: integer bits 1) + ((flag-bit 'psh) :: integer bits 1) + ((flag-bit 'rst) :: integer bits 1) + ((flag-bit 'syn) :: integer bits 1) + ((flag-bit 'fin) :: integer bits 1) + (window-size :: integer bytes 2) + (0 :: integer bytes 2) ;; checksum location + (0 :: integer bytes 2) ;; TODO: urgent pointer + (data :: binary))) + (define pseudo-header (bit-string (src-ip :: binary bytes 4) + (dst-ip :: binary bytes 4) + 0 + PROTOCOL-TCP + ((bit-string-byte-count payload) :: integer bytes 2))) + (send! (ip-packet #f src-ip dst-ip PROTOCOL-TCP #"" + (ip-checksum 16 payload #:pseudo-header pseudo-header)))) + + (on (message (ip-packet $source-if $src $dst PROTOCOL-TCP _ $body)) + (when (and source-if ;; source-if == #f iff packet originates locally + (set-member? (local-ips) dst)) + (analyze-incoming-packet src dst body))) + + (on (message ($ p (tcp-packet #f _ _ _ _ _ _ _))) + (deliver-outbound-packet p)) + + (during (observe (tcp-connection _ (tcp-listener $port))) + (assert (tcp-port-allocation port (tcp-listener port)))) + + (on (asserted (tcp-connection $id (tcp-address $remote-host $remote-port))) + (define port (allocate-port! 'tcp)) + ;; TODO: Choose a sensible IP address for the outbound + ;; connection. We don't have enough information to do this + ;; well at the moment, so just pick some available local IP + ;; address. + ;; + ;; Interesting note: In some sense, the right answer is a + ;; *wildcard*. This would give us a form of mobility, where IP + ;; addresses only route to a given bucket-of-state and ONLY the + ;; port number selects a substate therein. That's not how TCP + ;; is defined however so we can't do that. + (define appropriate-ip (set-first (local-ips))) + (define appropriate-host (ip-address->hostname appropriate-ip)) + (define remote-ip (ip-string->ip-address remote-host)) + (define q (tcp-quad remote-ip remote-port appropriate-ip port)) + (active-state-vectors (set-add (active-state-vectors) q)) + (spawn-state-vector #t id q)) + + (during/spawn (observe (tcp-in-line $id _)) + #:name (list 'drivers/tcp 'line-reader id) + (local-require (only-in syndicate/support/bytes bytes-index)) + (field [buffer #""]) + (on (message (tcp-in id $bs)) (buffer (bytes-append (buffer) bs))) + (begin/dataflow + (define newline-pos (bytes-index (buffer) (char->integer #\newline))) + (when newline-pos + (define line (subbytes (buffer) 0 newline-pos)) + (buffer (subbytes (buffer) (+ newline-pos 1))) + (send! (tcp-in-line id line))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Per-connection state vector process + +;;--------------------------------------------------------------------------- +;; From the RFC: +;; +;; Send Sequence Variables +;; +;; SND.UNA - send unacknowledged +;; SND.NXT - send next +;; SND.WND - send window +;; SND.UP - send urgent pointer +;; SND.WL1 - segment sequence number used for last window update +;; SND.WL2 - segment acknowledgment number used for last window +;; update +;; ISS - initial send sequence number +;; +;; Receive Sequence Variables +;; +;; RCV.NXT - receive next +;; RCV.WND - receive window +;; RCV.UP - receive urgent pointer +;; IRS - initial receive sequence number +;; +;; The following diagrams may help to relate some of these variables to +;; the sequence space. +;; +;; Send Sequence Space +;; +;; 1 2 3 4 +;; ----------|----------|----------|---------- +;; SND.UNA SND.NXT SND.UNA +;; +SND.WND +;; +;; 1 - old sequence numbers which have been acknowledged +;; 2 - sequence numbers of unacknowledged data +;; 3 - sequence numbers allowed for new data transmission +;; 4 - future sequence numbers which are not yet allowed +;; +;; Send Sequence Space +;; +;; Figure 4. +;; +;; The send window is the portion of the sequence space labeled 3 in +;; figure 4. +;; +;; Receive Sequence Space +;; +;; 1 2 3 +;; ----------|----------|---------- +;; RCV.NXT RCV.NXT +;; +RCV.WND +;; +;; 1 - old sequence numbers which have been acknowledged +;; 2 - sequence numbers allowed for new reception +;; 3 - future sequence numbers which are not yet allowed +;; +;; Receive Sequence Space +;; +;; Figure 5. +;; +;; The receive window is the portion of the sequence space labeled 2 in +;; figure 5. +;; +;; There are also some variables used frequently in the discussion that +;; take their values from the fields of the current segment. +;; +;; Current Segment Variables +;; +;; SEG.SEQ - segment sequence number +;; SEG.ACK - segment acknowledgment number +;; SEG.LEN - segment length +;; SEG.WND - segment window +;; SEG.UP - segment urgent pointer +;; SEG.PRC - segment precedence value +;; +;;--------------------------------------------------------------------------- + +(struct buffer (data ;; bit-string + seqn ;; names leftmost byte in data + window ;; counts bytes from leftmost byte in data + finished?) ;; boolean: true after FIN + #:transparent) + +;; Regarding acks: +;; +;; - we send an ack number that is (buffer-seqn (inbound)) plus the +;; number of buffered bytes. +;; +;; - acks received allow us to advance (buffer-seqn (outbound)) (that +;; is, SND.UNA) to that point, discarding buffered data to do so. + +;; Regarding windows: +;; +;; - (buffer-window (outbound)) is the size of the peer's receive +;; window. Do not allow more than this many bytes to be +;; unacknowledged on the wire. +;; +;; - (buffer-window (inbound)) is the size of our receive window. The +;; peer should not exceed this; we should ignore data received that +;; extends beyond this. Once we implement flow control locally +;; (ahem) we should move this around, but at present it is fixed. + +;; TODO: Zero receive window probe when we have something to say. + +(define (buffer-push b data) + (struct-copy buffer b [data (bit-string-append (buffer-data b) data)])) + +(define inbound-buffer-limit 65535) +(define maximum-segment-size 536) ;; bytes +(define maximum-segment-lifetime-sec (* 2 60)) ;; two minutes; 2MSL is TIME-WAIT timeout +(define user-timeout-msec (* 5 60 1000)) ;; per RFC 793, this should be per-connection, but I + ;; cheat; RFC 793 says "the present global default is five minutes", which is + ;; reasonable to be getting on with + +(define (seq+ a b) (bitwise-and #xffffffff (+ a b))) + +;; Always positive +(define (seq- larger smaller) + (if (< larger smaller) ;; wraparound has occurred + (+ (- larger smaller) #x100000000) + (- larger smaller))) + +(define (seq> a b) + (not (seq>= b a))) + +(define (seq>= a b) + (< (seq- a b) #x80000000)) + +(define (seq-min a b) (if (seq> a b) b a)) +(define (seq-max a b) (if (seq> a b) a b)) + +(module+ test + (check-equal? (seq+ 41724780 1) 41724781) + (check-equal? (seq+ 0 1) 1) + (check-equal? (seq+ #x80000000 1) #x80000001) + (check-equal? (seq+ #xffffffff 1) #x00000000) + + (check-equal? (seq> 41724780 41724780) #f) + (check-equal? (seq> 41724781 41724780) #t) + (check-equal? (seq> 41724780 41724781) #f) + + (check-equal? (seq> 0 0) #f) + (check-equal? (seq> 1 0) #t) + (check-equal? (seq> 0 1) #f) + + (check-equal? (seq> #x80000000 #x80000000) #f) + (check-equal? (seq> #x80000001 #x80000000) #t) + (check-equal? (seq> #x80000000 #x80000001) #f) + + (check-equal? (seq> #xffffffff #xffffffff) #f) + (check-equal? (seq> #x00000000 #xffffffff) #t) + (check-equal? (seq> #xffffffff #x00000000) #f) + + (check-equal? (seq>= 41724780 41724780) #t) + (check-equal? (seq>= 41724781 41724780) #t) + (check-equal? (seq>= 41724780 41724781) #f) + + (check-equal? (seq>= 0 0) #t) + (check-equal? (seq>= 1 0) #t) + (check-equal? (seq>= 0 1) #f) + + (check-equal? (seq>= #x80000000 #x80000000) #t) + (check-equal? (seq>= #x80000001 #x80000000) #t) + (check-equal? (seq>= #x80000000 #x80000001) #f) + + (check-equal? (seq>= #xffffffff #xffffffff) #t) + (check-equal? (seq>= #x00000000 #xffffffff) #t) + (check-equal? (seq>= #xffffffff #x00000000) #f)) + +(define (spawn-state-vector outbound? connection-id q) + (match-define (tcp-quad remote-ip remote-port local-ip local-port) q) + + (spawn #:name (list 'tcp-state-vector + (ip-address->hostname remote-ip) + remote-port + (ip-address->hostname local-ip) + local-port) + (define root-facet (current-facet)) + + (assert (tcp-port-allocation local-port + (tcp-address (ip-address->hostname remote-ip) remote-port))) + + (define initial-outbound-seqn + ;; Yuck + (inexact->exact (truncate (* #x100000000 (random))))) + + (field [outbound (buffer #"!" initial-outbound-seqn 0 #f)] ;; dummy data at SYN position + [send-next initial-outbound-seqn] ;; SND.NXT + [high-water-mark initial-outbound-seqn] + + [inbound (buffer #"" #f inbound-buffer-limit #f)] + [transmission-needed? #f] + [syn-acked? #f] + [fin-seen? #f] + [unblocked? #f] + + [latest-peer-activity-time (current-inexact-milliseconds)] + ;; ^ the most recent time we heard from our peer + [user-timeout-base-time (current-inexact-milliseconds)] + ;; ^ when the index of the first outbound unacknowledged byte changed + + ;; RFC 6298 + [rtt-estimate #f] ;; milliseconds; "SRTT" + [rtt-mean-deviation #f] ;; milliseconds; "RTTVAR" + [retransmission-timeout 1000] ;; milliseconds + [retransmission-deadline #f] + [rtt-estimate-seqn-target #f] + [rtt-estimate-start-time #f] + ) + + (define (next-expected-seqn) + (define b (inbound)) + (define v (buffer-seqn b)) + (and v (seq+ v (bit-string-byte-count (buffer-data b))))) + + (define (set-inbound-seqn! seqn) + (inbound (struct-copy buffer (inbound) [seqn seqn]))) + + (define (incorporate-segment! data) + (when (not (buffer-finished? (inbound))) + (inbound (buffer-push (inbound) data)))) + + (define (deliver-inbound-locally!) + (define b (inbound)) + (when (not (bit-string-empty? (buffer-data b))) + (define chunk (bit-string->bytes (buffer-data b))) + (send! (tcp-in connection-id chunk)) + (inbound (struct-copy buffer b + [data #""] + [seqn (seq+ (buffer-seqn b) (bytes-length chunk))])))) + + ;; -> Void + (define (check-fin!) + (define b (inbound)) + (when (not (buffer-finished? b)) + (unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally + (error 'check-fin "Nonempty inbound buffer")) + (when (fin-seen?) + (log-netstack/tcp-debug "Closing inbound stream.") + (inbound (struct-copy buffer b + [seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte + [finished? #t])) + (transmission-needed? #t)))) ;; we must send an ack + + (define (connected?) + (and (syn-acked?) ;; the SYN we sent has been acked by the remote peer + (not (buffer-finished? (inbound))))) ;; the remote peer hasn't sent a FIN + + (on (asserted (tcp-accepted connection-id)) + (unblocked? #t)) + + (begin/dataflow + (when (and (connected?) (unblocked?)) + (deliver-inbound-locally!) + (check-fin!))) + + ;; -> Void + (define (arm-retransmission-timer!) + (log-netstack/tcp-debug "Arming retransmission timer (~a ms)" (retransmission-timeout)) + (retransmission-deadline (+ (current-inexact-milliseconds) (retransmission-timeout)))) + + ;; Timestamp -> Void + (define (start-rtt-estimate! now) + (define target (send-next)) + (when (seq>= target (high-water-mark)) + (log-netstack/tcp-debug "Starting RTT estimation; target seqn is ~a" target) + (rtt-estimate-start-time now) + (rtt-estimate-seqn-target target))) + + ;; -> Void + (define (reset-rtt-estimate!) + (rtt-estimate-start-time #f) + (rtt-estimate-seqn-target #f)) + + ;; Timestamp -> Void + (define (finish-rtt-estimate! now) + (define rtt-measurement (- now (rtt-estimate-start-time))) + (reset-rtt-estimate!) + (log-netstack/tcp-debug "RTT measurement: ~a ms" rtt-measurement) + ;; RFC 6298 Section 2. + (cond [(rtt-estimate) => ;; we have a previous estimate, RFC 6298 rule (2.3) + (lambda (prev-estimate) + (rtt-mean-deviation (+ (* 0.75 (rtt-mean-deviation)) + (* 0.25 (abs (- rtt-measurement prev-estimate))))) + (rtt-estimate (+ (* 0.875 prev-estimate) + (* 0.125 rtt-measurement))))] + [else ;; no previous estimate, RFC 6298 rule (2.2) applies + (rtt-estimate rtt-measurement) + (rtt-mean-deviation (/ rtt-measurement 2))]) + (default-retransmission-timeout!) + (log-netstack/tcp-debug "RTT measurement ~a ms; estimate ~a ms; mean deviation ~a ms; RTO ~a ms" + rtt-measurement + (rtt-estimate) + (rtt-mean-deviation) + (retransmission-timeout))) + + (define (default-retransmission-timeout!) + (retransmission-timeout + (max 200 ;; RFC 6298 rule (2.4), but cribbing from Linux's 200ms minimum + (min 60000 ;; (2.5) + (+ (rtt-estimate) (* 4 (rtt-mean-deviation))))))) ;; (2.2), (2.3) + + ;; Boolean SeqNum -> Void + (define (discard-acknowledged-outbound! ack? ackn) + (when ack? + (let* ((b (outbound)) + (base (buffer-seqn b)) + (ackn (seq-min ackn (high-water-mark))) + (ackn (seq-max ackn base)) + (dist (seq- ackn base))) + (user-timeout-base-time (current-inexact-milliseconds)) + (when (positive? dist) + (when (not (syn-acked?)) (syn-acked? #t)) + (log-netstack/tcp-debug "******** ackn ~a; send-next ~a; high-water-mark ~a" + ackn + (send-next) + (high-water-mark)) + (when (seq> ackn (send-next)) (send-next ackn)) + (when (and (rtt-estimate-seqn-target) (seq>= ackn (rtt-estimate-seqn-target))) + (finish-rtt-estimate! (current-inexact-milliseconds))) + + (define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset! + (outbound (struct-copy buffer b [data remaining-data] [seqn ackn])) + + (default-retransmission-timeout!) + (log-netstack/tcp-debug "Positive distance moved by ack, RTO now ~a" + (retransmission-timeout)) + (arm-retransmission-timer!))))) + + ;; Nat -> Void + (define (update-outbound-window! peer-window) + (log-netstack/tcp-debug "Peer's receive-window is now ~a" peer-window) + (outbound (struct-copy buffer (outbound) [window peer-window]))) + + ;; True iff there is no queued-up data waiting either for + ;; transmission or (if transmitted already) for acknowledgement. + (define (all-output-acknowledged?) + (bit-string-empty? (buffer-data (outbound)))) + + (define (close-outbound-stream!) + (log-netstack/tcp-debug "Closing outbound stream.") + (define b (outbound)) + (when (not (buffer-finished? b)) + (outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte + [finished? #t])) + (transmission-needed? #t))) ;; the FIN machinery is awkwardly + ;; different from the usual + ;; advance-based decision on + ;; whether to send a packet or not + + ;; SeqNum Boolean Boolean Bytes -> TcpPacket + (define (build-outbound-packet seqn mention-syn? mention-fin? payload) + (define ackn (next-expected-seqn)) + (define window (min 65535 ;; limit of field width + (max 0 ;; can't be negative + (- (buffer-window (inbound)) + (bit-string-byte-count (buffer-data (inbound))))))) + + (define flags (set)) + (when ackn (set! flags (set-add flags 'ack))) + (when mention-syn? (set! flags (set-add flags 'syn))) + (when mention-fin? (set! flags (set-add flags 'fin))) + (tcp-packet #f q seqn (or ackn 0) flags window #"" payload)) + + (define (outbound-data-chunk offset length) + (bit-string-take (bit-string-drop (buffer-data (outbound)) (* offset 8)) (* length 8))) + + ;; Transmit acknowledgements and outbound data. + (begin/dataflow + (define in-flight-count (seq- (send-next) (buffer-seqn (outbound)))) + + (define-values (mention-syn? ;; whether to mention SYN + payload-size ;; how many bytes of payload data to include + mention-fin? ;; whether to mention FIN + advance) ;; how far to advance send-next + (if (syn-acked?) + (let* ((effective-window (max 0 (- (buffer-window (outbound)) in-flight-count))) + (stream-ended? (buffer-finished? (outbound))) + (max-advance (- (bit-string-byte-count (buffer-data (outbound))) in-flight-count)) + (payload-size (min maximum-segment-size effective-window max-advance))) + (if (and stream-ended? ;; there's a FIN enqueued, + (positive? payload-size) ;; we aren't sending nothing at all, + (= payload-size max-advance)) ;; and our payload would cover the FIN + (values #f (- payload-size 1) #t payload-size) + (values #f payload-size #f payload-size))) + (cond [(= in-flight-count 0) (values #t 0 #f 1)] + [(= in-flight-count 1) (values #t 0 #f 0)] + [else (error 'send-outbound! + "Invalid state: send-next had advanced too far before SYN")]))) + + (when (and (or (next-expected-seqn) outbound?) + ;; ^ Talk only either if: we know the peer's seqn, or + ;; we don't, but we're an outbound connection rather + ;; than a listener. + (or (transmission-needed?) + (positive? advance)) + ;; ^ ... and we have something to say. Something to + ;; ack, or something to send. + ) + (define packet-seqn (if mention-syn? (buffer-seqn (outbound)) (send-next))) + (define packet (build-outbound-packet packet-seqn + mention-syn? + mention-fin? + (outbound-data-chunk in-flight-count payload-size))) + (when (positive? advance) + (define new-send-next (seq+ (send-next) advance)) + (send-next new-send-next) + (when (seq> new-send-next (high-water-mark)) + (high-water-mark new-send-next))) + (when (transmission-needed?) + (transmission-needed? #f)) + + ;; (log-netstack/tcp-debug " sending ~v" packet) + (send! packet) + ;; (if (> (random) 0.5) + ;; (begin (log-netstack/tcp-debug "Send ~a" (summarize-tcp-packet packet)) + ;; (send! packet)) + ;; (log-netstack/tcp-debug "Drop ~a" (summarize-tcp-packet packet))) + + (when (or mention-syn? mention-fin? (positive? advance)) + (when (not (retransmission-deadline)) + (arm-retransmission-timer!)) + (when (not (rtt-estimate-start-time)) + (start-rtt-estimate! (current-inexact-milliseconds)))))) + + (begin/dataflow + (when (and (retransmission-deadline) (all-output-acknowledged?)) + (log-netstack/tcp-debug "All output acknowledged; disarming retransmission timer") + (retransmission-deadline #f))) + + (on #:when (retransmission-deadline) (asserted (later-than (retransmission-deadline))) + (send-next (buffer-seqn (outbound))) + (log-netstack/tcp-debug "Retransmission deadline fired, RTO was ~a; reset to ~a" + (retransmission-timeout) + (send-next)) + (update-outbound-window! maximum-segment-size) ;; temporary. Will reopen on next ack + (transmission-needed? #t) + (retransmission-deadline #f) + (reset-rtt-estimate!) ;; give up on current RTT estimation + (retransmission-timeout (min 64000 (* 2 (retransmission-timeout)))) + (log-netstack/tcp-debug " RTO now ~a" (retransmission-timeout))) + + (define (reset! seqn ackn) + (define reset-packet (tcp-packet #f q seqn ackn (set 'ack 'rst) 0 #"" #"")) + (log-netstack/tcp-warning "Reset ~a" (summarize-tcp-packet reset-packet)) + (stop-facet root-facet) + (send! reset-packet)) + + (assert q) ;; Declare that this state vector exists + (on-start (log-netstack/tcp-info "Starting ~a" (tcp-quad->string #t q))) + (on-stop (log-netstack/tcp-info "Stopping ~a" (tcp-quad->string #t q))) + + (stop-when #:when (and (buffer-finished? (outbound)) + (buffer-finished? (inbound)) + (all-output-acknowledged?)) + (asserted (later-than (+ (latest-peer-activity-time) + (* 2 1000 maximum-segment-lifetime-sec)))) + ;; Everything is cleanly shut down, and we just need to wait a while for unexpected + ;; packets before we release the state vector. + ) + + (stop-when #:when (not (all-output-acknowledged?)) + (asserted (later-than (+ (user-timeout-base-time) user-timeout-msec))) + ;; We've been plaintively retransmitting for user-timeout-msec without hearing anything + ;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but + ;; it will do for now? TODO + (log-netstack/tcp-warning "TCP_USER_TIMEOUT fired.")) + + (define/query-value listener-listening? + #f (observe (tcp-connection _ (tcp-listener local-port))) #t) + + (define (trigger-ack!) + (transmission-needed? #t)) + + (on (message (tcp-packet #t q $seqn $ackn $flags $window $options $data)) + (define expected (next-expected-seqn)) + (define is-syn? (set-member? flags 'syn)) + (define is-fin? (set-member? flags 'fin)) + (cond + [(set-member? flags 'rst) (stop-facet root-facet)] + [(and (not expected) ;; no syn yet + (or (not is-syn?) ;; and this isn't it + (and (not (listener-listening?)) ;; or it is, but no listener... + (not outbound?)))) ;; ...and we're not an outbound connection + (reset! ackn ;; this is *our* seqn + (seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0))) + ;; ^^ this is what we should acknowledge... + )] + [else + (cond + [(not expected) ;; haven't seen syn yet, but we know this is it + (set-inbound-seqn! (seq+ seqn 1)) + (incorporate-segment! data) + (trigger-ack!)] + [(= expected seqn) + (incorporate-segment! data) + (when (positive? (bit-string-byte-count data)) (trigger-ack!))] + [else + (trigger-ack!)]) + (when is-fin? (fin-seen? #t)) + (discard-acknowledged-outbound! (set-member? flags 'ack) ackn) + (update-outbound-window! window) + (latest-peer-activity-time (current-inexact-milliseconds))])) + + (on (message (tcp-out connection-id $bs)) + ;; (log-netstack/tcp-debug "GOT MORE STUFF TO DELIVER ~v" bs) + + (when (all-output-acknowledged?) + ;; Only move user-timeout-base-time if there wasn't + ;; already some outstanding output. + (user-timeout-base-time (current-inexact-milliseconds))) + + (outbound (buffer-push (outbound) bs))) + + (if outbound? + (begin + (assert #:when (connected?) (tcp-accepted connection-id)) + (on (retracted (tcp-connection connection-id (tcp-address _ _))) + (close-outbound-stream!))) + (begin + (assert #:when (connected?) (tcp-connection connection-id (tcp-listener local-port))) + (on (retracted (tcp-accepted connection-id)) + (close-outbound-stream!)) + (on-start (sleep 5) + (when (not (unblocked?)) + (log-netstack/tcp-error "TCP relay process ~a timed out waiting for peer" q) + (stop-facet root-facet))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn-tcp-driver) diff --git a/syndicate/examples/netstack/udp.rkt b/syndicate/examples/netstack/udp.rkt new file mode 100644 index 0000000..bfa553d --- /dev/null +++ b/syndicate/examples/netstack/udp.rkt @@ -0,0 +1,133 @@ +#lang imperative-syndicate + +(provide (struct-out udp-remote-address) + (struct-out udp-handle) + (struct-out udp-listener) + udp-address? + udp-local-address? + (struct-out udp-packet) + spawn-udp-driver) + +(require racket/set) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "checksum.rkt") +(require "configuration.rkt") +(require/activate "ip.rkt") +(require "port-allocator.rkt") + +;; udp-address/udp-address : "kernel" udp connection state machines +;; udp-handle/udp-address : "user" outbound connections +;; udp-listener/udp-address : "user" inbound connections + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct udp-remote-address (host port) #:prefab) +(struct udp-handle (id) #:prefab) +(struct udp-listener (port) #:prefab) + +(define (udp-address? x) + (or (udp-remote-address? x) + (udp-local-address? x))) + +(define (udp-local-address? x) + (or (udp-handle? x) + (udp-listener? x))) + +;; USER-level protocol +(struct udp-packet (source destination body) #:prefab) + +;; KERNEL-level protocol +(struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab) +(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; User-accessible driver startup + +(define (spawn-udp-driver) + (spawn-port-allocator 'udp (lambda () (query-set udp-ports (udp-port-allocation $p _) p))) + (spawn-kernel-udp-driver) + (spawn #:name 'udp-driver + (on (asserted (observe (udp-packet _ ($ h (udp-listener _)) _))) + (spawn-udp-relay (udp-listener-port h) h)) + (on (asserted (observe (udp-packet _ ($ h (udp-handle _)) _))) + (spawn #:name (list 'udp-transient h) + (on-start (spawn-udp-relay (allocate-port! 'udp) h)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Relaying + +(define (spawn-udp-relay local-port local-user-addr) + (spawn #:name (list 'udp-relay local-port local-user-addr) + (on-start (log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr)) + + (stop-when (retracted (observe (udp-packet _ local-user-addr _)))) + (assert (udp-port-allocation local-port local-user-addr)) + + (during (host-route $ip _ _) + (on (message (udp-datagram $source-ip $source-port ip local-port $bs)) + (send! + (udp-packet (udp-remote-address (ip-address->hostname source-ip) + source-port) + local-user-addr + bs)))) + + (define local-ips (query-local-ip-addresses)) + (on (message (udp-packet local-user-addr (udp-remote-address $other-host $other-port) $bs)) + ;; Choose arbitrary local IP address for outbound packet! + ;; TODO: what can be done? Must I examine the routing table? + (send! (udp-datagram (set-first (local-ips)) + local-port + (ip-string->ip-address other-host) + other-port + bs))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Codec & kernel-level driver + +(define PROTOCOL-UDP 17) + +(define (spawn-kernel-udp-driver) + (spawn #:name 'kernel-udp-driver + (define local-ips (query-local-ip-addresses)) + + (on (message (ip-packet $source-if $src-ip $dst-ip PROTOCOL-UDP _ $body)) + (when (and source-if (set-member? (local-ips) dst-ip)) + (bit-string-case body + ([ (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (length :: integer bytes 2) + (checksum :: integer bytes 2) ;; TODO: check checksum + (data :: binary) ] + (bit-string-case data + ([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes + (:: binary) ] + (send! (udp-datagram src-ip src-port dst-ip dst-port + (bit-string->bytes payload)))) + (else #f))) + (else #f)))) + + (on (message (udp-datagram $src-ip $src-port $dst-ip $dst-port $bs)) + (when (set-member? (local-ips) src-ip) + (let* ((payload (bit-string (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + ((+ 8 (bit-string-byte-count bs)) + :: integer bytes 2) + (0 :: integer bytes 2) ;; checksum location + (bs :: binary))) + (pseudo-header (bit-string (src-ip :: binary bytes 4) + (dst-ip :: binary bytes 4) + 0 + PROTOCOL-UDP + ((bit-string-byte-count payload) + :: integer bytes 2))) + (checksummed-payload (ip-checksum #:pseudo-header pseudo-header + 6 payload))) + (send! (ip-packet #f src-ip dst-ip PROTOCOL-UDP #"" + checksummed-payload))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn-udp-driver)