From aabeb5adcde7c6a7a6ce7c29b2fde874ec52322c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 23 Jan 2016 21:59:33 -0500 Subject: [PATCH] UDP. --- main.rkt | 6 +-- port-allocator.rkt | 11 ++--- udp.rkt | 112 ++++++++++++++++++++++----------------------- 3 files changed, 64 insertions(+), 65 deletions(-) diff --git a/main.rkt b/main.rkt index 8feeb27..cb6a11c 100644 --- a/main.rkt +++ b/main.rkt @@ -7,7 +7,7 @@ (require "arp.rkt") (require "ip.rkt") ;; (require "tcp.rkt") -;; (require "udp.rkt") +(require "udp.rkt") ;;(log-events-and-actions? #t) @@ -16,7 +16,7 @@ (spawn-arp-driver) (spawn-ip-driver) ;; (spawn-tcp-driver) -;; (spawn-udp-driver) +(spawn-udp-driver) (spawn-demo-config) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -66,7 +66,7 @@ spawn-session) ) -#;(let () +(let () (spawn (lambda (e s) (match e [(message (udp-packet src dst body)) diff --git a/port-allocator.rkt b/port-allocator.rkt index 1599cd5..533a9e4 100644 --- a/port-allocator.rkt +++ b/port-allocator.rkt @@ -16,12 +16,12 @@ (define (spawn-port-allocator allocator-type observer-gestalt compute-used-ports) (spawn (lambda (e s) (match e - [(routing-update g) + [(scn g) (define local-ips (or (gestalt->local-ip-addresses g) (set))) (define new-used-ports (compute-used-ports g local-ips)) (log-info "port-allocator ~v used ports: ~v" allocator-type new-used-ports) (transition (port-allocator-state new-used-ports local-ips) '())] - [(message (port-allocation-request _ k) _ _) + [(message (port-allocation-request _ k)) (define currently-used-ports (port-allocator-state-used-ports s)) (let randomly-allocate-until-unused () (define p (+ 1024 (random 64512))) @@ -32,7 +32,6 @@ (k p (port-allocator-state-local-ips s)))))] [_ #f])) (port-allocator-state (set) (set)) - (apply gestalt-union - (sub (port-allocation-request allocator-type ?)) - observe-local-ip-addresses-gestalt - observer-gestalt))) + (scn/union (subscription (port-allocation-request allocator-type ?)) + observe-local-ip-addresses-gestalt + observer-gestalt))) diff --git a/udp.rkt b/udp.rkt index d275571..d1302ae 100644 --- a/udp.rkt +++ b/udp.rkt @@ -43,6 +43,7 @@ ;; KERNEL-level protocol (struct udp-datagram (source-ip source-port destination-ip destination-port body) #:prefab) +(struct udp-port-allocation (port handle) #:prefab) ;; (udp-port-allocation Number UdpLocalAddress) (define any-remote (udp-remote-address ? ?)) @@ -51,26 +52,24 @@ (define (spawn-udp-driver) (list - (spawn-demand-matcher (udp-packet ? (?! (udp-listener ?)) ?) - #:demand-is-subscription? #t + (spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?)) + (advertise (udp-packet ? (?! (udp-listener ?)) ?)) (lambda (handle) (spawn-udp-relay (udp-listener-port handle) handle))) - (spawn-demand-matcher (udp-packet ? (?! (udp-handle ?)) ?) - #:demand-is-subscription? #t + (spawn-demand-matcher (observe (udp-packet ? (?! (udp-handle ?)) ?)) + (advertise (udp-packet ? (?! (udp-handle ?)) ?)) (lambda (handle) - (send (port-allocation-request - 'udp - (lambda (port local-ips) (spawn-udp-relay port handle)))))) + (message (port-allocation-request + 'udp + (lambda (port local-ips) (spawn-udp-relay port handle)))))) (spawn-udp-port-allocator) (spawn-kernel-udp-driver))) (define (spawn-udp-port-allocator) - (define udp-projector (project-pubs (udp-datagram (?!) (?!) ? ? ?))) + (define udp-projector (udp-port-allocation (?!) ?)) (spawn-port-allocator 'udp - (list (projection->gestalt udp-projector)) + (subscription (projection->pattern udp-projector)) (lambda (g local-ips) - (for/set [(e (gestalt-project/keys g udp-projector)) - #:when (set-member? local-ips (car e))] - (cadr e))))) + (project-assertions g udp-projector)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Relaying @@ -78,44 +77,45 @@ (define (spawn-udp-relay local-port local-user-addr) (log-info "Spawning UDP relay ~v / ~v" local-port local-user-addr) - (define local-peer-gestalt (pub (udp-packet any-remote local-user-addr ?) #:level 1)) + (define local-peer-detector (?! (observe (udp-packet any-remote local-user-addr ?)))) (define (compute-gestalt local-ips) - (for/fold [(g (gestalt-union local-peer-gestalt - observe-local-ip-addresses-gestalt - (pub (udp-packet any-remote local-user-addr ?)) - (sub (udp-packet local-user-addr any-remote ?))))] - [(ip (in-set local-ips))] - (gestalt-union g - (sub (udp-datagram ? ? ip local-port ?)) - (pub (udp-datagram ip local-port ? ? ?))))) + (for/fold [(g (assertion-set-union + (subscription (projection->pattern local-peer-detector)) + (advertisement (udp-packet any-remote local-user-addr ?)) + observe-local-ip-addresses-gestalt + (subscription (udp-packet local-user-addr any-remote ?)) + (assertion (udp-port-allocation local-port local-user-addr))))] + [(ip (in-set local-ips))] + (assertion-set-union g + (subscription (udp-datagram ? ? ip local-port ?)) + (advertisement (udp-datagram ip local-port ? ? ?))))) (spawn (lambda (e local-ips) (match e - [(routing-update g) + [(scn g) (define new-local-ips (gestalt->local-ip-addresses g)) - (transition new-local-ips - (list - (when (gestalt-empty? (gestalt-filter g local-peer-gestalt)) (quit)) - (routing-update (compute-gestalt new-local-ips))))] - [(message (udp-packet (== local-user-addr) remote-addr bs) _ _) + (if (trie-empty? (trie-project g (compile-projection local-peer-detector))) + (quit) + (transition new-local-ips (scn (compute-gestalt new-local-ips))))] + [(message (udp-packet (== local-user-addr) remote-addr bs)) ;; Choose arbitrary local IP address for outbound packet! ;; TODO: what can be done? Must I examine the routing table? (match-define (udp-remote-address remote-host remote-port) remote-addr) (define remote-ip (ip-string->ip-address remote-host)) - (transition local-ips (send (udp-datagram (set-first local-ips) - local-port - remote-ip - remote-port - bs)))] - [(message (udp-datagram si sp _ _ bs) _ _) - (transition local-ips (send (udp-packet (udp-remote-address (ip-address->hostname si) - sp) - local-user-addr - bs)))] + (transition local-ips (message (udp-datagram (set-first local-ips) + local-port + remote-ip + remote-port + bs)))] + [(message (udp-datagram si sp _ _ bs)) + (transition local-ips + (message (udp-packet (udp-remote-address (ip-address->hostname si) sp) + local-user-addr + bs)))] [_ #f])) (set) - (compute-gestalt (set)))) + (scn (compute-gestalt (set))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Codec & kernel-level driver @@ -125,9 +125,9 @@ (define (spawn-kernel-udp-driver) (spawn (lambda (e local-ips) (match e - [(routing-update g) + [(scn g) (transition (gestalt->local-ip-addresses g) '())] - [(message (ip-packet source-if src-ip dst-ip _ _ body) _ _) + [(message (ip-packet source-if src-ip dst-ip _ _ body)) #:when (and source-if (set-member? local-ips dst-ip)) (bit-string-case body ([ (src-port :: integer bytes 2) @@ -138,14 +138,14 @@ (bit-string-case data ([ (payload :: binary bytes (- length 8)) ;; min UDP header size is 8 bytes (:: binary) ] - (transition local-ips (send (udp-datagram src-ip - src-port - dst-ip - dst-port - (bit-string->bytes payload))))) + (transition local-ips (message (udp-datagram src-ip + src-port + dst-ip + dst-port + (bit-string->bytes payload))))) (else #f))) (else #f))] - [(message (udp-datagram src-ip src-port dst-ip dst-port bs) _ _) + [(message (udp-datagram src-ip src-port dst-ip dst-port bs)) #:when (set-member? local-ips src-ip) (let* ((payload (bit-string (src-port :: integer bytes 2) (dst-port :: integer bytes 2) @@ -161,15 +161,15 @@ :: integer bytes 2))) (checksummed-payload (ip-checksum #:pseudo-header pseudo-header 6 payload))) - (transition local-ips (send (ip-packet #f - src-ip - dst-ip - PROTOCOL-UDP - #"" - checksummed-payload))))] + (transition local-ips (message (ip-packet #f + src-ip + dst-ip + PROTOCOL-UDP + #"" + checksummed-payload))))] [_ #f])) (set) - (gestalt-union (pub (ip-packet #f ? ? PROTOCOL-UDP ? ?)) - (sub (ip-packet ? ? ? PROTOCOL-UDP ? ?)) - (sub (udp-datagram ? ? ? ? ?)) - observe-local-ip-addresses-gestalt))) + (scn/union (advertisement (ip-packet #f ? ? PROTOCOL-UDP ? ?)) + (subscription (ip-packet ? ? ? PROTOCOL-UDP ? ?)) + (subscription (udp-datagram ? ? ? ? ?)) + observe-local-ip-addresses-gestalt)))