Copy monolithic netstack implementation to subdir, for future reference

This commit is contained in:
Tony Garnock-Jones 2016-07-21 17:02:34 -04:00
parent 4357424e78
commit b2d5a3f74d
16 changed files with 2009 additions and 0 deletions

View File

@ -0,0 +1,7 @@
all:
run:
raco make main.rkt && racket main.rkt
clean:
find . -name compiled -type d | xargs rm -rf

View File

@ -0,0 +1,16 @@
# TCP/IP Stack
## Linux Firewall Configuration
Imagine a setup where the machine you are running this code has IP
192.168.1.10. This code claims 192.168.1.222 for itself. Now, pinging
192.168.1.222 from some other machine, say 192.168.1.99, will cause
the local kernel to receive the pings and then *forward them on to
192.168.1.222*, which because of the gratuitous ARP announcement, it
knows to be on its own Ethernet MAC address. This causes the ping
requests to repeat endlessly, each time with one lower TTL.
One approach to solving the problem is to prevent the kernel from
forwarding packets addressed to 192.168.1.222. To do this,
sudo iptables -I FORWARD -d 192.168.1.222 -j DROP

View File

@ -0,0 +1,18 @@
Ideas on TCP unit testing:
<https://www.snellman.net/blog/archive/2015-07-09-unit-testing-a-tcp-stack/>
Check behaviour around TCP zero-window probing. Is the correct
behaviour already a consequence of the way `send-outbound` works?
Do something smarter with TCP timers and RTT estimation than the
nothing that's already being done.
TCP options negotiation.
- SACK
- Window scaling
Bugs:
- RST kills a connection even if its sequence number is bogus. Check
to make sure it's in the window. (See
http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41848.pdf
and RFC 5961)

View File

