diff --git a/main.rkt b/main.rkt index 32180a1..fb6d194 100644 --- a/main.rkt +++ b/main.rkt @@ -1,12 +1,13 @@ #lang minimart +(require minimart/demand-matcher) (require minimart/drivers/timer) (require "ethernet.rkt") (require "arp.rkt") (require "ip.rkt") (require "tcp.rkt") -(define interface "wlan0") +(define interface "vboxnet0") ;;(log-events-and-actions? #t) @@ -16,8 +17,66 @@ (spawn-ip-driver interface (bytes 192 168 56 222)) (spawn-tcp-driver) +(let () + (local-require racket/set racket/string) + + (define (spawn-session them us) + (define user (gensym 'user)) + (define remote-detector (compile-gestalt-projection (?!))) + (define peer-detector (compile-gestalt-projection `(,(?!) says ,?))) + (define (send-to-remote fmt . vs) + (send #:meta-level 1 (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))) + (define (say who fmt . vs) + (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + (list (send-to-remote "Welcome, ~a.\n" user) + (spawn (lambda (e old-peers) + (log-info "~a: ~v --> ~v" user e old-peers) + (match e + [(message (tcp-channel _ _ bs) 1 #f) + (transition old-peers + (send `(,user says ,(string-trim (bytes->string/utf-8 bs)))))] + [(message `(,who says ,what) 0 #f) + (transition old-peers (say who "says: ~a" what))] + [(routing-update g) + (define new-peers + (matcher-key-set/single (gestalt-project g 0 0 #t peer-detector))) + (transition + new-peers + (list (when (matcher-empty? (gestalt-project g 1 0 #t remote-detector)) + (quit)) + (for/list [(who (set-subtract new-peers old-peers))] + (say who "arrived.")) + (for/list [(who (set-subtract old-peers new-peers))] + (say who "departed."))))] + [#f #f])) + (set) + (gestalt-union (sub `(,? says ,?)) + (sub `(,? says ,?) #:level 1) + (pub `(,user says ,?)) + (sub (tcp-channel them us ?) #:meta-level 1) + (sub (tcp-channel them us ?) #:meta-level 1 #:level 1) + (pub (tcp-channel us them ?) #:meta-level 1))))) + + (spawn-world + (spawn-demand-matcher (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-listener 5999)) ?) + #:meta-level 1 + spawn-session)) + + ) + (spawn (lambda (e s) - ;; (log-info "SPY: ~v" e) + (local-require racket/pretty) + (match e + [(message m _ _) + (pretty-write `(MAIN ,m))] + [(routing-update g) + (printf "MAIN gestalt:\n") + (pretty-print-gestalt g)] + [_ (void)]) + (flush-output) #f) (void) - (gestalt-union (sub ? #:level 5))) + (gestalt-union + ;;(sub ? #:level 5) + (sub (tcp-channel ? ? ?) #:level 5) + )) diff --git a/tcp.rkt b/tcp.rkt index 8b9107b..6e3c63b 100644 --- a/tcp.rkt +++ b/tcp.rkt @@ -21,14 +21,15 @@ ;; tcp-handle/tcp-address : "user" outbound connections ;; tcp-listener/tcp-address : "user" inbound connections +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + (struct tcp-address (host port) #:prefab) (struct tcp-handle (id) #:prefab) (struct tcp-listener (port) #:prefab) (struct tcp-channel (source destination subpacket) #:prefab) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (struct tcp-packet (from-wire? source-ip source-port @@ -42,11 +43,125 @@ data) #:prefab) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; User-accessible driver startup + +(define (spawn-tcp-driver) + (list (spawn-demand-matcher (tcp-channel ? (?! (tcp-listener ?)) ?) + #:demand-is-subscription? #t + #:demand-level 1 + #:supply-level 2 + (lambda (server-addr) + (match-define (tcp-listener port) server-addr) + (spawn-demand-matcher + (tcp-channel (?! (tcp-address ? ?)) (?! (tcp-address ? port)) ?) + (spawn-relay server-addr)))) + (spawn-demand-matcher (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?) + (lambda (local-addr remote-addr) + (send (tcp-port-allocation-request local-addr remote-addr)))) + (spawn-port-allocator) + (spawn-kernel-tcp-driver))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Port allocator + +(struct tcp-port-allocation-request (local-addr remote-addr) #:prefab) + +(struct port-allocator-state (used-ports local-ips) #:transparent) + +(define (spawn-port-allocator) + (define port-projector + (compile-gestalt-projection (tcp-channel (tcp-address (?!) (?!)) (tcp-address (?!) (?!)) ?))) + (define ip-projector + (compile-gestalt-projection (ip-interface (?!) ?))) + + ;; TODO: Choose a sensible IP address for the outbound connection. + ;; We don't have enough information to do this well at the moment, + ;; so just pick some available local IP address. + ;; + ;; Interesting note: In some sense, the right answer is "?". This + ;; would give us a form of mobility, where IP addresses only route + ;; to a given bucket-of-state and ONLY the port number selects a + ;; substate therein. That's not how TCP is defined however so we + ;; can't do that. + (define (appropriate-ip s) + (set-first (port-allocator-state-local-ips s))) + + (spawn (lambda (e s) + (match e + [(routing-update g) + (define extracted-ips (matcher-key-set (gestalt-project g 0 0 #t ip-projector))) + (define extracted-ports (matcher-key-set (gestalt-project g 0 0 #f port-projector))) + (if (or (not extracted-ports) (not extracted-ips)) + (error 'tcp "Someone has published a wildcard TCP address or IP interface") + (transition (let ((local-ips (for/set [(e (in-set extracted-ips))] (car e)))) + (port-allocator-state + (for/fold [(s (set))] [(e (in-set extracted-ports))] + (match-define (list si sp di dp) e) + (let* ((s (if (set-member? local-ips si) (set-add s sp) s)) + (s (if (set-member? local-ips di) (set-add s dp) s))) + s)) + local-ips)) + '()))] + [(message (tcp-port-allocation-request local-addr remote-addr) _ _) + (define currently-used-ports (port-allocator-state-used-ports s)) + (let randomly-allocate-until-unused () + (define p (+ 1024 (random 64512))) + (if (set-member? currently-used-ports p) + (randomly-allocate-until-unused) + (transition (struct-copy port-allocator-state s + [used-ports (set-add currently-used-ports p)]) + ((spawn-relay local-addr) + remote-addr + (tcp-channel (appropriate-ip s) p)))))] + [_ #f])) + (port-allocator-state (set) (set)) + (gestalt-union (sub (tcp-port-allocation-request ? ?)) + (sub (projection->pattern ip-projector) #:level 1) + (pub (projection->pattern port-projector) #:level 1)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Relay between kernel-level and user-level + +(define ((spawn-relay local-user-addr) remote-addr local-tcp-addr) + (define local-peer-traffic (pub (tcp-channel remote-addr local-user-addr ?) #:level 1)) + (define remote-peer-traffic (sub (tcp-channel remote-addr local-tcp-addr ?) #:level 1)) + (spawn (lambda (e seen-local-peer?) + (local-require racket/pretty) + (pretty-write `(RELAY (local-user-addr ,local-user-addr) + (remote-addr ,remote-addr) + (local-tcp-addr ,local-tcp-addr) + (seen-local-peer? ,seen-local-peer?) + (e ,e))) + (flush-output) + (match e + [(routing-update g) + (define local-peer-absent? (gestalt-empty? (gestalt-filter g local-peer-traffic))) + (transition (or seen-local-peer? (not local-peer-absent?)) + (when (or (and seen-local-peer? local-peer-absent?) + (gestalt-empty? (gestalt-filter g remote-peer-traffic))) + (quit)))] + [(message (tcp-channel (== local-user-addr) (== remote-addr) bs) _ _) + (transition seen-local-peer? (send (tcp-channel local-tcp-addr remote-addr bs)))] + [(message (tcp-channel (== remote-addr) (== local-tcp-addr) bs) _ _) + (transition seen-local-peer? (send (tcp-channel remote-addr local-user-addr bs)))] + [_ #f])) + #f + (gestalt-union local-peer-traffic + remote-peer-traffic + (sub (tcp-channel remote-addr local-tcp-addr ?)) + (sub (tcp-channel local-user-addr remote-addr ?)) + (pub (tcp-channel remote-addr local-user-addr ?)) + (pub (tcp-channel local-tcp-addr remote-addr ?))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Codec & kernel-level driver + (define PROTOCOL-TCP 6) (struct codec-state (active-state-vectors) #:transparent) -(define (spawn-tcp-driver) +(define (spawn-kernel-tcp-driver) (define (flip-statevec statevec) (match-define (list si sp di dp) statevec) @@ -80,7 +195,8 @@ (let* ((flags (set)) (statevec (list src-ip src-port dst-ip dst-port)) (old-active-state-vectors (codec-state-active-state-vectors s)) - (spawn-needed? (not (state-vector-active? statevec s)))) + (spawn-needed? (and (not (state-vector-active? statevec s)) + (zero? rst)))) ;; don't bother spawning if it's a rst (define-syntax-rule (set-flags! v ...) (begin (unless (zero? v) (set! flags (set-add flags 'v))) ...)) (set-flags! ns cwr ece urg ack psh rst syn fin) @@ -200,6 +316,7 @@ #:level 1)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Per-connection state vector process (struct buffer (data ;; bit-string seqn ;; names leftmost byte in data @@ -210,7 +327,8 @@ (struct conn-state (outbound ;; buffer inbound ;; buffer syn-acked? ;; boolean - latest-activity-time) ;; from current-inexact-milliseconds + latest-activity-time ;; from current-inexact-milliseconds + local-peer-seen?) ;; boolean #:transparent) (define transmit-check-interval-msec 100) @@ -255,6 +373,17 @@ (define (seq> a b) (< (seq- a b) #x80000000)) + ;; ConnState -> Gestalt + (define (compute-gestalt s) + (gestalt-union (sub (timer-expired (timer-name ?) ?)) + (sub (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?)) + (pub (tcp-packet #f dst-ip dst-port src-ip src-port ? ? ? ? ? ?)) + (sub (tcp-channel dst src ?)) + (if (not (buffer-finished? (conn-state-inbound s))) + (pub (tcp-channel src dst ?)) + (gestalt-empty)) + (pub (tcp-channel src dst ?) #:level 1))) + ;; ConnState -> Transition (define (deliver-inbound-locally s) (define b (conn-state-inbound s)) @@ -272,16 +401,15 @@ (define b (conn-state-inbound s)) (unless (bit-string-empty? (buffer-data b)) ;; assured by deliver-inbound-locally (error 'check-fin "Nonempty inbound buffer")) - (transition - (if (set-member? flags 'fin) - (struct-copy conn-state s - [inbound (struct-copy buffer b - [seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte - [finished? #t])]) - s) - '())) + (if (set-member? flags 'fin) + (let ((new-s (struct-copy conn-state s + [inbound (struct-copy buffer b + [seqn (seq+ (buffer-seqn b) 1)] ;; reliable: count fin as a byte + [finished? #t])]))) + (transition new-s (routing-update (compute-gestalt new-s)))) + (transition s '()))) - ;; Boolean Nat -> ConnState -> Transition + ;; Boolean SeqNum -> ConnState -> Transition (define ((discard-acknowledged-outbound ack? ackn) s) (transition (if (not ack?) @@ -309,13 +437,15 @@ (define (all-output-acknowledged? s) (bit-string-empty? (buffer-data (conn-state-outbound s)))) - ;; ConnState -> Transition + ;; (Option SeqNum) -> ConnState -> Transition (define ((send-outbound old-ackn) s) (define b (conn-state-outbound s)) (define pending-byte-count (max 0 (- (bit-string-byte-count (buffer-data b)) (if (buffer-finished? b) 1 0)))) + (define segment-size (min maximum-segment-size - (buffer-window b) + (if (conn-state-syn-acked? s) (buffer-window b) 1) + ;; ^ can only send SYN until SYN is acked pending-byte-count)) (define segment-offset (if (conn-state-syn-acked? s) 0 1)) (define-values (chunk0 remaining-data) @@ -329,8 +459,9 @@ (when (not (conn-state-syn-acked? s)) (set! flags (set-add flags 'syn))) (when (and (buffer-finished? b) + (conn-state-syn-acked? s) (= segment-size pending-byte-count) - (not (all-output-acknowledged? s))) + (not (all-output-acknowledged? s))) ;; TODO: reexamine. This looks fishy (set! flags (set-add flags 'fin))) (define window (min 65535 ;; limit of field width (max 0 ;; can't be negative @@ -340,12 +471,14 @@ (transition s (unless (and (equal? ackn old-ackn) (conn-state-syn-acked? s) + (not (set-member? flags 'fin)) (zero? (bit-string-byte-count chunk))) - (send (tcp-packet #f - dst-ip - dst-port - src-ip - src-port + (local-require racket/pretty) + (pretty-write `(send-outbound (old-ackn ,old-ackn) + (s ,s) + (flags ,flags))) + (flush-output) + (send (tcp-packet #f dst-ip dst-port src-ip src-port (buffer-seqn b) (or ackn 0) flags @@ -361,13 +494,13 @@ ;; ConnState -> Transition (define (quit-when-done s) - (if (and (buffer-finished? (conn-state-outbound s)) - (buffer-finished? (conn-state-inbound s)) - (all-output-acknowledged? s) - (> (- (current-inexact-milliseconds) (conn-state-latest-activity-time s)) - (* 2 1000 maximum-segment-lifetime-sec))) - (transition s (quit)) - (transition s '()))) + (transition s (when (and (buffer-finished? (conn-state-outbound s)) + (buffer-finished? (conn-state-inbound s)) + (all-output-acknowledged? s) + (> (- (current-inexact-milliseconds) + (conn-state-latest-activity-time s)) + (* 2 1000 maximum-segment-lifetime-sec))) + (quit)))) ;; Action (define send-set-transmit-check-timer @@ -375,23 +508,74 @@ transmit-check-interval-msec 'relative))) + ;; ConnState -> Transition + (define (reset seqn ackn is-fin? s) + (log-warning "Sending RST from ~a:~a to ~a:~a" + (ip-address->hostname dst-ip) + dst-port + (ip-address->hostname src-ip) + src-port) + (transition s + (list + (send (tcp-packet #f dst-ip dst-port src-ip src-port + seqn + (seq+ ackn (if is-fin? 1 0)) + (set 'ack 'rst) + 0 + #"" + #"")) + (quit)))) + + ;; ConnState -> ConnState + (define (close-outbound-stream s) + (transition + (struct-copy conn-state s + [outbound (struct-copy buffer (buffer-push (conn-state-outbound s) #"!") ;; dummy FIN byte + [finished? #t])]) + '())) + (define (state-vector-behavior e s) (define old-ackn (buffer-seqn (conn-state-inbound s))) (match e + [(routing-update g) + (log-info "State vector routing-update:\n~a" (gestalt->pretty-string g)) + (define local-peer-present? (not (gestalt-empty? g))) + (cond + [(and local-peer-present? (not (conn-state-local-peer-seen? s))) + (transition (struct-copy conn-state s [local-peer-seen? #t]) '())] + [(and (not local-peer-present?) (conn-state-local-peer-seen? s)) + (log-info "Closing outbound stream.") + (sequence-transitions (close-outbound-stream s) + (send-outbound old-ackn) + bump-activity-time + quit-when-done)] + [else #f])] [(message (tcp-packet #t _ _ _ _ seqn ackn flags window options data) _ _) (define expected (next-expected-seqn s)) - (sequence-transitions (if (not expected) ;; haven't seen syn yet... - (if (set-member? flags 'syn) ;; ... and this is it - (incorporate-segment data - (set-inbound-seqn (seq+ seqn 1) s)) - (transition s '())) - (if (= expected seqn) - (incorporate-segment data s) - (transition s '()))) - deliver-inbound-locally - (check-fin flags) - (discard-acknowledged-outbound (set-member? flags 'ack) ackn) - (update-outbound-window window) + (if (and (not expected) ;; no syn yet + (not (set-member? flags 'syn))) ;; and this isn't it + (reset ackn ;; this is *our* seqn + seqn ;; this is what we should acknowledge... + (set-member? flags 'fin) ;; ... +1, if fin is set + s) + (sequence-transitions (cond + [(not expected) ;; haven't seen syn yet, but we know this is it + (incorporate-segment data (set-inbound-seqn (seq+ seqn 1) s))] + [(= expected seqn) + (incorporate-segment data s)] + [else + (transition s '())]) + deliver-inbound-locally + (check-fin flags) + (discard-acknowledged-outbound (set-member? flags 'ack) ackn) + (update-outbound-window window) + (send-outbound old-ackn) + bump-activity-time + quit-when-done))] + [(message (tcp-channel _ _ bs) _ _) + (sequence-transitions (transition (struct-copy conn-state s + [outbound (buffer-push (conn-state-outbound s) bs)]) + '()) (send-outbound old-ackn) bump-activity-time quit-when-done)] @@ -412,13 +596,11 @@ ;; TODO append a dummy byte at FIN position in outbound buffer (list send-set-transmit-check-timer - (spawn state-vector-behavior - (conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position - (buffer #"" #f inbound-buffer-limit #f) - #f - (current-inexact-milliseconds)) - (gestalt-union (sub (timer-expired (timer-name ?) ?)) - (sub (tcp-packet #t src-ip src-port dst-ip dst-port ? ? ? ? ? ?)) - (pub (tcp-packet #f dst-ip dst-port src-ip src-port ? ? ? ? ? ?)) - (sub (tcp-channel dst src ?)) - (pub (tcp-channel src dst ?)))))) \ No newline at end of file + (let ((state0 (conn-state (buffer #"!" initial-outbound-seqn 0 #f) ;; dummy data at SYN position + (buffer #"" #f inbound-buffer-limit #f) + #f + (current-inexact-milliseconds) + #f))) + (spawn state-vector-behavior + state0 + (compute-gestalt state0)))))