From b2d5a3f74d5bf56347104d1018cfbc356b3acd6a Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 21 Jul 2016 17:02:34 -0400 Subject: [PATCH] Copy monolithic netstack implementation to subdir, for future reference --- .../netstack/monolithic-lowlevel/Makefile | 7 + .../netstack/monolithic-lowlevel/README.md | 16 + examples/netstack/monolithic-lowlevel/TODO.md | 18 + examples/netstack/monolithic-lowlevel/arp.rkt | 235 +++++++ .../netstack/monolithic-lowlevel/checksum.rkt | 52 ++ .../monolithic-lowlevel/configuration.rkt | 21 + .../monolithic-lowlevel/demo-config.rkt | 25 + .../monolithic-lowlevel/dump-bytes.rkt | 80 +++ .../netstack/monolithic-lowlevel/ethernet.rkt | 134 ++++ .../netstack/monolithic-lowlevel/fetchurl.rkt | 48 ++ examples/netstack/monolithic-lowlevel/ip.rkt | 327 +++++++++ .../netstack/monolithic-lowlevel/main.rkt | 121 ++++ .../netstack/monolithic-lowlevel/on-claim.rkt | 47 ++ .../monolithic-lowlevel/port-allocator.rkt | 38 + examples/netstack/monolithic-lowlevel/tcp.rkt | 665 ++++++++++++++++++ examples/netstack/monolithic-lowlevel/udp.rkt | 175 +++++ 16 files changed, 2009 insertions(+) create mode 100644 examples/netstack/monolithic-lowlevel/Makefile create mode 100644 examples/netstack/monolithic-lowlevel/README.md create mode 100644 examples/netstack/monolithic-lowlevel/TODO.md create mode 100644 examples/netstack/monolithic-lowlevel/arp.rkt create mode 100644 examples/netstack/monolithic-lowlevel/checksum.rkt create mode 100644 examples/netstack/monolithic-lowlevel/configuration.rkt create mode 100644 examples/netstack/monolithic-lowlevel/demo-config.rkt create mode 100644 examples/netstack/monolithic-lowlevel/dump-bytes.rkt create mode 100644 examples/netstack/monolithic-lowlevel/ethernet.rkt create mode 100644 examples/netstack/monolithic-lowlevel/fetchurl.rkt create mode 100644 examples/netstack/monolithic-lowlevel/ip.rkt create mode 100644 examples/netstack/monolithic-lowlevel/main.rkt create mode 100644 examples/netstack/monolithic-lowlevel/on-claim.rkt create mode 100644 examples/netstack/monolithic-lowlevel/port-allocator.rkt create mode 100644 examples/netstack/monolithic-lowlevel/tcp.rkt create mode 100644 examples/netstack/monolithic-lowlevel/udp.rkt diff --git a/examples/netstack/monolithic-lowlevel/Makefile b/examples/netstack/monolithic-lowlevel/Makefile new file mode 100644 index 0000000..d7ba69b --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/Makefile @@ -0,0 +1,7 @@ +all: + +run: + raco make main.rkt && racket main.rkt + +clean: + find . -name compiled -type d | xargs rm -rf diff --git a/examples/netstack/monolithic-lowlevel/README.md b/examples/netstack/monolithic-lowlevel/README.md new file mode 100644 index 0000000..bbf36a5 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/README.md @@ -0,0 +1,16 @@ +# TCP/IP Stack + +## Linux Firewall Configuration + +Imagine a setup where the machine you are running this code has IP +192.168.1.10. This code claims 192.168.1.222 for itself. Now, pinging +192.168.1.222 from some other machine, say 192.168.1.99, will cause +the local kernel to receive the pings and then *forward them on to +192.168.1.222*, which because of the gratuitous ARP announcement, it +knows to be on its own Ethernet MAC address. This causes the ping +requests to repeat endlessly, each time with one lower TTL. + +One approach to solving the problem is to prevent the kernel from +forwarding packets addressed to 192.168.1.222. To do this, + + sudo iptables -I FORWARD -d 192.168.1.222 -j DROP diff --git a/examples/netstack/monolithic-lowlevel/TODO.md b/examples/netstack/monolithic-lowlevel/TODO.md new file mode 100644 index 0000000..8d674d5 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/TODO.md @@ -0,0 +1,18 @@ +Ideas on TCP unit testing: + + +Check behaviour around TCP zero-window probing. Is the correct +behaviour already a consequence of the way `send-outbound` works? + +Do something smarter with TCP timers and RTT estimation than the +nothing that's already being done. + +TCP options negotiation. + - SACK + - Window scaling + +Bugs: + - RST kills a connection even if its sequence number is bogus. Check + to make sure it's in the window. (See + http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41848.pdf + and RFC 5961) diff --git a/examples/netstack/monolithic-lowlevel/arp.rkt b/examples/netstack/monolithic-lowlevel/arp.rkt new file mode 100644 index 0000000..97cf773 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/arp.rkt @@ -0,0 +1,235 @@ +#lang racket/base +;; ARP protocol, http://tools.ietf.org/html/rfc826 +;; Only does ARP-over-ethernet. + +(provide (struct-out arp-query) + (struct-out arp-assertion) + (struct-out arp-interface) + spawn-arp-driver) + +(require racket/set) +(require racket/match) +(require syndicate/monolithic) +(require syndicate/drivers/timer) +(require syndicate/demand-matcher) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "configuration.rkt") +(require "ethernet.rkt") + +(struct arp-query (protocol protocol-address interface link-address) #:prefab) +(struct arp-assertion (protocol protocol-address interface-name) #:prefab) +(struct arp-interface (interface-name) #:prefab) + +(struct arp-interface-up (interface-name) #:prefab) + +(define ARP-ethertype #x0806) +(define cache-entry-lifetime-msec (* 14400 1000)) +(define wakeup-interval 5000) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (spawn-arp-driver) + (spawn-demand-matcher (arp-interface (?!)) + (arp-interface-up (?!)) + spawn-arp-interface)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(struct cache-key (protocol address) #:transparent) +(struct cache-value (expiry interface address) #:transparent) + +(struct state (cache queries assertions) #:transparent) + +(define (spawn-arp-interface interface-name) + (log-info "spawn-arp-interface ~v" interface-name) + (lookup-ethernet-hwaddr (assertion (arp-interface-up interface-name)) + interface-name + (lambda (hwaddr) (spawn-arp-interface* interface-name hwaddr)))) + +(define (spawn-arp-interface* interface-name hwaddr) + (log-info "spawn-arp-interface* ~v ~v" interface-name hwaddr) + (define interface (ethernet-interface interface-name hwaddr)) + + (define (expire-cache cache) + (define now (current-inexact-milliseconds)) + (define (not-expired? v) (< now (cache-value-expiry v))) + (for/hash [((k v) (in-hash cache)) #:when (not-expired? v)] + (values k v))) + + (define timer-key (list 'arp interface-name)) + + (define (set-wakeup-alarm) + (message (set-timer timer-key wakeup-interval 'relative))) + + (define (compute-gestalt cache) + (scn/union (subscription (timer-expired timer-key ?)) + (subscription interface) + (subscription (ethernet-packet-pattern interface-name #t ARP-ethertype)) + (assertion (arp-interface-up interface-name)) + (subscription (arp-assertion ? ? interface-name)) + (subscription (observe (arp-query ? ? interface ?))) + (for/fold [(g trie-empty)] [((k v) (in-hash cache))] + (assertion-set-union g (assertion (arp-query (cache-key-protocol k) + (cache-key-address k) + (cache-value-interface v) + (cache-value-address v))))))) + + (define (build-packet dest-mac ptype oper sender-ha sender-pa target-ha target-pa) + (define hlen (bytes-length target-ha)) + (define plen (bytes-length target-pa)) + (define packet (bit-string->bytes + (bit-string (1 :: integer bytes 2) + (ptype :: integer bytes 2) + hlen + plen + (oper :: integer bytes 2) + (sender-ha :: binary bytes hlen) + (sender-pa :: binary bytes plen) + (target-ha :: binary bytes hlen) + (target-pa :: binary bytes plen)))) + (ethernet-packet interface + #f + hwaddr + dest-mac + ARP-ethertype + packet)) + + (define (analyze-incoming-packet source destination body s) + (bit-string-case body + ([ (= 1 :: integer bytes 2) + (ptype :: integer bytes 2) + hlen + plen + (oper :: integer bytes 2) + (sender-hardware-address0 :: binary bytes hlen) + (sender-protocol-address0 :: binary bytes plen) + (target-hardware-address0 :: binary bytes hlen) + (target-protocol-address0 :: binary bytes plen) + (:: binary) ;; The extra zeros exist because ethernet packets + ;; have a minimum size. This is, in part, why + ;; IPv4 headers have a total-length field, so + ;; that the zero padding can be removed. + ] + (let () + (define sender-protocol-address (bit-string->bytes sender-protocol-address0)) + (define sender-hardware-address (bit-string->bytes sender-hardware-address0)) + (define target-protocol-address (bit-string->bytes target-protocol-address0)) + (define learned-key (cache-key ptype sender-protocol-address)) + (when (and (set-member? (state-queries s) learned-key) ;; it is relevant to our interests + (not (equal? sender-hardware-address + (cache-value-address (hash-ref (state-cache s) + learned-key + (lambda () + (cache-value #f #f #f))))))) + (log-info "~a ARP Adding ~a = ~a to cache" + interface-name + (pretty-bytes sender-protocol-address) + (pretty-bytes sender-hardware-address))) + (define cache (hash-set (expire-cache (state-cache s)) + learned-key + (cache-value (+ (current-inexact-milliseconds) + cache-entry-lifetime-msec) + interface + sender-hardware-address))) + (transition (struct-copy state s [cache cache]) + (list + (case oper + [(1) ;; request + (if (set-member? (state-assertions s) + (cache-key ptype target-protocol-address)) + (begin + (log-info "~a ARP answering request for ~a/~a" + interface-name + ptype + (pretty-bytes target-protocol-address)) + (message (build-packet sender-hardware-address + ptype + 2 ;; reply + hwaddr + target-protocol-address + sender-hardware-address + sender-protocol-address))) + '())] + [(2) '()] ;; reply + [else '()]) + (compute-gestalt cache))))) + (else #f))) + + (define queries-projection (observe (arp-query (?!) (?!) ? ?))) + (define (gestalt->queries g) + (for/set [(e (in-set (trie-project/set #:take 2 g queries-projection)))] + (match-define (list ptype pa) e) + (cache-key ptype pa))) + + (define assertions-projection (arp-assertion (?!) (?!) ?)) + (define (gestalt->assertions g) + (for/set [(e (in-set (trie-project/set #:take 2 g assertions-projection)))] + (match-define (list ptype pa) e) + (cache-key ptype pa))) + + (define (analyze-gestalt g s) + (define new-assertions (gestalt->assertions g)) + (define added-assertions (set-subtract new-assertions (state-assertions s))) + (define new-s (struct-copy state s [queries (gestalt->queries g)] [assertions new-assertions])) + (if (trie-empty? (project-assertions g (arp-interface interface-name))) + (quit) + (transition new-s + (list + (for/list [(a (in-set added-assertions))] + (log-info "~a ARP Announcing ~a as ~a" + interface-name + (pretty-bytes (cache-key-address a)) + (pretty-bytes hwaddr)) + (message (build-packet broadcast-ethernet-address + (cache-key-protocol a) + 2 ;; reply -- gratuitous announcement + hwaddr + (cache-key-address a) + hwaddr + (cache-key-address a)))))))) + + (define (send-questions s) + (define unanswered-queries + (set-subtract (state-queries s) (list->set (hash-keys (state-cache s))))) + (define (some-asserted-pa ptype) + (match (filter (lambda (k) (equal? (cache-key-protocol k) ptype)) + (set->list (state-assertions s))) + ['() #f] + [(list* k _) (cache-key-address k)])) + (transition s + (for/list [(q (in-set unanswered-queries))] + (define pa (some-asserted-pa (cache-key-protocol q))) + (log-info "~a ARP Asking for ~a from ~a" + interface-name + (pretty-bytes (cache-key-address q)) + (and pa (pretty-bytes pa))) + (when pa + (message (build-packet broadcast-ethernet-address + (cache-key-protocol q) + 1 ;; request + hwaddr + pa + zero-ethernet-address + (cache-key-address q))))))) + + (list (set-wakeup-alarm) + (spawn (lambda (e s) + ;; (log-info "ARP ~a ~a: ~v // ~v" interface-name (pretty-bytes hwaddr) e s) + (match e + [(scn g) + (sequence-transitions (analyze-gestalt g s) + send-questions)] + [(message (ethernet-packet _ _ source destination _ body)) + (analyze-incoming-packet source destination body s)] + [(message (timer-expired _ _)) + (define new-s (struct-copy state s + [cache (expire-cache (state-cache s))])) + (sequence-transitions (transition new-s + (list (set-wakeup-alarm) + (compute-gestalt (state-cache new-s)))) + send-questions)] + [_ #f])) + (state (hash) (set) (set)) + (compute-gestalt (hash))))) diff --git a/examples/netstack/monolithic-lowlevel/checksum.rkt b/examples/netstack/monolithic-lowlevel/checksum.rkt new file mode 100644 index 0000000..e34fc13 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/checksum.rkt @@ -0,0 +1,52 @@ +#lang racket/base + +(provide ones-complement-sum16 ip-checksum) + +(require bitsyntax) +(require "dump-bytes.rkt") + +(define (ones-complement-+16 a b) + (define c (+ a b)) + (bitwise-and #xffff (+ (arithmetic-shift c -16) c))) + +(define (ones-complement-sum16 bs) + (bit-string-case bs + ([ (n :: integer bytes 2) (rest :: binary) ] + (ones-complement-+16 n (ones-complement-sum16 rest))) + ([ odd-byte ] + (arithmetic-shift odd-byte 8)) + ([ ] + 0))) + +(define (ones-complement-negate16-safely x) + (define r (bitwise-and #xffff (bitwise-not x))) + (if (= r 0) #xffff r)) + +(define (ip-checksum offset blob #:pseudo-header [pseudo-header #""]) + (bit-string-case blob + ([ (prefix :: binary bytes offset) + (:: binary bytes 2) + (suffix :: binary) ] + ;; (log-info "Packet pre checksum:\n~a" (dump-bytes->string blob)) + (define result (ones-complement-+16 + (ones-complement-sum16 pseudo-header) + (ones-complement-+16 (ones-complement-sum16 prefix) + (ones-complement-sum16 suffix)))) + ;; (log-info "result: ~a" (number->string result 16)) + (define checksum (ones-complement-negate16-safely result)) + ;; (log-info "Checksum ~a" (number->string checksum 16)) + (define final-packet (bit-string (prefix :: binary) + (checksum :: integer bytes 2) + (suffix :: binary))) + ;; (log-info "Packet with checksum:\n~a" (dump-bytes->string final-packet)) + final-packet))) + +(module+ test + (require rackunit) + (check-equal? (ones-complement-negate16-safely + (ones-complement-sum16 (bytes #x45 #x00 #x00 #x54 + #x00 #x00 #x00 #x00 + #x40 #x01 #x00 #x00 + #xc0 #xa8 #x01 #xde + #xc0 #xa8 #x01 #x8f))) + #xf5eb)) diff --git a/examples/netstack/monolithic-lowlevel/configuration.rkt b/examples/netstack/monolithic-lowlevel/configuration.rkt new file mode 100644 index 0000000..01c8a86 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/configuration.rkt @@ -0,0 +1,21 @@ +#lang racket/base + +(provide (struct-out ethernet-interface) + (struct-out host-route) + (struct-out gateway-route) + (struct-out net-route) + + (struct-out route-up)) + +(struct ethernet-interface (name hwaddr) #:prefab) + +;; A Route is one of +;; - (host-route IpAddrBytes NetmaskNat InterfaceName), an own-IP route +;; - (gateway-route NetAddrBytes NetmaskNat IpAddrBytes InterfaceName), a gateway for a subnet +;; - (net-route NetAddrBytes NetmaskNat InterfaceName), an ethernet route for a subnet +;; NetmaskNat in a net-route is a default route. +(struct host-route (ip-addr netmask interface-name) #:prefab) +(struct gateway-route (network-addr netmask gateway-addr interface-name) #:prefab) +(struct net-route (network-addr netmask link) #:prefab) + +(struct route-up (route) #:prefab) ;; assertion: the given Route is running diff --git a/examples/netstack/monolithic-lowlevel/demo-config.rkt b/examples/netstack/monolithic-lowlevel/demo-config.rkt new file mode 100644 index 0000000..329a1aa --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/demo-config.rkt @@ -0,0 +1,25 @@ +#lang racket/base +;; Demonstration stack configuration for various hosts. + +(require racket/match) +(require syndicate/monolithic) +(require (only-in mzlib/os gethostname)) +(require "configuration.rkt") + +(provide spawn-demo-config) + +(define (spawn-demo-config) + (spawn (lambda (e s) #f) + (void) + (match (gethostname) + ["skip" + (scn/union (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "en0")) + (assertion (host-route (bytes 192 168 1 222) 24 "en0")))] + [(or "hop" "walk") + (scn/union (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "wlan0")) + (assertion (host-route (bytes 192 168 1 222) 24 "wlan0")))] + ["stockholm.ccs.neu.edu" + (scn/union (assertion (host-route (bytes 129 10 115 94) 24 "eth0")) + (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 129 10 115 1) "eth0")))] + [else + (error 'spawn-demo-config "No setup for hostname ~a" (gethostname))]))) diff --git a/examples/netstack/monolithic-lowlevel/dump-bytes.rkt b/examples/netstack/monolithic-lowlevel/dump-bytes.rkt new file mode 100644 index 0000000..365e59d --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/dump-bytes.rkt @@ -0,0 +1,80 @@ +#lang racket/base +;; Copyright (C) 2012 Tony Garnock-Jones +;; +;; dump-bytes.rkt is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published +;; by the Free Software Foundation, either version 3 of the License, +;; or (at your option) any later version. +;; +;; dump-bytes.rkt is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with dump-bytes.rkt. If not, see . + +;; Pretty hex dump output of a Bytes. + +(provide dump-bytes! + dump-bytes->string + pretty-bytes) + +(require (only-in bitsyntax bit-string->bytes)) +(require (only-in file/sha1 bytes->hex-string)) + +(define (pretty-bytes bs) + (bytes->hex-string (bit-string->bytes bs))) + +;; Exact Exact -> String +;; Returns the "0"-padded, width-digit hex representation of n +(define (hex width n) + (define s (number->string n 16)) + (define slen (string-length s)) + (cond + ((< slen width) (string-append (make-string (- width slen) #\0) s)) + ((= slen width) s) + ((> slen width) (substring s 0 width)))) + +;; Bytes Exact -> Void +;; Prints a pretty hex/ASCII dump of bs on (current-output-port). +(define (dump-bytes! bs0 [requested-count #f] #:base [baseaddr 0]) + (define bs (bit-string->bytes bs0)) + (define count (if requested-count (min requested-count (bytes-length bs)) (bytes-length bs))) + (define clipped (subbytes bs 0 count)) + (define (dump-hex i) + (if (< i count) + (display (hex 2 (bytes-ref clipped i))) + (display " ")) + (display #\space)) + (define (dump-char i) + (if (< i count) + (let ((ch (bytes-ref clipped i))) + (if (<= 32 ch 127) + (display (integer->char ch)) + (display #\.))) + (display #\space))) + (define (for-each-between f low high) + (do ((i low (+ i 1))) + ((= i high)) + (f i))) + (define (dump-line i) + (display (hex 8 (+ i baseaddr))) + (display #\space) + (for-each-between dump-hex i (+ i 8)) + (display ": ") + (for-each-between dump-hex (+ i 8) (+ i 16)) + (display #\space) + (for-each-between dump-char i (+ i 8)) + (display " : ") + (for-each-between dump-char (+ i 8) (+ i 16)) + (newline)) + (do ((i 0 (+ i 16))) + ((>= i count)) + (dump-line i))) + +(define (dump-bytes->string bs [requested-count #f] #:base [baseaddr 0]) + (define s (open-output-string)) + (parameterize ((current-output-port s)) + (dump-bytes! bs requested-count #:base baseaddr)) + (get-output-string s)) diff --git a/examples/netstack/monolithic-lowlevel/ethernet.rkt b/examples/netstack/monolithic-lowlevel/ethernet.rkt new file mode 100644 index 0000000..c5ecef9 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/ethernet.rkt @@ -0,0 +1,134 @@ +#lang racket/base +;; Ethernet driver + +(provide (struct-out ethernet-packet) + zero-ethernet-address + broadcast-ethernet-address + interface-names + spawn-ethernet-driver + ethernet-packet-pattern + lookup-ethernet-hwaddr) + +(require racket/set) +(require racket/match) +(require racket/async-channel) + +(require syndicate/monolithic) +(require syndicate/demand-matcher) +(require "on-claim.rkt") + +(require packet-socket) +(require bitsyntax) + +(require "configuration.rkt") +(require "dump-bytes.rkt") + +(struct ethernet-packet (interface from-wire? source destination ethertype body) #:prefab) + +(define zero-ethernet-address (bytes 0 0 0 0 0 0)) +(define broadcast-ethernet-address (bytes 255 255 255 255 255 255)) + +(define interface-names (raw-interface-names)) +(log-info "Device names: ~a" interface-names) + +(define (spawn-ethernet-driver) + (spawn-demand-matcher (observe (ethernet-packet (ethernet-interface (?!) ?) #t ? ? ? ?)) + (ethernet-interface (?!) ?) + spawn-interface-tap)) + +(define (spawn-interface-tap interface-name) + (define h (raw-interface-open interface-name)) + (define interface (ethernet-interface interface-name (raw-interface-hwaddr h))) + (cond + [(not h) + (log-error "ethernet: Couldn't open interface ~v" interface-name) + '()] + [else + (log-info "Opened interface ~a, yielding handle ~v" interface-name h) + (define control-ch (make-async-channel)) + (thread (lambda () (interface-packet-read-loop interface h control-ch))) + (spawn (lambda (e h) + (match e + [(scn g) + (if (trie-empty? g) + (begin (async-channel-put control-ch 'quit) + (quit)) + (begin (async-channel-put control-ch 'unblock) + #f))] + [(message (at-meta (? ethernet-packet? p))) + ;; (log-info "Interface ~a inbound packet ~a -> ~a (type 0x~a)" + ;; (ethernet-interface-name (ethernet-packet-interface p)) + ;; (pretty-bytes (ethernet-packet-source p)) + ;; (pretty-bytes (ethernet-packet-destination p)) + ;; (number->string (ethernet-packet-ethertype p) 16)) + ;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p))) + (transition h (message p))] + [(message (? ethernet-packet? p)) + ;; (log-info "Interface ~a OUTBOUND packet ~a -> ~a (type 0x~a)" + ;; (ethernet-interface-name (ethernet-packet-interface p)) + ;; (pretty-bytes (ethernet-packet-source p)) + ;; (pretty-bytes (ethernet-packet-destination p)) + ;; (number->string (ethernet-packet-ethertype p) 16)) + ;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p))) + (raw-interface-write h (encode-ethernet-packet p)) + #f] + [_ #f])) + h + (scn/union (assertion interface) + (subscription (ethernet-packet interface #f ? ? ? ?)) + (subscription (observe (ethernet-packet interface #t ? ? ? ?))) + (subscription (ethernet-packet interface #t ? ? ? ?) #:meta-level 1)))])) + +(define (interface-packet-read-loop interface h control-ch) + (define (blocked) + (match (async-channel-get control-ch) + ['unblock (unblocked)] + ['quit (void)])) + (define (unblocked) + (match (async-channel-try-get control-ch) + ['unblock (unblocked)] + ['quit (void)] + [#f + (define p (raw-interface-read h)) + (define decoded (decode-ethernet-packet interface p)) + (when decoded (send-ground-message decoded)) + (unblocked)])) + (blocked) + (raw-interface-close h)) + +(define (decode-ethernet-packet interface p) + (bit-string-case p + ([ (destination :: binary bytes 6) + (source :: binary bytes 6) + (ethertype :: integer bytes 2) + (body :: binary) ] + (ethernet-packet interface + #t + (bit-string->bytes source) + (bit-string->bytes destination) + ethertype + (bit-string->bytes body))) + (else #f))) + +(define (encode-ethernet-packet p) + (match-define (ethernet-packet _ _ source destination ethertype body) p) + (bit-string->bytes + (bit-string (destination :: binary bytes 6) + (source :: binary bytes 6) + (ethertype :: integer bytes 2) + (body :: binary)))) + +(define (ethernet-packet-pattern interface-name from-wire? ethertype) + (ethernet-packet (ethernet-interface interface-name ?) from-wire? ? ? ethertype ?)) + +(define (lookup-ethernet-hwaddr base-interests interface-name k) + (on-claim #:timeout-msec 5000 + #:on-timeout (lambda () + (log-info "Lookup of ethernet interface ~v failed" interface-name) + '()) + (lambda (_g hwaddrss) + (and (not (set-empty? hwaddrss)) + (let ((hwaddr (car (set-first hwaddrss)))) + (k hwaddr)))) + base-interests + (ethernet-interface interface-name (?!)))) diff --git a/examples/netstack/monolithic-lowlevel/fetchurl.rkt b/examples/netstack/monolithic-lowlevel/fetchurl.rkt new file mode 100644 index 0000000..a88c12f --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/fetchurl.rkt @@ -0,0 +1,48 @@ +#lang syndicate/monolithic + +(require syndicate/drivers/timer) +(require "demo-config.rkt") +(require "ethernet.rkt") +(require "arp.rkt") +(require "ip.rkt") +(require "tcp.rkt") +(require "udp.rkt") + +;;(log-events-and-actions? #t) + +(spawn-timer-driver) +(spawn-ethernet-driver) +(spawn-arp-driver) +(spawn-ip-driver) +(spawn-tcp-driver) +(spawn-udp-driver) +(spawn-demo-config) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(let () + (define local-handle (tcp-handle 'httpclient)) + (define remote-handle (tcp-address "129.10.115.92" 80)) + + (spawn (lambda (e seen-peer?) + (match e + [(scn g) + (define peer-present? (trie-non-empty? g)) + (if (and (not peer-present?) seen-peer?) + (begin (printf "URL fetcher exiting.\n") + (quit)) + (transition (or seen-peer? peer-present?) + (message + (tcp-channel + local-handle + remote-handle + #"GET / HTTP/1.0\r\nHost: stockholm.ccs.neu.edu\r\n\r\n"))))] + [(message (tcp-channel _ _ bs)) + (printf "----------------------------------------\n~a\n" bs) + (printf "----------------------------------------\n") + #f] + [_ #f])) + #f + (scn/union (advertisement (tcp-channel local-handle remote-handle ?)) + (subscription (tcp-channel remote-handle local-handle ?)) + (subscription (advertise (tcp-channel remote-handle local-handle ?)))))) diff --git a/examples/netstack/monolithic-lowlevel/ip.rkt b/examples/netstack/monolithic-lowlevel/ip.rkt new file mode 100644 index 0000000..6f2e0b3 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/ip.rkt @@ -0,0 +1,327 @@ +#lang racket/base + +(provide (struct-out ip-packet) + ip-address->hostname + ip-string->ip-address + apply-netmask + ip-address-in-subnet? + gestalt->local-ip-addresses + observe-local-ip-addresses-gestalt + broadcast-ip-address + spawn-ip-driver) + +(require racket/set) +(require racket/match) +(require (only-in racket/string string-split)) +(require syndicate/monolithic) +(require syndicate/drivers/timer) +(require syndicate/demand-matcher) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "configuration.rkt") +(require "checksum.rkt") +(require "ethernet.rkt") +(require "arp.rkt") +(require "on-claim.rkt") + +(struct ip-packet (source-interface ;; string for an ethernet interface, or #f for local interfaces + source + destination + protocol + options + body) + #:prefab) ;; TODO: more fields + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (ip-address->hostname bs) + (bit-string-case bs + ([ a b c d ] (format "~a.~a.~a.~a" a b c d)))) + +(define (ip-string->ip-address str) + (list->bytes (map string->number (string-split str ".")))) + +(define (apply-netmask addr netmask) + (bit-string-case addr + ([ (n :: integer bytes 4) ] + (bit-string ((bitwise-and n (arithmetic-shift #x-100000000 (- netmask))) + :: integer bytes 4))))) + +(define (ip-address-in-subnet? addr network netmask) + (equal? (apply-netmask network netmask) + (apply-netmask addr netmask))) + +(define broadcast-ip-address (bytes 255 255 255 255)) + +(define local-ip-address-projector (host-route (?!) ? ?)) +(define (gestalt->local-ip-addresses g) (trie-project/set/single g local-ip-address-projector)) +(define observe-local-ip-addresses-gestalt (subscription (host-route ? ? ?))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (spawn-ip-driver) + (list + (spawn-demand-matcher (host-route (?!) (?!) (?!)) + (route-up (host-route (?!) (?!) (?!))) + spawn-host-route) + (spawn-demand-matcher (gateway-route (?!) (?!) (?!) (?!)) + (route-up (gateway-route (?!) (?!) (?!) (?!))) + spawn-gateway-route) + (spawn-demand-matcher (net-route (?!) (?!) (?!)) + (route-up (net-route (?!) (?!) (?!))) + spawn-net-route))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Local IP route + +(define (spawn-host-route my-address netmask interface-name) + (list + (let ((network-addr (apply-netmask my-address netmask))) + (spawn-normal-ip-route (host-route my-address netmask interface-name) + network-addr + netmask + interface-name)) + (spawn (lambda (e s) + (match e + [(scn (? trie-empty?)) (quit)] + [(message (ip-packet _ peer-address _ _ _ body)) + (bit-string-case body + ([ type code (checksum :: integer bytes 2) (rest :: binary) ] ;; TODO: check cksum + (case type + [(8) ;; ECHO (0 is ECHO-REPLY) + (log-info "Ping of ~a from ~a" + (pretty-bytes my-address) + (pretty-bytes peer-address)) + (define reply-data0 (bit-string 0 + code + (0 :: integer bytes 2) ;; TODO + (rest :: binary))) + (transition s (message (ip-packet #f + my-address + peer-address + PROTOCOL-ICMP + #"" + (ip-checksum 2 reply-data0))))] + [else + (log-info "ICMP ~a/~a (cksum ~a) to ~a from ~a:\n~a" + type + code + checksum + (pretty-bytes my-address) + (pretty-bytes peer-address) + (dump-bytes->string rest)) + #f])) + (else #f))] + [_ #f])) + (void) + (scn/union (advertisement (ip-packet ? my-address ? PROTOCOL-ICMP ? ?)) + (subscription (ip-packet ? ? my-address PROTOCOL-ICMP ? ?)) + (assertion (arp-assertion IPv4-ethertype my-address interface-name)) + (subscription (host-route my-address netmask interface-name)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Gateway IP route + +(struct gateway-route-state (routes gateway-interface gateway-hwaddr) #:transparent) + +(define (spawn-gateway-route network netmask gateway-addr interface-name) + (define the-route (gateway-route network netmask gateway-addr interface-name)) + + (define host-route-projector (host-route (?!) (?!) ?)) + (define gateway-route-projector (gateway-route (?!) (?!) ? ?)) + (define net-route-projector (net-route (?!) (?!) ?)) + (define gateway-arp-projector (arp-query IPv4-ethertype + gateway-addr + (?! (ethernet-interface interface-name ?)) + (?!))) + + (define (covered-by-some-other-route? addr routes) + (for/or ([r (in-set routes)]) + (match-define (list net msk) r) + (and (positive? msk) + (ip-address-in-subnet? addr net msk)))) + + (spawn (lambda (e s) + (match e + [(scn g) + (define host-ips+netmasks (trie-project/set #:take 2 g host-route-projector)) + (define gw-nets+netmasks (trie-project/set #:take 2 g gateway-route-projector)) + (define net-nets+netmasks (trie-project/set #:take 2 g net-route-projector)) + (define gw-ip+hwaddr + (let ((vs (trie-project/set #:take 2 g gateway-arp-projector))) + (and vs (not (set-empty? vs)) (set-first vs)))) + (when (and gw-ip+hwaddr (not (gateway-route-state-gateway-hwaddr s))) + (log-info "Discovered gateway ~a at ~a on interface ~a." + (ip-address->hostname gateway-addr) + (ethernet-interface-name (car gw-ip+hwaddr)) + (pretty-bytes (cadr gw-ip+hwaddr)))) + (if (trie-empty? (project-assertions g (?! the-route))) + (quit) + (transition (gateway-route-state + (set-union host-ips+netmasks + gw-nets+netmasks + net-nets+netmasks) + (and gw-ip+hwaddr (car gw-ip+hwaddr)) + (and gw-ip+hwaddr (cadr gw-ip+hwaddr))) + '()))] + [(message (? ip-packet? p)) + (define gw-if (gateway-route-state-gateway-interface s)) + (when (not gw-if) + (log-warning "Gateway hwaddr for ~a not known, packet dropped." + (ip-address->hostname gateway-addr))) + (and gw-if + (not (equal? (ip-packet-source-interface p) (ethernet-interface-name gw-if))) + (not (covered-by-some-other-route? (ip-packet-destination p) + (gateway-route-state-routes s))) + (transition s + (message (ethernet-packet gw-if + #f + (ethernet-interface-hwaddr gw-if) + (gateway-route-state-gateway-hwaddr s) + IPv4-ethertype + (format-ip-packet p)))))] + [_ #f])) + (gateway-route-state (set) #f #f) + (scn/union (subscription the-route) + (assertion (route-up the-route)) + (subscription (ip-packet ? ? ? ? ? ?)) + observe-local-ip-addresses-gestalt + (subscription (net-route ? ? ?)) + (subscription (gateway-route ? ? ? ?)) + (subscription (projection->pattern gateway-arp-projector))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; General net route + +(define (spawn-net-route network-addr netmask link) + (spawn-normal-ip-route (net-route network-addr netmask link) network-addr netmask link)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Normal IP route + +(define (spawn-normal-ip-route the-route network netmask interface-name) + (spawn (lambda (e s) + (match e + [(scn (? trie-empty?)) (quit)] + [(message (ethernet-packet _ _ _ _ _ body)) + (define p (parse-ip-packet interface-name body)) + (and p (transition s (message p)))] + [(message (? ip-packet? p)) + (define destination (ip-packet-destination p)) + (and (not (equal? (ip-packet-source-interface p) interface-name)) + (ip-address-in-subnet? destination network netmask) + (transition + s + (lookup-arp destination + (ethernet-interface interface-name ?) + trie-empty + (lambda (interface destination-hwaddr) + (message (ethernet-packet interface + #f + (ethernet-interface-hwaddr interface) + destination-hwaddr + IPv4-ethertype + (format-ip-packet p)))))))] + [_ #f])) + (void) + (scn/union (subscription the-route) + (assertion (route-up the-route)) + (subscription (ethernet-packet-pattern interface-name #t IPv4-ethertype)) + (assertion (arp-interface interface-name)) + (subscription (ip-packet ? ? ? ? ? ?))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define IPv4-ethertype #x0800) + +(define IP-VERSION 4) +(define IP-MINIMUM-HEADER-LENGTH 5) + +(define PROTOCOL-ICMP 1) + +(define default-ttl 64) + +(define (parse-ip-packet interface-name body) + ;; (log-info "IP ~a got body ~a" (pretty-bytes my-address) (pretty-bytes body)) + (bit-string-case body + ([ (= IP-VERSION :: bits 4) + (header-length :: bits 4) + service-type + (total-length :: bits 16) + (id :: bits 16) + (flags :: bits 3) + (fragment-offset :: bits 13) + ttl + protocol + (header-checksum :: bits 16) ;; TODO: check checksum + (source-ip0 :: binary bits 32) + (destination-ip0 :: binary bits 32) + (rest :: binary) ] + (let* ((source-ip (bit-string->bytes source-ip0)) + (destination-ip (bit-string->bytes destination-ip0)) + (options-length (* 4 (- header-length IP-MINIMUM-HEADER-LENGTH))) + (data-length (- total-length (* 4 header-length)))) + (if (and (>= header-length 5) + (>= (bit-string-byte-count body) (* header-length 4))) + (bit-string-case rest + ([ (opts :: binary bytes options-length) + (data :: binary bytes data-length) + (:: binary) ] ;; Very short ethernet packets have a trailer of zeros + (ip-packet interface-name + (bit-string->bytes source-ip) + (bit-string->bytes destination-ip) + protocol + (bit-string->bytes opts) + (bit-string->bytes data)))) + #f))) + (else #f))) + +(define (format-ip-packet p) + (match-define (ip-packet _ src dst protocol options body) p) + + (define header-length ;; TODO: ensure options is a multiple of 4 bytes + (+ IP-MINIMUM-HEADER-LENGTH (quotient (bit-string-byte-count options) 4))) + + (define header0 (bit-string (IP-VERSION :: bits 4) + (header-length :: bits 4) + 0 ;; TODO: service type + ((+ (* header-length 4) (bit-string-byte-count body)) + :: bits 16) + (0 :: bits 16) ;; TODO: identifier + (0 :: bits 3) ;; TODO: flags + (0 :: bits 13) ;; TODO: fragments + default-ttl + protocol + (0 :: bits 16) + (src :: binary bits 32) + (dst :: binary bits 32) + (options :: binary))) + (define full-packet (bit-string ((ip-checksum 10 header0) :: binary) (body :: binary))) + + full-packet) + +(define (lookup-arp ipaddr query-interface-pattern base-gestalt k) + (on-claim #:name (string->symbol (format "lookup-arp:~a" (ip-address->hostname ipaddr))) + (lambda (_g arp-results) + (if (not arp-results) + (error 'ip "Someone has published a wildcard arp result") + (and (not (set-empty? arp-results)) + (match (set-first arp-results) + [(list interface hwaddr) + (log-info "ARP lookup yielded ~a on ~a for ~a" + (pretty-bytes hwaddr) + (ethernet-interface-name interface) + (ip-address->hostname ipaddr)) + (when (> (set-count arp-results) 1) + (log-warning "Ambiguous ARP result for ~a: ~v" + (ip-address->hostname ipaddr) + arp-results)) + (k interface hwaddr)])))) + base-gestalt + (arp-query IPv4-ethertype ipaddr (?! query-interface-pattern) (?!)) + #:timeout-msec 5000 + #:on-timeout (lambda () + (log-warning "ARP lookup of ~a failed, packet dropped" + (ip-address->hostname ipaddr)) + '()))) diff --git a/examples/netstack/monolithic-lowlevel/main.rkt b/examples/netstack/monolithic-lowlevel/main.rkt new file mode 100644 index 0000000..9d694ac --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/main.rkt @@ -0,0 +1,121 @@ +#lang syndicate/monolithic + +(require syndicate/demand-matcher) +(require syndicate/drivers/timer) +(require "demo-config.rkt") +(require "ethernet.rkt") +(require "arp.rkt") +(require "ip.rkt") +(require "tcp.rkt") +(require "udp.rkt") + +;;(log-events-and-actions? #t) + +(spawn-timer-driver) +(spawn-ethernet-driver) +(spawn-arp-driver) +(spawn-ip-driver) +(spawn-tcp-driver) +(spawn-udp-driver) +(spawn-demo-config) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(let () + (local-require racket/set racket/string) + + (define (spawn-session them us) + (define user (gensym 'user)) + (define remote-detector (at-meta (?!))) + (define peer-detector (advertise `(,(?!) says ,?))) + (define (send-to-remote fmt . vs) + (message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))) + (define (say who fmt . vs) + (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + (list (send-to-remote "Welcome, ~a.\n" user) + (spawn + (lambda (e peers) + (match e + [(message (at-meta (tcp-channel _ _ bs))) + (transition peers (message `(,user says ,(string-trim (bytes->string/utf-8 bs)))))] + [(message `(,who says ,what)) + (transition peers (say who "says: ~a" what))] + [(scn assertions) + (if (trie-empty? (trie-project assertions remote-detector)) + (quit (send-to-remote "Goodbye!\n")) + (let ((new-peers (trie-project/set/single assertions peer-detector))) + (define arrived (set-subtract new-peers peers)) + (define departed (set-subtract peers new-peers)) + (transition new-peers + (list (for/list [(who arrived)] (say who "arrived.")) + (for/list [(who departed)] (say who "departed."))))))] + [#f #f])) + (set) + (scn/union + (subscription `(,? says ,?)) ;; read actual chat messages + (subscription (advertise `(,? says ,?))) ;; observe peer presence + (advertisement `(,user says ,?)) ;; advertise our presence + (subscription (tcp-channel them us ?) #:meta-level 1) ;; read from remote client + (subscription (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client + (advertisement (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client + )))) + + (spawn-dataspace + (spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) + (observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) + #:meta-level 1 + spawn-session)) + ) + +(let () + (spawn (lambda (e s) + (match e + [(message (udp-packet src dst body)) + (log-info "Got packet from ~v: ~v" src body) + (transition s (message + (udp-packet dst + src + (string->bytes/utf-8 (format "You said: ~a" body)))))] + [_ #f])) + (void) + (scn (subscription (udp-packet ? (udp-listener 6667) ?))))) + +(let () + (define (spawn-session them us) + (list + (message 'bump) + (spawn (lambda (e s) + (match e + [(message `(counter ,counter)) + (define response + (string->bytes/utf-8 + (format (string-append + "HTTP/1.0 200 OK\r\n\r\n" + "