@ -0,0 +1,235 @@
#lang racket/base
;; ARP protocol, http://tools.ietf.org/html/rfc826
;; Only does ARP-over-ethernet.
(provide (struct-out arp-query)
(struct-out arp-assertion)
(struct-out arp-interface)
spawn-arp-driver)
(require racket/set)
(require racket/match)
(require syndicate/monolithic)
(require syndicate/drivers/timer)
(require syndicate/demand-matcher)
(require bitsyntax)
(require "dump-bytes.rkt")
(require "configuration.rkt")
(require "ethernet.rkt")
(struct arp-query (protocol protocol-address interface link-address) #:prefab)
(struct arp-assertion (protocol protocol-address interface-name) #:prefab)
(struct arp-interface (interface-name) #:prefab)
(struct arp-interface-up (interface-name) #:prefab)
(define ARP-ethertype #x0806)
(define cache-entry-lifetime-msec (* 14400 1000))
(define wakeup-interval 5000)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (spawn-arp-driver)
(spawn-demand-matcher (arp-interface (?!))
(arp-interface-up (?!))
spawn-arp-interface))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(struct cache-key (protocol address) #:transparent)
(struct cache-value (expiry interface address) #:transparent)
(struct state (cache queries assertions) #:transparent)
(define (spawn-arp-interface interface-name)
(log-info "spawn-arp-interface ~v" interface-name)
(lookup-ethernet-hwaddr (assertion (arp-interface-up interface-name))
interface-name
(lambda (hwaddr) (spawn-arp-interface* interface-name hwaddr))))
(define (spawn-arp-interface* interface-name hwaddr)
(log-info "spawn-arp-interface* ~v ~v" interface-name hwaddr)
(define interface (ethernet-interface interface-name hwaddr))
(define (expire-cache cache)
(define now (current-inexact-milliseconds))
(define (not-expired? v) (< now (cache-value-expiry v)))
(for/hash [((k v) (in-hash cache)) #:when (not-expired? v)]
(values k v)))
(define timer-key (list 'arp interface-name))
(define (set-wakeup-alarm)
(message (set-timer timer-key wakeup-interval 'relative)))
(define (compute-gestalt cache)
(scn/union (subscription (timer-expired timer-key ?))
(subscription interface)
(subscription (ethernet-packet-pattern interface-name #t ARP-ethertype))
(assertion (arp-interface-up interface-name))
(subscription (arp-assertion ? ? interface-name))
(subscription (observe (arp-query ? ? interface ?)))
(for/fold [(g trie-empty)] [((k v) (in-hash cache))]
(assertion-set-union g (assertion (arp-query (cache-key-protocol k)
(cache-key-address k)
(cache-value-interface v)
(cache-value-address v)))))))
(define (build-packet dest-mac ptype oper sender-ha sender-pa target-ha target-pa)
(define hlen (bytes-length target-ha))
(define plen (bytes-length target-pa))
(define packet (bit-string->bytes
(bit-string (1 :: integer bytes 2)
(ptype :: integer bytes 2)
hlen
plen
(oper :: integer bytes 2)
(sender-ha :: binary bytes hlen)
(sender-pa :: binary bytes plen)
(target-ha :: binary bytes hlen)
(target-pa :: binary bytes plen))))
(ethernet-packet interface
#f
hwaddr
dest-mac
ARP-ethertype
packet))
(define (analyze-incoming-packet source destination body s)
(bit-string-case body
([ (= 1 :: integer bytes 2)
(ptype :: integer bytes 2)
hlen
plen
(oper :: integer bytes 2)
(sender-hardware-address0 :: binary bytes hlen)
(sender-protocol-address0 :: binary bytes plen)
(target-hardware-address0 :: binary bytes hlen)
(target-protocol-address0 :: binary bytes plen)
(:: binary) ;; The extra zeros exist because ethernet packets
;; have a minimum size. This is, in part, why
;; IPv4 headers have a total-length field, so
;; that the zero padding can be removed.
]
(let ()
(define sender-protocol-address (bit-string->bytes sender-protocol-address0))
(define sender-hardware-address (bit-string->bytes sender-hardware-address0))
(define target-protocol-address (bit-string->bytes target-protocol-address0))
(define learned-key (cache-key ptype sender-protocol-address))
(when (and (set-member? (state-queries s) learned-key) ;; it is relevant to our interests
(not (equal? sender-hardware-address
(cache-value-address (hash-ref (state-cache s)
learned-key
(lambda ()
(cache-value #f #f #f)))))))
(log-info "~a ARP Adding ~a = ~a to cache"
interface-name
(pretty-bytes sender-protocol-address)
(pretty-bytes sender-hardware-address)))
(define cache (hash-set (expire-cache (state-cache s))
learned-key
(cache-value (+ (current-inexact-milliseconds)
cache-entry-lifetime-msec)
interface
sender-hardware-address)))
(transition (struct-copy state s [cache cache])
(list
(case oper
[(1) ;; request
(if (set-member? (state-assertions s)
(cache-key ptype target-protocol-address))
(begin
(log-info "~a ARP answering request for ~a/~a"
interface-name
ptype
(pretty-bytes target-protocol-address))
(message (build-packet sender-hardware-address
ptype
2 ;; reply
hwaddr
target-protocol-address
sender-hardware-address
sender-protocol-address)))
'())]
[(2) '()] ;; reply
[else '()])
(compute-gestalt cache)))))
(else #f)))
(define queries-projection (observe (arp-query (?!) (?!) ? ?)))
(define (gestalt->queries g)
(for/set [(e (in-set (trie-project/set #:take 2 g queries-projection)))]
(match-define (list ptype pa) e)
(cache-key ptype pa)))
(define assertions-projection (arp-assertion (?!) (?!) ?))
(define (gestalt->assertions g)
(for/set [(e (in-set (trie-project/set #:take 2 g assertions-projection)))]
(match-define (list ptype pa) e)
(cache-key ptype pa)))
(define (analyze-gestalt g s)
(define new-assertions (gestalt->assertions g))
(define added-assertions (set-subtract new-assertions (state-assertions s)))
(define new-s (struct-copy state s [queries (gestalt->queries g)] [assertions new-assertions]))
(if (trie-empty? (project-assertions g (arp-interface interface-name)))
(quit)
(transition new-s
(list
(for/list [(a (in-set added-assertions))]
(log-info "~a ARP Announcing ~a as ~a"
interface-name
(pretty-bytes (cache-key-address a))
(pretty-bytes hwaddr))
(message (build-packet broadcast-ethernet-address
(cache-key-protocol a)
2 ;; reply -- gratuitous announcement
hwaddr
(cache-key-address a)
hwaddr
(cache-key-address a))))))))
(define (send-questions s)
(define unanswered-queries
(set-subtract (state-queries s) (list->set (hash-keys (state-cache s)))))
(define (some-asserted-pa ptype)
(match (filter (lambda (k) (equal? (cache-key-protocol k) ptype))
(set->list (state-assertions s)))
['() #f]
[(list* k _) (cache-key-address k)]))
(transition s
(for/list [(q (in-set unanswered-queries))]
(define pa (some-asserted-pa (cache-key-protocol q)))
(log-info "~a ARP Asking for ~a from ~a"
interface-name
(pretty-bytes (cache-key-address q))
(and pa (pretty-bytes pa)))
(when pa
(message (build-packet broadcast-ethernet-address
(cache-key-protocol q)
1 ;; request
hwaddr
pa
zero-ethernet-address
(cache-key-address q)))))))
(list (set-wakeup-alarm)
(spawn (lambda (e s)
;; (log-info "ARP ~a ~a: ~v // ~v" interface-name (pretty-bytes hwaddr) e s)
(match e
[(scn g)
(sequence-transitions (analyze-gestalt g s)
send-questions)]
[(message (ethernet-packet _ _ source destination _ body))
(analyze-incoming-packet source destination body s)]
[(message (timer-expired _ _))
(define new-s (struct-copy state s
[cache (expire-cache (state-cache s))]))
(sequence-transitions (transition new-s
(list (set-wakeup-alarm)
(compute-gestalt (state-cache new-s))))
send-questions)]
[_ #f]))
(state (hash) (set) (set))
(compute-gestalt (hash)))))

View File

@ -0,0 +1,52 @@
#lang racket/base
(provide ones-complement-sum16 ip-checksum)
(require bitsyntax)
(require "dump-bytes.rkt")
(define (ones-complement-+16 a b)
(define c (+ a b))
(bitwise-and #xffff (+ (arithmetic-shift c -16) c)))
(define (ones-complement-sum16 bs)
(bit-string-case bs
([ (n :: integer bytes 2) (rest :: binary) ]
(ones-complement-+16 n (ones-complement-sum16 rest)))
([ odd-byte ]
(arithmetic-shift odd-byte 8))
([ ]
0)))
(define (ones-complement-negate16-safely x)
(define r (bitwise-and #xffff (bitwise-not x)))
(if (= r 0) #xffff r))
(define (ip-checksum offset blob #:pseudo-header [pseudo-header #""])
(bit-string-case blob
([ (prefix :: binary bytes offset)
(:: binary bytes 2)
(suffix :: binary) ]
;; (log-info "Packet pre checksum:\n~a" (dump-bytes->string blob))
(define result (ones-complement-+16
(ones-complement-sum16 pseudo-header)
(ones-complement-+16 (ones-complement-sum16 prefix)
(ones-complement-sum16 suffix))))
;; (log-info "result: ~a" (number->string result 16))
(define checksum (ones-complement-negate16-safely result))
;; (log-info "Checksum ~a" (number->string checksum 16))
(define final-packet (bit-string (prefix :: binary)
(checksum :: integer bytes 2)
(suffix :: binary)))
;; (log-info "Packet with checksum:\n~a" (dump-bytes->string final-packet))
final-packet)))
(module+ test
(require rackunit)
(check-equal? (ones-complement-negate16-safely
(ones-complement-sum16 (bytes #x45 #x00 #x00 #x54
#x00 #x00 #x00 #x00
#x40 #x01 #x00 #x00
#xc0 #xa8 #x01 #xde
#xc0 #xa8 #x01 #x8f)))
#xf5eb))

View File

@ -0,0 +1,21 @@
#lang racket/base
(provide (struct-out ethernet-interface)
(struct-out host-route)
(struct-out gateway-route)
(struct-out net-route)
(struct-out route-up))
(struct ethernet-interface (name hwaddr) #:prefab)
;; A Route is one of
;; - (host-route IpAddrBytes NetmaskNat InterfaceName), an own-IP route
;; - (gateway-route NetAddrBytes NetmaskNat IpAddrBytes InterfaceName), a gateway for a subnet
;; - (net-route NetAddrBytes NetmaskNat InterfaceName), an ethernet route for a subnet
;; NetmaskNat in a net-route is a default route.
(struct host-route (ip-addr netmask interface-name) #:prefab)
(struct gateway-route (network-addr netmask gateway-addr interface-name) #:prefab)
(struct net-route (network-addr netmask link) #:prefab)
(struct route-up (route) #:prefab) ;; assertion: the given Route is running

View File

@ -0,0 +1,25 @@
#lang racket/base
;; Demonstration stack configuration for various hosts.
(require racket/match)
(require syndicate/monolithic)
(require (only-in mzlib/os gethostname))
(require "configuration.rkt")
(provide spawn-demo-config)
(define (spawn-demo-config)
(spawn (lambda (e s) #f)
(void)
(match (gethostname)
["skip"
(scn/union (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "en0"))
(assertion (host-route (bytes 192 168 1 222) 24 "en0")))]
[(or "hop" "walk")
(scn/union (assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 192 168 1 1) "wlan0"))
(assertion (host-route (bytes 192 168 1 222) 24 "wlan0")))]
["stockholm.ccs.neu.edu"
(scn/union (assertion (host-route (bytes 129 10 115 94) 24 "eth0"))
(assertion (gateway-route (bytes 0 0 0 0) 0 (bytes 129 10 115 1) "eth0")))]
[else
(error 'spawn-demo-config "No setup for hostname ~a" (gethostname))])))

View File

@ -0,0 +1,80 @@
#lang racket/base
;; Copyright (C) 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>
;;
;; dump-bytes.rkt is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; dump-bytes.rkt is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with dump-bytes.rkt. If not, see <http://www.gnu.org/licenses/>.
;; Pretty hex dump output of a Bytes.
(provide dump-bytes!
dump-bytes->string
pretty-bytes)
(require (only-in bitsyntax bit-string->bytes))
(require (only-in file/sha1 bytes->hex-string))
(define (pretty-bytes bs)
(bytes->hex-string (bit-string->bytes bs)))
;; Exact Exact -> String
;; Returns the "0"-padded, width-digit hex representation of n
(define (hex width n)
(define s (number->string n 16))
(define slen (string-length s))
(cond
((< slen width) (string-append (make-string (- width slen) #\0) s))
((= slen width) s)
((> slen width) (substring s 0 width))))
;; Bytes Exact -> Void
;; Prints a pretty hex/ASCII dump of bs on (current-output-port).
(define (dump-bytes! bs0 [requested-count #f] #:base [baseaddr 0])
(define bs (bit-string->bytes bs0))
(define count (if requested-count (min requested-count (bytes-length bs)) (bytes-length bs)))
(define clipped (subbytes bs 0 count))
(define (dump-hex i)
(if (< i count)
(display (hex 2 (bytes-ref clipped i)))
(display " "))
(display #\space))
(define (dump-char i)
(if (< i count)
(let ((ch (bytes-ref clipped i)))
(if (<= 32 ch 127)
(display (integer->char ch))
(display #\.)))
(display #\space)))
(define (for-each-between f low high)
(do ((i low (+ i 1)))
((= i high))
(f i)))
(define (dump-line i)
(display (hex 8 (+ i baseaddr)))
(display #\space)
(for-each-between dump-hex i (+ i 8))
(display ": ")
(for-each-between dump-hex (+ i 8) (+ i 16))
(display #\space)
(for-each-between dump-char i (+ i 8))
(display " : ")
(for-each-between dump-char (+ i 8) (+ i 16))
(newline))
(do ((i 0 (+ i 16)))
((>= i count))
(dump-line i)))
(define (dump-bytes->string bs [requested-count #f] #:base [baseaddr 0])
(define s (open-output-string))
(parameterize ((current-output-port s))
(dump-bytes! bs requested-count #:base baseaddr))
(get-output-string s))

View File

@ -0,0 +1,134 @@
#lang racket/base
;; Ethernet driver
(provide (struct-out ethernet-packet)
zero-ethernet-address
broadcast-ethernet-address
interface-names
spawn-ethernet-driver
ethernet-packet-pattern
lookup-ethernet-hwaddr)
(require racket/set)
(require racket/match)
(require racket/async-channel)
(require syndicate/monolithic)
(require syndicate/demand-matcher)
(require "on-claim.rkt")
(require packet-socket)
(require bitsyntax)
(require "configuration.rkt")
(require "dump-bytes.rkt")
(struct ethernet-packet (interface from-wire? source destination ethertype body) #:prefab)
(define zero-ethernet-address (bytes 0 0 0 0 0 0))
(define broadcast-ethernet-address (bytes 255 255 255 255 255 255))
(define interface-names (raw-interface-names))
(log-info "Device names: ~a" interface-names)
(define (spawn-ethernet-driver)
(spawn-demand-matcher (observe (ethernet-packet (ethernet-interface (?!) ?) #t ? ? ? ?))
(ethernet-interface (?!) ?)
spawn-interface-tap))
(define (spawn-interface-tap interface-name)
(define h (raw-interface-open interface-name))
(define interface (ethernet-interface interface-name (raw-interface-hwaddr h)))
(cond
[(not h)
(log-error "ethernet: Couldn't open interface ~v" interface-name)
'()]
[else
(log-info "Opened interface ~a, yielding handle ~v" interface-name h)
(define control-ch (make-async-channel))
(thread (lambda () (interface-packet-read-loop interface h control-ch)))
(spawn (lambda (e h)
(match e
[(scn g)
(if (trie-empty? g)
(begin (async-channel-put control-ch 'quit)
(quit))
(begin (async-channel-put control-ch 'unblock)
#f))]
[(message (at-meta (? ethernet-packet? p)))
;; (log-info "Interface ~a inbound packet ~a -> ~a (type 0x~a)"
;; (ethernet-interface-name (ethernet-packet-interface p))
;; (pretty-bytes (ethernet-packet-source p))
;; (pretty-bytes (ethernet-packet-destination p))
;; (number->string (ethernet-packet-ethertype p) 16))
;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p)))
(transition h (message p))]
[(message (? ethernet-packet? p))
;; (log-info "Interface ~a OUTBOUND packet ~a -> ~a (type 0x~a)"
;; (ethernet-interface-name (ethernet-packet-interface p))
;; (pretty-bytes (ethernet-packet-source p))
;; (pretty-bytes (ethernet-packet-destination p))
;; (number->string (ethernet-packet-ethertype p) 16))
;; (log-info "~a" (dump-bytes->string (ethernet-packet-body p)))
(raw-interface-write h (encode-ethernet-packet p))
#f]
[_ #f]))
h
(scn/union (assertion interface)
(subscription (ethernet-packet interface #f ? ? ? ?))
(subscription (observe (ethernet-packet interface #t ? ? ? ?)))
(subscription (ethernet-packet interface #t ? ? ? ?) #:meta-level 1)))]))
(define (interface-packet-read-loop interface h control-ch)
(define (blocked)
(match (async-channel-get control-ch)
['unblock (unblocked)]
['quit (void)]))
(define (unblocked)
(match (async-channel-try-get control-ch)
['unblock (unblocked)]
['quit (void)]
[#f
(define p (raw-interface-read h))
(define decoded (decode-ethernet-packet interface p))
(when decoded (send-ground-message decoded))
(unblocked)]))
(blocked)
(raw-interface-close h))
(define (decode-ethernet-packet interface p)
(bit-string-case p
([ (destination :: binary bytes 6)
(source :: binary bytes 6)
(ethertype :: integer bytes 2)
(body :: binary) ]
(ethernet-packet interface
#t
(bit-string->bytes source)
(bit-string->bytes destination)
ethertype
(bit-string->bytes body)))
(else #f)))
(define (encode-ethernet-packet p)
(match-define (ethernet-packet _ _ source destination ethertype body) p)
(bit-string->bytes
(bit-string (destination :: binary bytes 6)
(source :: binary bytes 6)
(ethertype :: integer bytes 2)
(body :: binary))))
(define (ethernet-packet-pattern interface-name from-wire? ethertype)
(ethernet-packet (ethernet-interface interface-name ?) from-wire? ? ? ethertype ?))
(define (lookup-ethernet-hwaddr base-interests interface-name k)
(on-claim #:timeout-msec 5000
#:on-timeout (lambda ()
(log-info "Lookup of ethernet interface ~v failed" interface-name)
'())
(lambda (_g hwaddrss)
(and (not (set-empty? hwaddrss))
(let ((hwaddr (car (set-first hwaddrss))))
(k hwaddr))))
base-interests
(ethernet-interface interface-name (?!))))

View File

@ -0,0 +1,48 @@
#lang syndicate/monolithic
(require syndicate/drivers/timer)
(require "demo-config.rkt")
(require "ethernet.rkt")
(require "arp.rkt")
(require "ip.rkt")
(require "tcp.rkt")
(require "udp.rkt")
;;(log-events-and-actions? #t)
(spawn-timer-driver)
(spawn-ethernet-driver)
(spawn-arp-driver)
(spawn-ip-driver)
(spawn-tcp-driver)
(spawn-udp-driver)
(spawn-demo-config)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(let ()
(define local-handle (tcp-handle 'httpclient))
(define remote-handle (tcp-address "129.10.115.92" 80))
(spawn (lambda (e seen-peer?)
(match e
[(scn g)
(define peer-present? (trie-non-empty? g))
(if (and (not peer-present?) seen-peer?)
(begin (printf "URL fetcher exiting.\n")
(quit))
(transition (or seen-peer? peer-present?)
(message
(tcp-channel
local-handle
remote-handle
#"GET / HTTP/1.0\r\nHost: stockholm.ccs.neu.edu\r\n\r\n"))))]
[(message (tcp-channel _ _ bs))
(printf "----------------------------------------\n~a\n" bs)
(printf "----------------------------------------\n")
#f]
[_ #f]))
#f
(scn/union (advertisement (tcp-channel local-handle remote-handle ?))
(subscription (tcp-channel remote-handle local-handle ?))
(subscription (advertise (tcp-channel remote-handle local-handle ?))))))

View File

@ -0,0 +1,327 @@
#lang racket/base
(provide (struct-out ip-packet)
ip-address->hostname
ip-string->ip-address
apply-netmask
ip-address-in-subnet?
gestalt->local-ip-addresses
observe-local-ip-addresses-gestalt
broadcast-ip-address
spawn-ip-driver)
(require racket/set)
(require racket/match)
(require (only-in racket/string string-split))
(require syndicate/monolithic)
(require syndicate/drivers/timer)
(require syndicate/demand-matcher)
(require bitsyntax)
(require "dump-bytes.rkt")
(require "configuration.rkt")
(require "checksum.rkt")
(require "ethernet.rkt")
(require "arp.rkt")
(require "on-claim.rkt")
(struct ip-packet (source-interface ;; string for an ethernet interface, or #f for local interfaces
source
destination
protocol
options
body)
#:prefab) ;; TODO: more fields
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (ip-address->hostname bs)
(bit-string-case bs
([ a b c d ] (format "~a.~a.~a.~a" a b c d))))
(define (ip-string->ip-address str)
(list->bytes (map string->number (string-split str "."))))
(define (apply-netmask addr netmask)
(bit-string-case addr
([ (n :: integer bytes 4) ]
(bit-string ((bitwise-and n (arithmetic-shift #x-100000000 (- netmask)))
:: integer bytes 4)))))
(define (ip-address-in-subnet? addr network netmask)
(equal? (apply-netmask network netmask)
(apply-netmask addr netmask)))
(define broadcast-ip-address (bytes 255 255 255 255))
(define local-ip-address-projector (host-route (?!) ? ?))
(define (gestalt->local-ip-addresses g) (trie-project/set/single g local-ip-address-projector))
(define observe-local-ip-addresses-gestalt (subscription (host-route ? ? ?)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (spawn-ip-driver)
(list
(spawn-demand-matcher (host-route (?!) (?!) (?!))
(route-up (host-route (?!) (?!) (?!)))
spawn-host-route)
(spawn-demand-matcher (gateway-route (?!) (?!) (?!) (?!))
(route-up (gateway-route (?!) (?!) (?!) (?!)))
spawn-gateway-route)
(spawn-demand-matcher (net-route (?!) (?!) (?!))
(route-up (net-route (?!) (?!) (?!)))
spawn-net-route)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Local IP route
(define (spawn-host-route my-address netmask interface-name)
(list
(let ((network-addr (apply-netmask my-address netmask)))
(spawn-normal-ip-route (host-route my-address netmask interface-name)
network-addr
netmask
interface-name))
(spawn (lambda (e s)
(match e
[(scn (? trie-empty?)) (quit)]
[(message (ip-packet _ peer-address _ _ _ body))
(bit-string-case body
([ type code (checksum :: integer bytes 2) (rest :: binary) ] ;; TODO: check cksum
(case type
[(8) ;; ECHO (0 is ECHO-REPLY)
(log-info "Ping of ~a from ~a"
(pretty-bytes my-address)
(pretty-bytes peer-address))
(define reply-data0 (bit-string 0
code
(0 :: integer bytes 2) ;; TODO
(rest :: binary)))
(transition s (message (ip-packet #f
my-address
peer-address
PROTOCOL-ICMP
#""
(ip-checksum 2 reply-data0))))]
[else
(log-info "ICMP ~a/~a (cksum ~a) to ~a from ~a:\n~a"
type
code
checksum
(pretty-bytes my-address)
(pretty-bytes peer-address)
(dump-bytes->string rest))
#f]))
(else #f))]
[_ #f]))
(void)
(scn/union (advertisement (ip-packet ? my-address ? PROTOCOL-ICMP ? ?))
(subscription (ip-packet ? ? my-address PROTOCOL-ICMP ? ?))
(assertion (arp-assertion IPv4-ethertype my-address interface-name))
(subscription (host-route my-address netmask interface-name))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Gateway IP route
(struct gateway-route-state (routes gateway-interface gateway-hwaddr) #:transparent)
(define (spawn-gateway-route network netmask gateway-addr interface-name)
(define the-route (gateway-route network netmask gateway-addr interface-name))
(define host-route-projector (host-route (?!) (?!) ?))
(define gateway-route-projector (gateway-route (?!) (?!) ? ?))
(define net-route-projector (net-route (?!) (?!) ?))
(define gateway-arp-projector (arp-query IPv4-ethertype
gateway-addr
(?! (ethernet-interface interface-name ?))
(?!)))
(define (covered-by-some-other-route? addr routes)
(for/or ([r (in-set routes)])
(match-define (list net msk) r)
(and (positive? msk)
(ip-address-in-subnet? addr net msk))))
(spawn (lambda (e s)
(match e
[(scn g)
(define host-ips+netmasks (trie-project/set #:take 2 g host-route-projector))
(define gw-nets+netmasks (trie-project/set #:take 2 g gateway-route-projector))
(define net-nets+netmasks (trie-project/set #:take 2 g net-route-projector))
(define gw-ip+hwaddr
(let ((vs (trie-project/set #:take 2 g gateway-arp-projector)))
(and vs (not (set-empty? vs)) (set-first vs))))
(when (and gw-ip+hwaddr (not (gateway-route-state-gateway-hwaddr s)))
(log-info "Discovered gateway ~a at ~a on interface ~a."
(ip-address->hostname gateway-addr)
(ethernet-interface-name (car gw-ip+hwaddr))
(pretty-bytes (cadr gw-ip+hwaddr))))
(if (trie-empty? (project-assertions g (?! the-route)))
(quit)
(transition (gateway-route-state
(set-union host-ips+netmasks
gw-nets+netmasks
net-nets+netmasks)
(and gw-ip+hwaddr (car gw-ip+hwaddr))
(and gw-ip+hwaddr (cadr gw-ip+hwaddr)))
'()))]
[(message (? ip-packet? p))
(define gw-if (gateway-route-state-gateway-interface s))
(when (not gw-if)
(log-warning "Gateway hwaddr for ~a not known, packet dropped."
(ip-address->hostname gateway-addr)))
(and gw-if
(not (equal? (ip-packet-source-interface p) (ethernet-interface-name gw-if)))
(not (covered-by-some-other-route? (ip-packet-destination p)
(gateway-route-state-routes s)))
(transition s
(message (ethernet-packet gw-if
#f
(ethernet-interface-hwaddr gw-if)
(gateway-route-state-gateway-hwaddr s)
IPv4-ethertype
(format-ip-packet p)))))]
[_ #f]))
(gateway-route-state (set) #f #f)
(scn/union (subscription the-route)
(assertion (route-up the-route))
(subscription (ip-packet ? ? ? ? ? ?))
observe-local-ip-addresses-gestalt
(subscription (net-route ? ? ?))
(subscription (gateway-route ? ? ? ?))
(subscription (projection->pattern gateway-arp-projector)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; General net route
(define (spawn-net-route network-addr netmask link)
(spawn-normal-ip-route (net-route network-addr netmask link) network-addr netmask link))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Normal IP route
(define (spawn-normal-ip-route the-route network netmask interface-name)
(spawn (lambda (e s)
(match e
[(scn (? trie-empty?)) (quit)]
[(message (ethernet-packet _ _ _ _ _ body))
(define p (parse-ip-packet interface-name body))
(and p (transition s (message p)))]
[(message (? ip-packet? p))
(define destination (ip-packet-destination p))
(and (not (equal? (ip-packet-source-interface p) interface-name))
(ip-address-in-subnet? destination network netmask)
(transition
s
(lookup-arp destination
(ethernet-interface interface-name ?)
trie-empty
(lambda (interface destination-hwaddr)
(message (ethernet-packet interface
#f
(ethernet-interface-hwaddr interface)
destination-hwaddr
IPv4-ethertype
(format-ip-packet p)))))))]
[_ #f]))
(void)
(scn/union (subscription the-route)
(assertion (route-up the-route))
(subscription (ethernet-packet-pattern interface-name #t IPv4-ethertype))
(assertion (arp-interface interface-name))
(subscription (ip-packet ? ? ? ? ? ?)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define IPv4-ethertype #x0800)
(define IP-VERSION 4)
(define IP-MINIMUM-HEADER-LENGTH 5)
(define PROTOCOL-ICMP 1)
(define default-ttl 64)
(define (parse-ip-packet interface-name body)
;; (log-info "IP ~a got body ~a" (pretty-bytes my-address) (pretty-bytes body))
(bit-string-case body
([ (= IP-VERSION :: bits 4)
(header-length :: bits 4)
service-type
(total-length :: bits 16)
(id :: bits 16)
(flags :: bits 3)
(fragment-offset :: bits 13)
ttl
protocol
(header-checksum :: bits 16) ;; TODO: check checksum
(source-ip0 :: binary bits 32)
(destination-ip0 :: binary bits 32)
(rest :: binary) ]
(let* ((source-ip (bit-string->bytes source-ip0))
(destination-ip (bit-string->bytes destination-ip0))
(options-length (* 4 (- header-length IP-MINIMUM-HEADER-LENGTH)))
(data-length (- total-length (* 4 header-length))))
(if (and (>= header-length 5)
(>= (bit-string-byte-count body) (* header-length 4)))
(bit-string-case rest
([ (opts :: binary bytes options-length)
(data :: binary bytes data-length)
(:: binary) ] ;; Very short ethernet packets have a trailer of zeros
(ip-packet interface-name
(bit-string->bytes source-ip)
(bit-string->bytes destination-ip)
protocol
(bit-string->bytes opts)
(bit-string->bytes data))))
#f)))
(else #f)))
(define (format-ip-packet p)
(match-define (ip-packet _ src dst protocol options body) p)
(define header-length ;; TODO: ensure options is a multiple of 4 bytes
(+ IP-MINIMUM-HEADER-LENGTH (quotient (bit-string-byte-count options) 4)))
(define header0 (bit-string (IP-VERSION :: bits 4)
(header-length :: bits 4)
0 ;; TODO: service type
((+ (* header-length 4) (bit-string-byte-count body))
:: bits 16)
(0 :: bits 16) ;; TODO: identifier
(0 :: bits 3) ;; TODO: flags
(0 :: bits 13) ;; TODO: fragments
default-ttl
protocol
(0 :: bits 16)
(src :: binary bits 32)
(dst :: binary bits 32)
(options :: binary)))
(define full-packet (bit-string ((ip-checksum 10 header0) :: binary) (body :: binary)))
full-packet)
(define (lookup-arp ipaddr query-interface-pattern base-gestalt k)
(on-claim #:name (string->symbol (format "lookup-arp:~a" (ip-address->hostname ipaddr)))
(lambda (_g arp-results)
(if (not arp-results)
(error 'ip "Someone has published a wildcard arp result")
(and (not (set-empty? arp-results))
(match (set-first arp-results)
[(list interface hwaddr)
(log-info "ARP lookup yielded ~a on ~a for ~a"
(pretty-bytes hwaddr)
(ethernet-interface-name interface)
(ip-address->hostname ipaddr))
(when (> (set-count arp-results) 1)
(log-warning "Ambiguous ARP result for ~a: ~v"
(ip-address->hostname ipaddr)
arp-results))
(k interface hwaddr)]))))
base-gestalt
(arp-query IPv4-ethertype ipaddr (?! query-interface-pattern) (?!))
#:timeout-msec 5000
#:on-timeout (lambda ()
(log-warning "ARP lookup of ~a failed, packet dropped"
(ip-address->hostname ipaddr))
'())))

View File

@ -0,0 +1,121 @@
#lang syndicate/monolithic
(require syndicate/demand-matcher)
(require syndicate/drivers/timer)
(require "demo-config.rkt")
(require "ethernet.rkt")
(require "arp.rkt")
(require "ip.rkt")
(require "tcp.rkt")
(require "udp.rkt")
;;(log-events-and-actions? #t)
(spawn-timer-driver)
(spawn-ethernet-driver)
(spawn-arp-driver)
(spawn-ip-driver)
(spawn-tcp-driver)
(spawn-udp-driver)
(spawn-demo-config)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(let ()
(local-require racket/set racket/string)
(define (spawn-session them us)
(define user (gensym 'user))
(define remote-detector (at-meta (?!)))
(define peer-detector (advertise `(,(?!) says ,?)))
(define (send-to-remote fmt . vs)
(message (at-meta (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))))
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(list (send-to-remote "Welcome, ~a.\n" user)
(spawn
(lambda (e peers)
(match e
[(message (at-meta (tcp-channel _ _ bs)))
(transition peers (message `(,user says ,(string-trim (bytes->string/utf-8 bs)))))]
[(message `(,who says ,what))
(transition peers (say who "says: ~a" what))]
[(scn assertions)
(if (trie-empty? (trie-project assertions remote-detector))
(quit (send-to-remote "Goodbye!\n"))
(let ((new-peers (trie-project/set/single assertions peer-detector)))
(define arrived (set-subtract new-peers peers))
(define departed (set-subtract peers new-peers))
(transition new-peers
(list (for/list [(who arrived)] (say who "arrived."))
(for/list [(who departed)] (say who "departed."))))))]
[#f #f]))
(set)
(scn/union
(subscription `(,? says ,?)) ;; read actual chat messages
(subscription (advertise `(,? says ,?))) ;; observe peer presence
(advertisement `(,user says ,?)) ;; advertise our presence
(subscription (tcp-channel them us ?) #:meta-level 1) ;; read from remote client
(subscription (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client
(advertisement (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client
))))
(spawn-dataspace
(spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
(observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
#:meta-level 1
spawn-session))
)
(let ()
(spawn (lambda (e s)
(match e
[(message (udp-packet src dst body))
(log-info "Got packet from ~v: ~v" src body)
(transition s (message
(udp-packet dst
src
(string->bytes/utf-8 (format "You said: ~a" body)))))]
[_ #f]))
(void)
(scn (subscription (udp-packet ? (udp-listener 6667) ?)))))
(let ()
(define (spawn-session them us)
(list
(message 'bump)
(spawn (lambda (e s)
(match e
[(message `(counter ,counter))
(define response
(string->bytes/utf-8
(format (string-append
"HTTP/1.0 200 OK\r\n\r\n"
"<h1>Hello world from syndicate-monolithic-netstack!</h1>\n"
"<p>This is running on syndicate-monolithic's own\n"
"<a href='https://github.com/tonyg/syndicate/'>\n"
"TCP/IP stack</a>.</p>\n"
"<p>There have been ~a requests prior to this one.</p>")
counter)))
(quit (message (at-meta (tcp-channel us them response))))]
[_ #f]))
(void)
(scn/union (subscription `(counter ,?))
(subscription (tcp-channel them us ?) #:meta-level 1)
(subscription (advertise (tcp-channel them us ?)) #:meta-level 1)
(advertisement (tcp-channel us them ?) #:meta-level 1)))))
(spawn-dataspace
(spawn (lambda (e counter)
(match e
[(message 'bump)
(transition (+ counter 1) (message `(counter ,counter)))]
[_ #f]))
0
(scn (subscription 'bump)))
(spawn-demand-matcher (advertise (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?))
(observe (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 80)) ?))
#:meta-level 1
spawn-session))
)

View File

@ -0,0 +1,47 @@
#lang racket/base
(provide on-claim)
(require syndicate/monolithic)
(require syndicate/drivers/timer)
;; (Trie (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action)))
;; Trie Projection ...
;; -> Action
;; Spawns a process that observes the given projections. Any time the
;; environment's interests change in a relevant way, calls
;; check-and-maybe-spawn-fn with the aggregate interests and the
;; projection results. If check-and-maybe-spawn-fn returns #f,
;; continues to wait; otherwise, takes the action(s) returned, and
;; quits.
(define (on-claim #:timeout-msec [timeout-msec #f]
#:on-timeout [timeout-handler (lambda () '())]
#:name [name #f]
check-and-maybe-spawn-fn
base-interests
. projections)
(define timer-id (gensym 'on-claim))
(define (on-claim-handler e state)
(match e
[(scn new-aggregate)
(define projection-results
(map (lambda (p) (trie-project/set #:take (projection-arity p) new-aggregate p))
projections))
(define maybe-spawn (apply check-and-maybe-spawn-fn
new-aggregate
projection-results))
(if maybe-spawn
(quit maybe-spawn)
#f)]
[(message (timer-expired (== timer-id) _))
(quit (timeout-handler))]
[_ #f]))
(list
(when timeout-msec (message (set-timer timer-id timeout-msec 'relative)))
(spawn #:name name
on-claim-handler
(void)
(scn/union base-interests
(assertion-set-union*
(map (lambda (p) (subscription (projection->pattern p))) projections))
(subscription (timer-expired timer-id ?))))))

View File

@ -0,0 +1,38 @@
#lang racket/base
;; UDP/TCP port allocator
(provide spawn-port-allocator
(struct-out port-allocation-request))
(require racket/set)
(require racket/match)
(require syndicate/monolithic)
(require "ip.rkt")
(struct port-allocation-request (type k) #:prefab)
(struct port-allocator-state (used-ports local-ips) #:transparent)
(define (spawn-port-allocator allocator-type observer-gestalt compute-used-ports)
(spawn #:name (string->symbol (format "port-allocator:~a" allocator-type))
(lambda (e s)
(match e
[(scn g)
(define local-ips (or (gestalt->local-ip-addresses g) (set)))
(define new-used-ports (compute-used-ports g local-ips))
(log-info "port-allocator ~v used ports: ~v" allocator-type new-used-ports)
(transition (port-allocator-state new-used-ports local-ips) '())]
[(message (port-allocation-request _ k))
(define currently-used-ports (port-allocator-state-used-ports s))
(let randomly-allocate-until-unused ()
(define p (+ 1024 (random 64512)))
(if (set-member? currently-used-ports p)
(randomly-allocate-until-unused)
(transition (struct-copy port-allocator-state s
[used-ports (set-add currently-used-ports p)])
(k p (port-allocator-state-local-ips s)))))]
[_ #f]))
(port-allocator-state (set) (set))
(scn/union (subscription (port-allocation-request allocator-type ?))
observe-local-ip-addresses-gestalt
observer-gestalt)))

View File

@ -0,0 +1,665 @@
#lang racket/base
(provide (struct-out tcp-address)
(struct-out tcp-handle)
(struct-out tcp-listener)
(struct-out tcp-channel)
spawn-tcp-driver)
(require racket/set)
(require racket/match)
(require syndicate/monolithic)
(require syndicate/drivers/timer)
(require syndicate/demand-matcher)
(require bitsyntax)
(require "dump-bytes.rkt")
(require "checksum.rkt")
(require "ip.rkt")
(require "port-allocator.rkt")
;; tcp-address/tcp-address : "kernel" tcp connection state machines
;; tcp-handle/tcp-address : "user" outbound connections
;; tcp-listener/tcp-address : "user" inbound connections
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Protocol messages
(struct tcp-address (host port) #:prefab)
(struct tcp-handle (id) #:prefab)
(struct tcp-listener (port) #:prefab)
(struct tcp-channel (source destination subpacket) #:prefab)
(struct tcp-packet (from-wire?
source-ip
source-port
destination-ip
destination-port
sequence-number
ack-number
flags
window-size
options
data)
#:prefab)
;; (tcp-port-allocation Number (U TcpHandle TcpListener))
(struct tcp-port-allocation (port handle) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; User-accessible driver startup
(define (spawn-tcp-driver)
(list (spawn-demand-matcher #:name 'tcp-inbound-driver
(advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?)))
(advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?)))
(lambda (server-addr)
(match-define (tcp-listener port) server-addr)
;; TODO: have listener shut down once user-level listener does
(list
(spawn #:name (string->symbol
(format "tcp-listener-port-reservation:~a" port))
(lambda (e s) #f)
(void)
(scn (assertion (tcp-port-allocation port server-addr))))
(spawn-demand-matcher
#:name (string->symbol (format "tcp-listener:~a" port))
(advertise (tcp-channel (?! (tcp-address ? ?))
(?! (tcp-address ? port))
?))
(observe (tcp-channel (?! (tcp-address ? ?))
(?! (tcp-address ? port))
?))
(spawn-relay server-addr)))))
(spawn-demand-matcher #:name 'tcp-outbound-driver
(advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
(observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
allocate-port-and-spawn-socket)
(spawn-tcp-port-allocator)
(spawn-kernel-tcp-driver)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Port allocation
(define (spawn-tcp-port-allocator)
(spawn-port-allocator 'tcp
(subscription (tcp-port-allocation ? ?))
(lambda (g local-ips)
(project-assertions g (tcp-port-allocation (?!) ?)))))
(define (allocate-port-and-spawn-socket local-addr remote-addr)
(message (port-allocation-request
'tcp
(lambda (port local-ips)
;; TODO: Choose a sensible IP address for the outbound
;; connection. We don't have enough information to do this
;; well at the moment, so just pick some available local IP
;; address.
;;
;; Interesting note: In some sense, the right answer is
;; "?". This would give us a form of mobility, where IP
;; addresses only route to a given bucket-of-state and ONLY
;; the port number selects a substate therein. That's not
;; how TCP is defined however so we can't do that.
(define appropriate-ip (set-first local-ips))
(define appropriate-host (ip-address->hostname appropriate-ip))
(match-define (tcp-address remote-host remote-port) remote-addr)
(define remote-ip (ip-string->ip-address remote-host))
(list
((spawn-relay local-addr) remote-addr (tcp-address appropriate-host port))
(spawn-state-vector remote-ip remote-port appropriate-ip port))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Relay between kernel-level and user-level
(define relay-peer-wait-time-msec 5000)
(define ((spawn-relay local-user-addr) remote-addr local-tcp-addr)
(define timer-name (list 'spawn-relay local-tcp-addr remote-addr))
(define local-peer-traffic (?! (observe (tcp-channel remote-addr local-user-addr ?))))
(define remote-peer-traffic (?! (advertise (tcp-channel remote-addr local-tcp-addr ?))))
(list
(message (set-timer timer-name relay-peer-wait-time-msec 'relative))
(spawn #:name (string->symbol (format "tcp-relay:~v:~v:~v"
local-user-addr
remote-addr
local-tcp-addr))
(lambda (e state)
(match e
[(scn g)
(define local-peer-absent? (trie-empty? (trie-project g local-peer-traffic)))
(define remote-peer-absent? (trie-empty? (trie-project g remote-peer-traffic)))
(define new-state (+ (if local-peer-absent? 0 1) (if remote-peer-absent? 0 1)))
(if (< new-state state)
(quit)
(transition new-state '()))]
[(message (tcp-channel (== local-user-addr) (== remote-addr) bs))
(transition state (message (tcp-channel local-tcp-addr remote-addr bs)))]
[(message (tcp-channel (== remote-addr) (== local-tcp-addr) bs))
(transition state (message (tcp-channel remote-addr local-user-addr bs)))]
[(message (timer-expired _ _))
#:when (< state 2) ;; we only care if we're not fully connected
(error 'spawn-relay "TCP relay process timed out waiting for peer")]
[_ #f]))
0
(scn/union (subscription (projection->pattern local-peer-traffic))
(subscription (projection->pattern remote-peer-traffic))
(assertion (tcp-port-allocation (tcp-address-port local-tcp-addr)
local-user-addr))
(subscription (tcp-channel remote-addr local-tcp-addr ?))
(subscription (tcp-channel local-user-addr remote-addr ?))
(advertisement (tcp-channel remote-addr local-user-addr ?))
(advertisement (tcp-channel local-tcp-addr remote-addr ?))
(subscription (timer-expired timer-name ?))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Codec & kernel-level driver
(define PROTOCOL-TCP 6)
(struct codec-state (local-ips active-state-vectors) #:transparent)
(define (spawn-kernel-tcp-driver)
(define (state-vector-active? statevec s)
(set-member? (codec-state-active-state-vectors s) statevec))
(define (analyze-incoming-packet src-ip dst-ip body s)
(bit-string-case body
([ (src-port :: integer bytes 2)
(dst-port :: integer bytes 2)
(sequence-number :: integer bytes 4)
(ack-number :: integer bytes 4)
(data-offset :: integer bits 4)
(reserved :: integer bits 3)
(ns :: integer bits 1)
(cwr :: integer bits 1)
(ece :: integer bits 1)
(urg :: integer bits 1)
(ack :: integer bits 1)
(psh :: integer bits 1)
(rst :: integer bits 1)
(syn :: integer bits 1)
(fin :: integer bits 1)
(window-size :: integer bytes 2)
(checksum :: integer bytes 2) ;; TODO: check checksum
(urgent-pointer :: integer bytes 2)
(rest :: binary) ]
(let* ((flags (set))
(statevec (list src-ip src-port dst-ip dst-port))
(old-active-state-vectors (codec-state-active-state-vectors s))
(spawn-needed? (and (not (state-vector-active? statevec s))
(zero? rst)))) ;; don't bother spawning if it's a rst
(define-syntax-rule (set-flags! v ...)
(begin (unless (zero? v) (set! flags (set-add flags 'v))) ...))
(set-flags! ns cwr ece urg ack psh rst syn fin)
(log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)"
(ip-address->hostname src-ip)
src-port
(ip-address->hostname dst-ip)
dst-port
sequence-number
ack-number
flags
window-size)
(when spawn-needed? (log-info " - spawn needed!"))
(bit-string-case rest
([ (opts :: binary bytes (- (* data-offset 4) 20))
(data :: binary) ]
(let ((packet (tcp-packet #t
src-ip
src-port
dst-ip
dst-port
sequence-number
ack-number
flags
window-size
(bit-string->bytes opts)
(bit-string->bytes data))))
(transition (if spawn-needed?
(struct-copy codec-state s
[active-state-vectors
(set-add old-active-state-vectors statevec)])
s)
(list
(when spawn-needed? (spawn-state-vector src-ip src-port
dst-ip dst-port))
;; TODO: get packet to the new state-vector process somehow
(message packet)))))
(else #f))))
(else #f)))
(define statevec-projection (observe (tcp-packet ? (?!) (?!) (?!) (?!) ? ? ? ? ? ?)))
(define (analyze-gestalt g s)
(define local-ips (gestalt->local-ip-addresses g))
(define statevecs (trie-project/set #:take 4 g statevec-projection))
(log-info "gestalt yielded statevecs ~v and local-ips ~v" statevecs local-ips)
(transition (struct-copy codec-state s
[local-ips local-ips]
[active-state-vectors statevecs]) '()))
(define (deliver-outbound-packet p s)
(match-define (tcp-packet #f
src-ip
src-port
dst-ip
dst-port
sequence-number
ack-number
flags
window-size
options
data)
p)
(log-info "TCP ~a:~a -> ~a:~a (seq ~a, ack ~a, flags ~a, window ~a)"
(ip-address->hostname src-ip)
src-port
(ip-address->hostname dst-ip)
dst-port
sequence-number
ack-number
flags
window-size)
(define (flag-bit sym) (if (set-member? flags sym) 1 0))
(define payload (bit-string (src-port :: integer bytes 2)
(dst-port :: integer bytes 2)
(sequence-number :: integer bytes 4)
(ack-number :: integer bytes 4)
((+ 5 (quotient (bit-string-byte-count options) 4))
:: integer bits 4) ;; TODO: enforce 4-byte alignment
(0 :: integer bits 3)
((flag-bit 'ns) :: integer bits 1)
((flag-bit 'cwr) :: integer bits 1)
((flag-bit 'ece) :: integer bits 1)
((flag-bit 'urg) :: integer bits 1)
((flag-bit 'ack) :: integer bits 1)
((flag-bit 'psh) :: integer bits 1)
((flag-bit 'rst) :: integer bits 1)
((flag-bit 'syn) :: integer bits 1)
((flag-bit 'fin) :: integer bits 1)
(window-size :: integer bytes 2)
(0 :: integer bytes 2) ;; checksum location
(0 :: integer bytes 2) ;; TODO: urgent pointer
(data :: binary)))
(define pseudo-header (bit-string (src-ip :: binary bytes 4)
(dst-ip :: binary bytes 4)
0
PROTOCOL-TCP
((bit-string-byte-count payload) :: integer bytes 2)))
(transition s (message (ip-packet #f src-ip dst-ip PROTOCOL-TCP #""
(ip-checksum 16 payload #:pseudo-header pseudo-header)))))
(spawn #:name 'kernel-tcp-driver
(lambda (e s)
(match e
[(scn g)
(analyze-gestalt g s)]
[(message (ip-packet source-if src dst _ _ body))
#:when (and source-if ;; source-if == #f iff packet originates locally
(set-member? (codec-state-local-ips s) dst))
(analyze-incoming-packet src dst body s)]
[(message (? tcp-packet? p))
#:when (not (tcp-packet-from-wire? p))
(deliver-outbound-packet p s)]
[_ #f]))
(codec-state (set) (set))
(scn/union (subscription (ip-packet ? ? ? PROTOCOL-TCP ? ?))
(subscription (tcp-packet #f ? ? ? ? ? ? ? ? ? ?))
(subscription (observe (tcp-packet #t ? ? ? ? ? ? ? ? ? ?)))
observe-local-ip-addresses-gestalt)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Per-connection state vector process
(struct buffer (data ;; bit-string
seqn ;; names leftmost byte in data
window ;; counts bytes from leftmost byte in data
finished?) ;; boolean: true after FIN
#:transparent)
(struct conn-state (outbound ;; buffer
inbound ;; buffer
syn-acked? ;; boolean
latest-peer-activity-time ;; from current-inexact-milliseconds
;; ^ the most recent time we heard from our peer
user-timeout-base-time ;; from current-inexact-milliseconds
;; ^ when the index of the first outbound unacknowledged byte changed
local-peer-seen? ;; boolean
listener-listening?) ;; boolean
#:transparent)
(define transmit-check-interval-msec 2000)
(define inbound-buffer-limit 65535)
(define maximum-segment-size 536) ;; bytes
(define maximum-segment-lifetime-sec (* 2 60)) ;; two minutes; 2MSL is TIME-WAIT timeout
(define user-timeout-msec (* 5 60 1000)) ;; per RFC 793, this should be per-connection, but I
;; cheat; RFC 793 says "the present global default is five minutes", which is
;; reasonable to be getting on with
(define (spawn-state-vector src-ip src-port dst-ip dst-port)
(define src (tcp-address (ip-address->hostname src-ip) src-port))
(define dst (tcp-address (ip-address->hostname dst-ip) dst-port))
(define (timer-name kind) (list 'tcp-timer kind src dst))
(define (next-expected-seqn s)
(define b (conn-state-inbound s))
(define v (buffer-seqn b))
(and v (seq+ v (bit-string-byte-count (buffer-data b)))))
(define (buffer-push b data)
(struct-copy buffer b [data (bit-string-append (buffer-data b) data)]))
;; ConnState -> ConnState
(define (set-inbound-seqn seqn s)
(struct-copy conn-state s
[inbound (struct-copy buffer (conn-state-inbound s) [seqn seqn])]))
;; Bitstring ConnState -> Transition
(define (incorporate-segment data s)
;; (log-info "GOT INBOUND STUFF TO DELIVER ~v" data)
(transition
(if (buffer-finished? (conn-state-inbound s))
s
(struct-copy conn-state s [inbound (buffer-push (conn-state-inbound s) data)]))
'()))
(define (seq+ a b) (bitwise-and #xffffffff (+ a b)))
;; Always positive
(define (seq- larger smaller)
(if (< larger smaller) ;; wraparound has occurred
(+ (- larger smaller) #x100000000)
(- larger smaller)))
(define (seq> a b)
(< (seq- a b) #x80000000))
(define local-peer-detector (?! (observe (tcp-channel src dst ?))))
(define listener-detector (?! (observe (advertise (tcp-channel ? (tcp-listener dst-port) ?)))))
;; ConnState -> Gestalt
(define (compute-gestalt s)
(define worldward-facing-gestalt
(subscription (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?)))
(define appward-facing-gestalt
(assertion-set-union
(subscription (projection->pattern local-peer-detector))
(subscription (projection->pattern listener-detector))
(subscription (tcp-channel dst src ?))
(if (and (conn-state-syn-acked? s)
(not (buffer-finished? (conn-state-inbound s))))
(advertisement (tcp-channel src dst ?))
trie-empty)))
(assertion-set-union (subscription (timer-expired (timer-name ?) ?))
worldward-facing-gestalt
appward-facing-gestalt))
;; ConnState -> Transition
(define (deliver-inbound-locally s)
(define b (conn-state-inbound s))
(if (bit-string-empty? (buffer-data b))
(transition s '())
(let ((chunk (bit-string->bytes (buffer-data b))))
(transition (struct-copy conn-state s
[inbound (struct-copy buffer b
[data #""]
[seqn (seq+ (buffer-seqn b) (bytes-length chunk))])])
(message (tcp-channel src dst chunk))))))
;; (Setof Symbol) -> ConnState -> Transition
(define ((check-fin flags) s)
(define b (conn-state-inbound s))
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
(error 'check-fin "Nonempty inbound buffer"))
(if (set-member? flags 'fin)
(let ((new-s (struct-copy conn-state s
[inbound (struct-copy buffer b
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
[finished? #t])])))
(log-info "Closing inbound stream.")
(transition new-s (scn (compute-gestalt new-s))))
(transition s '())))
;; Boolean SeqNum -> ConnState -> Transition
(define ((discard-acknowledged-outbound ack? ackn) s)
(if (not ack?)
(transition s '())
(let* ((b (conn-state-outbound s))
(base (buffer-seqn b))
(limit (seq+ (buffer-seqn b) (bit-string-byte-count (buffer-data b))))
(ackn (if (seq> ackn limit) limit ackn))
(ackn (if (seq> base ackn) base ackn))
(dist (seq- ackn base)))
(define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset!
(define new-s (struct-copy conn-state s
[user-timeout-base-time (current-inexact-milliseconds)]
[outbound (struct-copy buffer b [data remaining-data] [seqn ackn])]
[syn-acked? (or (conn-state-syn-acked? s)
(positive? dist))]))
(transition new-s
(when (and (not (conn-state-syn-acked? s)) (positive? dist))
(scn (compute-gestalt new-s)))))))
;; Nat -> ConnState -> Transition
(define ((update-outbound-window peer-window) s)
(transition (struct-copy conn-state s
[outbound (struct-copy buffer (conn-state-outbound s)
[window peer-window])])
'()))
;; ConnState -> Boolean
(define (all-output-acknowledged? s)
(bit-string-empty? (buffer-data (conn-state-outbound s))))
;; (Option SeqNum) -> ConnState -> Transition
(define ((send-outbound old-ackn) s)
(define b (conn-state-outbound s))
(define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b))
(if (buffer-finished? b) 1 0))))
(define segment-size (min maximum-segment-size
(if (conn-state-syn-acked? s) (buffer-window b) 1)
;; ^ can only send SYN until SYN is acked
pending-byte-count))
(define segment-offset (if (conn-state-syn-acked? s) 0 1))
(define chunk0 (bit-string-take (buffer-data b) (* segment-size 8))) ;; bit offset!
(define chunk (bit-string-drop chunk0 (* segment-offset 8))) ;; bit offset!
(define ackn (next-expected-seqn s))
(define flags (set))
(when ackn
(set! flags (set-add flags 'ack)))
(when (not (conn-state-syn-acked? s))
(set! flags (set-add flags 'syn)))
(when (and (buffer-finished? b)
(conn-state-syn-acked? s)
(= segment-size pending-byte-count)
(not (all-output-acknowledged? s))) ;; TODO: reexamine. This looks fishy
(set! flags (set-add flags 'fin)))
(define window (min 65535 ;; limit of field width
(max 0 ;; can't be negative
(- (buffer-window (conn-state-inbound s))
(bit-string-byte-count
(buffer-data (conn-state-inbound s)))))))
(transition s
(unless (and (equal? ackn old-ackn)
(conn-state-syn-acked? s)
(not (set-member? flags 'fin))
(zero? (bit-string-byte-count chunk)))
(local-require racket/pretty)
(pretty-write `(send-outbound (old-ackn ,old-ackn)
(s ,s)
(flags ,flags)))
(flush-output)
(message (tcp-packet #f dst-ip dst-port src-ip src-port
(buffer-seqn b)
(or ackn 0)
flags
window
#""
chunk)))))
;; ConnState -> Transition
(define (bump-peer-activity-time s)
(transition (struct-copy conn-state s
[latest-peer-activity-time (current-inexact-milliseconds)])
'()))
;; ConnState Number -> Boolean
(define (heard-from-peer-within-msec? s msec)
(<= (- (current-inexact-milliseconds) (conn-state-latest-peer-activity-time s)) msec))
;; ConnState -> Boolean
(define (user-timeout-expired? s)
(and (not (all-output-acknowledged? s))
(> (- (current-inexact-milliseconds) (conn-state-user-timeout-base-time s))
user-timeout-msec)))
;; ConnState -> Transition
(define (quit-when-done s)
(cond
[(and (buffer-finished? (conn-state-outbound s))
(buffer-finished? (conn-state-inbound s))
(all-output-acknowledged? s)
(not (heard-from-peer-within-msec? s (* 2 1000 maximum-segment-lifetime-sec))))
;; Everything is cleanly shut down, and we just need to wait a while for unexpected
;; packets before we release the state vector.
(quit)]
[(user-timeout-expired? s)
;; We've been plaintively retransmitting for user-timeout-msec without hearing anything
;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but
;; it will do for now? TODO
(log-info "TCP_USER_TIMEOUT fired.")
(quit)]
[else #f]))
;; Action
(define send-set-transmit-check-timer
(message (set-timer (timer-name 'transmit-check)
transmit-check-interval-msec
'relative)))
;; SeqNum SeqNum ConnState -> Transition
(define (reset seqn ackn s)
(log-warning "Sending RST from ~a:~a to ~a:~a"
(ip-address->hostname dst-ip)
dst-port
(ip-address->hostname src-ip)
src-port)
(quit (message (tcp-packet #f dst-ip dst-port src-ip src-port
seqn
ackn
(set 'ack 'rst)
0
#""
#""))))
;; ConnState -> Transition
(define (close-outbound-stream s)
(define b (conn-state-outbound s))
(transition
(if (buffer-finished? b)
s
(struct-copy conn-state s
[outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte
[finished? #t])]))
'()))
(define (state-vector-behavior e s)
(define old-ackn (buffer-seqn (conn-state-inbound s)))
(match e
[(scn g)
(log-info "State vector routing-update:\n~a" (trie->pretty-string g))
(define local-peer-present? (trie-non-empty? (trie-project g local-peer-detector)))
(define listening? (trie-non-empty? (trie-project g listener-detector)))
(define new-s (struct-copy conn-state s [listener-listening? listening?]))
(cond
[(and local-peer-present? (not (conn-state-local-peer-seen? s)))
(transition (struct-copy conn-state new-s [local-peer-seen? #t]) '())]
[(and (not local-peer-present?) (conn-state-local-peer-seen? s))
(log-info "Closing outbound stream.")
(sequence-transitions (close-outbound-stream new-s)
(send-outbound old-ackn)
quit-when-done)]
[else (transition new-s '())])]
[(message (tcp-packet #t _ _ _ _ seqn ackn flags window options data))
(define expected (next-expected-seqn s))
(define is-syn? (set-member? flags 'syn))
(define is-fin? (set-member? flags 'fin))
(cond
[(set-member? flags 'rst) (quit)]
[(and (not expected) ;; no syn yet
(or (not is-syn?) ;; and this isn't it
(and (not (conn-state-listener-listening? s)) ;; or it is, but no listener...
(not (conn-state-local-peer-seen? s))))) ;; ...and no outbound client
(reset ackn ;; this is *our* seqn
(seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0)))
;; ^^ this is what we should acknowledge...
s)]
[else
(sequence-transitions (cond
[(not expected) ;; haven't seen syn yet, but we know this is it
(incorporate-segment data (set-inbound-seqn (seq+ seqn 1) s))]
[(= expected seqn)
(incorporate-segment data s)]
[else
(transition s '())])
deliver-inbound-locally
(check-fin flags)
(discard-acknowledged-outbound (set-member? flags 'ack) ackn)
(update-outbound-window window)
(send-outbound old-ackn)
bump-peer-activity-time
quit-when-done)])]
[(message (tcp-channel _ _ bs))
;; (log-info "GOT MORE STUFF TO DELIVER ~v" bs)
(sequence-transitions (transition (struct-copy conn-state s
[user-timeout-base-time
;; Only move user-timeout-base-time if there wasn't
;; already some outstanding output.
(if (all-output-acknowledged? s)
(current-inexact-milliseconds)
(conn-state-user-timeout-base-time s))]
[outbound (buffer-push (conn-state-outbound s) bs)])
'())
(send-outbound old-ackn)
quit-when-done)]
[(message (timer-expired (== (timer-name 'transmit-check)) _))
;; TODO: I am abusing this timer for multiple tasks. Notably, this is a (crude) means of
;; retransmitting outbound data as well as a means of checking for an expired
;; TCP_USER_TIMEOUT. A better design would have separate timers and a more fine-grained
;; approach.
(sequence-transitions (transition s send-set-transmit-check-timer)
(send-outbound old-ackn)
quit-when-done)]
[_ #f]))
;; (local-require racket/trace)
;; (trace state-vector-behavior)
(define initial-outbound-seqn
;; Yuck
(inexact->exact (truncate (* #x100000000 (random)))))
;; TODO accept input from user process
(list
send-set-transmit-check-timer
(let ((state0 (conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position
(buffer #"" #f inbound-buffer-limit #f)
#f
(current-inexact-milliseconds)
(current-inexact-milliseconds)
#f
#f)))
(spawn #:name
(string->symbol (format "tcp-state-vector:~a:~a:~a:~a"
(ip-address->hostname src-ip)
src-port
(ip-address->hostname dst-ip)
dst-port))
state-vector-behavior
state0
(scn (compute-gestalt state0))))))

View File

@ -0,0 +1,175 @@
#lang racket/base
(provide (struct-out udp-remote-address)
(struct-out udp-handle)
(struct-out udp-listener)
udp-address?
udp-local-address?
(struct-out udp-packet)
spawn-udp-driver)
(require racket/set)
(require racket/match)
(require syndicate/monolithic)
(require syndicate/demand-matcher)
(require bitsyntax)
(require "dump-bytes.rkt")
(require "checksum.rkt")
(require "ip.rkt")
(require "port-allocator.rkt")
;; udp-address/udp-address : "kernel" udp connection state machines
;; udp-handle/udp-address : "user" outbound connections
;; udp-listener/udp-address : "user" inbound connections
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Protocol messages
(struct udp-remote-address (host port) #:prefab)
(struct udp-handle (id) #:prefab)
(struct udp-listener (port) #:prefab)
(define (udp-address? x)
(or (udp-remote-address? x)
(udp-local-address? x)))
(define (udp-local-address? x)
(or (udp-handle? x)
(udp-listener? x)))
;; USER-level protocol
(struct udp-packet (source destination body) #:prefab)
;; KERNEL-level protocol
(struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab)
(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress)
(define any-remote (udp-remote-address ? ?))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; User-accessible driver startup
(define (spawn-udp-driver)
(list
(spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?))
(advertise (udp-packet ? (?! (udp-listener ?)) ?))
(lambda (handle) (spawn-udp-relay (udp-listener-port handle) handle)))
(spawn-demand-matcher (observe (udp-packet ? (?! (udp-handle ?)) ?))
(advertise (udp-packet ? (?! (udp-handle ?)) ?))
(lambda (handle)
(message (port-allocation-request
'udp
(lambda (port local-ips) (spawn-udp-relay port handle))))))
(spawn-udp-port-allocator)
(spawn-kernel-udp-driver)))
(define (spawn-udp-port-allocator)
(define udp-projector (udp-port-allocation (?!) ?))
(spawn-port-allocator 'udp
(subscription (projection->pattern udp-projector))
(lambda (g local-ips)
(project-assertions g udp-projector))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Relaying
(define (spawn-udp-relay local-port local-user-addr)
(log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr)
(define local-peer-detector (?! (observe (udp-packet any-remote local-user-addr ?))))
(define (compute-gestalt local-ips)
(for/fold [(g (assertion-set-union
(subscription (projection->pattern local-peer-detector))
(advertisement (udp-packet any-remote local-user-addr ?))
observe-local-ip-addresses-gestalt
(subscription (udp-packet local-user-addr any-remote ?))
(assertion (udp-port-allocation local-port local-user-addr))))]
[(ip (in-set local-ips))]
(assertion-set-union g
(subscription (udp-datagram ? ? ip local-port ?))
(advertisement (udp-datagram ip local-port ? ? ?)))))
(spawn (lambda (e local-ips)
(match e
[(scn g)
(define new-local-ips (gestalt->local-ip-addresses g))
(if (trie-empty? (trie-project g local-peer-detector))
(quit)
(transition new-local-ips (scn (compute-gestalt new-local-ips))))]
[(message (udp-packet (== local-user-addr) remote-addr bs))
;; Choose arbitrary local IP address for outbound packet!
;; TODO: what can be done? Must I examine the routing table?
(match-define (udp-remote-address remote-host remote-port) remote-addr)
(define remote-ip (ip-string->ip-address remote-host))
(transition local-ips (message (udp-datagram (set-first local-ips)
local-port
remote-ip
remote-port
bs)))]
[(message (udp-datagram si sp _ _ bs))
(transition local-ips
(message (udp-packet (udp-remote-address (ip-address->hostname si) sp)
local-user-addr
bs)))]
[_ #f]))
(set)
(scn (compute-gestalt (set)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Codec & kernel-level driver
(define PROTOCOL-UDP 17)
(define (spawn-kernel-udp-driver)
(spawn (lambda (e local-ips)
(match e
[(scn g)
(transition (gestalt->local-ip-addresses g) '())]
[(message (ip-packet source-if src-ip dst-ip _ _ body))
#:when (and source-if (set-member? local-ips dst-ip))
(bit-string-case body
([ (src-port :: integer bytes 2)
(dst-port :: integer bytes 2)
(length :: integer bytes 2)
(checksum :: integer bytes 2) ;; TODO: check checksum
(data :: binary) ]
(bit-string-case data
([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes
(:: binary) ]
(transition local-ips (message (udp-datagram src-ip
src-port
dst-ip
dst-port
(bit-string->bytes payload)))))
(else #f)))
(else #f))]
[(message (udp-datagram src-ip src-port dst-ip dst-port bs))
#:when (set-member? local-ips src-ip)
(let* ((payload (bit-string (src-port :: integer bytes 2)
(dst-port :: integer bytes 2)
((+ 8 (bit-string-byte-count bs))
:: integer bytes 2)
(0 :: integer bytes 2) ;; checksum location
(bs :: binary)))
(pseudo-header (bit-string (src-ip :: binary bytes 4)
(dst-ip :: binary bytes 4)
0
PROTOCOL-UDP
((bit-string-byte-count payload)
:: integer bytes 2)))
(checksummed-payload (ip-checksum #:pseudo-header pseudo-header
6 payload)))
(transition local-ips (message (ip-packet #f
src-ip
dst-ip
PROTOCOL-UDP
#""
checksummed-payload))))]
[_ #f]))
(set)
(scn/union (advertisement (ip-packet #f ? ? PROTOCOL-UDP ? ?))
(subscription (ip-packet ? ? ? PROTOCOL-UDP ? ?))
(subscription (udp-datagram ? ? ? ? ?))
observe-local-ip-addresses-gestalt)))