diff --git a/examples/netstack/arp.rkt b/examples/netstack/arp.rkt deleted file mode 100644 index 97cf773..0000000 --- a/examples/netstack/arp.rkt +++ /dev/null @@ -1,235 +0,0 @@ -#lang racket/base -;; ARP protocol, http://tools.ietf.org/html/rfc826 -;; Only does ARP-over-ethernet. - -(provide (struct-out arp-query) - (struct-out arp-assertion) - (struct-out arp-interface) - spawn-arp-driver) - -(require racket/set) -(require racket/match) -(require syndicate/monolithic) -(require syndicate/drivers/timer) -(require syndicate/demand-matcher) -(require bitsyntax) - -(require "dump-bytes.rkt") -(require "configuration.rkt") -(require "ethernet.rkt") - -(struct arp-query (protocol protocol-address interface link-address) #:prefab) -(struct arp-assertion (protocol protocol-address interface-name) #:prefab) -(struct arp-interface (interface-name) #:prefab) - -(struct arp-interface-up (interface-name) #:prefab) - -(define ARP-ethertype #x0806) -(define cache-entry-lifetime-msec (* 14400 1000)) -(define wakeup-interval 5000) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(define (spawn-arp-driver) - (spawn-demand-matcher (arp-interface (?!)) - (arp-interface-up (?!)) - spawn-arp-interface)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(struct cache-key (protocol address) #:transparent) -(struct cache-value (expiry interface address) #:transparent) - -(struct state (cache queries assertions) #:transparent) - -(define (spawn-arp-interface interface-name) - (log-info "spawn-arp-interface ~v" interface-name) - (lookup-ethernet-hwaddr (assertion (arp-interface-up interface-name)) - interface-name - (lambda (hwaddr) (spawn-arp-interface* interface-name hwaddr)))) - -(define (spawn-arp-interface* interface-name hwaddr) - (log-info "spawn-arp-interface* ~v ~v" interface-name hwaddr) - (define interface (ethernet-interface interface-name hwaddr)) - - (define (expire-cache cache) - (define now (current-inexact-milliseconds)) - (define (not-expired? v) (< now (cache-value-expiry v))) - (for/hash [((k v) (in-hash cache)) #:when (not-expired? v)] - (values k v))) - - (define timer-key (list 'arp interface-name)) - - (define (set-wakeup-alarm) - (message (set-timer timer-key wakeup-interval 'relative))) - - (define (compute-gestalt cache) - (scn/union (subscription (timer-expired timer-key ?)) - (subscription interface) - (subscription (ethernet-packet-pattern interface-name #t ARP-ethertype)) - (assertion (arp-interface-up interface-name)) - (subscription (arp-assertion ? ? interface-name)) - (subscription (observe (arp-query ? ? interface ?))) - (for/fold [(g trie-empty)] [((k v) (in-hash cache))] - (assertion-set-union g (assertion (arp-query (cache-key-protocol k) - (cache-key-address k) - (cache-value-interface v) - (cache-value-address v))))))) - - (define (build-packet dest-mac ptype oper sender-ha sender-pa target-ha target-pa) - (define hlen (bytes-length target-ha)) - (define plen (bytes-length target-pa)) - (define packet (bit-string->bytes - (bit-string (1 :: integer bytes 2) - (ptype :: integer bytes 2) - hlen - plen - (oper :: integer bytes 2) - (sender-ha :: binary bytes hlen) - (sender-pa :: binary bytes plen) - (target-ha :: binary bytes hlen) - (target-pa :: binary bytes plen)))) - (ethernet-packet interface - #f - hwaddr - dest-mac - ARP-ethertype - packet)) - - (define (analyze-incoming-packet source destination body s) - (bit-string-case body - ([ (= 1 :: integer bytes 2) - (ptype :: integer bytes 2) - hlen - plen - (oper :: integer bytes 2) - (sender-hardware-address0 :: binary bytes hlen) - (sender-protocol-address0 :: binary bytes plen) - (target-hardware-address0 :: binary bytes hlen) - (target-protocol-address0 :: binary bytes plen) - (:: binary) ;; The extra zeros exist because ethernet packets - ;; have a minimum size. This is, in part, why - ;; IPv4 headers have a total-length field, so - ;; that the zero padding can be removed. - ] - (let () - (define sender-protocol-address (bit-string->bytes sender-protocol-address0)) - (define sender-hardware-address (bit-string->bytes sender-hardware-address0)) - (define target-protocol-address (bit-string->bytes target-protocol-address0)) - (define learned-key (cache-key ptype sender-protocol-address)) - (when (and (set-member? (state-queries s) learned-key) ;; it is relevant to our interests - (not (equal? sender-hardware-address - (cache-value-address (hash-ref (state-cache s) - learned-key - (lambda () - (cache-value #f #f #f))))))) - (log-info "~a ARP Adding ~a = ~a to cache" - interface-name - (pretty-bytes sender-protocol-address) - (pretty-bytes sender-hardware-address))) - (define cache (hash-set (expire-cache (state-cache s)) - learned-key - (cache-value (+ (current-inexact-milliseconds) - cache-entry-lifetime-msec) - interface - sender-hardware-address))) - (transition (struct-copy state s [cache cache]) - (list - (case oper - [(1) ;; request - (if (set-member? (state-assertions s) - (cache-key ptype target-protocol-address)) - (begin - (log-info "~a ARP answering request for ~a/~a" - interface-name - ptype - (pretty-bytes target-protocol-address)) - (message (build-packet sender-hardware-address - ptype - 2 ;; reply - hwaddr - target-protocol-address - sender-hardware-address - sender-protocol-address))) - '())] - [(2) '()] ;; reply - [else '()]) - (compute-gestalt cache))))) - (else #f))) - - (define queries-projection (observe (arp-query (?!) (?!) ? ?))) - (define (gestalt->queries g) - (for/set [(e (in-set (trie-project/set #:take 2 g queries-projection)))] - (match-define (list ptype pa) e) - (cache-key ptype pa))) - - (define assertions-projection (arp-assertion (?!) (?!) ?)) - (define (gestalt->assertions g) - (for/set [(e (in-set (trie-project/set #:take 2 g assertions-projection)))] - (match-define (list ptype pa) e) - (cache-key ptype pa))) - - (define (analyze-gestalt g s) - (define new-assertions (gestalt->assertions g)) - (define added-assertions (set-subtract new-assertions (state-assertions s))) - (define new-s (struct-copy state s [queries (gestalt->queries g)] [assertions new-assertions])) - (if (trie-empty? (project-assertions g (arp-interface interface-name))) - (quit) - (transition new-s - (list - (for/list [(a (in-set added-assertions))] - (log-info "~a ARP Announcing ~a as ~a" - interface-name - (pretty-bytes (cache-key-address a)) - (pretty-bytes hwaddr)) - (message (build-packet broadcast-ethernet-address - (cache-key-protocol a) - 2 ;; reply -- gratuitous announcement - hwaddr - (cache-key-address a) - hwaddr - (cache-key-address a)))))))) - - (define (send-questions s) - (define unanswered-queries - (set-subtract (state-queries s) (list->set (hash-keys (state-cache s))))) - (define (some-asserted-pa ptype) - (match (filter (lambda (k) (equal? (cache-key-protocol k) ptype)) - (set->list (state-assertions s))) - ['() #f] - [(list* k _) (cache-key-address k)])) - (transition s - (for/list [(q (in-set unanswered-queries))] - (define pa (some-asserted-pa (cache-key-protocol q))) - (log-info "~a ARP Asking for ~a from ~a" - interface-name - (pretty-bytes (cache-key-address q)) - (and pa (pretty-bytes pa))) - (when pa - (message (build-packet broadcast-ethernet-address - (cache-key-protocol q) - 1 ;; request - hwaddr - pa - zero-ethernet-address - (cache-key-address q))))))) - - (list (set-wakeup-alarm) - (spawn (lambda (e s) - ;; (log-info "ARP ~a ~a: ~v // ~v" interface-name (pretty-bytes hwaddr) e s) - (match e - [(scn g) - (sequence-transitions (analyze-gestalt g s) - send-questions)] - [(message (ethernet-packet _ _ source destination _ body)) - (analyze-incoming-packet source destination body s)] - [(message (timer-expired _ _)) - (define new-s (struct-copy state s - [cache (expire-cache (state-cache s))])) - (sequence-transitions (transition new-s - (list (set-wakeup-alarm) - (compute-gestalt (state-cache new-s)))) - send-questions)] - [_ #f])) - (state (hash) (set) (set)) - (compute-gestalt (hash))))) diff --git a/examples/netstack/demo-config.rkt b/examples/netstack/demo-config.rkt deleted file mode 100644 index 329a1aa..0000000 --- a/examples/netstack/demo-config.rkt +++ /dev/null @@ -1,25 +0,0 @@ -#lang racket/base -;; Demonstration stack configuration for various hosts. - -(require racket/match) -(require syndicate/monolithic) -(require (only-in mzlib/os gethostname)) -(require "configuration.rkt") - -(provide spawn-demo-config) - -(define (spawn-demo-config) - (spawn (lambda (e s) #f) - (void) - (match (gethostname) - ["skip" - (scn/union (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "en0")) - (assertion (host-route (bytes 192 168 1 222) 24 "en0")))] - [(or "hop" "walk") - (scn/union (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "wlan0")) - (assertion (host-route (bytes 192 168 1 222) 24 "wlan0")))] - ["stockholm.ccs.neu.edu" - (scn/union (assertion (host-route (bytes 129 10 115 94) 24 "eth0")) - (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 129 10 115 1) "eth0")))] - [else - (error 'spawn-demo-config "No setup for hostname ~a" (gethostname))]))) diff --git a/examples/netstack/ethernet.rkt b/examples/netstack/ethernet.rkt deleted file mode 100644 index c5ecef9..0000000 --- a/examples/netstack/ethernet.rkt +++ /dev/null @@ -1,134 +0,0 @@ -#lang racket/base -;; Ethernet driver - -(provide (struct-out ethernet-packet) - zero-ethernet-address - broadcast-ethernet-address - interface-names - spawn-ethernet-driver - ethernet-packet-pattern - lookup-ethernet-hwaddr) - -(require racket/set) -(require racket/match) -(require racket/async-channel) - -(require syndicate/monolithic) -(require syndicate/demand-matcher) -(require "on-claim.rkt") - -(require packet-socket) -(require bitsyntax) - -(require "configuration.rkt") -(require "dump-bytes.rkt") - -(struct ethernet-packet (interface from-wire? source destination ethertype body) #:prefab) - -(define zero-ethernet-address (bytes 0 0 0 0 0 0)) -(define broadcast-ethernet-address (bytes 255 255 255 255 255 255)) - -(define interface-names (raw-interface-names)) -(log-info "Device names: ~a" interface-names) - -(define (spawn-ethernet-driver) - (spawn-demand-matcher (observe (ethernet-packet (ethernet-interface (?!) ?) #t ? ? ? ?)) - (ethernet-interface (?!) ?) - spawn-interface-tap)) - -(define (spawn-interface-tap interface-name) - (define h (raw-interface-open interface-name)) - (define interface (ethernet-interface interface-name (raw-interface-hwaddr h))) - (cond - [(not h) - (log-error "ethernet: Couldn't open interface ~v" interface-name) - '()] - [else - (log-info "Opened interface ~a, yielding handle ~v" interface-name h) - (define control-ch (make-async-channel)) - (thread (lambda () (interface-packet-read-loop interface h control-ch))) - (spawn (lambda (e h) - (match e - [(scn g) - (if (trie-empty? g) - (begin (async-channel-put control-ch 'quit) - (quit)) - (begin (async-channel-put control-ch 'unblock) - #f))] - [(message (at-meta (? ethernet-packet? p))) - ;; (log-info "Interface ~a inbound packet ~a -> ~a (type 0x~a)" - ;; (ethernet-interface-name (ethernet-packet-interface p)) - ;; (pretty-bytes (ethernet-packet-source p)) - ;; (pretty-bytes (ethernet-packet-destination p)) - ;; (number->string (ethernet-packet-ethertype p) 16)) - ;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p))) - (transition h (message p))] - [(message (? ethernet-packet? p)) - ;; (log-info "Interface ~a OUTBOUND packet ~a -> ~a (type 0x~a)" - ;; (ethernet-interface-name (ethernet-packet-interface p)) - ;; (pretty-bytes (ethernet-packet-source p)) - ;; (pretty-bytes (ethernet-packet-destination p)) - ;; (number->string (ethernet-packet-ethertype p) 16)) - ;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p))) - (raw-interface-write h (encode-ethernet-packet p)) - #f] - [_ #f])) - h - (scn/union (assertion interface) - (subscription (ethernet-packet interface #f ? ? ? ?)) - (subscription (observe (ethernet-packet interface #t ? ? ? ?))) - (subscription (ethernet-packet interface #t ? ? ? ?) #:meta-level 1)))])) - -(define (interface-packet-read-loop interface h control-ch) - (define (blocked) - (match (async-channel-get control-ch) - ['unblock (unblocked)] - ['quit (void)])) - (define (unblocked) - (match (async-channel-try-get control-ch) - ['unblock (unblocked)] - ['quit (void)] - [#f - (define p (raw-interface-read h)) - (define decoded (decode-ethernet-packet interface p)) - (when decoded (send-ground-message decoded)) - (unblocked)])) - (blocked) - (raw-interface-close h)) - -(define (decode-ethernet-packet interface p) - (bit-string-case p - ([ (destination :: binary bytes 6) - (source :: binary bytes 6) - (ethertype :: integer bytes 2) - (body :: binary) ] - (ethernet-packet interface - #t - (bit-string->bytes source) - (bit-string->bytes destination) - ethertype - (bit-string->bytes body))) - (else #f))) - -(define (encode-ethernet-packet p) - (match-define (ethernet-packet _ _ source destination ethertype body) p) - (bit-string->bytes - (bit-string (destination :: binary bytes 6) - (source :: binary bytes 6) - (ethertype :: integer bytes 2) - (body :: binary)))) - -(define (ethernet-packet-pattern interface-name from-wire? ethertype) - (ethernet-packet (ethernet-interface interface-name ?) from-wire? ? ? ethertype ?)) - -(define (lookup-ethernet-hwaddr base-interests interface-name k) - (on-claim #:timeout-msec 5000 - #:on-timeout (lambda () - (log-info "Lookup of ethernet interface ~v failed" interface-name) - '()) - (lambda (_g hwaddrss) - (and (not (set-empty? hwaddrss)) - (let ((hwaddr (car (set-first hwaddrss)))) - (k hwaddr)))) - base-interests - (ethernet-interface interface-name (?!)))) diff --git a/examples/netstack/fetchurl.rkt b/examples/netstack/fetchurl.rkt deleted file mode 100644 index a88c12f..0000000 --- a/examples/netstack/fetchurl.rkt +++ /dev/null @@ -1,48 +0,0 @@ -#lang syndicate/monolithic - -(require syndicate/drivers/timer) -(require "demo-config.rkt") -(require "ethernet.rkt") -(require "arp.rkt") -(require "ip.rkt") -(require "tcp.rkt") -(require "udp.rkt") - -;;(log-events-and-actions? #t) - -(spawn-timer-driver) -(spawn-ethernet-driver) -(spawn-arp-driver) -(spawn-ip-driver) -(spawn-tcp-driver) -(spawn-udp-driver) -(spawn-demo-config) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(let () - (define local-handle (tcp-handle 'httpclient)) - (define remote-handle (tcp-address "129.10.115.92" 80)) - - (spawn (lambda (e seen-peer?) - (match e - [(scn g) - (define peer-present? (trie-non-empty? g)) - (if (and (not peer-present?) seen-peer?) - (begin (printf "URL fetcher exiting.\n") - (quit)) - (transition (or seen-peer? peer-present?) - (message - (tcp-channel - local-handle - remote-handle - #"GET / HTTP/1.0\r\nHost: stockholm.ccs.neu.edu\r\n\r\n"))))] - [(message (tcp-channel _ _ bs)) - (printf "----------------------------------------\n~a\n" bs) - (printf "----------------------------------------\n") - #f] - [_ #f])) - #f - (scn/union (advertisement (tcp-channel local-handle remote-handle ?)) - (subscription (tcp-channel remote-handle local-handle ?)) - (subscription (advertise (tcp-channel remote-handle local-handle ?)))))) diff --git a/examples/netstack/Makefile b/examples/netstack/incremental-highlevel/Makefile similarity index 100% rename from examples/netstack/Makefile rename to examples/netstack/incremental-highlevel/Makefile diff --git a/examples/netstack/README.md b/examples/netstack/incremental-highlevel/README.md similarity index 100% rename from examples/netstack/README.md rename to examples/netstack/incremental-highlevel/README.md diff --git a/examples/netstack/TODO.md b/examples/netstack/incremental-highlevel/TODO.md similarity index 100% rename from examples/netstack/TODO.md rename to examples/netstack/incremental-highlevel/TODO.md diff --git a/examples/netstack/incremental-highlevel/arp.rkt b/examples/netstack/incremental-highlevel/arp.rkt new file mode 100644 index 0000000..030cbc4 --- /dev/null +++ b/examples/netstack/incremental-highlevel/arp.rkt @@ -0,0 +1,196 @@ +#lang syndicate/actor +;; ARP protocol, http://tools.ietf.org/html/rfc826 +;; Only does ARP-over-ethernet. + +(provide (struct-out arp-query) + (struct-out arp-assertion) + (struct-out arp-interface) + spawn-arp-driver) + +(require racket/set) +(require racket/match) +(require/activate syndicate/drivers/timer) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "configuration.rkt") +(require/activate "ethernet.rkt") + +(struct arp-query (protocol protocol-address interface link-address) #:prefab) +(struct arp-assertion (protocol protocol-address interface-name) #:prefab) +(struct arp-interface (interface-name) #:prefab) + +(struct arp-interface-up (interface-name) #:prefab) + +(define ARP-ethertype #x0806) +(define cache-entry-lifetime-msec (* 14400 1000)) +(define wakeup-interval 5000) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (spawn-arp-driver) + (actor #:name 'arp-driver + (react (during/actor (arp-interface $interface-name) + #:name (list 'arp-interface interface-name) + (assert (arp-interface-up interface-name)) + (on-start (define hwaddr (lookup-ethernet-hwaddr interface-name)) + (when (not hwaddr) + (error 'arp "Failed to look up ARP interface ~v" + interface-name)) + (react (run-arp-interface interface-name hwaddr))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(struct cache-key (protocol address) #:transparent) +(struct cache-value (expiry interface address) #:transparent) + +(define (expire-cache c) + (define now (current-inexact-milliseconds)) + (define (not-expired? v) (< now (cache-value-expiry v))) + (for/hash [((k v) (in-hash c)) #:when (not-expired? v)] + (values k v))) + +(define (run-arp-interface interface-name hwaddr) + (log-info "ARP interface ~v ~v" interface-name hwaddr) + (define interface (ethernet-interface interface-name hwaddr)) + + (define (build-packet dest-mac ptype oper sender-ha sender-pa target-ha target-pa) + (define hlen (bytes-length target-ha)) + (define plen (bytes-length target-pa)) + (define packet (bit-string->bytes + (bit-string (1 :: integer bytes 2) + (ptype :: integer bytes 2) + hlen + plen + (oper :: integer bytes 2) + (sender-ha :: binary bytes hlen) + (sender-pa :: binary bytes plen) + (target-ha :: binary bytes hlen) + (target-pa :: binary bytes plen)))) + (ethernet-packet interface + #f + hwaddr + dest-mac + ARP-ethertype + packet)) + + (define (some-asserted-pa ptype) + (match (filter (lambda (k) (equal? (cache-key-protocol k) ptype)) (set->list (assertions))) + ['() #f] + [(list* k _) (cache-key-address k)])) + + (define (send-questions!) + (for [(q (set-subtract (queries) (list->set (hash-keys (cache)))))] + (define pa (some-asserted-pa (cache-key-protocol q))) + (log-info "~a ARP Asking for ~a from ~a" + interface-name + (pretty-bytes (cache-key-address q)) + (and pa (pretty-bytes pa))) + (when pa + (send! (build-packet broadcast-ethernet-address + (cache-key-protocol q) + 1 ;; request + hwaddr + pa + zero-ethernet-address + (cache-key-address q)))))) + + (field [cache (hash)] + [queries (set)] + [assertions (set)]) + + (on-start (define timer-key (list 'arp interface-name)) + (define (arm-timer!) (send! (set-timer timer-key wakeup-interval 'relative))) + (arm-timer!) + (react (on (message (timer-expired timer-key _)) + (cache (expire-cache (cache))) + (send-questions!) + (arm-timer!)))) + + (on (message ($ p (ethernet-packet-pattern interface-name #t ARP-ethertype))) + (match-define (ethernet-packet _ _ source destination _ body) p) + (bit-string-case body + ([ (= 1 :: integer bytes 2) + (ptype :: integer bytes 2) + hlen + plen + (oper :: integer bytes 2) + (sender-hardware-address0 :: binary bytes hlen) + (sender-protocol-address0 :: binary bytes plen) + (target-hardware-address0 :: binary bytes hlen) + (target-protocol-address0 :: binary bytes plen) + (:: binary) ;; The extra zeros exist because ethernet packets + ;; have a minimum size. This is, in part, why IPv4 + ;; headers have a total-length field, so that the + ;; zero padding can be removed. + ] + (let () + (define sender-protocol-address (bit-string->bytes sender-protocol-address0)) + (define sender-hardware-address (bit-string->bytes sender-hardware-address0)) + (define target-protocol-address (bit-string->bytes target-protocol-address0)) + (define learned-key (cache-key ptype sender-protocol-address)) + + (when (and (set-member? (queries) learned-key) ;; it is relevant to our interests + (not (equal? sender-hardware-address + (cache-value-address (hash-ref (cache) + learned-key + (lambda () + (cache-value #f #f #f))))))) + (log-info "~a ARP Adding ~a = ~a to cache" + interface-name + (pretty-bytes sender-protocol-address) + (pretty-bytes sender-hardware-address))) + + (cache (hash-set (expire-cache (cache)) + learned-key + (cache-value (+ (current-inexact-milliseconds) + cache-entry-lifetime-msec) + interface + sender-hardware-address))) + (case oper + [(1) ;; request + (when (set-member? (assertions) (cache-key ptype target-protocol-address)) + (log-info "~a ARP answering request for ~a/~a" + interface-name + ptype + (pretty-bytes target-protocol-address)) + (send! (build-packet sender-hardware-address + ptype + 2 ;; reply + hwaddr + target-protocol-address + sender-hardware-address + sender-protocol-address)))] + [(2) (void)] ;; reply + [else (void)]))) + (else #f))) + + (during (arp-assertion $protocol $protocol-address interface-name) + (define a (cache-key protocol protocol-address)) + (on-start (assertions (set-add (assertions) a)) + (log-info "~a ARP Announcing ~a as ~a" + interface-name + (pretty-bytes (cache-key-address a)) + (pretty-bytes hwaddr)) + (send! (build-packet broadcast-ethernet-address + (cache-key-protocol a) + 2 ;; reply -- gratuitous announcement + hwaddr + (cache-key-address a) + hwaddr + (cache-key-address a)))) + (on-stop (assertions (set-remove (assertions) a)))) + + (during (observe (arp-query $protocol $protocol-address interface _)) + (define key (cache-key protocol protocol-address)) + (on-start (queries (set-add (queries) key)) + (send-questions!)) + (on-stop (queries (set-remove (queries) key))) + (assert #:when (hash-has-key? (cache) key) + (match (hash-ref (cache) key) + [(cache-value _ ifname addr) + (arp-query protocol protocol-address ifname addr)])))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn-arp-driver) diff --git a/examples/netstack/checksum.rkt b/examples/netstack/incremental-highlevel/checksum.rkt similarity index 100% rename from examples/netstack/checksum.rkt rename to examples/netstack/incremental-highlevel/checksum.rkt diff --git a/examples/netstack/configuration.rkt b/examples/netstack/incremental-highlevel/configuration.rkt similarity index 100% rename from examples/netstack/configuration.rkt rename to examples/netstack/incremental-highlevel/configuration.rkt diff --git a/examples/netstack/incremental-highlevel/demo-config.rkt b/examples/netstack/incremental-highlevel/demo-config.rkt new file mode 100644 index 0000000..b251acb --- /dev/null +++ b/examples/netstack/incremental-highlevel/demo-config.rkt @@ -0,0 +1,21 @@ +#lang syndicate/actor +;; Demonstration stack configuration for various hosts. + +(require racket/match) +(require (only-in mzlib/os gethostname)) +(require "configuration.rkt") + +(actor + (react + (match (gethostname) + ["skip" + (assert (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "en0")) + (assert (host-route (bytes 192 168 1 222) 24 "en0"))] + [(or "hop" "walk") + (assert (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "wlan0")) + (assert (host-route (bytes 192 168 1 222) 24 "wlan0"))] + ["stockholm.ccs.neu.edu" + (assert (host-route (bytes 129 10 115 94) 24 "eth0")) + (assert (gateway-route (bytes 0 0 0 0) 0 (bytes 129 10 115 1) "eth0"))] + [other + (error 'demo-config "No setup for hostname ~a" other)]))) diff --git a/examples/netstack/dump-bytes.rkt b/examples/netstack/incremental-highlevel/dump-bytes.rkt similarity index 100% rename from examples/netstack/dump-bytes.rkt rename to examples/netstack/incremental-highlevel/dump-bytes.rkt diff --git a/examples/netstack/incremental-highlevel/ethernet.rkt b/examples/netstack/incremental-highlevel/ethernet.rkt new file mode 100644 index 0000000..6391ea7 --- /dev/null +++ b/examples/netstack/incremental-highlevel/ethernet.rkt @@ -0,0 +1,125 @@ +#lang syndicate/actor +;; Ethernet driver + +(provide (struct-out ethernet-packet) + zero-ethernet-address + broadcast-ethernet-address + interface-names + spawn-ethernet-driver + ethernet-packet-pattern + lookup-ethernet-hwaddr) + +(require/activate syndicate/drivers/timer) +(require racket/set) +(require racket/match) +(require racket/async-channel) + +(require packet-socket) +(require bitsyntax) + +(require "configuration.rkt") +(require "dump-bytes.rkt") + +(struct ethernet-packet (interface from-wire? source destination ethertype body) #:prefab) + +(define zero-ethernet-address (bytes 0 0 0 0 0 0)) +(define broadcast-ethernet-address (bytes 255 255 255 255 255 255)) + +(define interface-names (raw-interface-names)) +(log-info "Device names: ~a" interface-names) + +(define (spawn-ethernet-driver) + (actor #:name 'ethernet-driver + (react (during/actor + (observe (ethernet-packet (ethernet-interface $interface-name _) #t _ _ _ _)) + #:name (list 'ethernet-interface interface-name) + + (define h (raw-interface-open interface-name)) + (when (not h) (error 'ethernet "Couldn't open interface ~v" interface-name)) + (log-info "Opened interface ~a, yielding handle ~v" interface-name h) + + (define interface (ethernet-interface interface-name (raw-interface-hwaddr h))) + (assert interface) + + (define control-ch (make-async-channel)) + (thread (lambda () (interface-packet-read-loop interface h control-ch))) + + (on-start (flush!) ;; ensure all subscriptions are in place + (async-channel-put control-ch 'unblock) + (actor #:name (list 'ethernet-interface-quit-monitor interface-name) + (react (on (retracted interface) + (async-channel-put control-ch 'quit))))) + + (on (message ($ p (ethernet-packet interface #t _ _ _ _)) #:meta-level 1) + ;; (log-info "Interface ~a inbound packet ~a -> ~a (type 0x~a)" + ;; (ethernet-interface-name (ethernet-packet-interface p)) + ;; (pretty-bytes (ethernet-packet-source p)) + ;; (pretty-bytes (ethernet-packet-destination p)) + ;; (number->string (ethernet-packet-ethertype p) 16)) + ;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p))) + (send! p)) + + (on (message ($ p (ethernet-packet interface #f _ _ _ _))) + ;; (log-info "Interface ~a OUTBOUND packet ~a -> ~a (type 0x~a)" + ;; (ethernet-interface-name (ethernet-packet-interface p)) + ;; (pretty-bytes (ethernet-packet-source p)) + ;; (pretty-bytes (ethernet-packet-destination p)) + ;; (number->string (ethernet-packet-ethertype p) 16)) + ;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p))) + (raw-interface-write h (encode-ethernet-packet p))))))) + +(define (interface-packet-read-loop interface h control-ch) + (define (blocked) + (match (async-channel-get control-ch) + ['unblock (unblocked)] + ['quit (void)])) + (define (unblocked) + (match (async-channel-try-get control-ch) + ['unblock (unblocked)] + ['quit (void)] + [#f + (define p (raw-interface-read h)) + (define decoded (decode-ethernet-packet interface p)) + (when decoded (send-ground-message decoded)) + (unblocked)])) + (blocked) + (raw-interface-close h)) + +(define (decode-ethernet-packet interface p) + (bit-string-case p + ([ (destination :: binary bytes 6) + (source :: binary bytes 6) + (ethertype :: integer bytes 2) + (body :: binary) ] + (ethernet-packet interface + #t + (bit-string->bytes source) + (bit-string->bytes destination) + ethertype + (bit-string->bytes body))) + (else #f))) + +(define (encode-ethernet-packet p) + (match-define (ethernet-packet _ _ source destination ethertype body) p) + (bit-string->bytes + (bit-string (destination :: binary bytes 6) + (source :: binary bytes 6) + (ethertype :: integer bytes 2) + (body :: binary)))) + +(define (ethernet-packet-pattern interface-name from-wire? ethertype) + (ethernet-packet (ethernet-interface interface-name ?) from-wire? ? ? ethertype ?)) + +(define (lookup-ethernet-hwaddr interface-name) + (define timer-id (gensym 'lookup-ethernet-hwaddr)) + (react/suspend (k) + (on-start (send! (set-timer timer-id 5000 'relative))) + (stop-when (message (timer-expired timer-id _)) + (log-info "Lookup of ethernet interface ~v failed" interface-name) + (k #f)) + (stop-when (asserted (ethernet-interface interface-name $hwaddr)) + (k hwaddr)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn-ethernet-driver) diff --git a/examples/netstack/incremental-highlevel/fetchurl.rkt b/examples/netstack/incremental-highlevel/fetchurl.rkt new file mode 100644 index 0000000..59616a9 --- /dev/null +++ b/examples/netstack/incremental-highlevel/fetchurl.rkt @@ -0,0 +1,27 @@ +#lang syndicate/actor + +(require/activate syndicate/drivers/timer) +(require/activate "ethernet.rkt") +(require/activate "arp.rkt") +(require/activate "ip.rkt") +(require/activate "tcp.rkt") +(require/activate "udp.rkt") +(require/activate "demo-config.rkt") + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(let () + (define local-handle (tcp-handle 'httpclient)) + (define remote-handle (tcp-address "81.4.107.66" 80)) + + (actor (react + (assert (advertise (tcp-channel local-handle remote-handle _))) + (on (asserted (advertise (tcp-channel remote-handle local-handle _))) + (send! (tcp-channel local-handle + remote-handle + #"GET / HTTP/1.0\r\nHost: leastfixedpoint.com\r\n\r\n"))) + (stop-when (retracted (advertise (tcp-channel remote-handle local-handle _))) + (printf "URL fetcher exiting.\n")) + (on (message (tcp-channel remote-handle local-handle $bs)) + (printf "----------------------------------------\n~a\n" bs) + (printf "----------------------------------------\n"))))) diff --git a/examples/netstack/incremental-highlevel/ip.rkt b/examples/netstack/incremental-highlevel/ip.rkt new file mode 100644 index 0000000..e4a3be9 --- /dev/null +++ b/examples/netstack/incremental-highlevel/ip.rkt @@ -0,0 +1,266 @@ +#lang syndicate/actor + +(provide (struct-out ip-packet) + ip-address->hostname + ip-string->ip-address + apply-netmask + ip-address-in-subnet? + query-local-ip-addresses + broadcast-ip-address + spawn-ip-driver) + +(require racket/set) +(require (only-in racket/string string-split)) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "configuration.rkt") +(require "checksum.rkt") + +(require/activate syndicate/drivers/timer) +(require/activate "ethernet.rkt") +(require/activate "arp.rkt") + +(struct ip-packet (source-interface ;; string for an ethernet interface, or #f for local interfaces + source + destination + protocol + options + body) + #:prefab) ;; TODO: more fields + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (ip-address->hostname bs) + (bit-string-case bs + ([ a b c d ] (format "~a.~a.~a.~a" a b c d)))) + +(define (ip-string->ip-address str) + (list->bytes (map string->number (string-split str ".")))) + +(define (apply-netmask addr netmask) + (bit-string-case addr + ([ (n :: integer bytes 4) ] + (bit-string ((bitwise-and n (arithmetic-shift #x-100000000 (- netmask))) + :: integer bytes 4))))) + +(define (ip-address-in-subnet? addr network netmask) + (equal? (apply-netmask network netmask) + (apply-netmask addr netmask))) + +(define broadcast-ip-address (bytes 255 255 255 255)) + +(define (query-local-ip-addresses) + (query-set local-ips (host-route $addr _ _) addr)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (spawn-ip-driver) + (actor #:name 'ip-driver + (react + (during/actor (host-route $my-address $netmask $interface-name) + (assert (route-up (host-route my-address netmask interface-name))) + (do-host-route my-address netmask interface-name)) + (during/actor (gateway-route $network $netmask $gateway-addr $interface-name) + (assert (route-up + (gateway-route $network $netmask $gateway-addr $interface-name))) + (do-gateway-route network netmask gateway-addr interface-name)) + (during/actor (net-route $network-addr $netmask $link) + (assert (route-up (net-route network-addr netmask link))) + (do-net-route network-addr netmask link))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Local IP route + +(define (do-host-route my-address netmask interface-name) + (let ((network-addr (apply-netmask my-address netmask))) + (do-normal-ip-route (host-route my-address netmask interface-name) + network-addr + netmask + interface-name)) + + (assert (advertise (ip-packet _ my-address _ PROTOCOL-ICMP _ _))) + (assert (arp-assertion IPv4-ethertype my-address interface-name)) + (on (message (ip-packet _ $peer-address my-address PROTOCOL-ICMP _ $body)) + (bit-string-case body + ([ type code (checksum :: integer bytes 2) (rest :: binary) ] ;; TODO: check cksum + (case type + [(8) ;; ECHO (0 is ECHO-REPLY) + (log-info "Ping of ~a from ~a" + (pretty-bytes my-address) + (pretty-bytes peer-address)) + (define reply-data0 (bit-string 0 + code + (0 :: integer bytes 2) ;; TODO + (rest :: binary))) + (send! (ip-packet #f + my-address + peer-address + PROTOCOL-ICMP + #"" + (ip-checksum 2 reply-data0)))] + [else + (log-info "ICMP ~a/~a (cksum ~a) to ~a from ~a:\n~a" + type + code + checksum + (pretty-bytes my-address) + (pretty-bytes peer-address) + (dump-bytes->string rest))])) + (else #f)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Gateway IP route + +(struct gateway-route-state (routes gateway-interface gateway-hwaddr) #:transparent) + +(define (do-gateway-route network netmask gateway-addr interface-name) + (define the-route (gateway-route network netmask gateway-addr interface-name)) + + (field [routes (set)]) + (query-set* routes (host-route $addr $netmask _) (list addr netmask)) + (query-set* routes (gateway-route $addr $netmask _ _) (list addr netmask)) + (query-set* routes (net-route $addr $netmask _) (list addr netmask)) + + (field [gateway-interface #f] + [gateway-hwaddr #f]) + (on (asserted (arp-query IPv4-ethertype + gateway-addr + ($ iface (ethernet-interface interface-name _)) + $hwaddr)) + (when (not (gateway-hwaddr)) + (log-info "Discovered gateway ~a at ~a on interface ~a." + (ip-address->hostname gateway-addr) + (ethernet-interface-name iface) + (pretty-bytes hwaddr))) + (gateway-interface iface) + (gateway-hwaddr hwaddr)) + + (define (covered-by-some-other-route? addr) + (for/or ([r (in-set (routes))]) + (match-define (list net msk) r) + (and (positive? msk) + (ip-address-in-subnet? addr net msk)))) + + (on (message ($ p (ip-packet _ _ _ _ _ _))) + (when (not (gateway-interface)) + (log-warning "Gateway hwaddr for ~a not known, packet dropped." + (ip-address->hostname gateway-addr))) + (when (and (gateway-interface) + (not (equal? (ip-packet-source-interface p) + (ethernet-interface-name (gateway-interface)))) + (not (covered-by-some-other-route? (ip-packet-destination p)))) + (send! (ethernet-packet (gateway-interface) + #f + (ethernet-interface-hwaddr (gateway-interface)) + (gateway-hwaddr) + IPv4-ethertype + (format-ip-packet p)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; General net route + +(define (do-net-route network-addr netmask link) + (do-normal-ip-route (net-route network-addr netmask link) network-addr netmask link)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Normal IP route + +(define (do-normal-ip-route the-route network netmask interface-name) + (assert (arp-interface interface-name)) + (on (message (ethernet-packet (ethernet-interface interface-name _) #t _ _ IPv4-ethertype $body)) + (define p (parse-ip-packet interface-name body)) + (when p (send! p))) + (on (message ($ p (ip-packet _ _ _ _ _ _))) + (define destination (ip-packet-destination p)) + (when (and (not (equal? (ip-packet-source-interface p) interface-name)) + (ip-address-in-subnet? destination network netmask)) + (define timer-id (gensym 'ippkt)) + (react (on-start (send! (set-timer timer-id 5000 'relative))) + (stop-when (message (timer-expired timer-id _)) + (log-warning "ARP lookup of ~a failed, packet dropped" + (ip-address->hostname destination))) + (stop-when (asserted (arp-query IPv4-ethertype + destination + ($ interface (ethernet-interface interface-name _)) + $destination-hwaddr)) + (send! (ethernet-packet interface + #f + (ethernet-interface-hwaddr interface) + destination-hwaddr + IPv4-ethertype + (format-ip-packet p)))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define IPv4-ethertype #x0800) + +(define IP-VERSION 4) +(define IP-MINIMUM-HEADER-LENGTH 5) + +(define PROTOCOL-ICMP 1) + +(define default-ttl 64) + +(define (parse-ip-packet interface-name body) + ;; (log-info "IP ~a got body ~a" (pretty-bytes my-address) (pretty-bytes body)) + (bit-string-case body + ([ (= IP-VERSION :: bits 4) + (header-length :: bits 4) + service-type + (total-length :: bits 16) + (id :: bits 16) + (flags :: bits 3) + (fragment-offset :: bits 13) + ttl + protocol + (header-checksum :: bits 16) ;; TODO: check checksum + (source-ip0 :: binary bits 32) + (destination-ip0 :: binary bits 32) + (rest :: binary) ] + (let* ((source-ip (bit-string->bytes source-ip0)) + (destination-ip (bit-string->bytes destination-ip0)) + (options-length (* 4 (- header-length IP-MINIMUM-HEADER-LENGTH))) + (data-length (- total-length (* 4 header-length)))) + (if (and (>= header-length 5) + (>= (bit-string-byte-count body) (* header-length 4))) + (bit-string-case rest + ([ (opts :: binary bytes options-length) + (data :: binary bytes data-length) + (:: binary) ] ;; Very short ethernet packets have a trailer of zeros + (ip-packet interface-name + (bit-string->bytes source-ip) + (bit-string->bytes destination-ip) + protocol + (bit-string->bytes opts) + (bit-string->bytes data)))) + #f))) + (else #f))) + +(define (format-ip-packet p) + (match-define (ip-packet _ src dst protocol options body) p) + + (define header-length ;; TODO: ensure options is a multiple of 4 bytes + (+ IP-MINIMUM-HEADER-LENGTH (quotient (bit-string-byte-count options) 4))) + + (define header0 (bit-string (IP-VERSION :: bits 4) + (header-length :: bits 4) + 0 ;; TODO: service type + ((+ (* header-length 4) (bit-string-byte-count body)) + :: bits 16) + (0 :: bits 16) ;; TODO: identifier + (0 :: bits 3) ;; TODO: flags + (0 :: bits 13) ;; TODO: fragments + default-ttl + protocol + (0 :: bits 16) + (src :: binary bits 32) + (dst :: binary bits 32) + (options :: binary))) + (define full-packet (bit-string ((ip-checksum 10 header0) :: binary) (body :: binary))) + + full-packet) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn-ip-driver) diff --git a/examples/netstack/incremental-highlevel/main.rkt b/examples/netstack/incremental-highlevel/main.rkt new file mode 100644 index 0000000..c78f114 --- /dev/null +++ b/examples/netstack/incremental-highlevel/main.rkt @@ -0,0 +1,85 @@ +#lang syndicate/actor + + +;;(log-events-and-actions? #t) + +(require/activate syndicate/drivers/timer) +(require/activate "ethernet.rkt") +(require/activate "arp.rkt") +(require/activate "ip.rkt") +(require/activate "tcp.rkt") +(require/activate "udp.rkt") +(require/activate "demo-config.rkt") + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(let () + (local-require (only-in racket/string string-trim)) + + (struct says (who what) #:prefab) + (struct present (who) #:prefab) + + (define (spawn-session them us) + (actor (define (send-to-remote fmt . vs) + (send! (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))) + #:meta-level 1)) + + (define (say who fmt . vs) + (unless (equal? who user) + (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + + (define user (gensym 'user)) + (send-to-remote "Welcome, ~a.\n" user) + + (until (retracted (advertise (tcp-channel them us _)) #:meta-level 1) + (assert (present user)) + (on (asserted (present $who)) (say who "arrived.")) + (on (retracted (present $who)) (say who "departed.")) + + (on (message (says $who $what)) (say who "says: ~a" what)) + + (assert (advertise (tcp-channel us them _)) #:meta-level 1) + (on (message (tcp-channel them us $bs) #:meta-level 1) + (send! (says user (string-trim (bytes->string/utf-8 bs)))))))) + + (dataspace (define us (tcp-listener 5999)) + (forever (assert (advertise (observe (tcp-channel _ us _))) #:meta-level 1) + (on (asserted (advertise (tcp-channel $them us _)) #:meta-level 1) + (spawn-session them us))))) + +(let ((dst (udp-listener 6667))) + (actor (react + (on (message (udp-packet $src dst $body)) + (log-info "Got packet from ~v: ~v" src body) + (send! (udp-packet dst src (string->bytes/utf-8 (format "You said: ~a" body)))))))) + +(let () + (dataspace + (actor (react (field [counter 0]) + (on (message 'bump) + (send! `(counter ,(counter))) + (counter (+ (counter) 1))))) + + (forever (define us (tcp-listener 80)) + (assert (advertise (observe (tcp-channel _ us _))) #:meta-level 1) + (during/actor (advertise (tcp-channel ($ them (tcp-address _ _)) us _)) #:meta-level 1 + (log-info "Got connection from ~v" them) + (field [done? #f]) + (stop-when (rising-edge (done?))) + (assert (advertise (tcp-channel us them _)) #:meta-level 1) + (on (message (tcp-channel them us _) #:meta-level 1)) ;; ignore input + + (on-start (send! 'bump)) + (on (message `(counter ,$counter)) + (define response + (string->bytes/utf-8 + (format (string-append + "HTTP/1.0 200 OK\r\n\r\n" + "