Hello world from syndicate-monolithic-netstack!

\n" + "

This is running on syndicate-monolithic's own\n" + "\n" + "TCP/IP stack.

\n" + "

There have been ~a requests prior to this one.

") + counter))) + (quit (message (at-meta (tcp-channel us them response))))] + [_ #f])) + (void) + (scn/union (subscription `(counter ,?)) + (subscription (tcp-channel them us ?) #:meta-level 1) + (subscription (advertise (tcp-channel them us ?)) #:meta-level 1) + (advertisement (tcp-channel us them ?) #:meta-level 1))))) + + (spawn-dataspace + (spawn (lambda (e counter) + (match e + [(message 'bump) + (transition (+ counter 1) (message `(counter ,counter)))] + [_ #f])) + 0 + (scn (subscription 'bump))) + (spawn-demand-matcher (advertise (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?)) + (observe (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?)) + #:meta-level 1 + spawn-session)) + + ) diff --git a/examples/netstack/monolithic-lowlevel/on-claim.rkt b/examples/netstack/monolithic-lowlevel/on-claim.rkt new file mode 100644 index 0000000..d517518 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/on-claim.rkt @@ -0,0 +1,47 @@ +#lang racket/base + +(provide on-claim) + +(require syndicate/monolithic) +(require syndicate/drivers/timer) + +;; (Trie (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action))) +;; Trie Projection ... +;; -> Action +;; Spawns a process that observes the given projections. Any time the +;; environment's interests change in a relevant way, calls +;; check-and-maybe-spawn-fn with the aggregate interests and the +;; projection results. If check-and-maybe-spawn-fn returns #f, +;; continues to wait; otherwise, takes the action(s) returned, and +;; quits. +(define (on-claim #:timeout-msec [timeout-msec #f] + #:on-timeout [timeout-handler (lambda () '())] + #:name [name #f] + check-and-maybe-spawn-fn + base-interests + . projections) + (define timer-id (gensym 'on-claim)) + (define (on-claim-handler e state) + (match e + [(scn new-aggregate) + (define projection-results + (map (lambda (p) (trie-project/set #:take (projection-arity p) new-aggregate p)) + projections)) + (define maybe-spawn (apply check-and-maybe-spawn-fn + new-aggregate + projection-results)) + (if maybe-spawn + (quit maybe-spawn) + #f)] + [(message (timer-expired (== timer-id) _)) + (quit (timeout-handler))] + [_ #f])) + (list + (when timeout-msec (message (set-timer timer-id timeout-msec 'relative))) + (spawn #:name name + on-claim-handler + (void) + (scn/union base-interests + (assertion-set-union* + (map (lambda (p) (subscription (projection->pattern p))) projections)) + (subscription (timer-expired timer-id ?)))))) diff --git a/examples/netstack/monolithic-lowlevel/port-allocator.rkt b/examples/netstack/monolithic-lowlevel/port-allocator.rkt new file mode 100644 index 0000000..5dc7229 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/port-allocator.rkt @@ -0,0 +1,38 @@ +#lang racket/base +;; UDP/TCP port allocator + +(provide spawn-port-allocator + (struct-out port-allocation-request)) + +(require racket/set) +(require racket/match) +(require syndicate/monolithic) +(require "ip.rkt") + +(struct port-allocation-request (type k) #:prefab) + +(struct port-allocator-state (used-ports local-ips) #:transparent) + +(define (spawn-port-allocator allocator-type observer-gestalt compute-used-ports) + (spawn #:name (string->symbol (format "port-allocator:~a" allocator-type)) + (lambda (e s) + (match e + [(scn g) + (define local-ips (or (gestalt->local-ip-addresses g) (set))) + (define new-used-ports (compute-used-ports g local-ips)) + (log-info "port-allocator ~v used ports: ~v" allocator-type new-used-ports) + (transition (port-allocator-state new-used-ports local-ips) '())] + [(message (port-allocation-request _ k)) + (define currently-used-ports (port-allocator-state-used-ports s)) + (let randomly-allocate-until-unused () + (define p (+ 1024 (random 64512))) + (if (set-member? currently-used-ports p) + (randomly-allocate-until-unused) + (transition (struct-copy port-allocator-state s + [used-ports (set-add currently-used-ports p)]) + (k p (port-allocator-state-local-ips s)))))] + [_ #f])) + (port-allocator-state (set) (set)) + (scn/union (subscription (port-allocation-request allocator-type ?)) + observe-local-ip-addresses-gestalt + observer-gestalt))) diff --git a/examples/netstack/monolithic-lowlevel/tcp.rkt b/examples/netstack/monolithic-lowlevel/tcp.rkt new file mode 100644 index 0000000..66ee0ea --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/tcp.rkt @@ -0,0 +1,665 @@ +#lang racket/base + +(provide (struct-out tcp-address) + (struct-out tcp-handle) + (struct-out tcp-listener) + (struct-out tcp-channel) + spawn-tcp-driver) + +(require racket/set) +(require racket/match) +(require syndicate/monolithic) +(require syndicate/drivers/timer) +(require syndicate/demand-matcher) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "checksum.rkt") +(require "ip.rkt") +(require "port-allocator.rkt") + +;; tcp-address/tcp-address : "kernel" tcp connection state machines +;; tcp-handle/tcp-address : "user" outbound connections +;; tcp-listener/tcp-address : "user" inbound connections + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct tcp-address (host port) #:prefab) +(struct tcp-handle (id) #:prefab) +(struct tcp-listener (port) #:prefab) + +(struct tcp-channel (source destination subpacket) #:prefab) + +(struct tcp-packet (from-wire? + source-ip + source-port + destination-ip + destination-port + sequence-number + ack-number + flags + window-size + options + data) + #:prefab) + +;; (tcp-port-allocation Number (U TcpHandle TcpListener)) +(struct tcp-port-allocation (port handle) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; User-accessible driver startup + +(define (spawn-tcp-driver) + (list (spawn-demand-matcher #:name 'tcp-inbound-driver + (advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?))) + (advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?))) + (lambda (server-addr) + (match-define (tcp-listener port) server-addr) + ;; TODO: have listener shut down once user-level listener does + (list + (spawn #:name (string->symbol + (format "tcp-listener-port-reservation:~a" port)) + (lambda (e s) #f) + (void) + (scn (assertion (tcp-port-allocation port server-addr)))) + (spawn-demand-matcher + #:name (string->symbol (format "tcp-listener:~a" port)) + (advertise (tcp-channel (?! (tcp-address ? ?)) + (?! (tcp-address ? port)) + ?)) + (observe (tcp-channel (?! (tcp-address ? ?)) + (?! (tcp-address ? port)) + ?)) + (spawn-relay server-addr))))) + (spawn-demand-matcher #:name 'tcp-outbound-driver + (advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) + (observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) + allocate-port-and-spawn-socket) + (spawn-tcp-port-allocator) + (spawn-kernel-tcp-driver))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Port allocation + +(define (spawn-tcp-port-allocator) + (spawn-port-allocator 'tcp + (subscription (tcp-port-allocation ? ?)) + (lambda (g local-ips) + (project-assertions g (tcp-port-allocation (?!) ?))))) + +(define (allocate-port-and-spawn-socket local-addr remote-addr) + (message (port-allocation-request + 'tcp + (lambda (port local-ips) + ;; TODO: Choose a sensible IP address for the outbound + ;; connection. We don't have enough information to do this + ;; well at the moment, so just pick some available local IP + ;; address. + ;; + ;; Interesting note: In some sense, the right answer is + ;; "?". This would give us a form of mobility, where IP + ;; addresses only route to a given bucket-of-state and ONLY + ;; the port number selects a substate therein. That's not + ;; how TCP is defined however so we can't do that. + (define appropriate-ip (set-first local-ips)) + (define appropriate-host (ip-address->hostname appropriate-ip)) + (match-define (tcp-address remote-host remote-port) remote-addr) + (define remote-ip (ip-string->ip-address remote-host)) + (list + ((spawn-relay local-addr) remote-addr (tcp-address appropriate-host port)) + (spawn-state-vector remote-ip remote-port appropriate-ip port)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Relay between kernel-level and user-level + +(define relay-peer-wait-time-msec 5000) + +(define ((spawn-relay local-user-addr) remote-addr local-tcp-addr) + (define timer-name (list 'spawn-relay local-tcp-addr remote-addr)) + (define local-peer-traffic (?! (observe (tcp-channel remote-addr local-user-addr ?)))) + (define remote-peer-traffic (?! (advertise (tcp-channel remote-addr local-tcp-addr ?)))) + (list + (message (set-timer timer-name relay-peer-wait-time-msec 'relative)) + (spawn #:name (string->symbol (format "tcp-relay:~v:~v:~v" + local-user-addr + remote-addr + local-tcp-addr)) + (lambda (e state) + (match e + [(scn g) + (define local-peer-absent? (trie-empty? (trie-project g local-peer-traffic))) + (define remote-peer-absent? (trie-empty? (trie-project g remote-peer-traffic))) + (define new-state (+ (if local-peer-absent? 0 1) (if remote-peer-absent? 0 1))) + (if (< new-state state) + (quit) + (transition new-state '()))] + [(message (tcp-channel (== local-user-addr) (== remote-addr) bs)) + (transition state (message (tcp-channel local-tcp-addr remote-addr bs)))] + [(message (tcp-channel (== remote-addr) (== local-tcp-addr) bs)) + (transition state (message (tcp-channel remote-addr local-user-addr bs)))] + [(message (timer-expired _ _)) + #:when (< state 2) ;; we only care if we're not fully connected + (error 'spawn-relay "TCP relay process timed out waiting for peer")] + [_ #f])) + 0 + (scn/union (subscription (projection->pattern local-peer-traffic)) + (subscription (projection->pattern remote-peer-traffic)) + (assertion (tcp-port-allocation (tcp-address-port local-tcp-addr) + local-user-addr)) + (subscription (tcp-channel remote-addr local-tcp-addr ?)) + (subscription (tcp-channel local-user-addr remote-addr ?)) + (advertisement (tcp-channel remote-addr local-user-addr ?)) + (advertisement (tcp-channel local-tcp-addr remote-addr ?)) + (subscription (timer-expired timer-name ?)))))) + + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Codec & kernel-level driver + +(define PROTOCOL-TCP 6) + +(struct codec-state (local-ips active-state-vectors) #:transparent) + +(define (spawn-kernel-tcp-driver) + + (define (state-vector-active? statevec s) + (set-member? (codec-state-active-state-vectors s) statevec)) + + (define (analyze-incoming-packet src-ip dst-ip body s) + (bit-string-case body + ([ (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (sequence-number :: integer bytes 4) + (ack-number :: integer bytes 4) + (data-offset :: integer bits 4) + (reserved :: integer bits 3) + (ns :: integer bits 1) + (cwr :: integer bits 1) + (ece :: integer bits 1) + (urg :: integer bits 1) + (ack :: integer bits 1) + (psh :: integer bits 1) + (rst :: integer bits 1) + (syn :: integer bits 1) + (fin :: integer bits 1) + (window-size :: integer bytes 2) + (checksum :: integer bytes 2) ;; TODO: check checksum + (urgent-pointer :: integer bytes 2) + (rest :: binary) ] + (let* ((flags (set)) + (statevec (list src-ip src-port dst-ip dst-port)) + (old-active-state-vectors (codec-state-active-state-vectors s)) + (spawn-needed? (and (not (state-vector-active? statevec s)) + (zero? rst)))) ;; don't bother spawning if it's a rst + (define-syntax-rule (set-flags! v ...) + (begin (unless (zero? v) (set! flags (set-add flags 'v))) ...)) + (set-flags! ns cwr ece urg ack psh rst syn fin) + (log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)" + (ip-address->hostname src-ip) + src-port + (ip-address->hostname dst-ip) + dst-port + sequence-number + ack-number + flags + window-size) + (when spawn-needed? (log-info " - spawn needed!")) + (bit-string-case rest + ([ (opts :: binary bytes (- (* data-offset 4) 20)) + (data :: binary) ] + (let ((packet (tcp-packet #t + src-ip + src-port + dst-ip + dst-port + sequence-number + ack-number + flags + window-size + (bit-string->bytes opts) + (bit-string->bytes data)))) + (transition (if spawn-needed? + (struct-copy codec-state s + [active-state-vectors + (set-add old-active-state-vectors statevec)]) + s) + (list + (when spawn-needed? (spawn-state-vector src-ip src-port + dst-ip dst-port)) + ;; TODO: get packet to the new state-vector process somehow + (message packet))))) + (else #f)))) + (else #f))) + + (define statevec-projection (observe (tcp-packet ? (?!) (?!) (?!) (?!) ? ? ? ? ? ?))) + + (define (analyze-gestalt g s) + (define local-ips (gestalt->local-ip-addresses g)) + (define statevecs (trie-project/set #:take 4 g statevec-projection)) + (log-info "gestalt yielded statevecs ~v and local-ips ~v" statevecs local-ips) + (transition (struct-copy codec-state s + [local-ips local-ips] + [active-state-vectors statevecs]) '())) + + (define (deliver-outbound-packet p s) + (match-define (tcp-packet #f + src-ip + src-port + dst-ip + dst-port + sequence-number + ack-number + flags + window-size + options + data) + p) + (log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)" + (ip-address->hostname src-ip) + src-port + (ip-address->hostname dst-ip) + dst-port + sequence-number + ack-number + flags + window-size) + (define (flag-bit sym) (if (set-member? flags sym) 1 0)) + (define payload (bit-string (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (sequence-number :: integer bytes 4) + (ack-number :: integer bytes 4) + ((+ 5 (quotient (bit-string-byte-count options) 4)) + :: integer bits 4) ;; TODO: enforce 4-byte alignment + (0 :: integer bits 3) + ((flag-bit 'ns) :: integer bits 1) + ((flag-bit 'cwr) :: integer bits 1) + ((flag-bit 'ece) :: integer bits 1) + ((flag-bit 'urg) :: integer bits 1) + ((flag-bit 'ack) :: integer bits 1) + ((flag-bit 'psh) :: integer bits 1) + ((flag-bit 'rst) :: integer bits 1) + ((flag-bit 'syn) :: integer bits 1) + ((flag-bit 'fin) :: integer bits 1) + (window-size :: integer bytes 2) + (0 :: integer bytes 2) ;; checksum location + (0 :: integer bytes 2) ;; TODO: urgent pointer + (data :: binary))) + (define pseudo-header (bit-string (src-ip :: binary bytes 4) + (dst-ip :: binary bytes 4) + 0 + PROTOCOL-TCP + ((bit-string-byte-count payload) :: integer bytes 2))) + (transition s (message (ip-packet #f src-ip dst-ip PROTOCOL-TCP #"" + (ip-checksum 16 payload #:pseudo-header pseudo-header))))) + + (spawn #:name 'kernel-tcp-driver + (lambda (e s) + (match e + [(scn g) + (analyze-gestalt g s)] + [(message (ip-packet source-if src dst _ _ body)) + #:when (and source-if ;; source-if == #f iff packet originates locally + (set-member? (codec-state-local-ips s) dst)) + (analyze-incoming-packet src dst body s)] + [(message (? tcp-packet? p)) + #:when (not (tcp-packet-from-wire? p)) + (deliver-outbound-packet p s)] + [_ #f])) + (codec-state (set) (set)) + (scn/union (subscription (ip-packet ? ? ? PROTOCOL-TCP ? ?)) + (subscription (tcp-packet #f ? ? ? ? ? ? ? ? ? ?)) + (subscription (observe (tcp-packet #t ? ? ? ? ? ? ? ? ? ?))) + observe-local-ip-addresses-gestalt))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Per-connection state vector process + +(struct buffer (data ;; bit-string + seqn ;; names leftmost byte in data + window ;; counts bytes from leftmost byte in data + finished?) ;; boolean: true after FIN + #:transparent) + +(struct conn-state (outbound ;; buffer + inbound ;; buffer + syn-acked? ;; boolean + latest-peer-activity-time ;; from current-inexact-milliseconds + ;; ^ the most recent time we heard from our peer + user-timeout-base-time ;; from current-inexact-milliseconds + ;; ^ when the index of the first outbound unacknowledged byte changed + local-peer-seen? ;; boolean + listener-listening?) ;; boolean + #:transparent) + +(define transmit-check-interval-msec 2000) +(define inbound-buffer-limit 65535) +(define maximum-segment-size 536) ;; bytes +(define maximum-segment-lifetime-sec (* 2 60)) ;; two minutes; 2MSL is TIME-WAIT timeout +(define user-timeout-msec (* 5 60 1000)) ;; per RFC 793, this should be per-connection, but I + ;; cheat; RFC 793 says "the present global default is five minutes", which is + ;; reasonable to be getting on with + +(define (spawn-state-vector src-ip src-port dst-ip dst-port) + (define src (tcp-address (ip-address->hostname src-ip) src-port)) + (define dst (tcp-address (ip-address->hostname dst-ip) dst-port)) + (define (timer-name kind) (list 'tcp-timer kind src dst)) + + (define (next-expected-seqn s) + (define b (conn-state-inbound s)) + (define v (buffer-seqn b)) + (and v (seq+ v (bit-string-byte-count (buffer-data b))))) + + (define (buffer-push b data) + (struct-copy buffer b [data (bit-string-append (buffer-data b) data)])) + + ;; ConnState -> ConnState + (define (set-inbound-seqn seqn s) + (struct-copy conn-state s + [inbound (struct-copy buffer (conn-state-inbound s) [seqn seqn])])) + + ;; Bitstring ConnState -> Transition + (define (incorporate-segment data s) + ;; (log-info "GOT INBOUND STUFF TO DELIVER ~v" data) + (transition + (if (buffer-finished? (conn-state-inbound s)) + s + (struct-copy conn-state s [inbound (buffer-push (conn-state-inbound s) data)])) + '())) + + (define (seq+ a b) (bitwise-and #xffffffff (+ a b))) + + ;; Always positive + (define (seq- larger smaller) + (if (< larger smaller) ;; wraparound has occurred + (+ (- larger smaller) #x100000000) + (- larger smaller))) + + (define (seq> a b) + (< (seq- a b) #x80000000)) + + (define local-peer-detector (?! (observe (tcp-channel src dst ?)))) + (define listener-detector (?! (observe (advertise (tcp-channel ? (tcp-listener dst-port) ?))))) + + ;; ConnState -> Gestalt + (define (compute-gestalt s) + (define worldward-facing-gestalt + (subscription (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?))) + (define appward-facing-gestalt + (assertion-set-union + (subscription (projection->pattern local-peer-detector)) + (subscription (projection->pattern listener-detector)) + (subscription (tcp-channel dst src ?)) + (if (and (conn-state-syn-acked? s) + (not (buffer-finished? (conn-state-inbound s)))) + (advertisement (tcp-channel src dst ?)) + trie-empty))) + (assertion-set-union (subscription (timer-expired (timer-name ?) ?)) + worldward-facing-gestalt + appward-facing-gestalt)) + + ;; ConnState -> Transition + (define (deliver-inbound-locally s) + (define b (conn-state-inbound s)) + (if (bit-string-empty? (buffer-data b)) + (transition s '()) + (let ((chunk (bit-string->bytes (buffer-data b)))) + (transition (struct-copy conn-state s + [inbound (struct-copy buffer b + [data #""] + [seqn (seq+ (buffer-seqn b) (bytes-length chunk))])]) + (message (tcp-channel src dst chunk)))))) + + ;; (Setof Symbol) -> ConnState -> Transition + (define ((check-fin flags) s) + (define b (conn-state-inbound s)) + (unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally + (error 'check-fin "Nonempty inbound buffer")) + (if (set-member? flags 'fin) + (let ((new-s (struct-copy conn-state s + [inbound (struct-copy buffer b + [seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte + [finished? #t])]))) + (log-info "Closing inbound stream.") + (transition new-s (scn (compute-gestalt new-s)))) + (transition s '()))) + + ;; Boolean SeqNum -> ConnState -> Transition + (define ((discard-acknowledged-outbound ack? ackn) s) + (if (not ack?) + (transition s '()) + (let* ((b (conn-state-outbound s)) + (base (buffer-seqn b)) + (limit (seq+ (buffer-seqn b) (bit-string-byte-count (buffer-data b)))) + (ackn (if (seq> ackn limit) limit ackn)) + (ackn (if (seq> base ackn) base ackn)) + (dist (seq- ackn base))) + (define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset! + (define new-s (struct-copy conn-state s + [user-timeout-base-time (current-inexact-milliseconds)] + [outbound (struct-copy buffer b [data remaining-data] [seqn ackn])] + [syn-acked? (or (conn-state-syn-acked? s) + (positive? dist))])) + (transition new-s + (when (and (not (conn-state-syn-acked? s)) (positive? dist)) + (scn (compute-gestalt new-s))))))) + + ;; Nat -> ConnState -> Transition + (define ((update-outbound-window peer-window) s) + (transition (struct-copy conn-state s + [outbound (struct-copy buffer (conn-state-outbound s) + [window peer-window])]) + '())) + + ;; ConnState -> Boolean + (define (all-output-acknowledged? s) + (bit-string-empty? (buffer-data (conn-state-outbound s)))) + + ;; (Option SeqNum) -> ConnState -> Transition + (define ((send-outbound old-ackn) s) + (define b (conn-state-outbound s)) + (define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b)) + (if (buffer-finished? b) 1 0)))) + + (define segment-size (min maximum-segment-size + (if (conn-state-syn-acked? s) (buffer-window b) 1) + ;; ^ can only send SYN until SYN is acked + pending-byte-count)) + (define segment-offset (if (conn-state-syn-acked? s) 0 1)) + (define chunk0 (bit-string-take (buffer-data b) (* segment-size 8))) ;; bit offset! + (define chunk (bit-string-drop chunk0 (* segment-offset 8))) ;; bit offset! + (define ackn (next-expected-seqn s)) + (define flags (set)) + (when ackn + (set! flags (set-add flags 'ack))) + (when (not (conn-state-syn-acked? s)) + (set! flags (set-add flags 'syn))) + (when (and (buffer-finished? b) + (conn-state-syn-acked? s) + (= segment-size pending-byte-count) + (not (all-output-acknowledged? s))) ;; TODO: reexamine. This looks fishy + (set! flags (set-add flags 'fin))) + (define window (min 65535 ;; limit of field width + (max 0 ;; can't be negative + (- (buffer-window (conn-state-inbound s)) + (bit-string-byte-count + (buffer-data (conn-state-inbound s))))))) + (transition s + (unless (and (equal? ackn old-ackn) + (conn-state-syn-acked? s) + (not (set-member? flags 'fin)) + (zero? (bit-string-byte-count chunk))) + (local-require racket/pretty) + (pretty-write `(send-outbound (old-ackn ,old-ackn) + (s ,s) + (flags ,flags))) + (flush-output) + (message (tcp-packet #f dst-ip dst-port src-ip src-port + (buffer-seqn b) + (or ackn 0) + flags + window + #"" + chunk))))) + + ;; ConnState -> Transition + (define (bump-peer-activity-time s) + (transition (struct-copy conn-state s + [latest-peer-activity-time (current-inexact-milliseconds)]) + '())) + + ;; ConnState Number -> Boolean + (define (heard-from-peer-within-msec? s msec) + (<= (- (current-inexact-milliseconds) (conn-state-latest-peer-activity-time s)) msec)) + + ;; ConnState -> Boolean + (define (user-timeout-expired? s) + (and (not (all-output-acknowledged? s)) + (> (- (current-inexact-milliseconds) (conn-state-user-timeout-base-time s)) + user-timeout-msec))) + + ;; ConnState -> Transition + (define (quit-when-done s) + (cond + [(and (buffer-finished? (conn-state-outbound s)) + (buffer-finished? (conn-state-inbound s)) + (all-output-acknowledged? s) + (not (heard-from-peer-within-msec? s (* 2 1000 maximum-segment-lifetime-sec)))) + ;; Everything is cleanly shut down, and we just need to wait a while for unexpected + ;; packets before we release the state vector. + (quit)] + [(user-timeout-expired? s) + ;; We've been plaintively retransmitting for user-timeout-msec without hearing anything + ;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but + ;; it will do for now? TODO + (log-info "TCP_USER_TIMEOUT fired.") + (quit)] + [else #f])) + + ;; Action + (define send-set-transmit-check-timer + (message (set-timer (timer-name 'transmit-check) + transmit-check-interval-msec + 'relative))) + + ;; SeqNum SeqNum ConnState -> Transition + (define (reset seqn ackn s) + (log-warning "Sending RST from ~a:~a to ~a:~a" + (ip-address->hostname dst-ip) + dst-port + (ip-address->hostname src-ip) + src-port) + (quit (message (tcp-packet #f dst-ip dst-port src-ip src-port + seqn + ackn + (set 'ack 'rst) + 0 + #"" + #"")))) + + ;; ConnState -> Transition + (define (close-outbound-stream s) + (define b (conn-state-outbound s)) + (transition + (if (buffer-finished? b) + s + (struct-copy conn-state s + [outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte + [finished? #t])])) + '())) + + (define (state-vector-behavior e s) + (define old-ackn (buffer-seqn (conn-state-inbound s))) + (match e + [(scn g) + (log-info "State vector routing-update:\n~a" (trie->pretty-string g)) + (define local-peer-present? (trie-non-empty? (trie-project g local-peer-detector))) + (define listening? (trie-non-empty? (trie-project g listener-detector))) + (define new-s (struct-copy conn-state s [listener-listening? listening?])) + (cond + [(and local-peer-present? (not (conn-state-local-peer-seen? s))) + (transition (struct-copy conn-state new-s [local-peer-seen? #t]) '())] + [(and (not local-peer-present?) (conn-state-local-peer-seen? s)) + (log-info "Closing outbound stream.") + (sequence-transitions (close-outbound-stream new-s) + (send-outbound old-ackn) + quit-when-done)] + [else (transition new-s '())])] + [(message (tcp-packet #t _ _ _ _ seqn ackn flags window options data)) + (define expected (next-expected-seqn s)) + (define is-syn? (set-member? flags 'syn)) + (define is-fin? (set-member? flags 'fin)) + (cond + [(set-member? flags 'rst) (quit)] + [(and (not expected) ;; no syn yet + (or (not is-syn?) ;; and this isn't it + (and (not (conn-state-listener-listening? s)) ;; or it is, but no listener... + (not (conn-state-local-peer-seen? s))))) ;; ...and no outbound client + (reset ackn ;; this is *our* seqn + (seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0))) + ;; ^^ this is what we should acknowledge... + s)] + [else + (sequence-transitions (cond + [(not expected) ;; haven't seen syn yet, but we know this is it + (incorporate-segment data (set-inbound-seqn (seq+ seqn 1) s))] + [(= expected seqn) + (incorporate-segment data s)] + [else + (transition s '())]) + deliver-inbound-locally + (check-fin flags) + (discard-acknowledged-outbound (set-member? flags 'ack) ackn) + (update-outbound-window window) + (send-outbound old-ackn) + bump-peer-activity-time + quit-when-done)])] + [(message (tcp-channel _ _ bs)) + ;; (log-info "GOT MORE STUFF TO DELIVER ~v" bs) + (sequence-transitions (transition (struct-copy conn-state s + [user-timeout-base-time + ;; Only move user-timeout-base-time if there wasn't + ;; already some outstanding output. + (if (all-output-acknowledged? s) + (current-inexact-milliseconds) + (conn-state-user-timeout-base-time s))] + [outbound (buffer-push (conn-state-outbound s) bs)]) + '()) + (send-outbound old-ackn) + quit-when-done)] + [(message (timer-expired (== (timer-name 'transmit-check)) _)) + ;; TODO: I am abusing this timer for multiple tasks. Notably, this is a (crude) means of + ;; retransmitting outbound data as well as a means of checking for an expired + ;; TCP_USER_TIMEOUT. A better design would have separate timers and a more fine-grained + ;; approach. + (sequence-transitions (transition s send-set-transmit-check-timer) + (send-outbound old-ackn) + quit-when-done)] + [_ #f])) + + ;; (local-require racket/trace) + ;; (trace state-vector-behavior) + + (define initial-outbound-seqn + ;; Yuck + (inexact->exact (truncate (* #x100000000 (random))))) + + ;; TODO accept input from user process + (list + send-set-transmit-check-timer + (let ((state0 (conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position + (buffer #"" #f inbound-buffer-limit #f) + #f + (current-inexact-milliseconds) + (current-inexact-milliseconds) + #f + #f))) + (spawn #:name + (string->symbol (format "tcp-state-vector:~a:~a:~a:~a" + (ip-address->hostname src-ip) + src-port + (ip-address->hostname dst-ip) + dst-port)) + state-vector-behavior + state0 + (scn (compute-gestalt state0)))))) diff --git a/examples/netstack/monolithic-lowlevel/udp.rkt b/examples/netstack/monolithic-lowlevel/udp.rkt new file mode 100644 index 0000000..7cc2726 --- /dev/null +++ b/examples/netstack/monolithic-lowlevel/udp.rkt @@ -0,0 +1,175 @@ +#lang racket/base + +(provide (struct-out udp-remote-address) + (struct-out udp-handle) + (struct-out udp-listener) + udp-address? + udp-local-address? + (struct-out udp-packet) + spawn-udp-driver) + +(require racket/set) +(require racket/match) +(require syndicate/monolithic) +(require syndicate/demand-matcher) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "checksum.rkt") +(require "ip.rkt") +(require "port-allocator.rkt") + +;; udp-address/udp-address : "kernel" udp connection state machines +;; udp-handle/udp-address : "user" outbound connections +;; udp-listener/udp-address : "user" inbound connections + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct udp-remote-address (host port) #:prefab) +(struct udp-handle (id) #:prefab) +(struct udp-listener (port) #:prefab) + +(define (udp-address? x) + (or (udp-remote-address? x) + (udp-local-address? x))) + +(define (udp-local-address? x) + (or (udp-handle? x) + (udp-listener? x))) + +;; USER-level protocol +(struct udp-packet (source destination body) #:prefab) + +;; KERNEL-level protocol +(struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab) +(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress) + +(define any-remote (udp-remote-address ? ?)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; User-accessible driver startup + +(define (spawn-udp-driver) + (list + (spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?)) + (advertise (udp-packet ? (?! (udp-listener ?)) ?)) + (lambda (handle) (spawn-udp-relay (udp-listener-port handle) handle))) + (spawn-demand-matcher (observe (udp-packet ? (?! (udp-handle ?)) ?)) + (advertise (udp-packet ? (?! (udp-handle ?)) ?)) + (lambda (handle) + (message (port-allocation-request + 'udp + (lambda (port local-ips) (spawn-udp-relay port handle)))))) + (spawn-udp-port-allocator) + (spawn-kernel-udp-driver))) + +(define (spawn-udp-port-allocator) + (define udp-projector (udp-port-allocation (?!) ?)) + (spawn-port-allocator 'udp + (subscription (projection->pattern udp-projector)) + (lambda (g local-ips) + (project-assertions g udp-projector)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Relaying + +(define (spawn-udp-relay local-port local-user-addr) + (log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr) + + (define local-peer-detector (?! (observe (udp-packet any-remote local-user-addr ?)))) + + (define (compute-gestalt local-ips) + (for/fold [(g (assertion-set-union + (subscription (projection->pattern local-peer-detector)) + (advertisement (udp-packet any-remote local-user-addr ?)) + observe-local-ip-addresses-gestalt + (subscription (udp-packet local-user-addr any-remote ?)) + (assertion (udp-port-allocation local-port local-user-addr))))] + [(ip (in-set local-ips))] + (assertion-set-union g + (subscription (udp-datagram ? ? ip local-port ?)) + (advertisement (udp-datagram ip local-port ? ? ?))))) + + (spawn (lambda (e local-ips) + (match e + [(scn g) + (define new-local-ips (gestalt->local-ip-addresses g)) + (if (trie-empty? (trie-project g local-peer-detector)) + (quit) + (transition new-local-ips (scn (compute-gestalt new-local-ips))))] + [(message (udp-packet (== local-user-addr) remote-addr bs)) + ;; Choose arbitrary local IP address for outbound packet! + ;; TODO: what can be done? Must I examine the routing table? + (match-define (udp-remote-address remote-host remote-port) remote-addr) + (define remote-ip (ip-string->ip-address remote-host)) + (transition local-ips (message (udp-datagram (set-first local-ips) + local-port + remote-ip + remote-port + bs)))] + [(message (udp-datagram si sp _ _ bs)) + (transition local-ips + (message (udp-packet (udp-remote-address (ip-address->hostname si) sp) + local-user-addr + bs)))] + [_ #f])) + (set) + (scn (compute-gestalt (set))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Codec & kernel-level driver + +(define PROTOCOL-UDP 17) + +(define (spawn-kernel-udp-driver) + (spawn (lambda (e local-ips) + (match e + [(scn g) + (transition (gestalt->local-ip-addresses g) '())] + [(message (ip-packet source-if src-ip dst-ip _ _ body)) + #:when (and source-if (set-member? local-ips dst-ip)) + (bit-string-case body + ([ (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (length :: integer bytes 2) + (checksum :: integer bytes 2) ;; TODO: check checksum + (data :: binary) ] + (bit-string-case data + ([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes + (:: binary) ] + (transition local-ips (message (udp-datagram src-ip + src-port + dst-ip + dst-port + (bit-string->bytes payload))))) + (else #f))) + (else #f))] + [(message (udp-datagram src-ip src-port dst-ip dst-port bs)) + #:when (set-member? local-ips src-ip) + (let* ((payload (bit-string (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + ((+ 8 (bit-string-byte-count bs)) + :: integer bytes 2) + (0 :: integer bytes 2) ;; checksum location + (bs :: binary))) + (pseudo-header (bit-string (src-ip :: binary bytes 4) + (dst-ip :: binary bytes 4) + 0 + PROTOCOL-UDP + ((bit-string-byte-count payload) + :: integer bytes 2))) + (checksummed-payload (ip-checksum #:pseudo-header pseudo-header + 6 payload))) + (transition local-ips (message (ip-packet #f + src-ip + dst-ip + PROTOCOL-UDP + #"" + checksummed-payload))))] + [_ #f])) + (set) + (scn/union (advertisement (ip-packet #f ? ? PROTOCOL-UDP ? ?)) + (subscription (ip-packet ? ? ? PROTOCOL-UDP ? ?)) + (subscription (udp-datagram ? ? ? ? ?)) + observe-local-ip-addresses-gestalt)))