diff --git a/network-query.rkt b/network-query.rkt index c3c99b2..b39169c 100644 --- a/network-query.rkt +++ b/network-query.rkt @@ -5,10 +5,10 @@ (require "api.rkt") (require "codec.rkt") (require "zonedb.rkt") -(require "../racket-matrix/os2.rkt") -(require "../racket-matrix/os2-udp.rkt") -(require "../racket-matrix/os2-timer.rkt") -(require "os2-dns.rkt") +(require racket-typed-matrix/sugar-untyped) +(require racket-typed-matrix/drivers/udp-untyped) +(require racket-typed-matrix/drivers/timer-untyped) +(require "tk-dns.rkt") (provide network-query (struct-out network-reply)) @@ -210,7 +210,7 @@ (format "~a.~a.~a.~a" a b c d)) (define (make-dns-address ip-address) - (udp-address (ip->host-name ip-address) 53)) + (udp-remote-address (ip->host-name ip-address) 53)) ;; network-query : UdpAddress Question DomainName NEListOf UniqueId -> BootK (define (network-query s q zone-origin server-names unique-id) @@ -248,32 +248,33 @@ [current-name current-name] [remaining-names remaining-names]) (send-message subq) - (role/fresh subq-id (topic-subscriber (answered-question subq (wild))) - #:state w - [(answered-question (== subq) ans) - (define ips - (map make-dns-address (set->list (extract-addresses current-name ans)))) - (sequence-actions - (try-next-server (struct-copy network-query-state w - [known-addresses (hash-set known-addresses - current-name - ips)] - [remaining-addresses ips])) - (delete-role subq-id))]))))] + (endpoint #:subscriber (answered-question subq (wild)) + #:let-name subq-id + #:state w + [(answered-question (== subq) ans) + (let ((ips (map make-dns-address + (set->list (extract-addresses current-name ans))))) + (sequence-actions + (try-next-server (struct-copy network-query-state w + [known-addresses (hash-set known-addresses + current-name + ips)] + [remaining-addresses ips])) + (delete-endpoint subq-id)))]))))] [(network-query-state req timeout _ (cons current-ip remaining-ips) _ _) (define rpc-id (gensym 'network-query/allocate-query-id)) (transition w (send-message `(request ,rpc-id allocate-query-id)) - (role (topic-subscriber `(reply ,rpc-id ,(wild))) - #:name rpc-id - #:state w - [`(reply ,(== rpc-id) ,id) - (sequence-actions (send-request (struct-copy network-query-state w - [remaining-addresses remaining-ips]) - id - timeout - current-ip) - (delete-role rpc-id))]))])) + (endpoint #:subscriber `(reply ,rpc-id ,(wild)) + #:name rpc-id + #:state w + [`(reply ,(== rpc-id) ,id) + (sequence-actions (send-request (struct-copy network-query-state w + [remaining-addresses remaining-ips]) + id + timeout + current-ip) + (delete-endpoint rpc-id))]))])) (define (on-answer w ans server-ip) (match ans @@ -301,7 +302,8 @@ (define (send-request w query-id timeout server-ip) (match-define (network-request s q zone-origin _ _) (network-query-state-request w)) (define query (make-dns-query-message q query-id)) - (define subscription-id (list s query-id)) + (define reply-wait-id (list s query-id 'reply-wait)) + (define timeout-id (list s query-id 'timeout)) (define start-time (current-inexact-milliseconds)) (log-debug (format "Sending ~v ~v to ~v ~v with ~v seconds of timeout" q query-id @@ -309,33 +311,40 @@ timeout)) (transition w (send-message (dns-request query s server-ip)) - (send-message (set-timer subscription-id (* timeout 1000) 'relative)) - (role (set (topic-subscriber (timer-expired subscription-id (wild))) - (topic-subscriber (dns-reply (wild) (wild) s))) - #:name subscription-id - #:state w - [(timer-expired (== subscription-id) _) - (log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds" - q query-id - zone-origin server-ip - timeout)) - (sequence-actions (try-next-server w) - (delete-role subscription-id) - (send-message (list 'release-query-id query-id)))] - [(dns-reply reply-message source (== s)) - ;; TODO: maybe receive only specifically from the queried IP address? - (log-debug - (format - "Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v" - q zone-origin server-ip - (inexact->exact (round (- (current-inexact-milliseconds) start-time))) - (dns-message-answers reply-message) - (dns-message-authorities reply-message) - (dns-message-additional reply-message))) - (if (not (= (dns-message-id reply-message) (dns-message-id query))) - (transition w) - (sequence-actions (on-answer w - (filter-dns-reply q reply-message zone-origin) - server-ip) - (delete-role subscription-id) - (send-message (list 'release-query-id query-id))))]))) + (send-message (set-timer timeout-id (* timeout 1000) 'relative)) + ;; TODO: Restore this to a "join" when proper pattern-unions are implemented + (endpoint #:subscriber (timer-expired timeout-id (wild)) + #:name timeout-id + #:state w + [(timer-expired (== timeout-id) _) + (begin + (log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds" + q query-id + zone-origin server-ip + timeout)) + (sequence-actions (try-next-server w) + (delete-endpoint timeout-id) + (delete-endpoint reply-wait-id) + (send-message (list 'release-query-id query-id))))]) + (endpoint #:subscriber (dns-reply (wild) (wild) s) + #:name reply-wait-id + #:state w + [(dns-reply reply-message source (== s)) + ;; TODO: maybe receive only specifically from the queried IP address? + (begin + (log-debug + (format + "Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v" + q zone-origin server-ip + (inexact->exact (round (- (current-inexact-milliseconds) start-time))) + (dns-message-answers reply-message) + (dns-message-authorities reply-message) + (dns-message-additional reply-message))) + (if (not (= (dns-message-id reply-message) (dns-message-id query))) + (transition w) + (sequence-actions (on-answer w + (filter-dns-reply q reply-message zone-origin) + server-ip) + (delete-endpoint timeout-id) + (delete-endpoint reply-wait-id) + (send-message (list 'release-query-id query-id)))))]))) diff --git a/proxy.rkt b/proxy.rkt index edf843d..c9913ea 100644 --- a/proxy.rkt +++ b/proxy.rkt @@ -11,10 +11,11 @@ (require "zonedb.rkt") (require "network-query.rkt") (require "resolver.rkt") -(require "../racket-matrix/os2.rkt") -(require "../racket-matrix/os2-udp.rkt") -(require "../racket-matrix/os2-timer.rkt") -(require "os2-dns.rkt") +(require racket-typed-matrix/sugar-untyped) +(require racket-typed-matrix/support/spy) +(require racket-typed-matrix/drivers/timer-untyped) +(require racket-typed-matrix/drivers/udp-untyped) +(require "tk-dns.rkt") (require racket/pretty) @@ -32,65 +33,59 @@ (log-info "Ready.") (ground-vm - (transition 'no-state - ;;(spawn udp-spy) - (spawn udp-driver #:debug-name 'udp-driver) - (spawn (timer-driver 'timer-driver) #:debug-name 'timer-driver) - (spawn (nested-vm #:debug-name 'dns-vm - (transition 'no-state - (spawn dns-spy #:debug-name 'dns-spy) - (spawn (timer-relay 'timer-relay:dns) #:debug-name 'timer-relay) - (spawn (query-id-allocator) #:debug-name 'query-id-allocator) - (spawn (dns-read-driver server-addr) #:debug-name 'server-dns-reader) - (spawn (dns-write-driver server-addr) #:debug-name 'server-dns-writer) - (spawn (dns-read-driver client-addr) #:debug-name 'client-dns-reader) - (spawn (dns-write-driver client-addr) #:debug-name 'client-dns-writer) - (spawn (packet-dispatcher server-addr) #:debug-name 'packet-dispatcher) - (spawn (question-dispatcher zone roots-only client-addr) - #:debug-name 'question-dispatcher))) - #:debug-name 'dns-vm)))) + (generic-spy 'UDP) + (udp-driver) + (timer-driver) + (nested-vm #:debug-name 'dns-vm + (dns-spy) + (timer-relay 'timer-relay:dns) + (spawn #:debug-name 'query-id-allocator #:child (query-id-allocator)) + (spawn #:debug-name 'server-dns-reader #:child (dns-read-driver server-addr)) + (spawn #:debug-name 'server-dns-writer #:child (dns-write-driver server-addr)) + (spawn #:debug-name 'client-dns-reader #:child (dns-read-driver client-addr)) + (spawn #:debug-name 'client-dns-writer #:child (dns-write-driver client-addr)) + (spawn #:debug-name 'packet-dispatcher #:child (packet-dispatcher server-addr)) + (spawn #:debug-name 'question-dispatcher + #:child (question-dispatcher zone roots-only client-addr))))) (define (query-id-allocator) ;; TODO: track how many are allocated and throttle requests if too ;; many are in flight (transition (set) ;; SetOf, all active query IDs - (role (topic-subscriber `(request ,(wild) allocate-query-id)) - #:state allocated - [`(request ,reply-addr allocate-query-id) - (let recheck () - (define n (random 65536)) - (if (set-member? allocated n) - (recheck) - (transition (set-add allocated n) - (send-message `(reply ,reply-addr ,n)))))]) - (role (topic-subscriber `(release-query-id ,(wild))) - #:state allocated - [`(release-query-id ,n) - (transition (set-remove allocated n))]))) + (endpoint #:subscriber `(request ,(wild) allocate-query-id) + #:state allocated + [`(request ,reply-addr allocate-query-id) + (let recheck () + (define n (random 65536)) + (if (set-member? allocated n) + (recheck) + (transition (set-add allocated n) + (send-message `(reply ,reply-addr ,n)))))]) + (endpoint #:subscriber `(release-query-id ,(wild)) + #:state allocated + [`(release-query-id ,n) + (transition (set-remove allocated n))]))) (define (packet-dispatcher s) (transition (set) ;; SetOf - (role (topic-subscriber (bad-dns-packet (wild) (wild) (wild) (wild))) - #:state old-active-requests - [p - (log-error (pretty-format p)) - ;; TODO: ^ perhaps use metalevel events? perhaps don't bother though - (transition old-active-requests)]) - (role (topic-subscriber (dns-request (wild) (wild) s)) - #:state old-active-requests - [(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket - (define req-id (active-request source (dns-message-id m))) - ;; TODO: when we have presence/error-handling, remove req-id - ;; from active requests once request-handler pseudothread exits. - (if (set-member? old-active-requests req-id) - (transition old-active-requests) ;; ignore retransmitted duplicates - (transition (set-add old-active-requests req-id) - (spawn (packet-relay req-id r) #:debug-name (list 'packet-relay req-id))))]) - (role (topic-subscriber (dns-reply (wild) s (wild))) - #:state old-active-requests - [(and r (dns-reply m (== s) sink)) - (define req-id (active-request sink (dns-message-id m))) - (transition (set-remove old-active-requests req-id))]))) + (endpoint #:subscriber (bad-dns-packet (wild) (wild) (wild) (wild)) + [p (begin (log-error (pretty-format p)) '())]) + (endpoint #:subscriber (dns-request (wild) (wild) s) + #:state old-active-requests + [(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket + (let ((req-id (active-request source (dns-message-id m)))) + ;; TODO: when we have presence/error-handling, remove req-id + ;; from active requests once request-handler pseudothread exits. + (if (set-member? old-active-requests req-id) + (transition old-active-requests) ;; ignore retransmitted duplicates + (transition (set-add old-active-requests req-id) + (spawn #:debug-name (list 'packet-relay req-id) + #:child (packet-relay req-id r)))))]) + (endpoint #:subscriber (dns-reply (wild) s (wild)) + #:state old-active-requests + [(and r (dns-reply m (== s) sink)) + (let ((req-id (active-request sink (dns-message-id m)))) + (transition (set-remove old-active-requests req-id)))]))) (define (packet-relay req-id request) (match-define (dns-request request-message request-source request-sink) request) @@ -127,32 +122,34 @@ original-question (dns-message-id request-message))) (transition 'no-state/packet-relay (send-message original-question) - (role/fresh wait-id (topic-subscriber (answered-question original-question (wild))) - #:state w - [(answered-question (== original-question) answer) - (log-debug (format "Final answer to ~v with query id ~v is ~v" - original-question - (dns-message-id request-message) - answer)) - (transition w - (delete-role wait-id) - (send-message (answer->reply original-question answer)))]))])) + (endpoint #:subscriber (answered-question original-question (wild)) + #:state w + #:let-name wait-id + [(answered-question (== original-question) answer) + (begin (log-debug (format "Final answer to ~v with query id ~v is ~v" + original-question + (dns-message-id request-message) + answer)) + (transition w + (delete-endpoint wait-id) + (send-message (answer->reply original-question answer))))]))])) (define (glueless-question-handler roots-only-zone q client-sock) ;; Restart q, an overly-glueless question, from the roots. (define restarted-question (restart-question q)) (transition 'no-state - (role/fresh relay (topic-subscriber (answered-question restarted-question (wild))) - #:state w - [(answered-question (== restarted-question) ans) - ;; We got the answer to our restarted question; now transform - ;; it into an answer to the original question, to unblock the - ;; original questioner. - (transition w - (delete-role relay) - (send-message (answered-question q ans)))]) - (spawn (question-handler roots-only-zone restarted-question client-sock) - #:debug-name (list 'glueless-question-handler-inner restarted-question)))) + (endpoint #:subscriber (answered-question restarted-question (wild)) + #:state w + #:let-name relay + [(answered-question (== restarted-question) ans) + ;; We got the answer to our restarted question; now transform + ;; it into an answer to the original question, to unblock the + ;; original questioner. + (transition w + (delete-endpoint relay) + (send-message (answered-question q ans)))]) + (spawn #:debug-name (list 'glueless-question-handler-inner restarted-question) + #:child (question-handler roots-only-zone restarted-question client-sock)))) (define (question-dispatcher seed-zone roots-only client-sock) (define (transition-and-set-timers new-zone timers) @@ -163,58 +160,59 @@ (define-values (cleaned-seed-zone initial-timers) (zone-expire seed-zone)) (sequence-actions (transition-and-set-timers cleaned-seed-zone initial-timers) ;; TODO: consider deduping questions here too? - (role (topic-subscriber `(debug-dump)) - #:state zone - [`(debug-dump) - (with-output-to-file "zone-proxy.zone" - (lambda () - (write-bytes (bit-string->bytes (zone->bit-string zone)))) - #:mode 'binary - #:exists 'replace) - (with-output-to-file "zone-proxy.dump" - (lambda () - (display "----------------------------------------------------------------------\n") - (display (seconds->date (current-seconds))) - (newline) - (for* ([(name rrmap) zone] [(rr expiry) rrmap]) - (write (list rr expiry)) - (newline)) - (newline)) - #:mode 'text - #:exists 'append) - (with-output-to-file "zone-proxy.debug" - (lambda () - (display "----------------------------------------------------------------------\n") - (display (seconds->date (current-seconds))) - (newline) - (pretty-write current-ground-transition)) - #:mode 'text - #:exists 'append) - (transition zone)]) - (role (topic-subscriber (question (wild) (wild) (wild) (wild))) - #:state zone - [(? question? q) - (transition zone - (cond - [(question-cyclic? q) - (log-warning (format "Cyclic question ~v" q)) - (send-message (answered-question q (empty-complete-answer)))] - [(question-too-glueless? q) - (log-warning (format "Overly-glueless question ~v" q)) - (spawn (glueless-question-handler roots-only q client-sock) - #:debug-name (list 'glueless-question-handler-outer q))] - [else - (spawn (question-handler zone q client-sock) - #:debug-name (list 'question-handler q))]))]) - (role (topic-subscriber (network-reply (wild) (wild))) - #:state zone - [(network-reply _ answer) - (define-values (new-zone timers) (incorporate-complete-answer answer zone)) - (transition-and-set-timers new-zone timers)]) - (role (topic-subscriber (timer-expired (list 'check-dns-expiry (wild)) (wild))) - #:state zone - [(timer-expired (list 'check-dns-expiry name) now-msec) - (transition (zone-expire-name zone name (/ now-msec 1000.0)))]))) + (endpoint #:subscriber `(debug-dump) + #:state zone + [`(debug-dump) + (begin + (with-output-to-file "zone-proxy.zone" + (lambda () + (write-bytes (bit-string->bytes (zone->bit-string zone)))) + #:mode 'binary + #:exists 'replace) + (with-output-to-file "zone-proxy.dump" + (lambda () + (display "----------------------------------------------------------------------\n") + (display (seconds->date (current-seconds))) + (newline) + (for* ([(name rrmap) zone] [(rr expiry) rrmap]) + (write (list rr expiry)) + (newline)) + (newline)) + #:mode 'text + #:exists 'append) + ;; (with-output-to-file "zone-proxy.debug" + ;; (lambda () + ;; (display "----------------------------------------------------------------------\n") + ;; (display (seconds->date (current-seconds))) + ;; (newline) + ;; (pretty-write current-ground-transition)) + ;; #:mode 'text + ;; #:exists 'append) + '())]) + (endpoint #:subscriber (question (wild) (wild) (wild) (wild)) + #:state zone + [(? question? q) + (transition zone + (cond + [(question-cyclic? q) + (log-warning (format "Cyclic question ~v" q)) + (send-message (answered-question q (empty-complete-answer)))] + [(question-too-glueless? q) + (log-warning (format "Overly-glueless question ~v" q)) + (spawn #:debug-name (list 'glueless-question-handler-outer q) + #:child (glueless-question-handler roots-only q client-sock))] + [else + (spawn #:debug-name (list 'question-handler q) + #:child (question-handler zone q client-sock))]))]) + (endpoint #:subscriber (network-reply (wild) (wild)) + #:state zone + [(network-reply _ answer) + (let-values (((new-zone timers) (incorporate-complete-answer answer zone))) + (transition-and-set-timers new-zone timers))]) + (endpoint #:subscriber (timer-expired (list 'check-dns-expiry (wild)) (wild)) + #:state zone + [(timer-expired (list 'check-dns-expiry name) now-msec) + (transition (zone-expire-name zone name (/ now-msec 1000.0)))]))) (struct question-state (zone q client-sock nameservers-tried retry-count) #:prefab) (struct expanding-cnames (q accumulator remaining-count) #:prefab) @@ -244,39 +242,39 @@ q referral-id (domain-labels zone-origin) (map domain-labels (set-map nameserver-rrs rr-rdata)))) (transition w - (spawn (network-query client-sock - q - zone-origin - (map rr-rdata (set->list nameserver-rrs)) - referral-id) - #:debug-name (list 'network-query q)) - (role (topic-subscriber (network-reply referral-id (wild))) - #:name referral-id - #:state w - [(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN - (transition w - (delete-role referral-id) - (send-message (answered-question q #f)))] - [(network-reply (== referral-id) ans) - (define-values (new-zone ignored-timers) (incorporate-complete-answer ans zone)) - (when (log-level? (current-logger) 'debug) - (log-debug (format "Referral ~v results in origin ~v:~n" - referral-id zone-origin)) - (for ([k (set-union (list->set (hash-keys zone)) - (list->set (hash-keys new-zone)))] - #:when (in-bailiwick? k zone-origin)) - (log-debug (format "Old ~v ~v~nNew ~v ~v" - k (hash-ref zone k 'missing) - k (hash-ref new-zone k 'missing)))) - (log-debug "=-=-=-=-=-=")) - (define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr))) - (sequence-actions - (retry-question (struct-copy question-state w - [nameservers-tried (set-union nameservers-tried - nameserver-names)] - [zone new-zone] - [retry-count (+ old-retry-count 1)])) - (delete-role referral-id))]))] + (spawn #:debug-name (list 'network-query q) + #:child (network-query client-sock + q + zone-origin + (map rr-rdata (set->list nameserver-rrs)) + referral-id)) + (endpoint #:subscriber (network-reply referral-id (wild)) + #:name referral-id + #:state w + [(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN + (transition w + (delete-endpoint referral-id) + (send-message (answered-question q #f)))] + [(network-reply (== referral-id) ans) + (let-values (((new-zone ignored-timers) (incorporate-complete-answer ans zone))) + (when (log-level? (current-logger) 'debug) + (log-debug (format "Referral ~v results in origin ~v:~n" + referral-id zone-origin)) + (for ([k (set-union (list->set (hash-keys zone)) + (list->set (hash-keys new-zone)))] + #:when (in-bailiwick? k zone-origin)) + (log-debug (format "Old ~v ~v~nNew ~v ~v" + k (hash-ref zone k 'missing) + k (hash-ref new-zone k 'missing)))) + (log-debug "=-=-=-=-=-=")) + (define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr))) + (sequence-actions + (retry-question (struct-copy question-state w + [nameservers-tried (set-union nameservers-tried + nameserver-names)] + [zone new-zone] + [retry-count (+ old-retry-count 1)])) + (delete-endpoint referral-id)))]))] [(? complete-answer? ans) (transition w (send-message (answered-question q ans)))] [(partial-answer base cnames) @@ -285,18 +283,19 @@ ;; TODO: record chains of CNAMEs to avoid pathologically-long chains (define cname-q (cname-question cname q)) (list (send-message cname-q) - (role/fresh subscription-id - (topic-subscriber (answered-question cname-q (wild))) - #:state (expanding-cnames q acc remaining) - [(answered-question (== cname-q) ans) - (define new-acc (if ans (merge-answers acc ans) acc)) - (define new-remaining (- remaining 1)) - (define new-w (expanding-cnames q new-acc new-remaining)) - (transition new-w - (delete-role subscription-id) - (if (zero? new-remaining) - (send-message (answered-question q new-acc)) - '()))]))) + (endpoint #:subscriber (answered-question cname-q (wild)) + #:state (expanding-cnames q acc remaining) + #:let-name subscription-id + [(answered-question (== cname-q) ans) + (let () + (define new-acc (if ans (merge-answers acc ans) acc)) + (define new-remaining (- remaining 1)) + (define new-w (expanding-cnames q new-acc new-remaining)) + (transition new-w + (delete-endpoint subscription-id) + (if (zero? new-remaining) + (send-message (answered-question q new-acc)) + '())))]))) cnames))])])) (require "test-rrs.rkt")