Migrate ip, port-allocator, udp and tcp to syndicate/actor
This commit is contained in:
parent
04f1c56a5a
commit
09dfaf7d0e
|
@ -1,29 +1,25 @@
|
||||||
#lang racket/base
|
#lang syndicate/actor
|
||||||
|
|
||||||
(provide (struct-out ip-packet)
|
(provide (struct-out ip-packet)
|
||||||
ip-address->hostname
|
ip-address->hostname
|
||||||
ip-string->ip-address
|
ip-string->ip-address
|
||||||
apply-netmask
|
apply-netmask
|
||||||
ip-address-in-subnet?
|
ip-address-in-subnet?
|
||||||
gestalt->local-ip-addresses
|
query-local-ip-addresses
|
||||||
observe-local-ip-addresses-gestalt
|
|
||||||
broadcast-ip-address
|
broadcast-ip-address
|
||||||
spawn-ip-driver)
|
spawn-ip-driver)
|
||||||
|
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
(require racket/match)
|
|
||||||
(require (only-in racket/string string-split))
|
(require (only-in racket/string string-split))
|
||||||
(require syndicate/monolithic)
|
|
||||||
(require syndicate/drivers/timer)
|
|
||||||
(require syndicate/demand-matcher)
|
|
||||||
(require bitsyntax)
|
(require bitsyntax)
|
||||||
|
|
||||||
(require "dump-bytes.rkt")
|
(require "dump-bytes.rkt")
|
||||||
(require "configuration.rkt")
|
(require "configuration.rkt")
|
||||||
(require "checksum.rkt")
|
(require "checksum.rkt")
|
||||||
(require "ethernet.rkt")
|
|
||||||
(require "arp.rkt")
|
(require/activate syndicate/drivers/timer)
|
||||||
(require "on-claim.rkt")
|
(require/activate "ethernet.rkt")
|
||||||
|
(require/activate "arp.rkt")
|
||||||
|
|
||||||
(struct ip-packet (source-interface ;; string for an ethernet interface, or #f for local interfaces
|
(struct ip-packet (source-interface ;; string for an ethernet interface, or #f for local interfaces
|
||||||
source
|
source
|
||||||
|
@ -54,38 +50,38 @@
|
||||||
|
|
||||||
(define broadcast-ip-address (bytes 255 255 255 255))
|
(define broadcast-ip-address (bytes 255 255 255 255))
|
||||||
|
|
||||||
(define local-ip-address-projector (host-route (?!) ? ?))
|
(define (query-local-ip-addresses)
|
||||||
(define (gestalt->local-ip-addresses g) (trie-project/set/single g local-ip-address-projector))
|
(query-set local-ips (host-route $addr _ _) addr))
|
||||||
(define observe-local-ip-addresses-gestalt (subscription (host-route ? ? ?)))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(define (spawn-ip-driver)
|
(define (spawn-ip-driver)
|
||||||
(list
|
(actor #:name 'ip-driver
|
||||||
(spawn-demand-matcher (host-route (?!) (?!) (?!))
|
(react
|
||||||
(route-up (host-route (?!) (?!) (?!)))
|
(during/actor (host-route $my-address $netmask $interface-name)
|
||||||
spawn-host-route)
|
(assert (route-up (host-route my-address netmask interface-name)))
|
||||||
(spawn-demand-matcher (gateway-route (?!) (?!) (?!) (?!))
|
(do-host-route my-address netmask interface-name))
|
||||||
(route-up (gateway-route (?!) (?!) (?!) (?!)))
|
(during/actor (gateway-route $network $netmask $gateway-addr $interface-name)
|
||||||
spawn-gateway-route)
|
(assert (route-up
|
||||||
(spawn-demand-matcher (net-route (?!) (?!) (?!))
|
(gateway-route $network $netmask $gateway-addr $interface-name)))
|
||||||
(route-up (net-route (?!) (?!) (?!)))
|
(do-gateway-route network netmask gateway-addr interface-name))
|
||||||
spawn-net-route)))
|
(during/actor (net-route $network-addr $netmask $link)
|
||||||
|
(assert (route-up (net-route network-addr netmask link)))
|
||||||
|
(do-net-route network-addr netmask link)))))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Local IP route
|
;; Local IP route
|
||||||
|
|
||||||
(define (spawn-host-route my-address netmask interface-name)
|
(define (do-host-route my-address netmask interface-name)
|
||||||
(list
|
|
||||||
(let ((network-addr (apply-netmask my-address netmask)))
|
(let ((network-addr (apply-netmask my-address netmask)))
|
||||||
(spawn-normal-ip-route (host-route my-address netmask interface-name)
|
(do-normal-ip-route (host-route my-address netmask interface-name)
|
||||||
network-addr
|
network-addr
|
||||||
netmask
|
netmask
|
||||||
interface-name))
|
interface-name))
|
||||||
(spawn (lambda (e s)
|
|
||||||
(match e
|
(assert (advertise (ip-packet _ my-address _ PROTOCOL-ICMP _ _)))
|
||||||
[(scn (? trie-empty?)) (quit)]
|
(assert (arp-assertion IPv4-ethertype my-address interface-name))
|
||||||
[(message (ip-packet _ peer-address _ _ _ body))
|
(on (message (ip-packet _ $peer-address my-address PROTOCOL-ICMP _ $body))
|
||||||
(bit-string-case body
|
(bit-string-case body
|
||||||
([ type code (checksum :: integer bytes 2) (rest :: binary) ] ;; TODO: check cksum
|
([ type code (checksum :: integer bytes 2) (rest :: binary) ] ;; TODO: check cksum
|
||||||
(case type
|
(case type
|
||||||
|
@ -97,12 +93,12 @@
|
||||||
code
|
code
|
||||||
(0 :: integer bytes 2) ;; TODO
|
(0 :: integer bytes 2) ;; TODO
|
||||||
(rest :: binary)))
|
(rest :: binary)))
|
||||||
(transition s (message (ip-packet #f
|
(send! (ip-packet #f
|
||||||
my-address
|
my-address
|
||||||
peer-address
|
peer-address
|
||||||
PROTOCOL-ICMP
|
PROTOCOL-ICMP
|
||||||
#""
|
#""
|
||||||
(ip-checksum 2 reply-data0))))]
|
(ip-checksum 2 reply-data0)))]
|
||||||
[else
|
[else
|
||||||
(log-info "ICMP ~a/~a (cksum ~a) to ~a from ~a:\n~a"
|
(log-info "ICMP ~a/~a (cksum ~a) to ~a from ~a:\n~a"
|
||||||
type
|
type
|
||||||
|
@ -110,126 +106,90 @@
|
||||||
checksum
|
checksum
|
||||||
(pretty-bytes my-address)
|
(pretty-bytes my-address)
|
||||||
(pretty-bytes peer-address)
|
(pretty-bytes peer-address)
|
||||||
(dump-bytes->string rest))
|
(dump-bytes->string rest))]))
|
||||||
#f]))
|
(else #f))))
|
||||||
(else #f))]
|
|
||||||
[_ #f]))
|
|
||||||
(void)
|
|
||||||
(scn/union (advertisement (ip-packet ? my-address ? PROTOCOL-ICMP ? ?))
|
|
||||||
(subscription (ip-packet ? ? my-address PROTOCOL-ICMP ? ?))
|
|
||||||
(assertion (arp-assertion IPv4-ethertype my-address interface-name))
|
|
||||||
(subscription (host-route my-address netmask interface-name))))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Gateway IP route
|
;; Gateway IP route
|
||||||
|
|
||||||
(struct gateway-route-state (routes gateway-interface gateway-hwaddr) #:transparent)
|
(struct gateway-route-state (routes gateway-interface gateway-hwaddr) #:transparent)
|
||||||
|
|
||||||
(define (spawn-gateway-route network netmask gateway-addr interface-name)
|
(define (do-gateway-route network netmask gateway-addr interface-name)
|
||||||
(define the-route (gateway-route network netmask gateway-addr interface-name))
|
(define the-route (gateway-route network netmask gateway-addr interface-name))
|
||||||
|
|
||||||
(define host-route-projector (host-route (?!) (?!) ?))
|
(field [routes (set)])
|
||||||
(define gateway-route-projector (gateway-route (?!) (?!) ? ?))
|
(query-set* routes (host-route $addr $netmask _) (list addr netmask))
|
||||||
(define net-route-projector (net-route (?!) (?!) ?))
|
(query-set* routes (gateway-route $addr $netmask _ _) (list addr netmask))
|
||||||
(define gateway-arp-projector (arp-query IPv4-ethertype
|
(query-set* routes (net-route $addr $netmask _) (list addr netmask))
|
||||||
gateway-addr
|
|
||||||
(?! (ethernet-interface interface-name ?))
|
|
||||||
(?!)))
|
|
||||||
|
|
||||||
(define (covered-by-some-other-route? addr routes)
|
(field [gateway-interface #f]
|
||||||
(for/or ([r (in-set routes)])
|
[gateway-hwaddr #f])
|
||||||
|
(on (asserted (arp-query IPv4-ethertype
|
||||||
|
gateway-addr
|
||||||
|
($ iface (ethernet-interface interface-name _))
|
||||||
|
$hwaddr))
|
||||||
|
(when (not (gateway-hwaddr))
|
||||||
|
(log-info "Discovered gateway ~a at ~a on interface ~a."
|
||||||
|
(ip-address->hostname gateway-addr)
|
||||||
|
(ethernet-interface-name iface)
|
||||||
|
(pretty-bytes hwaddr)))
|
||||||
|
(gateway-interface iface)
|
||||||
|
(gateway-hwaddr hwaddr))
|
||||||
|
|
||||||
|
(define (covered-by-some-other-route? addr)
|
||||||
|
(for/or ([r (in-set (routes))])
|
||||||
(match-define (list net msk) r)
|
(match-define (list net msk) r)
|
||||||
(and (positive? msk)
|
(and (positive? msk)
|
||||||
(ip-address-in-subnet? addr net msk))))
|
(ip-address-in-subnet? addr net msk))))
|
||||||
|
|
||||||
(spawn (lambda (e s)
|
(on (message ($ p (ip-packet _ _ _ _ _ _)))
|
||||||
(match e
|
(when (not (gateway-interface))
|
||||||
[(scn g)
|
|
||||||
(define host-ips+netmasks (trie-project/set #:take 2 g host-route-projector))
|
|
||||||
(define gw-nets+netmasks (trie-project/set #:take 2 g gateway-route-projector))
|
|
||||||
(define net-nets+netmasks (trie-project/set #:take 2 g net-route-projector))
|
|
||||||
(define gw-ip+hwaddr
|
|
||||||
(let ((vs (trie-project/set #:take 2 g gateway-arp-projector)))
|
|
||||||
(and vs (not (set-empty? vs)) (set-first vs))))
|
|
||||||
(when (and gw-ip+hwaddr (not (gateway-route-state-gateway-hwaddr s)))
|
|
||||||
(log-info "Discovered gateway ~a at ~a on interface ~a."
|
|
||||||
(ip-address->hostname gateway-addr)
|
|
||||||
(ethernet-interface-name (car gw-ip+hwaddr))
|
|
||||||
(pretty-bytes (cadr gw-ip+hwaddr))))
|
|
||||||
(if (trie-empty? (project-assertions g (?! the-route)))
|
|
||||||
(quit)
|
|
||||||
(transition (gateway-route-state
|
|
||||||
(set-union host-ips+netmasks
|
|
||||||
gw-nets+netmasks
|
|
||||||
net-nets+netmasks)
|
|
||||||
(and gw-ip+hwaddr (car gw-ip+hwaddr))
|
|
||||||
(and gw-ip+hwaddr (cadr gw-ip+hwaddr)))
|
|
||||||
'()))]
|
|
||||||
[(message (? ip-packet? p))
|
|
||||||
(define gw-if (gateway-route-state-gateway-interface s))
|
|
||||||
(when (not gw-if)
|
|
||||||
(log-warning "Gateway hwaddr for ~a not known, packet dropped."
|
(log-warning "Gateway hwaddr for ~a not known, packet dropped."
|
||||||
(ip-address->hostname gateway-addr)))
|
(ip-address->hostname gateway-addr)))
|
||||||
(and gw-if
|
(when (and (gateway-interface)
|
||||||
(not (equal? (ip-packet-source-interface p) (ethernet-interface-name gw-if)))
|
(not (equal? (ip-packet-source-interface p)
|
||||||
(not (covered-by-some-other-route? (ip-packet-destination p)
|
(ethernet-interface-name (gateway-interface))))
|
||||||
(gateway-route-state-routes s)))
|
(not (covered-by-some-other-route? (ip-packet-destination p))))
|
||||||
(transition s
|
(send! (ethernet-packet (gateway-interface)
|
||||||
(message (ethernet-packet gw-if
|
|
||||||
#f
|
#f
|
||||||
(ethernet-interface-hwaddr gw-if)
|
(ethernet-interface-hwaddr (gateway-interface))
|
||||||
(gateway-route-state-gateway-hwaddr s)
|
(gateway-hwaddr)
|
||||||
IPv4-ethertype
|
IPv4-ethertype
|
||||||
(format-ip-packet p)))))]
|
(format-ip-packet p))))))
|
||||||
[_ #f]))
|
|
||||||
(gateway-route-state (set) #f #f)
|
|
||||||
(scn/union (subscription the-route)
|
|
||||||
(assertion (route-up the-route))
|
|
||||||
(subscription (ip-packet ? ? ? ? ? ?))
|
|
||||||
observe-local-ip-addresses-gestalt
|
|
||||||
(subscription (net-route ? ? ?))
|
|
||||||
(subscription (gateway-route ? ? ? ?))
|
|
||||||
(subscription (projection->pattern gateway-arp-projector)))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; General net route
|
;; General net route
|
||||||
|
|
||||||
(define (spawn-net-route network-addr netmask link)
|
(define (do-net-route network-addr netmask link)
|
||||||
(spawn-normal-ip-route (net-route network-addr netmask link) network-addr netmask link))
|
(do-normal-ip-route (net-route network-addr netmask link) network-addr netmask link))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Normal IP route
|
;; Normal IP route
|
||||||
|
|
||||||
(define (spawn-normal-ip-route the-route network netmask interface-name)
|
(define (do-normal-ip-route the-route network netmask interface-name)
|
||||||
(spawn (lambda (e s)
|
(assert (arp-interface interface-name))
|
||||||
(match e
|
(on (message (ethernet-packet (ethernet-interface interface-name _) #t _ _ IPv4-ethertype $body))
|
||||||
[(scn (? trie-empty?)) (quit)]
|
|
||||||
[(message (ethernet-packet _ _ _ _ _ body))
|
|
||||||
(define p (parse-ip-packet interface-name body))
|
(define p (parse-ip-packet interface-name body))
|
||||||
(and p (transition s (message p)))]
|
(when p (send! p)))
|
||||||
[(message (? ip-packet? p))
|
(on (message ($ p (ip-packet _ _ _ _ _ _)))
|
||||||
(define destination (ip-packet-destination p))
|
(define destination (ip-packet-destination p))
|
||||||
(and (not (equal? (ip-packet-source-interface p) interface-name))
|
(when (and (not (equal? (ip-packet-source-interface p) interface-name))
|
||||||
(ip-address-in-subnet? destination network netmask)
|
(ip-address-in-subnet? destination network netmask))
|
||||||
(transition
|
(define timer-id (gensym 'ippkt))
|
||||||
s
|
(react (on-start (send! (set-timer timer-id 5000 'relative)))
|
||||||
(lookup-arp destination
|
(stop-when (message (timer-expired timer-id _))
|
||||||
(ethernet-interface interface-name ?)
|
(log-warning "ARP lookup of ~a failed, packet dropped"
|
||||||
trie-empty
|
(ip-address->hostname destination)))
|
||||||
(lambda (interface destination-hwaddr)
|
(stop-when (asserted (arp-query IPv4-ethertype
|
||||||
(message (ethernet-packet interface
|
destination
|
||||||
|
($ interface (ethernet-interface interface-name _))
|
||||||
|
$destination-hwaddr))
|
||||||
|
(send! (ethernet-packet interface
|
||||||
#f
|
#f
|
||||||
(ethernet-interface-hwaddr interface)
|
(ethernet-interface-hwaddr interface)
|
||||||
destination-hwaddr
|
destination-hwaddr
|
||||||
IPv4-ethertype
|
IPv4-ethertype
|
||||||
(format-ip-packet p)))))))]
|
(format-ip-packet p))))))))
|
||||||
[_ #f]))
|
|
||||||
(void)
|
|
||||||
(scn/union (subscription the-route)
|
|
||||||
(assertion (route-up the-route))
|
|
||||||
(subscription (ethernet-packet-pattern interface-name #t IPv4-ethertype))
|
|
||||||
(assertion (arp-interface interface-name))
|
|
||||||
(subscription (ip-packet ? ? ? ? ? ?)))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
@ -301,27 +261,6 @@
|
||||||
|
|
||||||
full-packet)
|
full-packet)
|
||||||
|
|
||||||
(define (lookup-arp ipaddr query-interface-pattern base-gestalt k)
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
(on-claim #:name (string->symbol (format "lookup-arp:~a" (ip-address->hostname ipaddr)))
|
|
||||||
(lambda (_g arp-results)
|
(spawn-ip-driver)
|
||||||
(if (not arp-results)
|
|
||||||
(error 'ip "Someone has published a wildcard arp result")
|
|
||||||
(and (not (set-empty? arp-results))
|
|
||||||
(match (set-first arp-results)
|
|
||||||
[(list interface hwaddr)
|
|
||||||
(log-info "ARP lookup yielded ~a on ~a for ~a"
|
|
||||||
(pretty-bytes hwaddr)
|
|
||||||
(ethernet-interface-name interface)
|
|
||||||
(ip-address->hostname ipaddr))
|
|
||||||
(when (> (set-count arp-results) 1)
|
|
||||||
(log-warning "Ambiguous ARP result for ~a: ~v"
|
|
||||||
(ip-address->hostname ipaddr)
|
|
||||||
arp-results))
|
|
||||||
(k interface hwaddr)]))))
|
|
||||||
base-gestalt
|
|
||||||
(arp-query IPv4-ethertype ipaddr (?! query-interface-pattern) (?!))
|
|
||||||
#:timeout-msec 5000
|
|
||||||
#:on-timeout (lambda ()
|
|
||||||
(log-warning "ARP lookup of ~a failed, packet dropped"
|
|
||||||
(ip-address->hostname ipaddr))
|
|
||||||
'())))
|
|
||||||
|
|
|
@ -1,17 +1,14 @@
|
||||||
#lang syndicate/actor
|
#lang syndicate/actor
|
||||||
|
|
||||||
(require "ip.rkt")
|
|
||||||
(require "tcp.rkt")
|
|
||||||
(require "udp.rkt")
|
|
||||||
|
|
||||||
;;(log-events-and-actions? #t)
|
;;(log-events-and-actions? #t)
|
||||||
|
|
||||||
(require/activate syndicate/drivers/timer)
|
(require/activate syndicate/drivers/timer)
|
||||||
(require/activate "ethernet.rkt")
|
(require/activate "ethernet.rkt")
|
||||||
(require/activate "arp.rkt")
|
(require/activate "arp.rkt")
|
||||||
(spawn-ip-driver)
|
(require/activate "ip.rkt")
|
||||||
(spawn-tcp-driver)
|
(require/activate "tcp.rkt")
|
||||||
(spawn-udp-driver)
|
(require/activate "udp.rkt")
|
||||||
(require/activate "demo-config.rkt")
|
(require/activate "demo-config.rkt")
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
|
@ -1,38 +1,37 @@
|
||||||
#lang racket/base
|
#lang syndicate/actor
|
||||||
;; UDP/TCP port allocator
|
;; UDP/TCP port allocator
|
||||||
|
|
||||||
(provide spawn-port-allocator
|
(provide spawn-port-allocator
|
||||||
(struct-out port-allocation-request))
|
allocate-port!
|
||||||
|
(struct-out port-allocation-request)
|
||||||
|
(struct-out port-allocation-reply))
|
||||||
|
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
(require racket/match)
|
|
||||||
(require syndicate/monolithic)
|
|
||||||
(require "ip.rkt")
|
(require "ip.rkt")
|
||||||
|
|
||||||
(struct port-allocation-request (type k) #:prefab)
|
(struct port-allocation-request (reqid type) #:prefab)
|
||||||
|
(struct port-allocation-reply (reqid port) #:prefab)
|
||||||
|
|
||||||
(struct port-allocator-state (used-ports local-ips) #:transparent)
|
(define (spawn-port-allocator allocator-type query-used-ports)
|
||||||
|
(actor #:name (list 'port-allocator allocator-type)
|
||||||
|
(react
|
||||||
|
(define local-ips (query-local-ip-addresses))
|
||||||
|
(define used-ports (query-used-ports))
|
||||||
|
|
||||||
(define (spawn-port-allocator allocator-type observer-gestalt compute-used-ports)
|
;; TODO: How can I get this to run whenever used-ports changes?
|
||||||
(spawn #:name (string->symbol (format "port-allocator:~a" allocator-type))
|
;; (log-info "port-allocator ~v used ports: ~v" allocator-type new-used-ports)
|
||||||
(lambda (e s)
|
|
||||||
(match e
|
(on (message (port-allocation-request $reqid allocator-type))
|
||||||
[(scn g)
|
(define currently-used-ports (used-ports))
|
||||||
(define local-ips (or (gestalt->local-ip-addresses g) (set)))
|
|
||||||
(define new-used-ports (compute-used-ports g local-ips))
|
|
||||||
(log-info "port-allocator ~v used ports: ~v" allocator-type new-used-ports)
|
|
||||||
(transition (port-allocator-state new-used-ports local-ips) '())]
|
|
||||||
[(message (port-allocation-request _ k))
|
|
||||||
(define currently-used-ports (port-allocator-state-used-ports s))
|
|
||||||
(let randomly-allocate-until-unused ()
|
(let randomly-allocate-until-unused ()
|
||||||
(define p (+ 1024 (random 64512)))
|
(define p (+ 1024 (random 64512)))
|
||||||
(if (set-member? currently-used-ports p)
|
(if (set-member? currently-used-ports p)
|
||||||
(randomly-allocate-until-unused)
|
(randomly-allocate-until-unused)
|
||||||
(transition (struct-copy port-allocator-state s
|
(begin (used-ports (set-add currently-used-ports p))
|
||||||
[used-ports (set-add currently-used-ports p)])
|
(send! (port-allocation-reply reqid p)))))))))
|
||||||
(k p (port-allocator-state-local-ips s)))))]
|
|
||||||
[_ #f]))
|
(define (allocate-port! type)
|
||||||
(port-allocator-state (set) (set))
|
(define reqid (gensym 'allocate-port!))
|
||||||
(scn/union (subscription (port-allocation-request allocator-type ?))
|
(react/suspend (done)
|
||||||
observe-local-ip-addresses-gestalt
|
(stop-when (message (port-allocation-reply reqid $port)) (done port))
|
||||||
observer-gestalt)))
|
(on-start (send! (port-allocation-request reqid type)))))
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#lang racket/base
|
#lang syndicate/actor
|
||||||
|
|
||||||
(provide (struct-out tcp-address)
|
(provide (struct-out tcp-address)
|
||||||
(struct-out tcp-handle)
|
(struct-out tcp-handle)
|
||||||
|
@ -7,14 +7,12 @@
|
||||||
spawn-tcp-driver)
|
spawn-tcp-driver)
|
||||||
|
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
(require racket/match)
|
|
||||||
(require syndicate/monolithic)
|
|
||||||
(require syndicate/drivers/timer)
|
|
||||||
(require syndicate/demand-matcher)
|
|
||||||
(require bitsyntax)
|
(require bitsyntax)
|
||||||
|
|
||||||
(require "dump-bytes.rkt")
|
(require "dump-bytes.rkt")
|
||||||
(require "checksum.rkt")
|
(require "checksum.rkt")
|
||||||
|
|
||||||
|
(require/activate syndicate/drivers/timer)
|
||||||
(require "ip.rkt")
|
(require "ip.rkt")
|
||||||
(require "port-allocator.rkt")
|
(require "port-allocator.rkt")
|
||||||
|
|
||||||
|
@ -51,47 +49,25 @@
|
||||||
;; User-accessible driver startup
|
;; User-accessible driver startup
|
||||||
|
|
||||||
(define (spawn-tcp-driver)
|
(define (spawn-tcp-driver)
|
||||||
(list (spawn-demand-matcher #:name 'tcp-inbound-driver
|
(spawn-port-allocator 'tcp (lambda () (query-set tcp-ports (tcp-port-allocation $p _) p)))
|
||||||
(advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?)))
|
(spawn-kernel-tcp-driver)
|
||||||
(advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?)))
|
(actor #:name 'tcp-inbound-driver
|
||||||
(lambda (server-addr)
|
(react
|
||||||
|
(during/actor (advertise (observe (tcp-channel _ ($ server-addr (tcp-listener _)) _)))
|
||||||
|
#:name (list 'tcp-listen server-addr)
|
||||||
(match-define (tcp-listener port) server-addr)
|
(match-define (tcp-listener port) server-addr)
|
||||||
;; TODO: have listener shut down once user-level listener does
|
(assert (tcp-port-allocation port server-addr))
|
||||||
(list
|
(on (asserted (advertise (tcp-channel ($ remote-addr (tcp-address _ _))
|
||||||
(spawn #:name (string->symbol
|
($ local-addr (tcp-address _ port))
|
||||||
(format "tcp-listener-port-reservation:~a" port))
|
_)))
|
||||||
(lambda (e s) #f)
|
(spawn-relay server-addr remote-addr local-addr)))))
|
||||||
(void)
|
(actor #:name 'tcp-outbound-driver
|
||||||
(scn (assertion (tcp-port-allocation port server-addr))))
|
(react
|
||||||
(spawn-demand-matcher
|
(define local-ips (query-local-ip-addresses))
|
||||||
#:name (string->symbol (format "tcp-listener:~a" port))
|
(on (asserted (advertise (tcp-channel ($ local-addr (tcp-handle _))
|
||||||
(advertise (tcp-channel (?! (tcp-address ? ?))
|
($ remote-addr (tcp-address _ _))
|
||||||
(?! (tcp-address ? port))
|
_)))
|
||||||
?))
|
(define port (allocate-port! 'tcp))
|
||||||
(observe (tcp-channel (?! (tcp-address ? ?))
|
|
||||||
(?! (tcp-address ? port))
|
|
||||||
?))
|
|
||||||
(spawn-relay server-addr)))))
|
|
||||||
(spawn-demand-matcher #:name 'tcp-outbound-driver
|
|
||||||
(advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
|
|
||||||
(observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
|
|
||||||
allocate-port-and-spawn-socket)
|
|
||||||
(spawn-tcp-port-allocator)
|
|
||||||
(spawn-kernel-tcp-driver)))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; Port allocation
|
|
||||||
|
|
||||||
(define (spawn-tcp-port-allocator)
|
|
||||||
(spawn-port-allocator 'tcp
|
|
||||||
(subscription (tcp-port-allocation ? ?))
|
|
||||||
(lambda (g local-ips)
|
|
||||||
(project-assertions g (tcp-port-allocation (?!) ?)))))
|
|
||||||
|
|
||||||
(define (allocate-port-and-spawn-socket local-addr remote-addr)
|
|
||||||
(message (port-allocation-request
|
|
||||||
'tcp
|
|
||||||
(lambda (port local-ips)
|
|
||||||
;; TODO: Choose a sensible IP address for the outbound
|
;; TODO: Choose a sensible IP address for the outbound
|
||||||
;; connection. We don't have enough information to do this
|
;; connection. We don't have enough information to do this
|
||||||
;; well at the moment, so just pick some available local IP
|
;; well at the moment, so just pick some available local IP
|
||||||
|
@ -102,71 +78,68 @@
|
||||||
;; addresses only route to a given bucket-of-state and ONLY
|
;; addresses only route to a given bucket-of-state and ONLY
|
||||||
;; the port number selects a substate therein. That's not
|
;; the port number selects a substate therein. That's not
|
||||||
;; how TCP is defined however so we can't do that.
|
;; how TCP is defined however so we can't do that.
|
||||||
(define appropriate-ip (set-first local-ips))
|
(define appropriate-ip (set-first (local-ips)))
|
||||||
(define appropriate-host (ip-address->hostname appropriate-ip))
|
(define appropriate-host (ip-address->hostname appropriate-ip))
|
||||||
(match-define (tcp-address remote-host remote-port) remote-addr)
|
(match-define (tcp-address remote-host remote-port) remote-addr)
|
||||||
(define remote-ip (ip-string->ip-address remote-host))
|
(define remote-ip (ip-string->ip-address remote-host))
|
||||||
(list
|
(spawn-relay local-addr remote-addr (tcp-address appropriate-host port))
|
||||||
((spawn-relay local-addr) remote-addr (tcp-address appropriate-host port))
|
(spawn-state-vector remote-ip remote-port appropriate-ip port)))))
|
||||||
(spawn-state-vector remote-ip remote-port appropriate-ip port))))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Relay between kernel-level and user-level
|
;; Relay between kernel-level and user-level
|
||||||
|
|
||||||
(define relay-peer-wait-time-msec 5000)
|
(define relay-peer-wait-time-msec 5000)
|
||||||
|
|
||||||
(define ((spawn-relay local-user-addr) remote-addr local-tcp-addr)
|
(define (spawn-relay local-user-addr remote-addr local-tcp-addr)
|
||||||
(define timer-name (list 'spawn-relay local-tcp-addr remote-addr))
|
(define timer-name (list 'spawn-relay local-tcp-addr remote-addr))
|
||||||
(define local-peer-traffic (?! (observe (tcp-channel remote-addr local-user-addr ?))))
|
|
||||||
(define remote-peer-traffic (?! (advertise (tcp-channel remote-addr local-tcp-addr ?))))
|
|
||||||
(list
|
|
||||||
(message (set-timer timer-name relay-peer-wait-time-msec 'relative))
|
|
||||||
(spawn #:name (string->symbol (format "tcp-relay:~v:~v:~v"
|
|
||||||
local-user-addr
|
|
||||||
remote-addr
|
|
||||||
local-tcp-addr))
|
|
||||||
(lambda (e state)
|
|
||||||
(match e
|
|
||||||
[(scn g)
|
|
||||||
(define local-peer-absent? (trie-empty? (trie-project g local-peer-traffic)))
|
|
||||||
(define remote-peer-absent? (trie-empty? (trie-project g remote-peer-traffic)))
|
|
||||||
(define new-state (+ (if local-peer-absent? 0 1) (if remote-peer-absent? 0 1)))
|
|
||||||
(if (< new-state state)
|
|
||||||
(quit)
|
|
||||||
(transition new-state '()))]
|
|
||||||
[(message (tcp-channel (== local-user-addr) (== remote-addr) bs))
|
|
||||||
(transition state (message (tcp-channel local-tcp-addr remote-addr bs)))]
|
|
||||||
[(message (tcp-channel (== remote-addr) (== local-tcp-addr) bs))
|
|
||||||
(transition state (message (tcp-channel remote-addr local-user-addr bs)))]
|
|
||||||
[(message (timer-expired _ _))
|
|
||||||
#:when (< state 2) ;; we only care if we're not fully connected
|
|
||||||
(error 'spawn-relay "TCP relay process timed out waiting for peer")]
|
|
||||||
[_ #f]))
|
|
||||||
0
|
|
||||||
(scn/union (subscription (projection->pattern local-peer-traffic))
|
|
||||||
(subscription (projection->pattern remote-peer-traffic))
|
|
||||||
(assertion (tcp-port-allocation (tcp-address-port local-tcp-addr)
|
|
||||||
local-user-addr))
|
|
||||||
(subscription (tcp-channel remote-addr local-tcp-addr ?))
|
|
||||||
(subscription (tcp-channel local-user-addr remote-addr ?))
|
|
||||||
(advertisement (tcp-channel remote-addr local-user-addr ?))
|
|
||||||
(advertisement (tcp-channel local-tcp-addr remote-addr ?))
|
|
||||||
(subscription (timer-expired timer-name ?))))))
|
|
||||||
|
|
||||||
|
(actor #:name (list 'tcp-relay local-user-addr remote-addr local-tcp-addr)
|
||||||
|
(react
|
||||||
|
(assert (tcp-port-allocation (tcp-address-port local-tcp-addr) local-user-addr))
|
||||||
|
(assert (advertise (tcp-channel remote-addr local-user-addr _)))
|
||||||
|
(assert (advertise (tcp-channel local-tcp-addr remote-addr _)))
|
||||||
|
|
||||||
|
(field [local-peer-present? #f]
|
||||||
|
[remote-peer-present? #f])
|
||||||
|
|
||||||
|
(on-start (send! (set-timer timer-name relay-peer-wait-time-msec 'relative)))
|
||||||
|
(on (message (timer-expired timer-name _))
|
||||||
|
(when (not (and (local-peer-present?) (remote-peer-present?)))
|
||||||
|
(error 'spawn-relay "TCP relay process timed out waiting for peer")))
|
||||||
|
|
||||||
|
(on (asserted (observe (tcp-channel remote-addr local-user-addr _)))
|
||||||
|
(local-peer-present? #t))
|
||||||
|
(stop-when (retracted (observe (tcp-channel remote-addr local-user-addr _))))
|
||||||
|
|
||||||
|
(on (asserted (advertise (tcp-channel remote-addr local-tcp-addr _)))
|
||||||
|
(remote-peer-present? #t))
|
||||||
|
(stop-when (retracted (advertise (tcp-channel remote-addr local-tcp-addr _))))
|
||||||
|
|
||||||
|
(on (message (tcp-channel local-user-addr remote-addr $bs))
|
||||||
|
(send! (tcp-channel local-tcp-addr remote-addr bs)))
|
||||||
|
|
||||||
|
(on (message (tcp-channel remote-addr local-tcp-addr $bs))
|
||||||
|
(send! (tcp-channel remote-addr local-user-addr bs))))))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Codec & kernel-level driver
|
;; Codec & kernel-level driver
|
||||||
|
|
||||||
(define PROTOCOL-TCP 6)
|
(define PROTOCOL-TCP 6)
|
||||||
|
|
||||||
(struct codec-state (local-ips active-state-vectors) #:transparent)
|
|
||||||
|
|
||||||
(define (spawn-kernel-tcp-driver)
|
(define (spawn-kernel-tcp-driver)
|
||||||
|
(actor #:name 'kernel-tcp-driver
|
||||||
|
(forever
|
||||||
|
(define local-ips (query-local-ip-addresses))
|
||||||
|
|
||||||
(define (state-vector-active? statevec s)
|
(define active-state-vectors
|
||||||
(set-member? (codec-state-active-state-vectors s) statevec))
|
(query-set active-state-vectors
|
||||||
|
(observe (observe (tcp-packet _ $si $sp $di $dp _ _ _ _ _ _)))
|
||||||
|
(list si sp di dp)))
|
||||||
|
|
||||||
(define (analyze-incoming-packet src-ip dst-ip body s)
|
(define (state-vector-active? statevec)
|
||||||
|
(set-member? (active-state-vectors) statevec))
|
||||||
|
|
||||||
|
(define (analyze-incoming-packet src-ip dst-ip body)
|
||||||
(bit-string-case body
|
(bit-string-case body
|
||||||
([ (src-port :: integer bytes 2)
|
([ (src-port :: integer bytes 2)
|
||||||
(dst-port :: integer bytes 2)
|
(dst-port :: integer bytes 2)
|
||||||
|
@ -189,8 +162,8 @@
|
||||||
(rest :: binary) ]
|
(rest :: binary) ]
|
||||||
(let* ((flags (set))
|
(let* ((flags (set))
|
||||||
(statevec (list src-ip src-port dst-ip dst-port))
|
(statevec (list src-ip src-port dst-ip dst-port))
|
||||||
(old-active-state-vectors (codec-state-active-state-vectors s))
|
(old-active-state-vectors (active-state-vectors))
|
||||||
(spawn-needed? (and (not (state-vector-active? statevec s))
|
(spawn-needed? (and (not (state-vector-active? statevec))
|
||||||
(zero? rst)))) ;; don't bother spawning if it's a rst
|
(zero? rst)))) ;; don't bother spawning if it's a rst
|
||||||
(define-syntax-rule (set-flags! v ...)
|
(define-syntax-rule (set-flags! v ...)
|
||||||
(begin (unless (zero? v) (set! flags (set-add flags 'v))) ...))
|
(begin (unless (zero? v) (set! flags (set-add flags 'v))) ...))
|
||||||
|
@ -219,30 +192,19 @@
|
||||||
window-size
|
window-size
|
||||||
(bit-string->bytes opts)
|
(bit-string->bytes opts)
|
||||||
(bit-string->bytes data))))
|
(bit-string->bytes data))))
|
||||||
(transition (if spawn-needed?
|
(when spawn-needed?
|
||||||
(struct-copy codec-state s
|
(active-state-vectors (set-add (active-state-vectors) statevec))
|
||||||
[active-state-vectors
|
(spawn-state-vector src-ip src-port dst-ip dst-port))
|
||||||
(set-add old-active-state-vectors statevec)])
|
|
||||||
s)
|
|
||||||
(list
|
|
||||||
(when spawn-needed? (spawn-state-vector src-ip src-port
|
|
||||||
dst-ip dst-port))
|
|
||||||
;; TODO: get packet to the new state-vector process somehow
|
;; TODO: get packet to the new state-vector process somehow
|
||||||
(message packet)))))
|
(send! packet)))
|
||||||
(else #f))))
|
(else #f))))
|
||||||
(else #f)))
|
(else #f)))
|
||||||
|
|
||||||
(define statevec-projection (observe (tcp-packet ? (?!) (?!) (?!) (?!) ? ? ? ? ? ?)))
|
;; TODO: again, want to print this when local-ips or
|
||||||
|
;; active-state-vectors change.
|
||||||
|
;; (log-info "gestalt yielded statevecs ~v and local-ips ~v" statevecs local-ips)
|
||||||
|
|
||||||
(define (analyze-gestalt g s)
|
(define (deliver-outbound-packet p)
|
||||||
(define local-ips (gestalt->local-ip-addresses g))
|
|
||||||
(define statevecs (trie-project/set #:take 4 g statevec-projection))
|
|
||||||
(log-info "gestalt yielded statevecs ~v and local-ips ~v" statevecs local-ips)
|
|
||||||
(transition (struct-copy codec-state s
|
|
||||||
[local-ips local-ips]
|
|
||||||
[active-state-vectors statevecs]) '()))
|
|
||||||
|
|
||||||
(define (deliver-outbound-packet p s)
|
|
||||||
(match-define (tcp-packet #f
|
(match-define (tcp-packet #f
|
||||||
src-ip
|
src-ip
|
||||||
src-port
|
src-port
|
||||||
|
@ -290,27 +252,16 @@
|
||||||
0
|
0
|
||||||
PROTOCOL-TCP
|
PROTOCOL-TCP
|
||||||
((bit-string-byte-count payload) :: integer bytes 2)))
|
((bit-string-byte-count payload) :: integer bytes 2)))
|
||||||
(transition s (message (ip-packet #f src-ip dst-ip PROTOCOL-TCP #""
|
(send! (ip-packet #f src-ip dst-ip PROTOCOL-TCP #""
|
||||||
(ip-checksum 16 payload #:pseudo-header pseudo-header)))))
|
(ip-checksum 16 payload #:pseudo-header pseudo-header))))
|
||||||
|
|
||||||
(spawn #:name 'kernel-tcp-driver
|
(on (message (ip-packet $source-if $src $dst PROTOCOL-TCP _ $body))
|
||||||
(lambda (e s)
|
(when (and source-if ;; source-if == #f iff packet originates locally
|
||||||
(match e
|
(set-member? (local-ips) dst))
|
||||||
[(scn g)
|
(analyze-incoming-packet src dst body)))
|
||||||
(analyze-gestalt g s)]
|
|
||||||
[(message (ip-packet source-if src dst _ _ body))
|
(on (message ($ p (tcp-packet #f _ _ _ _ _ _ _ _ _ _)))
|
||||||
#:when (and source-if ;; source-if == #f iff packet originates locally
|
(deliver-outbound-packet p)))))
|
||||||
(set-member? (codec-state-local-ips s) dst))
|
|
||||||
(analyze-incoming-packet src dst body s)]
|
|
||||||
[(message (? tcp-packet? p))
|
|
||||||
#:when (not (tcp-packet-from-wire? p))
|
|
||||||
(deliver-outbound-packet p s)]
|
|
||||||
[_ #f]))
|
|
||||||
(codec-state (set) (set))
|
|
||||||
(scn/union (subscription (ip-packet ? ? ? PROTOCOL-TCP ? ?))
|
|
||||||
(subscription (tcp-packet #f ? ? ? ? ? ? ? ? ? ?))
|
|
||||||
(subscription (observe (tcp-packet #t ? ? ? ? ? ? ? ? ? ?)))
|
|
||||||
observe-local-ip-addresses-gestalt)))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Per-connection state vector process
|
;; Per-connection state vector process
|
||||||
|
@ -321,16 +272,8 @@
|
||||||
finished?) ;; boolean: true after FIN
|
finished?) ;; boolean: true after FIN
|
||||||
#:transparent)
|
#:transparent)
|
||||||
|
|
||||||
(struct conn-state (outbound ;; buffer
|
(define (buffer-push b data)
|
||||||
inbound ;; buffer
|
(struct-copy buffer b [data (bit-string-append (buffer-data b) data)]))
|
||||||
syn-acked? ;; boolean
|
|
||||||
latest-peer-activity-time ;; from current-inexact-milliseconds
|
|
||||||
;; ^ the most recent time we heard from our peer
|
|
||||||
user-timeout-base-time ;; from current-inexact-milliseconds
|
|
||||||
;; ^ when the index of the first outbound unacknowledged byte changed
|
|
||||||
local-peer-seen? ;; boolean
|
|
||||||
listener-listening?) ;; boolean
|
|
||||||
#:transparent)
|
|
||||||
|
|
||||||
(define transmit-check-interval-msec 2000)
|
(define transmit-check-interval-msec 2000)
|
||||||
(define inbound-buffer-limit 65535)
|
(define inbound-buffer-limit 65535)
|
||||||
|
@ -340,326 +283,265 @@
|
||||||
;; cheat; RFC 793 says "the present global default is five minutes", which is
|
;; cheat; RFC 793 says "the present global default is five minutes", which is
|
||||||
;; reasonable to be getting on with
|
;; reasonable to be getting on with
|
||||||
|
|
||||||
|
(define (seq+ a b) (bitwise-and #xffffffff (+ a b)))
|
||||||
|
|
||||||
|
;; Always positive
|
||||||
|
(define (seq- larger smaller)
|
||||||
|
(if (< larger smaller) ;; wraparound has occurred
|
||||||
|
(+ (- larger smaller) #x100000000)
|
||||||
|
(- larger smaller)))
|
||||||
|
|
||||||
|
(define (seq> a b)
|
||||||
|
(< (seq- a b) #x80000000))
|
||||||
|
|
||||||
(define (spawn-state-vector src-ip src-port dst-ip dst-port)
|
(define (spawn-state-vector src-ip src-port dst-ip dst-port)
|
||||||
(define src (tcp-address (ip-address->hostname src-ip) src-port))
|
(define src (tcp-address (ip-address->hostname src-ip) src-port))
|
||||||
(define dst (tcp-address (ip-address->hostname dst-ip) dst-port))
|
(define dst (tcp-address (ip-address->hostname dst-ip) dst-port))
|
||||||
(define (timer-name kind) (list 'tcp-timer kind src dst))
|
(define (timer-name kind) (list 'tcp-timer kind src dst))
|
||||||
|
|
||||||
(define (next-expected-seqn s)
|
(actor
|
||||||
(define b (conn-state-inbound s))
|
#:name (list 'tcp-state-vector
|
||||||
|
(ip-address->hostname src-ip)
|
||||||
|
src-port
|
||||||
|
(ip-address->hostname dst-ip)
|
||||||
|
dst-port)
|
||||||
|
(react
|
||||||
|
|
||||||
|
(define initial-outbound-seqn
|
||||||
|
;; Yuck
|
||||||
|
(inexact->exact (truncate (* #x100000000 (random)))))
|
||||||
|
|
||||||
|
(field [outbound (buffer #"!" initial-outbound-seqn 0 #f)] ;; dummy data at SYN position
|
||||||
|
[inbound (buffer #"" #f inbound-buffer-limit #f)]
|
||||||
|
[syn-acked? #f]
|
||||||
|
[latest-peer-activity-time (current-inexact-milliseconds)]
|
||||||
|
;; ^ the most recent time we heard from our peer
|
||||||
|
[user-timeout-base-time (current-inexact-milliseconds)]
|
||||||
|
;; ^ when the index of the first outbound unacknowledged byte changed
|
||||||
|
[quit-because-reset? #f])
|
||||||
|
|
||||||
|
(define (next-expected-seqn)
|
||||||
|
(define b (inbound))
|
||||||
(define v (buffer-seqn b))
|
(define v (buffer-seqn b))
|
||||||
(and v (seq+ v (bit-string-byte-count (buffer-data b)))))
|
(and v (seq+ v (bit-string-byte-count (buffer-data b)))))
|
||||||
|
|
||||||
(define (buffer-push b data)
|
(define (set-inbound-seqn! seqn)
|
||||||
(struct-copy buffer b [data (bit-string-append (buffer-data b) data)]))
|
(inbound (struct-copy buffer (inbound) [seqn seqn])))
|
||||||
|
|
||||||
;; ConnState -> ConnState
|
(define (incorporate-segment! data)
|
||||||
(define (set-inbound-seqn seqn s)
|
|
||||||
(struct-copy conn-state s
|
|
||||||
[inbound (struct-copy buffer (conn-state-inbound s) [seqn seqn])]))
|
|
||||||
|
|
||||||
;; Bitstring ConnState -> Transition
|
|
||||||
(define (incorporate-segment data s)
|
|
||||||
;; (log-info "GOT INBOUND STUFF TO DELIVER ~v" data)
|
;; (log-info "GOT INBOUND STUFF TO DELIVER ~v" data)
|
||||||
(transition
|
(when (not (buffer-finished? (inbound)))
|
||||||
(if (buffer-finished? (conn-state-inbound s))
|
(inbound (buffer-push (inbound) data))))
|
||||||
s
|
|
||||||
(struct-copy conn-state s [inbound (buffer-push (conn-state-inbound s) data)]))
|
|
||||||
'()))
|
|
||||||
|
|
||||||
(define (seq+ a b) (bitwise-and #xffffffff (+ a b)))
|
(define (deliver-inbound-locally!)
|
||||||
|
(define b (inbound))
|
||||||
;; Always positive
|
(when (not (bit-string-empty? (buffer-data b)))
|
||||||
(define (seq- larger smaller)
|
(define chunk (bit-string->bytes (buffer-data b)))
|
||||||
(if (< larger smaller) ;; wraparound has occurred
|
(send! (tcp-channel src dst chunk))
|
||||||
(+ (- larger smaller) #x100000000)
|
(inbound (struct-copy buffer b
|
||||||
(- larger smaller)))
|
|
||||||
|
|
||||||
(define (seq> a b)
|
|
||||||
(< (seq- a b) #x80000000))
|
|
||||||
|
|
||||||
(define local-peer-detector (?! (observe (tcp-channel src dst ?))))
|
|
||||||
(define listener-detector (?! (observe (advertise (tcp-channel ? (tcp-listener dst-port) ?)))))
|
|
||||||
|
|
||||||
;; ConnState -> Gestalt
|
|
||||||
(define (compute-gestalt s)
|
|
||||||
(define worldward-facing-gestalt
|
|
||||||
(subscription (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?)))
|
|
||||||
(define appward-facing-gestalt
|
|
||||||
(assertion-set-union
|
|
||||||
(subscription (projection->pattern local-peer-detector))
|
|
||||||
(subscription (projection->pattern listener-detector))
|
|
||||||
(subscription (tcp-channel dst src ?))
|
|
||||||
(if (and (conn-state-syn-acked? s)
|
|
||||||
(not (buffer-finished? (conn-state-inbound s))))
|
|
||||||
(advertisement (tcp-channel src dst ?))
|
|
||||||
trie-empty)))
|
|
||||||
(assertion-set-union (subscription (timer-expired (timer-name ?) ?))
|
|
||||||
worldward-facing-gestalt
|
|
||||||
appward-facing-gestalt))
|
|
||||||
|
|
||||||
;; ConnState -> Transition
|
|
||||||
(define (deliver-inbound-locally s)
|
|
||||||
(define b (conn-state-inbound s))
|
|
||||||
(if (bit-string-empty? (buffer-data b))
|
|
||||||
(transition s '())
|
|
||||||
(let ((chunk (bit-string->bytes (buffer-data b))))
|
|
||||||
(transition (struct-copy conn-state s
|
|
||||||
[inbound (struct-copy buffer b
|
|
||||||
[data #""]
|
[data #""]
|
||||||
[seqn (seq+ (buffer-seqn b) (bytes-length chunk))])])
|
[seqn (seq+ (buffer-seqn b) (bytes-length chunk))]))))
|
||||||
(message (tcp-channel src dst chunk))))))
|
|
||||||
|
|
||||||
;; (Setof Symbol) -> ConnState -> Transition
|
;; (Setof Symbol) -> Void
|
||||||
(define ((check-fin flags) s)
|
(define (check-fin! flags)
|
||||||
(define b (conn-state-inbound s))
|
(define b (inbound))
|
||||||
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
|
(unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally
|
||||||
(error 'check-fin "Nonempty inbound buffer"))
|
(error 'check-fin "Nonempty inbound buffer"))
|
||||||
(if (set-member? flags 'fin)
|
(when (set-member? flags 'fin)
|
||||||
(let ((new-s (struct-copy conn-state s
|
|
||||||
[inbound (struct-copy buffer b
|
|
||||||
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
|
|
||||||
[finished? #t])])))
|
|
||||||
(log-info "Closing inbound stream.")
|
(log-info "Closing inbound stream.")
|
||||||
(transition new-s (scn (compute-gestalt new-s))))
|
(inbound (struct-copy buffer b
|
||||||
(transition s '())))
|
[seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte
|
||||||
|
[finished? #t]))))
|
||||||
|
|
||||||
;; Boolean SeqNum -> ConnState -> Transition
|
;; Boolean SeqNum -> Void
|
||||||
(define ((discard-acknowledged-outbound ack? ackn) s)
|
(define (discard-acknowledged-outbound! ack? ackn)
|
||||||
(if (not ack?)
|
(when ack?
|
||||||
(transition s '())
|
(let* ((b (outbound))
|
||||||
(let* ((b (conn-state-outbound s))
|
|
||||||
(base (buffer-seqn b))
|
(base (buffer-seqn b))
|
||||||
(limit (seq+ (buffer-seqn b) (bit-string-byte-count (buffer-data b))))
|
(limit (seq+ (buffer-seqn b) (bit-string-byte-count (buffer-data b))))
|
||||||
(ackn (if (seq> ackn limit) limit ackn))
|
(ackn (if (seq> ackn limit) limit ackn))
|
||||||
(ackn (if (seq> base ackn) base ackn))
|
(ackn (if (seq> base ackn) base ackn))
|
||||||
(dist (seq- ackn base)))
|
(dist (seq- ackn base)))
|
||||||
(define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset!
|
(define remaining-data (bit-string-drop (buffer-data b) (* dist 8))) ;; bit offset!
|
||||||
(define new-s (struct-copy conn-state s
|
(user-timeout-base-time (current-inexact-milliseconds))
|
||||||
[user-timeout-base-time (current-inexact-milliseconds)]
|
(outbound (struct-copy buffer b [data remaining-data] [seqn ackn]))
|
||||||
[outbound (struct-copy buffer b [data remaining-data] [seqn ackn])]
|
(syn-acked? (or (syn-acked?) (positive? dist))))))
|
||||||
[syn-acked? (or (conn-state-syn-acked? s)
|
|
||||||
(positive? dist))]))
|
|
||||||
(transition new-s
|
|
||||||
(when (and (not (conn-state-syn-acked? s)) (positive? dist))
|
|
||||||
(scn (compute-gestalt new-s)))))))
|
|
||||||
|
|
||||||
;; Nat -> ConnState -> Transition
|
;; Nat -> Void
|
||||||
(define ((update-outbound-window peer-window) s)
|
(define (update-outbound-window! peer-window)
|
||||||
(transition (struct-copy conn-state s
|
(outbound (struct-copy buffer (outbound) [window peer-window])))
|
||||||
[outbound (struct-copy buffer (conn-state-outbound s)
|
|
||||||
[window peer-window])])
|
|
||||||
'()))
|
|
||||||
|
|
||||||
;; ConnState -> Boolean
|
(define (all-output-acknowledged?)
|
||||||
(define (all-output-acknowledged? s)
|
(bit-string-empty? (buffer-data (outbound))))
|
||||||
(bit-string-empty? (buffer-data (conn-state-outbound s))))
|
|
||||||
|
|
||||||
;; (Option SeqNum) -> ConnState -> Transition
|
;; (Option SeqNum) -> Void
|
||||||
(define ((send-outbound old-ackn) s)
|
(define (send-outbound! old-ackn)
|
||||||
(define b (conn-state-outbound s))
|
(define b (outbound))
|
||||||
(define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b))
|
(define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b))
|
||||||
(if (buffer-finished? b) 1 0))))
|
(if (buffer-finished? b) 1 0))))
|
||||||
|
|
||||||
(define segment-size (min maximum-segment-size
|
(define segment-size (min maximum-segment-size
|
||||||
(if (conn-state-syn-acked? s) (buffer-window b) 1)
|
(if (syn-acked?) (buffer-window b) 1)
|
||||||
;; ^ can only send SYN until SYN is acked
|
;; ^ can only send SYN until SYN is acked
|
||||||
pending-byte-count))
|
pending-byte-count))
|
||||||
(define segment-offset (if (conn-state-syn-acked? s) 0 1))
|
(define segment-offset (if (syn-acked?) 0 1))
|
||||||
(define chunk0 (bit-string-take (buffer-data b) (* segment-size 8))) ;; bit offset!
|
(define chunk0 (bit-string-take (buffer-data b) (* segment-size 8))) ;; bit offset!
|
||||||
(define chunk (bit-string-drop chunk0 (* segment-offset 8))) ;; bit offset!
|
(define chunk (bit-string-drop chunk0 (* segment-offset 8))) ;; bit offset!
|
||||||
(define ackn (next-expected-seqn s))
|
(define ackn (next-expected-seqn))
|
||||||
(define flags (set))
|
(define flags (set))
|
||||||
(when ackn
|
(when ackn
|
||||||
(set! flags (set-add flags 'ack)))
|
(set! flags (set-add flags 'ack)))
|
||||||
(when (not (conn-state-syn-acked? s))
|
(when (not (syn-acked?))
|
||||||
(set! flags (set-add flags 'syn)))
|
(set! flags (set-add flags 'syn)))
|
||||||
(when (and (buffer-finished? b)
|
(when (and (buffer-finished? b)
|
||||||
(conn-state-syn-acked? s)
|
(syn-acked?)
|
||||||
(= segment-size pending-byte-count)
|
(= segment-size pending-byte-count)
|
||||||
(not (all-output-acknowledged? s))) ;; TODO: reexamine. This looks fishy
|
(not (all-output-acknowledged?))) ;; TODO: reexamine. This looks fishy
|
||||||
(set! flags (set-add flags 'fin)))
|
(set! flags (set-add flags 'fin)))
|
||||||
(define window (min 65535 ;; limit of field width
|
(define window (min 65535 ;; limit of field width
|
||||||
(max 0 ;; can't be negative
|
(max 0 ;; can't be negative
|
||||||
(- (buffer-window (conn-state-inbound s))
|
(- (buffer-window (inbound))
|
||||||
(bit-string-byte-count
|
(bit-string-byte-count (buffer-data (inbound)))))))
|
||||||
(buffer-data (conn-state-inbound s)))))))
|
|
||||||
(transition s
|
|
||||||
(unless (and (equal? ackn old-ackn)
|
(unless (and (equal? ackn old-ackn)
|
||||||
(conn-state-syn-acked? s)
|
(syn-acked?)
|
||||||
(not (set-member? flags 'fin))
|
(not (set-member? flags 'fin))
|
||||||
(zero? (bit-string-byte-count chunk)))
|
(zero? (bit-string-byte-count chunk)))
|
||||||
(local-require racket/pretty)
|
(local-require racket/pretty)
|
||||||
(pretty-write `(send-outbound (old-ackn ,old-ackn)
|
(pretty-write `(send-outbound (old-ackn ,old-ackn)
|
||||||
(s ,s)
|
|
||||||
(flags ,flags)))
|
(flags ,flags)))
|
||||||
(flush-output)
|
(flush-output)
|
||||||
(message (tcp-packet #f dst-ip dst-port src-ip src-port
|
(send! (tcp-packet #f dst-ip dst-port src-ip src-port
|
||||||
(buffer-seqn b)
|
(buffer-seqn b)
|
||||||
(or ackn 0)
|
(or ackn 0)
|
||||||
flags
|
flags
|
||||||
window
|
window
|
||||||
#""
|
#""
|
||||||
chunk)))))
|
chunk))))
|
||||||
|
|
||||||
;; ConnState -> Transition
|
(define (bump-peer-activity-time!)
|
||||||
(define (bump-peer-activity-time s)
|
(latest-peer-activity-time (current-inexact-milliseconds)))
|
||||||
(transition (struct-copy conn-state s
|
|
||||||
[latest-peer-activity-time (current-inexact-milliseconds)])
|
|
||||||
'()))
|
|
||||||
|
|
||||||
;; ConnState Number -> Boolean
|
;; Number -> Boolean
|
||||||
(define (heard-from-peer-within-msec? s msec)
|
(define (heard-from-peer-within-msec? msec)
|
||||||
(<= (- (current-inexact-milliseconds) (conn-state-latest-peer-activity-time s)) msec))
|
(<= (- (current-inexact-milliseconds) (latest-peer-activity-time)) msec))
|
||||||
|
|
||||||
;; ConnState -> Boolean
|
(define (user-timeout-expired?)
|
||||||
(define (user-timeout-expired? s)
|
(and (not (all-output-acknowledged?))
|
||||||
(and (not (all-output-acknowledged? s))
|
(> (- (current-inexact-milliseconds) (user-timeout-base-time))
|
||||||
(> (- (current-inexact-milliseconds) (conn-state-user-timeout-base-time s))
|
|
||||||
user-timeout-msec)))
|
user-timeout-msec)))
|
||||||
|
|
||||||
;; ConnState -> Transition
|
(define (send-set-transmit-check-timer!)
|
||||||
(define (quit-when-done s)
|
(send! (set-timer (timer-name 'transmit-check)
|
||||||
(cond
|
|
||||||
[(and (buffer-finished? (conn-state-outbound s))
|
|
||||||
(buffer-finished? (conn-state-inbound s))
|
|
||||||
(all-output-acknowledged? s)
|
|
||||||
(not (heard-from-peer-within-msec? s (* 2 1000 maximum-segment-lifetime-sec))))
|
|
||||||
;; Everything is cleanly shut down, and we just need to wait a while for unexpected
|
|
||||||
;; packets before we release the state vector.
|
|
||||||
(quit)]
|
|
||||||
[(user-timeout-expired? s)
|
|
||||||
;; We've been plaintively retransmitting for user-timeout-msec without hearing anything
|
|
||||||
;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but
|
|
||||||
;; it will do for now? TODO
|
|
||||||
(log-info "TCP_USER_TIMEOUT fired.")
|
|
||||||
(quit)]
|
|
||||||
[else #f]))
|
|
||||||
|
|
||||||
;; Action
|
|
||||||
(define send-set-transmit-check-timer
|
|
||||||
(message (set-timer (timer-name 'transmit-check)
|
|
||||||
transmit-check-interval-msec
|
transmit-check-interval-msec
|
||||||
'relative)))
|
'relative)))
|
||||||
|
|
||||||
;; SeqNum SeqNum ConnState -> Transition
|
(define (reset! seqn ackn)
|
||||||
(define (reset seqn ackn s)
|
|
||||||
(log-warning "Sending RST from ~a:~a to ~a:~a"
|
(log-warning "Sending RST from ~a:~a to ~a:~a"
|
||||||
(ip-address->hostname dst-ip)
|
(ip-address->hostname dst-ip)
|
||||||
dst-port
|
dst-port
|
||||||
(ip-address->hostname src-ip)
|
(ip-address->hostname src-ip)
|
||||||
src-port)
|
src-port)
|
||||||
(quit (message (tcp-packet #f dst-ip dst-port src-ip src-port
|
(quit-because-reset? #t)
|
||||||
|
(send! (tcp-packet #f dst-ip dst-port src-ip src-port
|
||||||
seqn
|
seqn
|
||||||
ackn
|
ackn
|
||||||
(set 'ack 'rst)
|
(set 'ack 'rst)
|
||||||
0
|
0
|
||||||
#""
|
#""
|
||||||
#""))))
|
#"")))
|
||||||
|
|
||||||
;; ConnState -> Transition
|
(define (close-outbound-stream!)
|
||||||
(define (close-outbound-stream s)
|
(define b (outbound))
|
||||||
(define b (conn-state-outbound s))
|
(when (not (buffer-finished? b))
|
||||||
(transition
|
(outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte
|
||||||
(if (buffer-finished? b)
|
[finished? #t]))))
|
||||||
s
|
|
||||||
(struct-copy conn-state s
|
|
||||||
[outbound (struct-copy buffer (buffer-push b #"!") ;; dummy FIN byte
|
|
||||||
[finished? #t])]))
|
|
||||||
'()))
|
|
||||||
|
|
||||||
(define (state-vector-behavior e s)
|
(assert #:when (and (syn-acked?) (not (buffer-finished? (inbound))))
|
||||||
(define old-ackn (buffer-seqn (conn-state-inbound s)))
|
(advertise (tcp-channel src dst _)))
|
||||||
(match e
|
|
||||||
[(scn g)
|
(stop-when
|
||||||
(log-info "State vector routing-update:\n~a" (trie->pretty-string g))
|
(rising-edge
|
||||||
(define local-peer-present? (trie-non-empty? (trie-project g local-peer-detector)))
|
(and (buffer-finished? (outbound))
|
||||||
(define listening? (trie-non-empty? (trie-project g listener-detector)))
|
(buffer-finished? (inbound))
|
||||||
(define new-s (struct-copy conn-state s [listener-listening? listening?]))
|
(all-output-acknowledged?)
|
||||||
(cond
|
(not (heard-from-peer-within-msec? (* 2 1000 maximum-segment-lifetime-sec)))))
|
||||||
[(and local-peer-present? (not (conn-state-local-peer-seen? s)))
|
;; Everything is cleanly shut down, and we just need to wait a while for unexpected
|
||||||
(transition (struct-copy conn-state new-s [local-peer-seen? #t]) '())]
|
;; packets before we release the state vector.
|
||||||
[(and (not local-peer-present?) (conn-state-local-peer-seen? s))
|
)
|
||||||
|
|
||||||
|
(stop-when
|
||||||
|
(rising-edge (user-timeout-expired?))
|
||||||
|
;; We've been plaintively retransmitting for user-timeout-msec without hearing anything
|
||||||
|
;; back; this is a crude approximation of the real condition for TCP_USER_TIMEOUT, but
|
||||||
|
;; it will do for now? TODO
|
||||||
|
(log-info "TCP_USER_TIMEOUT fired."))
|
||||||
|
|
||||||
|
(stop-when (rising-edge (quit-because-reset?)))
|
||||||
|
|
||||||
|
(define/query-value local-peer-seen? #f (observe (tcp-channel src dst _)) #t
|
||||||
|
#:on-remove (begin
|
||||||
(log-info "Closing outbound stream.")
|
(log-info "Closing outbound stream.")
|
||||||
(sequence-transitions (close-outbound-stream new-s)
|
(close-outbound-stream!)
|
||||||
(send-outbound old-ackn)
|
(send-outbound! (buffer-seqn (inbound)))))
|
||||||
quit-when-done)]
|
|
||||||
[else (transition new-s '())])]
|
(define/query-value listener-listening?
|
||||||
[(message (tcp-packet #t _ _ _ _ seqn ackn flags window options data))
|
#f
|
||||||
(define expected (next-expected-seqn s))
|
(observe (advertise (tcp-channel _ (tcp-listener dst-port) _)))
|
||||||
|
#t)
|
||||||
|
|
||||||
|
(on (message (tcp-packet #t src-ip src-port dst-ip dst-port
|
||||||
|
$seqn $ackn $flags $window $options $data))
|
||||||
|
(define old-ackn (buffer-seqn (inbound)))
|
||||||
|
(define expected (next-expected-seqn))
|
||||||
(define is-syn? (set-member? flags 'syn))
|
(define is-syn? (set-member? flags 'syn))
|
||||||
(define is-fin? (set-member? flags 'fin))
|
(define is-fin? (set-member? flags 'fin))
|
||||||
(cond
|
(cond
|
||||||
[(set-member? flags 'rst) (quit)]
|
[(set-member? flags 'rst) (quit-because-reset? #t)]
|
||||||
[(and (not expected) ;; no syn yet
|
[(and (not expected) ;; no syn yet
|
||||||
(or (not is-syn?) ;; and this isn't it
|
(or (not is-syn?) ;; and this isn't it
|
||||||
(and (not (conn-state-listener-listening? s)) ;; or it is, but no listener...
|
(and (not (listener-listening?)) ;; or it is, but no listener...
|
||||||
(not (conn-state-local-peer-seen? s))))) ;; ...and no outbound client
|
(not (local-peer-seen?))))) ;; ...and no outbound client
|
||||||
(reset ackn ;; this is *our* seqn
|
(reset! ackn ;; this is *our* seqn
|
||||||
(seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0)))
|
(seq+ seqn (+ (if is-syn? 1 0) (if is-fin? 1 0)))
|
||||||
;; ^^ this is what we should acknowledge...
|
;; ^^ this is what we should acknowledge...
|
||||||
s)]
|
)]
|
||||||
[else
|
[else
|
||||||
(sequence-transitions (cond
|
(cond
|
||||||
[(not expected) ;; haven't seen syn yet, but we know this is it
|
[(not expected) ;; haven't seen syn yet, but we know this is it
|
||||||
(incorporate-segment data (set-inbound-seqn (seq+ seqn 1) s))]
|
(set-inbound-seqn! (seq+ seqn 1))
|
||||||
|
(incorporate-segment! data)]
|
||||||
[(= expected seqn)
|
[(= expected seqn)
|
||||||
(incorporate-segment data s)]
|
(incorporate-segment! data)]
|
||||||
[else
|
[else (void)])
|
||||||
(transition s '())])
|
(deliver-inbound-locally!)
|
||||||
deliver-inbound-locally
|
(check-fin! flags)
|
||||||
(check-fin flags)
|
(discard-acknowledged-outbound! (set-member? flags 'ack) ackn)
|
||||||
(discard-acknowledged-outbound (set-member? flags 'ack) ackn)
|
(update-outbound-window! window)
|
||||||
(update-outbound-window window)
|
(send-outbound! old-ackn)
|
||||||
(send-outbound old-ackn)
|
(bump-peer-activity-time!)]))
|
||||||
bump-peer-activity-time
|
|
||||||
quit-when-done)])]
|
(on (message (tcp-channel dst src $bs))
|
||||||
[(message (tcp-channel _ _ bs))
|
(define old-ackn (buffer-seqn (inbound)))
|
||||||
;; (log-info "GOT MORE STUFF TO DELIVER ~v" bs)
|
;; (log-info "GOT MORE STUFF TO DELIVER ~v" bs)
|
||||||
(sequence-transitions (transition (struct-copy conn-state s
|
|
||||||
[user-timeout-base-time
|
(when (all-output-acknowledged?)
|
||||||
;; Only move user-timeout-base-time if there wasn't
|
;; Only move user-timeout-base-time if there wasn't
|
||||||
;; already some outstanding output.
|
;; already some outstanding output.
|
||||||
(if (all-output-acknowledged? s)
|
(user-timeout-base-time (current-inexact-milliseconds)))
|
||||||
(current-inexact-milliseconds)
|
|
||||||
(conn-state-user-timeout-base-time s))]
|
(outbound (buffer-push (outbound) bs))
|
||||||
[outbound (buffer-push (conn-state-outbound s) bs)])
|
(send-outbound! old-ackn))
|
||||||
'())
|
|
||||||
(send-outbound old-ackn)
|
(on-start (send-set-transmit-check-timer!))
|
||||||
quit-when-done)]
|
(on (message (timer-expired (timer-name 'transmit-check) _))
|
||||||
[(message (timer-expired (== (timer-name 'transmit-check)) _))
|
(define old-ackn (buffer-seqn (inbound)))
|
||||||
;; TODO: I am abusing this timer for multiple tasks. Notably, this is a (crude) means of
|
;; TODO: I am abusing this timer for multiple tasks. Notably, this is a (crude) means of
|
||||||
;; retransmitting outbound data as well as a means of checking for an expired
|
;; retransmitting outbound data as well as a means of checking for an expired
|
||||||
;; TCP_USER_TIMEOUT. A better design would have separate timers and a more fine-grained
|
;; TCP_USER_TIMEOUT. A better design would have separate timers and a more fine-grained
|
||||||
;; approach.
|
;; approach.
|
||||||
(sequence-transitions (transition s send-set-transmit-check-timer)
|
(send-set-transmit-check-timer!)
|
||||||
(send-outbound old-ackn)
|
(send-outbound! old-ackn)))))
|
||||||
quit-when-done)]
|
|
||||||
[_ #f]))
|
|
||||||
|
|
||||||
;; (local-require racket/trace)
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; (trace state-vector-behavior)
|
|
||||||
|
|
||||||
(define initial-outbound-seqn
|
(spawn-tcp-driver)
|
||||||
;; Yuck
|
|
||||||
(inexact->exact (truncate (* #x100000000 (random)))))
|
|
||||||
|
|
||||||
;; TODO accept input from user process
|
|
||||||
(list
|
|
||||||
send-set-transmit-check-timer
|
|
||||||
(let ((state0 (conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position
|
|
||||||
(buffer #"" #f inbound-buffer-limit #f)
|
|
||||||
#f
|
|
||||||
(current-inexact-milliseconds)
|
|
||||||
(current-inexact-milliseconds)
|
|
||||||
#f
|
|
||||||
#f)))
|
|
||||||
(spawn #:name
|
|
||||||
(string->symbol (format "tcp-state-vector:~a:~a:~a:~a"
|
|
||||||
(ip-address->hostname src-ip)
|
|
||||||
src-port
|
|
||||||
(ip-address->hostname dst-ip)
|
|
||||||
dst-port))
|
|
||||||
state-vector-behavior
|
|
||||||
state0
|
|
||||||
(scn (compute-gestalt state0))))))
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#lang racket/base
|
#lang syndicate/actor
|
||||||
|
|
||||||
(provide (struct-out udp-remote-address)
|
(provide (struct-out udp-remote-address)
|
||||||
(struct-out udp-handle)
|
(struct-out udp-handle)
|
||||||
|
@ -9,13 +9,11 @@
|
||||||
spawn-udp-driver)
|
spawn-udp-driver)
|
||||||
|
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
(require racket/match)
|
|
||||||
(require syndicate/monolithic)
|
|
||||||
(require syndicate/demand-matcher)
|
|
||||||
(require bitsyntax)
|
(require bitsyntax)
|
||||||
|
|
||||||
(require "dump-bytes.rkt")
|
(require "dump-bytes.rkt")
|
||||||
(require "checksum.rkt")
|
(require "checksum.rkt")
|
||||||
|
(require "configuration.rkt")
|
||||||
(require "ip.rkt")
|
(require "ip.rkt")
|
||||||
(require "port-allocator.rkt")
|
(require "port-allocator.rkt")
|
||||||
|
|
||||||
|
@ -45,77 +43,52 @@
|
||||||
(struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab)
|
(struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab)
|
||||||
(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress)
|
(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress)
|
||||||
|
|
||||||
(define any-remote (udp-remote-address ? ?))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; User-accessible driver startup
|
;; User-accessible driver startup
|
||||||
|
|
||||||
(define (spawn-udp-driver)
|
(define (spawn-udp-driver)
|
||||||
(list
|
(spawn-port-allocator 'udp (lambda () (query-set udp-ports (udp-port-allocation $p _) p)))
|
||||||
(spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?))
|
(spawn-kernel-udp-driver)
|
||||||
(advertise (udp-packet ? (?! (udp-listener ?)) ?))
|
(actor #:name 'udp-driver
|
||||||
(lambda (handle) (spawn-udp-relay (udp-listener-port handle) handle)))
|
(react (on (asserted (observe (udp-packet _ ($ h (udp-listener _)) _)))
|
||||||
(spawn-demand-matcher (observe (udp-packet ? (?! (udp-handle ?)) ?))
|
(spawn-udp-relay (udp-listener-port h) h))
|
||||||
(advertise (udp-packet ? (?! (udp-handle ?)) ?))
|
(on (asserted (observe (udp-packet _ ($ h (udp-handle _)) _)))
|
||||||
(lambda (handle)
|
(actor #:name (list 'udp-transient h)
|
||||||
(message (port-allocation-request
|
(spawn-udp-relay (allocate-port! 'udp) h))))))
|
||||||
'udp
|
|
||||||
(lambda (port local-ips) (spawn-udp-relay port handle))))))
|
|
||||||
(spawn-udp-port-allocator)
|
|
||||||
(spawn-kernel-udp-driver)))
|
|
||||||
|
|
||||||
(define (spawn-udp-port-allocator)
|
|
||||||
(define udp-projector (udp-port-allocation (?!) ?))
|
|
||||||
(spawn-port-allocator 'udp
|
|
||||||
(subscription (projection->pattern udp-projector))
|
|
||||||
(lambda (g local-ips)
|
|
||||||
(project-assertions g udp-projector))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Relaying
|
;; Relaying
|
||||||
|
|
||||||
(define (spawn-udp-relay local-port local-user-addr)
|
(define (spawn-udp-relay local-port local-user-addr)
|
||||||
|
(actor #:name (list 'udp-relay local-port local-user-addr)
|
||||||
(log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr)
|
(log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr)
|
||||||
|
|
||||||
(define local-peer-detector (?! (observe (udp-packet any-remote local-user-addr ?))))
|
(define any-remote (udp-remote-address ? ?))
|
||||||
|
|
||||||
(define (compute-gestalt local-ips)
|
(react (stop-when (retracted (observe (udp-packet any-remote local-user-addr _))))
|
||||||
(for/fold [(g (assertion-set-union
|
(assert (advertise (udp-packet any-remote local-user-addr _)))
|
||||||
(subscription (projection->pattern local-peer-detector))
|
(assert (udp-port-allocation local-port local-user-addr))
|
||||||
(advertisement (udp-packet any-remote local-user-addr ?))
|
|
||||||
observe-local-ip-addresses-gestalt
|
|
||||||
(subscription (udp-packet local-user-addr any-remote ?))
|
|
||||||
(assertion (udp-port-allocation local-port local-user-addr))))]
|
|
||||||
[(ip (in-set local-ips))]
|
|
||||||
(assertion-set-union g
|
|
||||||
(subscription (udp-datagram ? ? ip local-port ?))
|
|
||||||
(advertisement (udp-datagram ip local-port ? ? ?)))))
|
|
||||||
|
|
||||||
(spawn (lambda (e local-ips)
|
(during (host-route $ip _ _)
|
||||||
(match e
|
(assert (advertise (udp-datagram ip local-port _ _ _)))
|
||||||
[(scn g)
|
(on (message (udp-datagram $source-ip $source-port ip local-port $bs))
|
||||||
(define new-local-ips (gestalt->local-ip-addresses g))
|
(send!
|
||||||
(if (trie-empty? (trie-project g local-peer-detector))
|
(udp-packet (udp-remote-address (ip-address->hostname source-ip)
|
||||||
(quit)
|
source-port)
|
||||||
(transition new-local-ips (scn (compute-gestalt new-local-ips))))]
|
local-user-addr
|
||||||
[(message (udp-packet (== local-user-addr) remote-addr bs))
|
bs))))
|
||||||
|
|
||||||
|
(define local-ips (query-local-ip-addresses))
|
||||||
|
(on (message (udp-packet local-user-addr ($ remote-addr any-remote) $bs))
|
||||||
;; Choose arbitrary local IP address for outbound packet!
|
;; Choose arbitrary local IP address for outbound packet!
|
||||||
;; TODO: what can be done? Must I examine the routing table?
|
;; TODO: what can be done? Must I examine the routing table?
|
||||||
(match-define (udp-remote-address remote-host remote-port) remote-addr)
|
(match-define (udp-remote-address remote-host remote-port) remote-addr)
|
||||||
(define remote-ip (ip-string->ip-address remote-host))
|
(define remote-ip (ip-string->ip-address remote-host))
|
||||||
(transition local-ips (message (udp-datagram (set-first local-ips)
|
(send! (udp-datagram (set-first (local-ips))
|
||||||
local-port
|
local-port
|
||||||
remote-ip
|
remote-ip
|
||||||
remote-port
|
remote-port
|
||||||
bs)))]
|
bs))))))
|
||||||
[(message (udp-datagram si sp _ _ bs))
|
|
||||||
(transition local-ips
|
|
||||||
(message (udp-packet (udp-remote-address (ip-address->hostname si) sp)
|
|
||||||
local-user-addr
|
|
||||||
bs)))]
|
|
||||||
[_ #f]))
|
|
||||||
(set)
|
|
||||||
(scn (compute-gestalt (set)))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Codec & kernel-level driver
|
;; Codec & kernel-level driver
|
||||||
|
@ -123,12 +96,14 @@
|
||||||
(define PROTOCOL-UDP 17)
|
(define PROTOCOL-UDP 17)
|
||||||
|
|
||||||
(define (spawn-kernel-udp-driver)
|
(define (spawn-kernel-udp-driver)
|
||||||
(spawn (lambda (e local-ips)
|
(actor #:name 'kernel-udp-driver
|
||||||
(match e
|
(forever
|
||||||
[(scn g)
|
(assert (advertise (ip-packet #f _ _ PROTOCOL-UDP _ _)))
|
||||||
(transition (gestalt->local-ip-addresses g) '())]
|
|
||||||
[(message (ip-packet source-if src-ip dst-ip _ _ body))
|
(define local-ips (query-local-ip-addresses))
|
||||||
#:when (and source-if (set-member? local-ips dst-ip))
|
|
||||||
|
(on (message (ip-packet $source-if $src-ip $dst-ip PROTOCOL-UDP _ $body))
|
||||||
|
(when (and source-if (set-member? (local-ips) dst-ip))
|
||||||
(bit-string-case body
|
(bit-string-case body
|
||||||
([ (src-port :: integer bytes 2)
|
([ (src-port :: integer bytes 2)
|
||||||
(dst-port :: integer bytes 2)
|
(dst-port :: integer bytes 2)
|
||||||
|
@ -138,15 +113,13 @@
|
||||||
(bit-string-case data
|
(bit-string-case data
|
||||||
([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes
|
([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes
|
||||||
(:: binary) ]
|
(:: binary) ]
|
||||||
(transition local-ips (message (udp-datagram src-ip
|
(send! (udp-datagram src-ip src-port dst-ip dst-port
|
||||||
src-port
|
(bit-string->bytes payload))))
|
||||||
dst-ip
|
|
||||||
dst-port
|
|
||||||
(bit-string->bytes payload)))))
|
|
||||||
(else #f)))
|
(else #f)))
|
||||||
(else #f))]
|
(else #f))))
|
||||||
[(message (udp-datagram src-ip src-port dst-ip dst-port bs))
|
|
||||||
#:when (set-member? local-ips src-ip)
|
(on (message (udp-datagram $src-ip $src-port $dst-ip $dst-port $bs))
|
||||||
|
(when (set-member? (local-ips) src-ip)
|
||||||
(let* ((payload (bit-string (src-port :: integer bytes 2)
|
(let* ((payload (bit-string (src-port :: integer bytes 2)
|
||||||
(dst-port :: integer bytes 2)
|
(dst-port :: integer bytes 2)
|
||||||
((+ 8 (bit-string-byte-count bs))
|
((+ 8 (bit-string-byte-count bs))
|
||||||
|
@ -161,15 +134,9 @@
|
||||||
:: integer bytes 2)))
|
:: integer bytes 2)))
|
||||||
(checksummed-payload (ip-checksum #:pseudo-header pseudo-header
|
(checksummed-payload (ip-checksum #:pseudo-header pseudo-header
|
||||||
6 payload)))
|
6 payload)))
|
||||||
(transition local-ips (message (ip-packet #f
|
(send! (ip-packet #f src-ip dst-ip PROTOCOL-UDP #""
|
||||||
src-ip
|
checksummed-payload))))))))
|
||||||
dst-ip
|
|
||||||
PROTOCOL-UDP
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
#""
|
|
||||||
checksummed-payload))))]
|
(spawn-udp-driver)
|
||||||
[_ #f]))
|
|
||||||
(set)
|
|
||||||
(scn/union (advertisement (ip-packet #f ? ? PROTOCOL-UDP ? ?))
|
|
||||||
(subscription (ip-packet ? ? ? PROTOCOL-UDP ? ?))
|
|
||||||
(subscription (udp-datagram ? ? ? ? ?))
|
|
||||||
observe-local-ip-addresses-gestalt)))
|
|
||||||
|
|
Loading…
Reference in New Issue