Hello world from syndicate-netstack!

\n" + "

This is running on syndicate's own\n" + "\n" + "TCP/IP stack.

\n" + "

There have been ~a requests prior to this one.

\n") + counter))) + (send! (tcp-channel us them response) #:meta-level 1) + (done? #t)))))) diff --git a/examples/netstack/incremental-highlevel/port-allocator.rkt b/examples/netstack/incremental-highlevel/port-allocator.rkt new file mode 100644 index 0000000..bdafd5f --- /dev/null +++ b/examples/netstack/incremental-highlevel/port-allocator.rkt @@ -0,0 +1,37 @@ +#lang syndicate/actor +;; UDP/TCP port allocator + +(provide spawn-port-allocator + allocate-port! + (struct-out port-allocation-request) + (struct-out port-allocation-reply)) + +(require racket/set) +(require "ip.rkt") + +(struct port-allocation-request (reqid type) #:prefab) +(struct port-allocation-reply (reqid port) #:prefab) + +(define (spawn-port-allocator allocator-type query-used-ports) + (actor #:name (list 'port-allocator allocator-type) + (react + (define local-ips (query-local-ip-addresses)) + (define used-ports (query-used-ports)) + + (begin/dataflow + (log-info "port-allocator ~v used ports: ~v" allocator-type (used-ports))) + + (on (message (port-allocation-request $reqid allocator-type)) + (define currently-used-ports (used-ports)) + (let randomly-allocate-until-unused () + (define p (+ 1024 (random 64512))) + (if (set-member? currently-used-ports p) + (randomly-allocate-until-unused) + (begin (used-ports (set-add currently-used-ports p)) + (send! (port-allocation-reply reqid p))))))))) + +(define (allocate-port! type) + (define reqid (gensym 'allocate-port!)) + (react/suspend (done) + (stop-when (message (port-allocation-reply reqid $port)) (done port)) + (on-start (send! (port-allocation-request reqid type))))) diff --git a/examples/netstack/incremental-highlevel/tcp.rkt b/examples/netstack/incremental-highlevel/tcp.rkt new file mode 100644 index 0000000..b7be776 --- /dev/null +++ b/examples/netstack/incremental-highlevel/tcp.rkt @@ -0,0 +1,555 @@ +#lang syndicate/actor + +(provide (struct-out tcp-address) + (struct-out tcp-handle) + (struct-out tcp-listener) + (struct-out tcp-channel) + spawn-tcp-driver) + +(require racket/set) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "checksum.rkt") + +(require/activate syndicate/drivers/timer) +(require "ip.rkt") +(require "port-allocator.rkt") + +;; tcp-address/tcp-address : "kernel" tcp connection state machines +;; tcp-handle/tcp-address : "user" outbound connections +;; tcp-listener/tcp-address : "user" inbound connections + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct tcp-address (host port) #:prefab) +(struct tcp-handle (id) #:prefab) +(struct tcp-listener (port) #:prefab) + +(struct tcp-channel (source destination subpacket) #:prefab) + +(struct tcp-packet (from-wire? + source-ip + source-port + destination-ip + destination-port + sequence-number + ack-number + flags + window-size + options + data) + #:prefab) + +;; (tcp-port-allocation Number (U TcpHandle TcpListener)) +(struct tcp-port-allocation (port handle) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; User-accessible driver startup + +(define (spawn-tcp-driver) + (spawn-port-allocator 'tcp (lambda () (query-set tcp-ports (tcp-port-allocation $p _) p))) + (spawn-kernel-tcp-driver) + (actor #:name 'tcp-inbound-driver + (react + (during/actor (advertise (observe (tcp-channel _ ($ server-addr (tcp-listener _)) _))) + #:name (list 'tcp-listen server-addr) + (match-define (tcp-listener port) server-addr) + (assert (tcp-port-allocation port server-addr)) + (on (asserted (advertise (tcp-channel ($ remote-addr (tcp-address _ _)) + ($ local-addr (tcp-address _ port)) + _))) + (spawn-relay server-addr remote-addr local-addr))))) + (actor #:name 'tcp-outbound-driver + (react + (define local-ips (query-local-ip-addresses)) + (on (asserted (advertise (tcp-channel ($ local-addr (tcp-handle _)) + ($ remote-addr (tcp-address _ _)) + _))) + (define port (allocate-port! 'tcp)) + ;; TODO: Choose a sensible IP address for the outbound + ;; connection. We don't have enough information to do this + ;; well at the moment, so just pick some available local IP + ;; address. + ;; + ;; Interesting note: In some sense, the right answer is + ;; "?". This would give us a form of mobility, where IP + ;; addresses only route to a given bucket-of-state and ONLY + ;; the port number selects a substate therein. That's not + ;; how TCP is defined however so we can't do that. + (define appropriate-ip (set-first (local-ips))) + (define appropriate-host (ip-address->hostname appropriate-ip)) + (match-define (tcp-address remote-host remote-port) remote-addr) + (define remote-ip (ip-string->ip-address remote-host)) + (spawn-relay local-addr remote-addr (tcp-address appropriate-host port)) + (spawn-state-vector remote-ip remote-port appropriate-ip port))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Relay between kernel-level and user-level + +(define relay-peer-wait-time-msec 5000) + +(define (spawn-relay local-user-addr remote-addr local-tcp-addr) + (define timer-name (list 'spawn-relay local-tcp-addr remote-addr)) + + (actor #:name (list 'tcp-relay local-user-addr remote-addr local-tcp-addr) + (react + (assert (tcp-port-allocation (tcp-address-port local-tcp-addr) local-user-addr)) + (assert (advertise (tcp-channel remote-addr local-user-addr _))) + (assert (advertise (tcp-channel local-tcp-addr remote-addr _))) + + (field [local-peer-present? #f] + [remote-peer-present? #f]) + + (on-start (send! (set-timer timer-name relay-peer-wait-time-msec 'relative))) + (on (message (timer-expired timer-name _)) + (when (not (and (local-peer-present?) (remote-peer-present?))) + (error 'spawn-relay "TCP relay process timed out waiting for peer"))) + + (on (asserted (observe (tcp-channel remote-addr local-user-addr _))) + (local-peer-present? #t)) + (stop-when (retracted (observe (tcp-channel remote-addr local-user-addr _)))) + + (on (asserted (advertise (tcp-channel remote-addr local-tcp-addr _))) + (remote-peer-present? #t)) + (stop-when (retracted (advertise (tcp-channel remote-addr local-tcp-addr _)))) + + (on (message (tcp-channel local-user-addr remote-addr $bs)) + (send! (tcp-channel local-tcp-addr remote-addr bs))) + + (on (message (tcp-channel remote-addr local-tcp-addr $bs)) + (send! (tcp-channel remote-addr local-user-addr bs)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Codec & kernel-level driver + +(define PROTOCOL-TCP 6) + +(define (spawn-kernel-tcp-driver) + (actor #:name 'kernel-tcp-driver + (forever + (define local-ips (query-local-ip-addresses)) + + (define active-state-vectors + (query-set active-state-vectors + (observe (tcp-packet #t $si $sp $di $dp _ _ _ _ _ _)) + (list si sp di dp))) + + (define (state-vector-active? statevec) + (set-member? (active-state-vectors) statevec)) + + (define (analyze-incoming-packet src-ip dst-ip body) + (bit-string-case body + ([ (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (sequence-number :: integer bytes 4) + (ack-number :: integer bytes 4) + (data-offset :: integer bits 4) + (reserved :: integer bits 3) + (ns :: integer bits 1) + (cwr :: integer bits 1) + (ece :: integer bits 1) + (urg :: integer bits 1) + (ack :: integer bits 1) + (psh :: integer bits 1) + (rst :: integer bits 1) + (syn :: integer bits 1) + (fin :: integer bits 1) + (window-size :: integer bytes 2) + (checksum :: integer bytes 2) ;; TODO: check checksum + (urgent-pointer :: integer bytes 2) + (rest :: binary) ] + (let* ((flags (set)) + (statevec (list src-ip src-port dst-ip dst-port)) + (old-active-state-vectors (active-state-vectors)) + (spawn-needed? (and (not (state-vector-active? statevec)) + (zero? rst)))) ;; don't bother spawning if it's a rst + (define-syntax-rule (set-flags! v ...) + (begin (unless (zero? v) (set! flags (set-add flags 'v))) ...)) + (set-flags! ns cwr ece urg ack psh rst syn fin) + (log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)" + (ip-address->hostname src-ip) + src-port + (ip-address->hostname dst-ip) + dst-port + sequence-number + ack-number + flags + window-size) + (when spawn-needed? (log-info " - spawn needed!")) + (bit-string-case rest + ([ (opts :: binary bytes (- (* data-offset 4) 20)) + (data :: binary) ] + (let ((packet (tcp-packet #t + src-ip + src-port + dst-ip + dst-port + sequence-number + ack-number + flags + window-size + (bit-string->bytes opts) + (bit-string->bytes data)))) + (when spawn-needed? + (active-state-vectors (set-add (active-state-vectors) statevec)) + (spawn-state-vector src-ip src-port dst-ip dst-port)) + ;; TODO: get packet to the new state-vector process somehow + (send! packet))) + (else #f)))) + (else #f))) + + (begin/dataflow + (log-info "SCN yielded statevecs ~v and local-ips ~v" + (active-state-vectors) + (local-ips))) + + (define (deliver-outbound-packet p) + (match-define (tcp-packet #f + src-ip + src-port + dst-ip + dst-port + sequence-number + ack-number + flags + window-size + options + data) + p) + (log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)" + (ip-address->hostname src-ip) + src-port + (ip-address->hostname dst-ip) + dst-port + sequence-number + ack-number + flags + window-size) + (define (flag-bit sym) (if (set-member? flags sym) 1 0)) + (define payload (bit-string (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (sequence-number :: integer bytes 4) + (ack-number :: integer bytes 4) + ((+ 5 (quotient (bit-string-byte-count options) 4)) + :: integer bits 4) ;; TODO: enforce 4-byte alignment + (0 :: integer bits 3) + ((flag-bit 'ns) :: integer bits 1) + ((flag-bit 'cwr) :: integer bits 1) + ((flag-bit 'ece) :: integer bits 1) + ((flag-bit 'urg) :: integer bits 1) + ((flag-bit 'ack) :: integer bits 1) + ((flag-bit 'psh) :: integer bits 1) + ((flag-bit 'rst) :: integer bits 1) + ((flag-bit 'syn) :: integer bits 1) + ((flag-bit 'fin) :: integer bits 1) + (window-size :: integer bytes 2) + (0 :: integer bytes 2) ;; checksum location + (0 :: integer bytes 2) ;; TODO: urgent pointer + (data :: binary))) + (define pseudo-header (bit-string (src-ip :: binary bytes 4) + (dst-ip :: binary bytes 4) + 0 + PROTOCOL-TCP + ((bit-string-byte-count payload) :: integer bytes 2))) + (send! (ip-packet #f src-ip dst-ip PROTOCOL-TCP #"" + (ip-checksum 16 payload #:pseudo-header pseudo-header)))) + + (on (message (ip-packet $source-if $src $dst PROTOCOL-TCP _ $body)) + (when (and source-if ;; source-if == #f iff packet originates locally + (set-member? (local-ips) dst)) + (analyze-incoming-packet src dst body))) + + (on (message ($ p (tcp-packet #f _ _ _ _ _ _ _ _ _ _))) + (deliver-outbound-packet p))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Per-connection state vector process + +(struct buffer (data ;; bit-string + seqn ;; names leftmost byte in data + window ;; counts bytes from leftmost byte in data + finished?) ;; boolean: true after FIN + #:transparent) + +(define (buffer-push b data) + (struct-copy buffer b [data (bit-string-append (buffer-data b) data)])) + +(define transmit-check-interval-msec 2000) +(define inbound-buffer-limit 65535) +(define maximum-segment-size 536) ;; bytes +(define maximum-segment-lifetime-sec (* 2 60)) ;; two minutes; 2MSL is TIME-WAIT timeout +(define user-timeout-msec (* 5 60 1000)) ;; per RFC 793, this should be per-connection, but I + ;; cheat; RFC 793 says "the present global default is five minutes", which is + ;; reasonable to be getting on with + +(define (seq+ a b) (bitwise-and #xffffffff (+ a b))) + +;; Always positive +(define (seq- larger smaller) + (if (< larger smaller) ;; wraparound has occurred + (+ (- larger smaller) #x100000000) + (- larger smaller))) + +(define (seq> a b) + (< (seq- a b) #x80000000)) + +(define (spawn-state-vector src-ip src-port dst-ip dst-port) + (define src (tcp-address (ip-address->hostname src-ip) src-port)) + (define dst (tcp-address (ip-address->hostname dst-ip) dst-port)) + (define (timer-name kind) (list 'tcp-timer kind src dst)) + + (actor + #:name (list 'tcp-state-vector + (ip-address->hostname src-ip) + src-port + (ip-address->hostname dst-ip) + dst-port) + (react + + (define initial-outbound-seqn + ;; Yuck + (inexact->exact (truncate (* #x100000000 (random))))) + + (field [outbound (buffer #"!" initial-outbound-seqn 0 #f)] ;; dummy data at SYN position + [inbound (buffer #"" #f inbound-buffer-limit #f)] + [syn-acked? #f] + [latest-peer-activity-time (current-inexact-milliseconds)] + ;; ^ the most recent time we heard from our peer + [user-timeout-base-time (current-inexact-milliseconds)] + ;; ^ when the index of the first outbound unacknowledged byte changed + [most-recent-time (current-inexact-milliseconds)] + ;; ^ updated by timer expiry; a field, to trigger quit checks + [quit-because-reset? #f]) + + (let () + (local-require (submod syndicate/actor priorities)) + (on-event #:priority *query-priority* + [_ (most-recent-time (current-inexact-milliseconds))])) + + (define (next-expected-seqn) + (define b (inbound)) + (define v (buffer-seqn b)) + (and v (seq+ v (bit-string-byte-count (buffer-data b))))) + + (define (set-inbound-seqn! seqn) + (inbound (struct-copy buffer (inbound) [seqn seqn]))) + + (define (incorporate-segment! data) + ;; (log-info "GOT INBOUND STUFF TO DELIVER ~v" data) + (when (not (buffer-finished? (inbound))) + (inbound (buffer-push (inbound) data)))) + + (define (deliver-inbound-locally!) + (define b (inbound)) + (when (not (bit-string-empty? (buffer-data b))) + (define chunk (bit-string->bytes (buffer-data b))) + (send! (tcp-channel src dst chunk)) + (inbound (struct-copy buffer b + [data #""] + [seqn (seq+ (buffer-seqn b) (bytes-length chunk))])))) + + ;; (Setof Symbol) -> Void + (define (check-fin! flags) + (define b (inbound)) + (unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally + (error 'check-fin "Nonempty inbound buffer")) + (when (set-member? flags 'fin) + (log-info "Closing inbound stream.") + (inbound (struct-copy buffer b + [seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte + [finished? #t])))) + + ;; Boolean SeqNum -> Void + (define (discard-acknowledged-outbound! ack? ackn) + (when ack? + (let* ((b (outbound)) + (base (buffer-seqn b)) + (limit (seq+ (buffer-seqn b) (bit-string-byte-count (buffer-data b)))) + (ackn (if (seq> ackn limit) limit ackn)) + (ackn (if (seq> base ackn) base ackn)) + (dist (seq- ackn base))) + (define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset! + (user-timeout-base-time (current-inexact-milliseconds)) + (outbound (struct-copy buffer b [data remaining-data] [seqn ackn])) + (syn-acked? (or (syn-acked?) (positive? dist)))))) + + ;; Nat -> Void + (define (update-outbound-window! peer-window) + (outbound (struct-copy buffer (outbound) [window peer-window]))) + + (define (all-output-acknowledged?) + (bit-string-empty? (buffer-data (outbound)))) + + ;; (Option SeqNum) -> Void + (define (send-outbound! old-ackn) + (define b (outbound)) + (define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b)) + (if (buffer-finished? b) 1 0)))) + + (define segment-size (min maximum-segment-size + (if (syn-acked?) (buffer-window b) 1) + ;; ^ can only send SYN until SYN is acked + pending-byte-count)) + (define segment-offset (if (syn-acked?) 0 1)) + (define chunk0 (bit-string-take (buffer-data b) (* segment-size 8))) ;; bit offset! + (define chunk (bit-string-drop chunk0 (* segment-offset 8))) ;; bit offset! + (define ackn (next-expected-seqn)) + (define flags (set)) + (when ackn + (set! flags (set-add flags 'ack))) + (when (not (syn-acked?)) + (set! flags (set-add flags 'syn))) + (when (and (buffer-finished? b) + (syn-acked?) + (= segment-size pending-byte-count) + (not (all-output-acknowledged?))) ;; TODO: reexamine. This looks fishy + (set! flags (set-add flags 'fin))) + (define window (min 65535 ;; limit of field width + (max 0 ;; can't be negative + (- (buffer-window (inbound)) + (bit-string-byte-count (buffer-data (inbound))))))) + (unless (and (equal? ackn old-ackn) + (syn-acked?) + (not (set-member? flags 'fin)) + (zero? (bit-string-byte-count chunk))) + (local-require racket/pretty) + (pretty-write `(send-outbound (old-ackn ,old-ackn) + (flags ,flags))) + (flush-output) + (send! (tcp-packet #f dst-ip dst-port src-ip src-port + (buffer-seqn b) + (or ackn 0) + flags + window + #"" + chunk)))) + + (define (bump-peer-activity-time!) + (latest-peer-activity-time (current-inexact-milliseconds))) + + ;; Number -> Boolean + (define (heard-from-peer-within-msec? msec) + (<= (- (most-recent-time) (latest-peer-activity-time)) msec)) + + (define (user-timeout-expired?) + (and (not (all-output-acknowledged?)) + (> (- (most-recent-time) (user-timeout-base-time)) + user-timeout-msec))) + + (define (send-set-transmit-check-timer!) + (send! (set-timer (timer-name 'transmit-check) + transmit-check-interval-msec + 'relative))) + + (define (reset! seqn ackn) + (log-warning "Sending RST from ~a:~a to ~a:~a" + (ip-address->hostname dst-ip) + dst-port + (ip-address->hostname src-ip) + src-port) + (quit-because-reset? #t) + (send! (tcp-packet #f dst-ip dst-port src-ip src-port + seqn + ackn + (set 'ack 'rst) + 0 + #"" + #""))) + + (define (close-outbound-stream!) + (define b (outbound)) + (when (not (buffer-finished? b)) + (outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte + [finished? #t])))) + + (assert #:when (and (syn-acked?) (not (buffer-finished? (inbound)))) + (advertise (tcp-channel src dst _))) + + (stop-when + (rising-edge + (and (buffer-finished? (outbound)) + (buffer-finished? (inbound)) + (all-output-acknowledged?) + (not (heard-from-peer-within-msec? (* 2 1000 maximum-segment-lifetime-sec))))) + ;; Everything is cleanly shut down, and we just need to wait a while for unexpected + ;; packets before we release the state vector. + ) + + (stop-when + (rising-edge (user-timeout-expired?)) + ;; We've been plaintively retransmitting for user-timeout-msec without hearing anything + ;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but + ;; it will do for now? TODO + (log-info "TCP_USER_TIMEOUT fired.")) + + (stop-when (rising-edge (quit-because-reset?))) + + (define/query-value local-peer-seen? #f (observe (tcp-channel src dst _)) #t + #:on-remove (begin + (log-info "Closing outbound stream.") + (close-outbound-stream!) + (send-outbound! (buffer-seqn (inbound))))) + + (define/query-value listener-listening? + #f + (observe (advertise (tcp-channel _ (tcp-listener dst-port) _))) + #t) + + (on (message (tcp-packet #t src-ip src-port dst-ip dst-port + $seqn $ackn $flags $window $options $data)) + (define old-ackn (buffer-seqn (inbound))) + (define expected (next-expected-seqn)) + (define is-syn? (set-member? flags 'syn)) + (define is-fin? (set-member? flags 'fin)) + (cond + [(set-member? flags 'rst) (quit-because-reset? #t)] + [(and (not expected) ;; no syn yet + (or (not is-syn?) ;; and this isn't it + (and (not (listener-listening?)) ;; or it is, but no listener... + (not (local-peer-seen?))))) ;; ...and no outbound client + (reset! ackn ;; this is *our* seqn + (seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0))) + ;; ^^ this is what we should acknowledge... + )] + [else + (cond + [(not expected) ;; haven't seen syn yet, but we know this is it + (set-inbound-seqn! (seq+ seqn 1)) + (incorporate-segment! data)] + [(= expected seqn) + (incorporate-segment! data)] + [else (void)]) + (deliver-inbound-locally!) + (check-fin! flags) + (discard-acknowledged-outbound! (set-member? flags 'ack) ackn) + (update-outbound-window! window) + (send-outbound! old-ackn) + (bump-peer-activity-time!)])) + + (on (message (tcp-channel dst src $bs)) + (define old-ackn (buffer-seqn (inbound))) + ;; (log-info "GOT MORE STUFF TO DELIVER ~v" bs) + + (when (all-output-acknowledged?) + ;; Only move user-timeout-base-time if there wasn't + ;; already some outstanding output. + (user-timeout-base-time (current-inexact-milliseconds))) + + (outbound (buffer-push (outbound) bs)) + (send-outbound! old-ackn)) + + (on-start (send-set-transmit-check-timer!)) + (on (message (timer-expired (timer-name 'transmit-check) _)) + (define old-ackn (buffer-seqn (inbound))) + ;; TODO: I am abusing this timer for multiple tasks. Notably, this is a (crude) means of + ;; retransmitting outbound data as well as a means of checking for an expired + ;; TCP_USER_TIMEOUT. A better design would have separate timers and a more fine-grained + ;; approach. + (send-set-transmit-check-timer!) + (send-outbound! old-ackn))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn-tcp-driver) diff --git a/examples/netstack/incremental-highlevel/udp.rkt b/examples/netstack/incremental-highlevel/udp.rkt new file mode 100644 index 0000000..ed42eba --- /dev/null +++ b/examples/netstack/incremental-highlevel/udp.rkt @@ -0,0 +1,142 @@ +#lang syndicate/actor + +(provide (struct-out udp-remote-address) + (struct-out udp-handle) + (struct-out udp-listener) + udp-address? + udp-local-address? + (struct-out udp-packet) + spawn-udp-driver) + +(require racket/set) +(require bitsyntax) + +(require "dump-bytes.rkt") +(require "checksum.rkt") +(require "configuration.rkt") +(require "ip.rkt") +(require "port-allocator.rkt") + +;; udp-address/udp-address : "kernel" udp connection state machines +;; udp-handle/udp-address : "user" outbound connections +;; udp-listener/udp-address : "user" inbound connections + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct udp-remote-address (host port) #:prefab) +(struct udp-handle (id) #:prefab) +(struct udp-listener (port) #:prefab) + +(define (udp-address? x) + (or (udp-remote-address? x) + (udp-local-address? x))) + +(define (udp-local-address? x) + (or (udp-handle? x) + (udp-listener? x))) + +;; USER-level protocol +(struct udp-packet (source destination body) #:prefab) + +;; KERNEL-level protocol +(struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab) +(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; User-accessible driver startup + +(define (spawn-udp-driver) + (spawn-port-allocator 'udp (lambda () (query-set udp-ports (udp-port-allocation $p _) p))) + (spawn-kernel-udp-driver) + (actor #:name 'udp-driver + (react (on (asserted (observe (udp-packet _ ($ h (udp-listener _)) _))) + (spawn-udp-relay (udp-listener-port h) h)) + (on (asserted (observe (udp-packet _ ($ h (udp-handle _)) _))) + (actor #:name (list 'udp-transient h) + (spawn-udp-relay (allocate-port! 'udp) h)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Relaying + +(define (spawn-udp-relay local-port local-user-addr) + (actor #:name (list 'udp-relay local-port local-user-addr) + (log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr) + + (define any-remote (udp-remote-address ? ?)) + + (react (stop-when (retracted (observe (udp-packet any-remote local-user-addr _)))) + (assert (advertise (udp-packet any-remote local-user-addr _))) + (assert (udp-port-allocation local-port local-user-addr)) + + (during (host-route $ip _ _) + (assert (advertise (udp-datagram ip local-port _ _ _))) + (on (message (udp-datagram $source-ip $source-port ip local-port $bs)) + (send! + (udp-packet (udp-remote-address (ip-address->hostname source-ip) + source-port) + local-user-addr + bs)))) + + (define local-ips (query-local-ip-addresses)) + (on (message (udp-packet local-user-addr ($ remote-addr any-remote) $bs)) + ;; Choose arbitrary local IP address for outbound packet! + ;; TODO: what can be done? Must I examine the routing table? + (match-define (udp-remote-address remote-host remote-port) remote-addr) + (define remote-ip (ip-string->ip-address remote-host)) + (send! (udp-datagram (set-first (local-ips)) + local-port + remote-ip + remote-port + bs)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Codec & kernel-level driver + +(define PROTOCOL-UDP 17) + +(define (spawn-kernel-udp-driver) + (actor #:name 'kernel-udp-driver + (forever + (assert (advertise (ip-packet #f _ _ PROTOCOL-UDP _ _))) + + (define local-ips (query-local-ip-addresses)) + + (on (message (ip-packet $source-if $src-ip $dst-ip PROTOCOL-UDP _ $body)) + (when (and source-if (set-member? (local-ips) dst-ip)) + (bit-string-case body + ([ (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + (length :: integer bytes 2) + (checksum :: integer bytes 2) ;; TODO: check checksum + (data :: binary) ] + (bit-string-case data + ([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes + (:: binary) ] + (send! (udp-datagram src-ip src-port dst-ip dst-port + (bit-string->bytes payload)))) + (else #f))) + (else #f)))) + + (on (message (udp-datagram $src-ip $src-port $dst-ip $dst-port $bs)) + (when (set-member? (local-ips) src-ip) + (let* ((payload (bit-string (src-port :: integer bytes 2) + (dst-port :: integer bytes 2) + ((+ 8 (bit-string-byte-count bs)) + :: integer bytes 2) + (0 :: integer bytes 2) ;; checksum location + (bs :: binary))) + (pseudo-header (bit-string (src-ip :: binary bytes 4) + (dst-ip :: binary bytes 4) + 0 + PROTOCOL-UDP + ((bit-string-byte-count payload) + :: integer bytes 2))) + (checksummed-payload (ip-checksum #:pseudo-header pseudo-header + 6 payload))) + (send! (ip-packet #f src-ip dst-ip PROTOCOL-UDP #"" + checksummed-payload)))))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(spawn-udp-driver) diff --git a/examples/netstack/ip.rkt b/examples/netstack/ip.rkt deleted file mode 100644 index 6f2e0b3..0000000 --- a/examples/netstack/ip.rkt +++ /dev/null @@ -1,327 +0,0 @@ -#lang racket/base - -(provide (struct-out ip-packet) - ip-address->hostname - ip-string->ip-address - apply-netmask - ip-address-in-subnet? - gestalt->local-ip-addresses - observe-local-ip-addresses-gestalt - broadcast-ip-address - spawn-ip-driver) - -(require racket/set) -(require racket/match) -(require (only-in racket/string string-split)) -(require syndicate/monolithic) -(require syndicate/drivers/timer) -(require syndicate/demand-matcher) -(require bitsyntax) - -(require "dump-bytes.rkt") -(require "configuration.rkt") -(require "checksum.rkt") -(require "ethernet.rkt") -(require "arp.rkt") -(require "on-claim.rkt") - -(struct ip-packet (source-interface ;; string for an ethernet interface, or #f for local interfaces - source - destination - protocol - options - body) - #:prefab) ;; TODO: more fields - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(define (ip-address->hostname bs) - (bit-string-case bs - ([ a b c d ] (format "~a.~a.~a.~a" a b c d)))) - -(define (ip-string->ip-address str) - (list->bytes (map string->number (string-split str ".")))) - -(define (apply-netmask addr netmask) - (bit-string-case addr - ([ (n :: integer bytes 4) ] - (bit-string ((bitwise-and n (arithmetic-shift #x-100000000 (- netmask))) - :: integer bytes 4))))) - -(define (ip-address-in-subnet? addr network netmask) - (equal? (apply-netmask network netmask) - (apply-netmask addr netmask))) - -(define broadcast-ip-address (bytes 255 255 255 255)) - -(define local-ip-address-projector (host-route (?!) ? ?)) -(define (gestalt->local-ip-addresses g) (trie-project/set/single g local-ip-address-projector)) -(define observe-local-ip-addresses-gestalt (subscription (host-route ? ? ?))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(define (spawn-ip-driver) - (list - (spawn-demand-matcher (host-route (?!) (?!) (?!)) - (route-up (host-route (?!) (?!) (?!))) - spawn-host-route) - (spawn-demand-matcher (gateway-route (?!) (?!) (?!) (?!)) - (route-up (gateway-route (?!) (?!) (?!) (?!))) - spawn-gateway-route) - (spawn-demand-matcher (net-route (?!) (?!) (?!)) - (route-up (net-route (?!) (?!) (?!))) - spawn-net-route))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Local IP route - -(define (spawn-host-route my-address netmask interface-name) - (list - (let ((network-addr (apply-netmask my-address netmask))) - (spawn-normal-ip-route (host-route my-address netmask interface-name) - network-addr - netmask - interface-name)) - (spawn (lambda (e s) - (match e - [(scn (? trie-empty?)) (quit)] - [(message (ip-packet _ peer-address _ _ _ body)) - (bit-string-case body - ([ type code (checksum :: integer bytes 2) (rest :: binary) ] ;; TODO: check cksum - (case type - [(8) ;; ECHO (0 is ECHO-REPLY) - (log-info "Ping of ~a from ~a" - (pretty-bytes my-address) - (pretty-bytes peer-address)) - (define reply-data0 (bit-string 0 - code - (0 :: integer bytes 2) ;; TODO - (rest :: binary))) - (transition s (message (ip-packet #f - my-address - peer-address - PROTOCOL-ICMP - #"" - (ip-checksum 2 reply-data0))))] - [else - (log-info "ICMP ~a/~a (cksum ~a) to ~a from ~a:\n~a" - type - code - checksum - (pretty-bytes my-address) - (pretty-bytes peer-address) - (dump-bytes->string rest)) - #f])) - (else #f))] - [_ #f])) - (void) - (scn/union (advertisement (ip-packet ? my-address ? PROTOCOL-ICMP ? ?)) - (subscription (ip-packet ? ? my-address PROTOCOL-ICMP ? ?)) - (assertion (arp-assertion IPv4-ethertype my-address interface-name)) - (subscription (host-route my-address netmask interface-name)))))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Gateway IP route - -(struct gateway-route-state (routes gateway-interface gateway-hwaddr) #:transparent) - -(define (spawn-gateway-route network netmask gateway-addr interface-name) - (define the-route (gateway-route network netmask gateway-addr interface-name)) - - (define host-route-projector (host-route (?!) (?!) ?)) - (define gateway-route-projector (gateway-route (?!) (?!) ? ?)) - (define net-route-projector (net-route (?!) (?!) ?)) - (define gateway-arp-projector (arp-query IPv4-ethertype - gateway-addr - (?! (ethernet-interface interface-name ?)) - (?!))) - - (define (covered-by-some-other-route? addr routes) - (for/or ([r (in-set routes)]) - (match-define (list net msk) r) - (and (positive? msk) - (ip-address-in-subnet? addr net msk)))) - - (spawn (lambda (e s) - (match e - [(scn g) - (define host-ips+netmasks (trie-project/set #:take 2 g host-route-projector)) - (define gw-nets+netmasks (trie-project/set #:take 2 g gateway-route-projector)) - (define net-nets+netmasks (trie-project/set #:take 2 g net-route-projector)) - (define gw-ip+hwaddr - (let ((vs (trie-project/set #:take 2 g gateway-arp-projector))) - (and vs (not (set-empty? vs)) (set-first vs)))) - (when (and gw-ip+hwaddr (not (gateway-route-state-gateway-hwaddr s))) - (log-info "Discovered gateway ~a at ~a on interface ~a." - (ip-address->hostname gateway-addr) - (ethernet-interface-name (car gw-ip+hwaddr)) - (pretty-bytes (cadr gw-ip+hwaddr)))) - (if (trie-empty? (project-assertions g (?! the-route))) - (quit) - (transition (gateway-route-state - (set-union host-ips+netmasks - gw-nets+netmasks - net-nets+netmasks) - (and gw-ip+hwaddr (car gw-ip+hwaddr)) - (and gw-ip+hwaddr (cadr gw-ip+hwaddr))) - '()))] - [(message (? ip-packet? p)) - (define gw-if (gateway-route-state-gateway-interface s)) - (when (not gw-if) - (log-warning "Gateway hwaddr for ~a not known, packet dropped." - (ip-address->hostname gateway-addr))) - (and gw-if - (not (equal? (ip-packet-source-interface p) (ethernet-interface-name gw-if))) - (not (covered-by-some-other-route? (ip-packet-destination p) - (gateway-route-state-routes s))) - (transition s - (message (ethernet-packet gw-if - #f - (ethernet-interface-hwaddr gw-if) - (gateway-route-state-gateway-hwaddr s) - IPv4-ethertype - (format-ip-packet p)))))] - [_ #f])) - (gateway-route-state (set) #f #f) - (scn/union (subscription the-route) - (assertion (route-up the-route)) - (subscription (ip-packet ? ? ? ? ? ?)) - observe-local-ip-addresses-gestalt - (subscription (net-route ? ? ?)) - (subscription (gateway-route ? ? ? ?)) - (subscription (projection->pattern gateway-arp-projector))))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; General net route - -(define (spawn-net-route network-addr netmask link) - (spawn-normal-ip-route (net-route network-addr netmask link) network-addr netmask link)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Normal IP route - -(define (spawn-normal-ip-route the-route network netmask interface-name) - (spawn (lambda (e s) - (match e - [(scn (? trie-empty?)) (quit)] - [(message (ethernet-packet _ _ _ _ _ body)) - (define p (parse-ip-packet interface-name body)) - (and p (transition s (message p)))] - [(message (? ip-packet? p)) - (define destination (ip-packet-destination p)) - (and (not (equal? (ip-packet-source-interface p) interface-name)) - (ip-address-in-subnet? destination network netmask) - (transition - s - (lookup-arp destination - (ethernet-interface interface-name ?) - trie-empty - (lambda (interface destination-hwaddr) - (message (ethernet-packet interface - #f - (ethernet-interface-hwaddr interface) - destination-hwaddr - IPv4-ethertype - (format-ip-packet p)))))))] - [_ #f])) - (void) - (scn/union (subscription the-route) - (assertion (route-up the-route)) - (subscription (ethernet-packet-pattern interface-name #t IPv4-ethertype)) - (assertion (arp-interface interface-name)) - (subscription (ip-packet ? ? ? ? ? ?))))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(define IPv4-ethertype #x0800) - -(define IP-VERSION 4) -(define IP-MINIMUM-HEADER-LENGTH 5) - -(define PROTOCOL-ICMP 1) - -(define default-ttl 64) - -(define (parse-ip-packet interface-name body) - ;; (log-info "IP ~a got body ~a" (pretty-bytes my-address) (pretty-bytes body)) - (bit-string-case body - ([ (= IP-VERSION :: bits 4) - (header-length :: bits 4) - service-type - (total-length :: bits 16) - (id :: bits 16) - (flags :: bits 3) - (fragment-offset :: bits 13) - ttl - protocol - (header-checksum :: bits 16) ;; TODO: check checksum - (source-ip0 :: binary bits 32) - (destination-ip0 :: binary bits 32) - (rest :: binary) ] - (let* ((source-ip (bit-string->bytes source-ip0)) - (destination-ip (bit-string->bytes destination-ip0)) - (options-length (* 4 (- header-length IP-MINIMUM-HEADER-LENGTH))) - (data-length (- total-length (* 4 header-length)))) - (if (and (>= header-length 5) - (>= (bit-string-byte-count body) (* header-length 4))) - (bit-string-case rest - ([ (opts :: binary bytes options-length) - (data :: binary bytes data-length) - (:: binary) ] ;; Very short ethernet packets have a trailer of zeros - (ip-packet interface-name - (bit-string->bytes source-ip) - (bit-string->bytes destination-ip) - protocol - (bit-string->bytes opts) - (bit-string->bytes data)))) - #f))) - (else #f))) - -(define (format-ip-packet p) - (match-define (ip-packet _ src dst protocol options body) p) - - (define header-length ;; TODO: ensure options is a multiple of 4 bytes - (+ IP-MINIMUM-HEADER-LENGTH (quotient (bit-string-byte-count options) 4))) - - (define header0 (bit-string (IP-VERSION :: bits 4) - (header-length :: bits 4) - 0 ;; TODO: service type - ((+ (* header-length 4) (bit-string-byte-count body)) - :: bits 16) - (0 :: bits 16) ;; TODO: identifier - (0 :: bits 3) ;; TODO: flags - (0 :: bits 13) ;; TODO: fragments - default-ttl - protocol - (0 :: bits 16) - (src :: binary bits 32) - (dst :: binary bits 32) - (options :: binary))) - (define full-packet (bit-string ((ip-checksum 10 header0) :: binary) (body :: binary))) - - full-packet) - -(define (lookup-arp ipaddr query-interface-pattern base-gestalt k) - (on-claim #:name (string->symbol (format "lookup-arp:~a" (ip-address->hostname ipaddr))) - (lambda (_g arp-results) - (if (not arp-results) - (error 'ip "Someone has published a wildcard arp result") - (and (not (set-empty? arp-results)) - (match (set-first arp-results) - [(list interface hwaddr) - (log-info "ARP lookup yielded ~a on ~a for ~a" - (pretty-bytes hwaddr) - (ethernet-interface-name interface) - (ip-address->hostname ipaddr)) - (when (> (set-count arp-results) 1) - (log-warning "Ambiguous ARP result for ~a: ~v" - (ip-address->hostname ipaddr) - arp-results)) - (k interface hwaddr)])))) - base-gestalt - (arp-query IPv4-ethertype ipaddr (?! query-interface-pattern) (?!)) - #:timeout-msec 5000 - #:on-timeout (lambda () - (log-warning "ARP lookup of ~a failed, packet dropped" - (ip-address->hostname ipaddr)) - '()))) diff --git a/examples/netstack/main.rkt b/examples/netstack/main.rkt deleted file mode 100644 index 9d694ac..0000000 --- a/examples/netstack/main.rkt +++ /dev/null @@ -1,121 +0,0 @@ -#lang syndicate/monolithic - -(require syndicate/demand-matcher) -(require syndicate/drivers/timer) -(require "demo-config.rkt") -(require "ethernet.rkt") -(require "arp.rkt") -(require "ip.rkt") -(require "tcp.rkt") -(require "udp.rkt") - -;;(log-events-and-actions? #t) - -(spawn-timer-driver) -(spawn-ethernet-driver) -(spawn-arp-driver) -(spawn-ip-driver) -(spawn-tcp-driver) -(spawn-udp-driver) -(spawn-demo-config) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(let () - (local-require racket/set racket/string) - - (define (spawn-session them us) - (define user (gensym 'user)) - (define remote-detector (at-meta (?!))) - (define peer-detector (advertise `(,(?!) says ,?))) - (define (send-to-remote fmt . vs) - (message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))) - (define (say who fmt . vs) - (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) - (list (send-to-remote "Welcome, ~a.\n" user) - (spawn - (lambda (e peers) - (match e - [(message (at-meta (tcp-channel _ _ bs))) - (transition peers (message `(,user says ,(string-trim (bytes->string/utf-8 bs)))))] - [(message `(,who says ,what)) - (transition peers (say who "says: ~a" what))] - [(scn assertions) - (if (trie-empty? (trie-project assertions remote-detector)) - (quit (send-to-remote "Goodbye!\n")) - (let ((new-peers (trie-project/set/single assertions peer-detector))) - (define arrived (set-subtract new-peers peers)) - (define departed (set-subtract peers new-peers)) - (transition new-peers - (list (for/list [(who arrived)] (say who "arrived.")) - (for/list [(who departed)] (say who "departed."))))))] - [#f #f])) - (set) - (scn/union - (subscription `(,? says ,?)) ;; read actual chat messages - (subscription (advertise `(,? says ,?))) ;; observe peer presence - (advertisement `(,user says ,?)) ;; advertise our presence - (subscription (tcp-channel them us ?) #:meta-level 1) ;; read from remote client - (subscription (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client - (advertisement (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client - )))) - - (spawn-dataspace - (spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) - (observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) - #:meta-level 1 - spawn-session)) - ) - -(let () - (spawn (lambda (e s) - (match e - [(message (udp-packet src dst body)) - (log-info "Got packet from ~v: ~v" src body) - (transition s (message - (udp-packet dst - src - (string->bytes/utf-8 (format "You said: ~a" body)))))] - [_ #f])) - (void) - (scn (subscription (udp-packet ? (udp-listener 6667) ?))))) - -(let () - (define (spawn-session them us) - (list - (message 'bump) - (spawn (lambda (e s) - (match e - [(message `(counter ,counter)) - (define response - (string->bytes/utf-8 - (format (string-append - "HTTP/1.0 200 OK\r\n\r\n" - "

Hello world from syndicate-monolithic-netstack!

\n" - "

This is running on syndicate-monolithic's own\n" - "\n" - "TCP/IP stack.

\n" - "

There have been ~a requests prior to this one.

") - counter))) - (quit (message (at-meta (tcp-channel us them response))))] - [_ #f])) - (void) - (scn/union (subscription `(counter ,?)) - (subscription (tcp-channel them us ?) #:meta-level 1) - (subscription (advertise (tcp-channel them us ?)) #:meta-level 1) - (advertisement (tcp-channel us them ?) #:meta-level 1))))) - - (spawn-dataspace - (spawn (lambda (e counter) - (match e - [(message 'bump) - (transition (+ counter 1) (message `(counter ,counter)))] - [_ #f])) - 0 - (scn (subscription 'bump))) - (spawn-demand-matcher (advertise (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?)) - (observe (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?)) - #:meta-level 1 - spawn-session)) - - ) diff --git a/examples/netstack/on-claim.rkt b/examples/netstack/on-claim.rkt deleted file mode 100644 index d517518..0000000 --- a/examples/netstack/on-claim.rkt +++ /dev/null @@ -1,47 +0,0 @@ -#lang racket/base - -(provide on-claim) - -(require syndicate/monolithic) -(require syndicate/drivers/timer) - -;; (Trie (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action))) -;; Trie Projection ... -;; -> Action -;; Spawns a process that observes the given projections. Any time the -;; environment's interests change in a relevant way, calls -;; check-and-maybe-spawn-fn with the aggregate interests and the -;; projection results. If check-and-maybe-spawn-fn returns #f, -;; continues to wait; otherwise, takes the action(s) returned, and -;; quits. -(define (on-claim #:timeout-msec [timeout-msec #f] - #:on-timeout [timeout-handler (lambda () '())] - #:name [name #f] - check-and-maybe-spawn-fn - base-interests - . projections) - (define timer-id (gensym 'on-claim)) - (define (on-claim-handler e state) - (match e - [(scn new-aggregate) - (define projection-results - (map (lambda (p) (trie-project/set #:take (projection-arity p) new-aggregate p)) - projections)) - (define maybe-spawn (apply check-and-maybe-spawn-fn - new-aggregate - projection-results)) - (if maybe-spawn - (quit maybe-spawn) - #f)] - [(message (timer-expired (== timer-id) _)) - (quit (timeout-handler))] - [_ #f])) - (list - (when timeout-msec (message (set-timer timer-id timeout-msec 'relative))) - (spawn #:name name - on-claim-handler - (void) - (scn/union base-interests - (assertion-set-union* - (map (lambda (p) (subscription (projection->pattern p))) projections)) - (subscription (timer-expired timer-id ?)))))) diff --git a/examples/netstack/port-allocator.rkt b/examples/netstack/port-allocator.rkt deleted file mode 100644 index 5dc7229..0000000 --- a/examples/netstack/port-allocator.rkt +++ /dev/null @@ -1,38 +0,0 @@ -#lang racket/base -;; UDP/TCP port allocator - -(provide spawn-port-allocator - (struct-out port-allocation-request)) - -(require racket/set) -(require racket/match) -(require syndicate/monolithic) -(require "ip.rkt") - -(struct port-allocation-request (type k) #:prefab) - -(struct port-allocator-state (used-ports local-ips) #:transparent) - -(define (spawn-port-allocator allocator-type observer-gestalt compute-used-ports) - (spawn #:name (string->symbol (format "port-allocator:~a" allocator-type)) - (lambda (e s) - (match e - [(scn g) - (define local-ips (or (gestalt->local-ip-addresses g) (set))) - (define new-used-ports (compute-used-ports g local-ips)) - (log-info "port-allocator ~v used ports: ~v" allocator-type new-used-ports) - (transition (port-allocator-state new-used-ports local-ips) '())] - [(message (port-allocation-request _ k)) - (define currently-used-ports (port-allocator-state-used-ports s)) - (let randomly-allocate-until-unused () - (define p (+ 1024 (random 64512))) - (if (set-member? currently-used-ports p) - (randomly-allocate-until-unused) - (transition (struct-copy port-allocator-state s - [used-ports (set-add currently-used-ports p)]) - (k p (port-allocator-state-local-ips s)))))] - [_ #f])) - (port-allocator-state (set) (set)) - (scn/union (subscription (port-allocation-request allocator-type ?)) - observe-local-ip-addresses-gestalt - observer-gestalt))) diff --git a/examples/netstack/tcp.rkt b/examples/netstack/tcp.rkt deleted file mode 100644 index 66ee0ea..0000000 --- a/examples/netstack/tcp.rkt +++ /dev/null @@ -1,665 +0,0 @@ -#lang racket/base - -(provide (struct-out tcp-address) - (struct-out tcp-handle) - (struct-out tcp-listener) - (struct-out tcp-channel) - spawn-tcp-driver) - -(require racket/set) -(require racket/match) -(require syndicate/monolithic) -(require syndicate/drivers/timer) -(require syndicate/demand-matcher) -(require bitsyntax) - -(require "dump-bytes.rkt") -(require "checksum.rkt") -(require "ip.rkt") -(require "port-allocator.rkt") - -;; tcp-address/tcp-address : "kernel" tcp connection state machines -;; tcp-handle/tcp-address : "user" outbound connections -;; tcp-listener/tcp-address : "user" inbound connections - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Protocol messages - -(struct tcp-address (host port) #:prefab) -(struct tcp-handle (id) #:prefab) -(struct tcp-listener (port) #:prefab) - -(struct tcp-channel (source destination subpacket) #:prefab) - -(struct tcp-packet (from-wire? - source-ip - source-port - destination-ip - destination-port - sequence-number - ack-number - flags - window-size - options - data) - #:prefab) - -;; (tcp-port-allocation Number (U TcpHandle TcpListener)) -(struct tcp-port-allocation (port handle) #:prefab) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; User-accessible driver startup - -(define (spawn-tcp-driver) - (list (spawn-demand-matcher #:name 'tcp-inbound-driver - (advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?))) - (advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?))) - (lambda (server-addr) - (match-define (tcp-listener port) server-addr) - ;; TODO: have listener shut down once user-level listener does - (list - (spawn #:name (string->symbol - (format "tcp-listener-port-reservation:~a" port)) - (lambda (e s) #f) - (void) - (scn (assertion (tcp-port-allocation port server-addr)))) - (spawn-demand-matcher - #:name (string->symbol (format "tcp-listener:~a" port)) - (advertise (tcp-channel (?! (tcp-address ? ?)) - (?! (tcp-address ? port)) - ?)) - (observe (tcp-channel (?! (tcp-address ? ?)) - (?! (tcp-address ? port)) - ?)) - (spawn-relay server-addr))))) - (spawn-demand-matcher #:name 'tcp-outbound-driver - (advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) - (observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) - allocate-port-and-spawn-socket) - (spawn-tcp-port-allocator) - (spawn-kernel-tcp-driver))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Port allocation - -(define (spawn-tcp-port-allocator) - (spawn-port-allocator 'tcp - (subscription (tcp-port-allocation ? ?)) - (lambda (g local-ips) - (project-assertions g (tcp-port-allocation (?!) ?))))) - -(define (allocate-port-and-spawn-socket local-addr remote-addr) - (message (port-allocation-request - 'tcp - (lambda (port local-ips) - ;; TODO: Choose a sensible IP address for the outbound - ;; connection. We don't have enough information to do this - ;; well at the moment, so just pick some available local IP - ;; address. - ;; - ;; Interesting note: In some sense, the right answer is - ;; "?". This would give us a form of mobility, where IP - ;; addresses only route to a given bucket-of-state and ONLY - ;; the port number selects a substate therein. That's not - ;; how TCP is defined however so we can't do that. - (define appropriate-ip (set-first local-ips)) - (define appropriate-host (ip-address->hostname appropriate-ip)) - (match-define (tcp-address remote-host remote-port) remote-addr) - (define remote-ip (ip-string->ip-address remote-host)) - (list - ((spawn-relay local-addr) remote-addr (tcp-address appropriate-host port)) - (spawn-state-vector remote-ip remote-port appropriate-ip port)))))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Relay between kernel-level and user-level - -(define relay-peer-wait-time-msec 5000) - -(define ((spawn-relay local-user-addr) remote-addr local-tcp-addr) - (define timer-name (list 'spawn-relay local-tcp-addr remote-addr)) - (define local-peer-traffic (?! (observe (tcp-channel remote-addr local-user-addr ?)))) - (define remote-peer-traffic (?! (advertise (tcp-channel remote-addr local-tcp-addr ?)))) - (list - (message (set-timer timer-name relay-peer-wait-time-msec 'relative)) - (spawn #:name (string->symbol (format "tcp-relay:~v:~v:~v" - local-user-addr - remote-addr - local-tcp-addr)) - (lambda (e state) - (match e - [(scn g) - (define local-peer-absent? (trie-empty? (trie-project g local-peer-traffic))) - (define remote-peer-absent? (trie-empty? (trie-project g remote-peer-traffic))) - (define new-state (+ (if local-peer-absent? 0 1) (if remote-peer-absent? 0 1))) - (if (< new-state state) - (quit) - (transition new-state '()))] - [(message (tcp-channel (== local-user-addr) (== remote-addr) bs)) - (transition state (message (tcp-channel local-tcp-addr remote-addr bs)))] - [(message (tcp-channel (== remote-addr) (== local-tcp-addr) bs)) - (transition state (message (tcp-channel remote-addr local-user-addr bs)))] - [(message (timer-expired _ _)) - #:when (< state 2) ;; we only care if we're not fully connected - (error 'spawn-relay "TCP relay process timed out waiting for peer")] - [_ #f])) - 0 - (scn/union (subscription (projection->pattern local-peer-traffic)) - (subscription (projection->pattern remote-peer-traffic)) - (assertion (tcp-port-allocation (tcp-address-port local-tcp-addr) - local-user-addr)) - (subscription (tcp-channel remote-addr local-tcp-addr ?)) - (subscription (tcp-channel local-user-addr remote-addr ?)) - (advertisement (tcp-channel remote-addr local-user-addr ?)) - (advertisement (tcp-channel local-tcp-addr remote-addr ?)) - (subscription (timer-expired timer-name ?)))))) - - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Codec & kernel-level driver - -(define PROTOCOL-TCP 6) - -(struct codec-state (local-ips active-state-vectors) #:transparent) - -(define (spawn-kernel-tcp-driver) - - (define (state-vector-active? statevec s) - (set-member? (codec-state-active-state-vectors s) statevec)) - - (define (analyze-incoming-packet src-ip dst-ip body s) - (bit-string-case body - ([ (src-port :: integer bytes 2) - (dst-port :: integer bytes 2) - (sequence-number :: integer bytes 4) - (ack-number :: integer bytes 4) - (data-offset :: integer bits 4) - (reserved :: integer bits 3) - (ns :: integer bits 1) - (cwr :: integer bits 1) - (ece :: integer bits 1) - (urg :: integer bits 1) - (ack :: integer bits 1) - (psh :: integer bits 1) - (rst :: integer bits 1) - (syn :: integer bits 1) - (fin :: integer bits 1) - (window-size :: integer bytes 2) - (checksum :: integer bytes 2) ;; TODO: check checksum - (urgent-pointer :: integer bytes 2) - (rest :: binary) ] - (let* ((flags (set)) - (statevec (list src-ip src-port dst-ip dst-port)) - (old-active-state-vectors (codec-state-active-state-vectors s)) - (spawn-needed? (and (not (state-vector-active? statevec s)) - (zero? rst)))) ;; don't bother spawning if it's a rst - (define-syntax-rule (set-flags! v ...) - (begin (unless (zero? v) (set! flags (set-add flags 'v))) ...)) - (set-flags! ns cwr ece urg ack psh rst syn fin) - (log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)" - (ip-address->hostname src-ip) - src-port - (ip-address->hostname dst-ip) - dst-port - sequence-number - ack-number - flags - window-size) - (when spawn-needed? (log-info " - spawn needed!")) - (bit-string-case rest - ([ (opts :: binary bytes (- (* data-offset 4) 20)) - (data :: binary) ] - (let ((packet (tcp-packet #t - src-ip - src-port - dst-ip - dst-port - sequence-number - ack-number - flags - window-size - (bit-string->bytes opts) - (bit-string->bytes data)))) - (transition (if spawn-needed? - (struct-copy codec-state s - [active-state-vectors - (set-add old-active-state-vectors statevec)]) - s) - (list - (when spawn-needed? (spawn-state-vector src-ip src-port - dst-ip dst-port)) - ;; TODO: get packet to the new state-vector process somehow - (message packet))))) - (else #f)))) - (else #f))) - - (define statevec-projection (observe (tcp-packet ? (?!) (?!) (?!) (?!) ? ? ? ? ? ?))) - - (define (analyze-gestalt g s) - (define local-ips (gestalt->local-ip-addresses g)) - (define statevecs (trie-project/set #:take 4 g statevec-projection)) - (log-info "gestalt yielded statevecs ~v and local-ips ~v" statevecs local-ips) - (transition (struct-copy codec-state s - [local-ips local-ips] - [active-state-vectors statevecs]) '())) - - (define (deliver-outbound-packet p s) - (match-define (tcp-packet #f - src-ip - src-port - dst-ip - dst-port - sequence-number - ack-number - flags - window-size - options - data) - p) - (log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)" - (ip-address->hostname src-ip) - src-port - (ip-address->hostname dst-ip) - dst-port - sequence-number - ack-number - flags - window-size) - (define (flag-bit sym) (if (set-member? flags sym) 1 0)) - (define payload (bit-string (src-port :: integer bytes 2) - (dst-port :: integer bytes 2) - (sequence-number :: integer bytes 4) - (ack-number :: integer bytes 4) - ((+ 5 (quotient (bit-string-byte-count options) 4)) - :: integer bits 4) ;; TODO: enforce 4-byte alignment - (0 :: integer bits 3) - ((flag-bit 'ns) :: integer bits 1) - ((flag-bit 'cwr) :: integer bits 1) - ((flag-bit 'ece) :: integer bits 1) - ((flag-bit 'urg) :: integer bits 1) - ((flag-bit 'ack) :: integer bits 1) - ((flag-bit 'psh) :: integer bits 1) - ((flag-bit 'rst) :: integer bits 1) - ((flag-bit 'syn) :: integer bits 1) - ((flag-bit 'fin) :: integer bits 1) - (window-size :: integer bytes 2) - (0 :: integer bytes 2) ;; checksum location - (0 :: integer bytes 2) ;; TODO: urgent pointer - (data :: binary))) - (define pseudo-header (bit-string (src-ip :: binary bytes 4) - (dst-ip :: binary bytes 4) - 0 - PROTOCOL-TCP - ((bit-string-byte-count payload) :: integer bytes 2))) - (transition s (message (ip-packet #f src-ip dst-ip PROTOCOL-TCP #"" - (ip-checksum 16 payload #:pseudo-header pseudo-header))))) - - (spawn #:name 'kernel-tcp-driver - (lambda (e s) - (match e - [(scn g) - (analyze-gestalt g s)] - [(message (ip-packet source-if src dst _ _ body)) - #:when (and source-if ;; source-if == #f iff packet originates locally - (set-member? (codec-state-local-ips s) dst)) - (analyze-incoming-packet src dst body s)] - [(message (? tcp-packet? p)) - #:when (not (tcp-packet-from-wire? p)) - (deliver-outbound-packet p s)] - [_ #f])) - (codec-state (set) (set)) - (scn/union (subscription (ip-packet ? ? ? PROTOCOL-TCP ? ?)) - (subscription (tcp-packet #f ? ? ? ? ? ? ? ? ? ?)) - (subscription (observe (tcp-packet #t ? ? ? ? ? ? ? ? ? ?))) - observe-local-ip-addresses-gestalt))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Per-connection state vector process - -(struct buffer (data ;; bit-string - seqn ;; names leftmost byte in data - window ;; counts bytes from leftmost byte in data - finished?) ;; boolean: true after FIN - #:transparent) - -(struct conn-state (outbound ;; buffer - inbound ;; buffer - syn-acked? ;; boolean - latest-peer-activity-time ;; from current-inexact-milliseconds - ;; ^ the most recent time we heard from our peer - user-timeout-base-time ;; from current-inexact-milliseconds - ;; ^ when the index of the first outbound unacknowledged byte changed - local-peer-seen? ;; boolean - listener-listening?) ;; boolean - #:transparent) - -(define transmit-check-interval-msec 2000) -(define inbound-buffer-limit 65535) -(define maximum-segment-size 536) ;; bytes -(define maximum-segment-lifetime-sec (* 2 60)) ;; two minutes; 2MSL is TIME-WAIT timeout -(define user-timeout-msec (* 5 60 1000)) ;; per RFC 793, this should be per-connection, but I - ;; cheat; RFC 793 says "the present global default is five minutes", which is - ;; reasonable to be getting on with - -(define (spawn-state-vector src-ip src-port dst-ip dst-port) - (define src (tcp-address (ip-address->hostname src-ip) src-port)) - (define dst (tcp-address (ip-address->hostname dst-ip) dst-port)) - (define (timer-name kind) (list 'tcp-timer kind src dst)) - - (define (next-expected-seqn s) - (define b (conn-state-inbound s)) - (define v (buffer-seqn b)) - (and v (seq+ v (bit-string-byte-count (buffer-data b))))) - - (define (buffer-push b data) - (struct-copy buffer b [data (bit-string-append (buffer-data b) data)])) - - ;; ConnState -> ConnState - (define (set-inbound-seqn seqn s) - (struct-copy conn-state s - [inbound (struct-copy buffer (conn-state-inbound s) [seqn seqn])])) - - ;; Bitstring ConnState -> Transition - (define (incorporate-segment data s) - ;; (log-info "GOT INBOUND STUFF TO DELIVER ~v" data) - (transition - (if (buffer-finished? (conn-state-inbound s)) - s - (struct-copy conn-state s [inbound (buffer-push (conn-state-inbound s) data)])) - '())) - - (define (seq+ a b) (bitwise-and #xffffffff (+ a b))) - - ;; Always positive - (define (seq- larger smaller) - (if (< larger smaller) ;; wraparound has occurred - (+ (- larger smaller) #x100000000) - (- larger smaller))) - - (define (seq> a b) - (< (seq- a b) #x80000000)) - - (define local-peer-detector (?! (observe (tcp-channel src dst ?)))) - (define listener-detector (?! (observe (advertise (tcp-channel ? (tcp-listener dst-port) ?))))) - - ;; ConnState -> Gestalt - (define (compute-gestalt s) - (define worldward-facing-gestalt - (subscription (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?))) - (define appward-facing-gestalt - (assertion-set-union - (subscription (projection->pattern local-peer-detector)) - (subscription (projection->pattern listener-detector)) - (subscription (tcp-channel dst src ?)) - (if (and (conn-state-syn-acked? s) - (not (buffer-finished? (conn-state-inbound s)))) - (advertisement (tcp-channel src dst ?)) - trie-empty))) - (assertion-set-union (subscription (timer-expired (timer-name ?) ?)) - worldward-facing-gestalt - appward-facing-gestalt)) - - ;; ConnState -> Transition - (define (deliver-inbound-locally s) - (define b (conn-state-inbound s)) - (if (bit-string-empty? (buffer-data b)) - (transition s '()) - (let ((chunk (bit-string->bytes (buffer-data b)))) - (transition (struct-copy conn-state s - [inbound (struct-copy buffer b - [data #""] - [seqn (seq+ (buffer-seqn b) (bytes-length chunk))])]) - (message (tcp-channel src dst chunk)))))) - - ;; (Setof Symbol) -> ConnState -> Transition - (define ((check-fin flags) s) - (define b (conn-state-inbound s)) - (unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally - (error 'check-fin "Nonempty inbound buffer")) - (if (set-member? flags 'fin) - (let ((new-s (struct-copy conn-state s - [inbound (struct-copy buffer b - [seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte - [finished? #t])]))) - (log-info "Closing inbound stream.") - (transition new-s (scn (compute-gestalt new-s)))) - (transition s '()))) - - ;; Boolean SeqNum -> ConnState -> Transition - (define ((discard-acknowledged-outbound ack? ackn) s) - (if (not ack?) - (transition s '()) - (let* ((b (conn-state-outbound s)) - (base (buffer-seqn b)) - (limit (seq+ (buffer-seqn b) (bit-string-byte-count (buffer-data b)))) - (ackn (if (seq> ackn limit) limit ackn)) - (ackn (if (seq> base ackn) base ackn)) - (dist (seq- ackn base))) - (define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset! - (define new-s (struct-copy conn-state s - [user-timeout-base-time (current-inexact-milliseconds)] - [outbound (struct-copy buffer b [data remaining-data] [seqn ackn])] - [syn-acked? (or (conn-state-syn-acked? s) - (positive? dist))])) - (transition new-s - (when (and (not (conn-state-syn-acked? s)) (positive? dist)) - (scn (compute-gestalt new-s))))))) - - ;; Nat -> ConnState -> Transition - (define ((update-outbound-window peer-window) s) - (transition (struct-copy conn-state s - [outbound (struct-copy buffer (conn-state-outbound s) - [window peer-window])]) - '())) - - ;; ConnState -> Boolean - (define (all-output-acknowledged? s) - (bit-string-empty? (buffer-data (conn-state-outbound s)))) - - ;; (Option SeqNum) -> ConnState -> Transition - (define ((send-outbound old-ackn) s) - (define b (conn-state-outbound s)) - (define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b)) - (if (buffer-finished? b) 1 0)))) - - (define segment-size (min maximum-segment-size - (if (conn-state-syn-acked? s) (buffer-window b) 1) - ;; ^ can only send SYN until SYN is acked - pending-byte-count)) - (define segment-offset (if (conn-state-syn-acked? s) 0 1)) - (define chunk0 (bit-string-take (buffer-data b) (* segment-size 8))) ;; bit offset! - (define chunk (bit-string-drop chunk0 (* segment-offset 8))) ;; bit offset! - (define ackn (next-expected-seqn s)) - (define flags (set)) - (when ackn - (set! flags (set-add flags 'ack))) - (when (not (conn-state-syn-acked? s)) - (set! flags (set-add flags 'syn))) - (when (and (buffer-finished? b) - (conn-state-syn-acked? s) - (= segment-size pending-byte-count) - (not (all-output-acknowledged? s))) ;; TODO: reexamine. This looks fishy - (set! flags (set-add flags 'fin))) - (define window (min 65535 ;; limit of field width - (max 0 ;; can't be negative - (- (buffer-window (conn-state-inbound s)) - (bit-string-byte-count - (buffer-data (conn-state-inbound s))))))) - (transition s - (unless (and (equal? ackn old-ackn) - (conn-state-syn-acked? s) - (not (set-member? flags 'fin)) - (zero? (bit-string-byte-count chunk))) - (local-require racket/pretty) - (pretty-write `(send-outbound (old-ackn ,old-ackn) - (s ,s) - (flags ,flags))) - (flush-output) - (message (tcp-packet #f dst-ip dst-port src-ip src-port - (buffer-seqn b) - (or ackn 0) - flags - window - #"" - chunk))))) - - ;; ConnState -> Transition - (define (bump-peer-activity-time s) - (transition (struct-copy conn-state s - [latest-peer-activity-time (current-inexact-milliseconds)]) - '())) - - ;; ConnState Number -> Boolean - (define (heard-from-peer-within-msec? s msec) - (<= (- (current-inexact-milliseconds) (conn-state-latest-peer-activity-time s)) msec)) - - ;; ConnState -> Boolean - (define (user-timeout-expired? s) - (and (not (all-output-acknowledged? s)) - (> (- (current-inexact-milliseconds) (conn-state-user-timeout-base-time s)) - user-timeout-msec))) - - ;; ConnState -> Transition - (define (quit-when-done s) - (cond - [(and (buffer-finished? (conn-state-outbound s)) - (buffer-finished? (conn-state-inbound s)) - (all-output-acknowledged? s) - (not (heard-from-peer-within-msec? s (* 2 1000 maximum-segment-lifetime-sec)))) - ;; Everything is cleanly shut down, and we just need to wait a while for unexpected - ;; packets before we release the state vector. - (quit)] - [(user-timeout-expired? s) - ;; We've been plaintively retransmitting for user-timeout-msec without hearing anything - ;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but - ;; it will do for now? TODO - (log-info "TCP_USER_TIMEOUT fired.") - (quit)] - [else #f])) - - ;; Action - (define send-set-transmit-check-timer - (message (set-timer (timer-name 'transmit-check) - transmit-check-interval-msec - 'relative))) - - ;; SeqNum SeqNum ConnState -> Transition - (define (reset seqn ackn s) - (log-warning "Sending RST from ~a:~a to ~a:~a" - (ip-address->hostname dst-ip) - dst-port - (ip-address->hostname src-ip) - src-port) - (quit (message (tcp-packet #f dst-ip dst-port src-ip src-port - seqn - ackn - (set 'ack 'rst) - 0 - #"" - #"")))) - - ;; ConnState -> Transition - (define (close-outbound-stream s) - (define b (conn-state-outbound s)) - (transition - (if (buffer-finished? b) - s - (struct-copy conn-state s - [outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte - [finished? #t])])) - '())) - - (define (state-vector-behavior e s) - (define old-ackn (buffer-seqn (conn-state-inbound s))) - (match e - [(scn g) - (log-info "State vector routing-update:\n~a" (trie->pretty-string g)) - (define local-peer-present? (trie-non-empty? (trie-project g local-peer-detector))) - (define listening? (trie-non-empty? (trie-project g listener-detector))) - (define new-s (struct-copy conn-state s [listener-listening? listening?])) - (cond - [(and local-peer-present? (not (conn-state-local-peer-seen? s))) - (transition (struct-copy conn-state new-s [local-peer-seen? #t]) '())] - [(and (not local-peer-present?) (conn-state-local-peer-seen? s)) - (log-info "Closing outbound stream.") - (sequence-transitions (close-outbound-stream new-s) - (send-outbound old-ackn) - quit-when-done)] - [else (transition new-s '())])] - [(message (tcp-packet #t _ _ _ _ seqn ackn flags window options data)) - (define expected (next-expected-seqn s)) - (define is-syn? (set-member? flags 'syn)) - (define is-fin? (set-member? flags 'fin)) - (cond - [(set-member? flags 'rst) (quit)] - [(and (not expected) ;; no syn yet - (or (not is-syn?) ;; and this isn't it - (and (not (conn-state-listener-listening? s)) ;; or it is, but no listener... - (not (conn-state-local-peer-seen? s))))) ;; ...and no outbound client - (reset ackn ;; this is *our* seqn - (seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0))) - ;; ^^ this is what we should acknowledge... - s)] - [else - (sequence-transitions (cond - [(not expected) ;; haven't seen syn yet, but we know this is it - (incorporate-segment data (set-inbound-seqn (seq+ seqn 1) s))] - [(= expected seqn) - (incorporate-segment data s)] - [else - (transition s '())]) - deliver-inbound-locally - (check-fin flags) - (discard-acknowledged-outbound (set-member? flags 'ack) ackn) - (update-outbound-window window) - (send-outbound old-ackn) - bump-peer-activity-time - quit-when-done)])] - [(message (tcp-channel _ _ bs)) - ;; (log-info "GOT MORE STUFF TO DELIVER ~v" bs) - (sequence-transitions (transition (struct-copy conn-state s - [user-timeout-base-time - ;; Only move user-timeout-base-time if there wasn't - ;; already some outstanding output. - (if (all-output-acknowledged? s) - (current-inexact-milliseconds) - (conn-state-user-timeout-base-time s))] - [outbound (buffer-push (conn-state-outbound s) bs)]) - '()) - (send-outbound old-ackn) - quit-when-done)] - [(message (timer-expired (== (timer-name 'transmit-check)) _)) - ;; TODO: I am abusing this timer for multiple tasks. Notably, this is a (crude) means of - ;; retransmitting outbound data as well as a means of checking for an expired - ;; TCP_USER_TIMEOUT. A better design would have separate timers and a more fine-grained - ;; approach. - (sequence-transitions (transition s send-set-transmit-check-timer) - (send-outbound old-ackn) - quit-when-done)] - [_ #f])) - - ;; (local-require racket/trace) - ;; (trace state-vector-behavior) - - (define initial-outbound-seqn - ;; Yuck - (inexact->exact (truncate (* #x100000000 (random))))) - - ;; TODO accept input from user process - (list - send-set-transmit-check-timer - (let ((state0 (conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position - (buffer #"" #f inbound-buffer-limit #f) - #f - (current-inexact-milliseconds) - (current-inexact-milliseconds) - #f - #f))) - (spawn #:name - (string->symbol (format "tcp-state-vector:~a:~a:~a:~a" - (ip-address->hostname src-ip) - src-port - (ip-address->hostname dst-ip) - dst-port)) - state-vector-behavior - state0 - (scn (compute-gestalt state0)))))) diff --git a/examples/netstack/udp.rkt b/examples/netstack/udp.rkt deleted file mode 100644 index 7cc2726..0000000 --- a/examples/netstack/udp.rkt +++ /dev/null @@ -1,175 +0,0 @@ -#lang racket/base - -(provide (struct-out udp-remote-address) - (struct-out udp-handle) - (struct-out udp-listener) - udp-address? - udp-local-address? - (struct-out udp-packet) - spawn-udp-driver) - -(require racket/set) -(require racket/match) -(require syndicate/monolithic) -(require syndicate/demand-matcher) -(require bitsyntax) - -(require "dump-bytes.rkt") -(require "checksum.rkt") -(require "ip.rkt") -(require "port-allocator.rkt") - -;; udp-address/udp-address : "kernel" udp connection state machines -;; udp-handle/udp-address : "user" outbound connections -;; udp-listener/udp-address : "user" inbound connections - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Protocol messages - -(struct udp-remote-address (host port) #:prefab) -(struct udp-handle (id) #:prefab) -(struct udp-listener (port) #:prefab) - -(define (udp-address? x) - (or (udp-remote-address? x) - (udp-local-address? x))) - -(define (udp-local-address? x) - (or (udp-handle? x) - (udp-listener? x))) - -;; USER-level protocol -(struct udp-packet (source destination body) #:prefab) - -;; KERNEL-level protocol -(struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab) -(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress) - -(define any-remote (udp-remote-address ? ?)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; User-accessible driver startup - -(define (spawn-udp-driver) - (list - (spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?)) - (advertise (udp-packet ? (?! (udp-listener ?)) ?)) - (lambda (handle) (spawn-udp-relay (udp-listener-port handle) handle))) - (spawn-demand-matcher (observe (udp-packet ? (?! (udp-handle ?)) ?)) - (advertise (udp-packet ? (?! (udp-handle ?)) ?)) - (lambda (handle) - (message (port-allocation-request - 'udp - (lambda (port local-ips) (spawn-udp-relay port handle)))))) - (spawn-udp-port-allocator) - (spawn-kernel-udp-driver))) - -(define (spawn-udp-port-allocator) - (define udp-projector (udp-port-allocation (?!) ?)) - (spawn-port-allocator 'udp - (subscription (projection->pattern udp-projector)) - (lambda (g local-ips) - (project-assertions g udp-projector)))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Relaying - -(define (spawn-udp-relay local-port local-user-addr) - (log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr) - - (define local-peer-detector (?! (observe (udp-packet any-remote local-user-addr ?)))) - - (define (compute-gestalt local-ips) - (for/fold [(g (assertion-set-union - (subscription (projection->pattern local-peer-detector)) - (advertisement (udp-packet any-remote local-user-addr ?)) - observe-local-ip-addresses-gestalt - (subscription (udp-packet local-user-addr any-remote ?)) - (assertion (udp-port-allocation local-port local-user-addr))))] - [(ip (in-set local-ips))] - (assertion-set-union g - (subscription (udp-datagram ? ? ip local-port ?)) - (advertisement (udp-datagram ip local-port ? ? ?))))) - - (spawn (lambda (e local-ips) - (match e - [(scn g) - (define new-local-ips (gestalt->local-ip-addresses g)) - (if (trie-empty? (trie-project g local-peer-detector)) - (quit) - (transition new-local-ips (scn (compute-gestalt new-local-ips))))] - [(message (udp-packet (== local-user-addr) remote-addr bs)) - ;; Choose arbitrary local IP address for outbound packet! - ;; TODO: what can be done? Must I examine the routing table? - (match-define (udp-remote-address remote-host remote-port) remote-addr) - (define remote-ip (ip-string->ip-address remote-host)) - (transition local-ips (message (udp-datagram (set-first local-ips) - local-port - remote-ip - remote-port - bs)))] - [(message (udp-datagram si sp _ _ bs)) - (transition local-ips - (message (udp-packet (udp-remote-address (ip-address->hostname si) sp) - local-user-addr - bs)))] - [_ #f])) - (set) - (scn (compute-gestalt (set))))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Codec & kernel-level driver - -(define PROTOCOL-UDP 17) - -(define (spawn-kernel-udp-driver) - (spawn (lambda (e local-ips) - (match e - [(scn g) - (transition (gestalt->local-ip-addresses g) '())] - [(message (ip-packet source-if src-ip dst-ip _ _ body)) - #:when (and source-if (set-member? local-ips dst-ip)) - (bit-string-case body - ([ (src-port :: integer bytes 2) - (dst-port :: integer bytes 2) - (length :: integer bytes 2) - (checksum :: integer bytes 2) ;; TODO: check checksum - (data :: binary) ] - (bit-string-case data - ([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes - (:: binary) ] - (transition local-ips (message (udp-datagram src-ip - src-port - dst-ip - dst-port - (bit-string->bytes payload))))) - (else #f))) - (else #f))] - [(message (udp-datagram src-ip src-port dst-ip dst-port bs)) - #:when (set-member? local-ips src-ip) - (let* ((payload (bit-string (src-port :: integer bytes 2) - (dst-port :: integer bytes 2) - ((+ 8 (bit-string-byte-count bs)) - :: integer bytes 2) - (0 :: integer bytes 2) ;; checksum location - (bs :: binary))) - (pseudo-header (bit-string (src-ip :: binary bytes 4) - (dst-ip :: binary bytes 4) - 0 - PROTOCOL-UDP - ((bit-string-byte-count payload) - :: integer bytes 2))) - (checksummed-payload (ip-checksum #:pseudo-header pseudo-header - 6 payload))) - (transition local-ips (message (ip-packet #f - src-ip - dst-ip - PROTOCOL-UDP - #"" - checksummed-payload))))] - [_ #f])) - (set) - (scn/union (advertisement (ip-packet #f ? ? PROTOCOL-UDP ? ?)) - (subscription (ip-packet ? ? ? PROTOCOL-UDP ? ?)) - (subscription (udp-datagram ? ? ? ? ?)) - observe-local-ip-addresses-gestalt)))