diff --git a/network-query.rkt b/network-query.rkt index a9af378..db2a7a7 100644 --- a/network-query.rkt +++ b/network-query.rkt @@ -5,10 +5,10 @@ (require "api.rkt") (require "codec.rkt") (require "zonedb.rkt") -(require "../racket-matrix/os-big-bang.rkt") -(require "../racket-matrix/os-udp.rkt") -(require "../racket-matrix/os-timer.rkt") -(require "os-dns.rkt") +(require "../racket-matrix/os2.rkt") +(require "../racket-matrix/os2-udp.rkt") +(require "../racket-matrix/os2-timer.rkt") +(require "os2-dns.rkt") (provide network-query (struct-out network-reply)) @@ -214,13 +214,13 @@ ;; network-query : UdpAddress Question DomainName NEListOf UniqueId -> BootK (define (network-query s q zone-origin server-names unique-id) - (os-big-bang/transition - (try-next-server (network-query-state (network-request s q zone-origin server-names unique-id) - first-timeout - (hash) - '() - #f - server-names)))) + (lambda (self-pid) + (try-next-server (network-query-state (network-request s q zone-origin server-names unique-id) + first-timeout + (hash) + '() + #f + server-names)))) (define (try-next-server w) (match w @@ -248,31 +248,31 @@ [current-name current-name] [remaining-names remaining-names]) (send-message subq) - (subscribe/fresh subscription-id - (message-handlers w - [(answered-question (== subq) ans) - (define ips - (map make-dns-address (set->list (extract-addresses current-name ans)))) - (extend-transition - (try-next-server (struct-copy network-query-state w - [known-addresses (hash-set known-addresses - current-name - ips)] - [remaining-addresses ips])) - (unsubscribe subscription-id))])))))] + (role/fresh subq-id (topic-subscriber (answered-question subq (wild))) + #:state w + [(answered-question (== subq) ans) + (define ips + (map make-dns-address (set->list (extract-addresses current-name ans)))) + (extend-transition + (try-next-server (struct-copy network-query-state w + [known-addresses (hash-set known-addresses + current-name + ips)] + [remaining-addresses ips])) + (delete-role subq-id))]))))] [(network-query-state req timeout _ (cons current-ip remaining-ips) _ _) (define rpc-id (gensym 'network-query/allocate-query-id)) (transition w (send-message `(request ,rpc-id allocate-query-id)) - (subscribe rpc-id - (message-handlers w - [`(reply ,(== rpc-id) ,id) - (extend-transition (send-request (struct-copy network-query-state w - [remaining-addresses remaining-ips]) - id - timeout - current-ip) - (unsubscribe rpc-id))])))])) + (role rpc-id (topic-subscriber `(reply ,rpc-id ,(wild))) + #:state w + [`(reply ,(== rpc-id) ,id) + (extend-transition (send-request (struct-copy network-query-state w + [remaining-addresses remaining-ips]) + id + timeout + current-ip) + (delete-role rpc-id))]))])) (define (on-answer w ans server-ip) (match ans @@ -309,30 +309,32 @@ (transition w (send-message (dns-request query s server-ip)) (send-message (set-timer subscription-id (* timeout 1000) 'relative)) - (subscribe subscription-id - (message-handlers w - [(timer-expired (== subscription-id) _) - (log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds" - q query-id - zone-origin server-ip - timeout)) - (extend-transition (try-next-server w) - (unsubscribe subscription-id) - (send-message (list 'release-query-id query-id)))] - [(dns-reply reply-message source (== s)) - ;; TODO: maybe receive only specifically from the queried IP address? - (log-debug - (format - "Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v" - q zone-origin server-ip - (inexact->exact (round (- (current-inexact-milliseconds) start-time))) - (dns-message-answers reply-message) - (dns-message-authorities reply-message) - (dns-message-additional reply-message))) - (if (not (= (dns-message-id reply-message) (dns-message-id query))) - w - (extend-transition (on-answer w - (filter-dns-reply q reply-message zone-origin) - server-ip) - (unsubscribe subscription-id) - (send-message (list 'release-query-id query-id))))])))) + (role subscription-id + (set (topic-subscriber (timer-expired subscription-id (wild))) + (topic-subscriber (dns-reply (wild) (wild) s))) + #:state w + [(timer-expired (== subscription-id) _) + (log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds" + q query-id + zone-origin server-ip + timeout)) + (extend-transition (try-next-server w) + (delete-role subscription-id) + (send-message (list 'release-query-id query-id)))] + [(dns-reply reply-message source (== s)) + ;; TODO: maybe receive only specifically from the queried IP address? + (log-debug + (format + "Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v" + q zone-origin server-ip + (inexact->exact (round (- (current-inexact-milliseconds) start-time))) + (dns-message-answers reply-message) + (dns-message-authorities reply-message) + (dns-message-additional reply-message))) + (if (not (= (dns-message-id reply-message) (dns-message-id query))) + w + (extend-transition (on-answer w + (filter-dns-reply q reply-message zone-origin) + server-ip) + (delete-role subscription-id) + (send-message (list 'release-query-id query-id))))]))) diff --git a/proxy.rkt b/proxy.rkt index 85de992..9a816df 100644 --- a/proxy.rkt +++ b/proxy.rkt @@ -11,11 +11,10 @@ (require "zonedb.rkt") (require "network-query.rkt") (require "resolver.rkt") -(require "../racket-matrix/os.rkt") -(require "../racket-matrix/os-big-bang.rkt") -(require "../racket-matrix/os-udp.rkt") -(require "../racket-matrix/os-timer.rkt") -(require "os-dns.rkt") +(require "../racket-matrix/os2.rkt") +(require "../racket-matrix/os2-udp.rkt") +(require "../racket-matrix/os2-timer.rkt") +(require "os2-dns.rkt") (require racket/pretty) @@ -27,77 +26,72 @@ ;; start-proxy : UInt16 CompiledZone CompiledZone -> Void (define (start-proxy port-number zone roots-only) + (define server-addr (udp-listener port-number)) + (define client-addr (udp-handle 'dns-client)) - (define boot-server - (os-big-bang 'no-state/boot-server - (spawn dns-spy) - (spawn (timer-relay 'timer-relay:dns)) - (spawn (query-id-allocator)) - (send-meta-message `(request create-server-socket (udp new ,port-number 512))) - (subscribe/fresh wait-id - (meta-message-handlers w - [`(reply create-server-socket ,s) - (transition w - (unsubscribe wait-id) - (send-meta-message - `(request create-client-socket (udp new 0 512))) - (client-socket-waiter s))])))) + (log-info "Ready.") - (define (client-socket-waiter s) - (subscribe/fresh wait-id - (meta-message-handlers w - [`(reply create-client-socket ,c) - (log-info "Ready.") - (transition w - (unsubscribe wait-id) - (spawn (dns-read-driver s)) - (spawn (dns-write-driver s)) - (spawn (dns-read-driver c)) - (spawn (dns-write-driver c)) - (spawn (packet-dispatcher s)) - (spawn (question-dispatcher zone roots-only c)))]))) - - (ground-vm (os-big-bang (void) - ;;(spawn udp-spy) - (spawn udp-driver) - (spawn (timer-driver 'timer-driver)) - (spawn (nested-vm boot-server))))) + (ground-vm + (transition 'no-state + ;;(spawn udp-spy) + (spawn udp-driver #:debug-name 'udp-driver) + (spawn (timer-driver 'timer-driver) #:debug-name 'timer-driver) + (spawn (nested-vm + 'dns-vm + (transition 'no-state + (spawn dns-spy #:debug-name 'dns-spy) + (spawn (timer-relay 'timer-relay:dns) #:debug-name 'timer-relay) + (spawn (query-id-allocator) #:debug-name 'query-id-allocator) + (spawn (dns-read-driver server-addr) #:debug-name 'server-dns-reader) + (spawn (dns-write-driver server-addr) #:debug-name 'server-dns-writer) + (spawn (dns-read-driver client-addr) #:debug-name 'client-dns-reader) + (spawn (dns-write-driver client-addr) #:debug-name 'client-dns-writer) + (spawn (packet-dispatcher server-addr) #:debug-name 'packet-dispatcher) + (spawn (question-dispatcher zone roots-only client-addr) + #:debug-name 'question-dispatcher))) + #:debug-name 'dns-vm)))) (define (query-id-allocator) ;; TODO: track how many are allocated and throttle requests if too ;; many are in flight - (os-big-bang (set) ;; SetOf, all active query IDs - (subscribe 'query-id-request-handler - (message-handlers allocated - [`(request ,reply-addr allocate-query-id) - (let recheck () - (define n (random 65536)) - (if (set-member? allocated n) - (recheck) - (transition (set-add allocated n) - (send-message `(reply ,reply-addr ,n)))))] - [`(release-query-id ,n) - (transition (set-remove allocated n))])))) + (transition (set) ;; SetOf, all active query IDs + (role 'query-id-request-handler (topic-subscriber `(request ,(wild) allocate-query-id)) + #:state allocated + [`(request ,reply-addr allocate-query-id) + (let recheck () + (define n (random 65536)) + (if (set-member? allocated n) + (recheck) + (transition (set-add allocated n) + (send-message `(reply ,reply-addr ,n)))))]) + (role 'query-id-release-handler (topic-subscriber `(release-query-id ,(wild))) + #:state allocated + [`(release-query-id ,n) + (transition (set-remove allocated n))]))) (define (packet-dispatcher s) - (os-big-bang (set) ;; SetOf - (subscribe 'packet-dispatcher - (message-handlers old-active-requests - [(? bad-dns-packet? p) - (log-error (pretty-format p)) - ;; TODO: ^ perhaps use metalevel events? perhaps don't bother though - old-active-requests] - [(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket - (define req-id (active-request source (dns-message-id m))) - ;; TODO: when we have presence/error-handling, remove req-id - ;; from active requests once request-handler pseudothread exits. - (if (set-member? old-active-requests req-id) - old-active-requests ;; ignore retransmitted duplicates - (transition (set-add old-active-requests req-id) - (spawn (packet-relay req-id r))))] - [(and r (dns-reply m (== s) sink)) - (define req-id (active-request sink (dns-message-id m))) - (set-remove old-active-requests req-id)])))) + (transition (set) ;; SetOf + (role 'error-logger (topic-subscriber (bad-dns-packet (wild) (wild) (wild) (wild))) + #:state old-active-requests + [p + (log-error (pretty-format p)) + ;; TODO: ^ perhaps use metalevel events? perhaps don't bother though + old-active-requests]) + (role 'request-booter (topic-subscriber (dns-request (wild) (wild) s)) + #:state old-active-requests + [(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket + (define req-id (active-request source (dns-message-id m))) + ;; TODO: when we have presence/error-handling, remove req-id + ;; from active requests once request-handler pseudothread exits. + (if (set-member? old-active-requests req-id) + old-active-requests ;; ignore retransmitted duplicates + (transition (set-add old-active-requests req-id) + (spawn (packet-relay req-id r) #:debug-name (list 'packet-relay req-id))))]) + (role 'reply-cleanup (topic-subscriber (dns-reply (wild) s (wild))) + #:state old-active-requests + [(and r (dns-reply m (== s) sink)) + (define req-id (active-request sink (dns-message-id m))) + (set-remove old-active-requests req-id)]))) (define (packet-relay req-id request) (match-define (dns-request request-message request-source request-sink) request) @@ -126,39 +120,40 @@ (match (dns-message-questions request-message) ['() ;; No questions! - (os-big-bang 'no-state/packet-relay + (transition 'no-state/packet-relay (send-message (answer->reply #f (empty-complete-answer))))] [(cons original-question _) ;; At least one question (log-debug (format "Looking up ~v with query id ~v" original-question (dns-message-id request-message))) - (os-big-bang 'no-state/packet-relay + (transition 'no-state/packet-relay (send-message original-question) - (subscribe/fresh wait-id - (message-handlers w - [(answered-question (== original-question) answer) - (log-debug (format "Final answer to ~v with query id ~v is ~v" - original-question - (dns-message-id request-message) - answer)) - (transition w - (unsubscribe wait-id) - (send-message (answer->reply original-question answer)))])))])) + (role/fresh wait-id (topic-subscriber (answered-question original-question (wild))) + #:state w + [(answered-question (== original-question) answer) + (log-debug (format "Final answer to ~v with query id ~v is ~v" + original-question + (dns-message-id request-message) + answer)) + (transition w + (delete-role wait-id) + (send-message (answer->reply original-question answer)))]))])) (define (glueless-question-handler roots-only-zone q client-sock) ;; Restart q, an overly-glueless question, from the roots. (define restarted-question (restart-question q)) - (os-big-bang 'no-state - (subscribe/fresh relay - (message-handlers w - [(answered-question (== restarted-question) ans) - ;; We got the answer to our restarted question; now transform - ;; it into an answer to the original question, to unblock the - ;; original questioner. - (transition w - (unsubscribe relay) - (send-message (answered-question q ans)))])) - (spawn (question-handler roots-only-zone restarted-question client-sock)))) + (transition 'no-state + (role/fresh relay (topic-subscriber (answered-question restarted-question (wild))) + #:state w + [(answered-question (== restarted-question) ans) + ;; We got the answer to our restarted question; now transform + ;; it into an answer to the original question, to unblock the + ;; original questioner. + (transition w + (delete-role relay) + (send-message (answered-question q ans)))]) + (spawn (question-handler roots-only-zone restarted-question client-sock) + #:debug-name (list 'glueless-question-handler-inner restarted-question)))) (define (question-dispatcher seed-zone roots-only client-sock) (define (transition-and-set-timers new-zone timers) @@ -167,60 +162,67 @@ (match-define (cons name ttl) timerspec) (send-message (set-timer (list 'check-dns-expiry name) (* ttl 1000) 'relative))))) (define-values (cleaned-seed-zone initial-timers) (zone-expire seed-zone)) - (os-big-bang/transition - (extend-transition (transition-and-set-timers cleaned-seed-zone initial-timers) - ;; TODO: consider deduping questions here too? - (subscribe 'question-handler-factory - (message-handlers zone - [`(debug-dump) - (with-output-to-file "zone-proxy.zone" - (lambda () - (write-bytes (bit-string->bytes (zone->bit-string zone)))) - #:mode 'binary - #:exists 'replace) - (with-output-to-file "zone-proxy.dump" - (lambda () - (display "----------------------------------------------------------------------\n") - (display (seconds->date (current-seconds))) - (newline) - (for* ([(name rrmap) zone] [(rr expiry) rrmap]) - (write (list rr expiry)) - (newline)) - (newline)) - #:mode 'text - #:exists 'append) - (with-output-to-file "zone-proxy.debug" - (lambda () - (display "----------------------------------------------------------------------\n") - (display (seconds->date (current-seconds))) - (newline) - (pretty-write current-ground-transition)) - #:mode 'text - #:exists 'append) - zone] - [(? question? q) - (transition zone - (cond - [(question-cyclic? q) - (log-warning (format "Cyclic question ~v" q)) - (send-message (answered-question q (empty-complete-answer)))] - [(question-too-glueless? q) - (log-warning (format "Overly-glueless question ~v" q)) - (spawn (glueless-question-handler roots-only q client-sock))] - [else - (spawn (question-handler zone q client-sock))]))] - [(network-reply _ answer) - (define-values (new-zone timers) (incorporate-complete-answer answer zone)) - (transition-and-set-timers new-zone timers)] - [(timer-expired (list 'check-dns-expiry name) now-msec) - (zone-expire-name zone name (/ now-msec 1000.0))]))))) + (extend-transition (transition-and-set-timers cleaned-seed-zone initial-timers) + ;; TODO: consider deduping questions here too? + (role 'debug-dumper (topic-subscriber `(debug-dump)) + #:state zone + [`(debug-dump) + (with-output-to-file "zone-proxy.zone" + (lambda () + (write-bytes (bit-string->bytes (zone->bit-string zone)))) + #:mode 'binary + #:exists 'replace) + (with-output-to-file "zone-proxy.dump" + (lambda () + (display "----------------------------------------------------------------------\n") + (display (seconds->date (current-seconds))) + (newline) + (for* ([(name rrmap) zone] [(rr expiry) rrmap]) + (write (list rr expiry)) + (newline)) + (newline)) + #:mode 'text + #:exists 'append) + (with-output-to-file "zone-proxy.debug" + (lambda () + (display "----------------------------------------------------------------------\n") + (display (seconds->date (current-seconds))) + (newline) + (pretty-write current-ground-transition)) + #:mode 'text + #:exists 'append) + zone]) + (role 'question-handler-factory (topic-subscriber (question (wild) (wild) (wild) (wild))) + #:state zone + [(? question? q) + (transition zone + (cond + [(question-cyclic? q) + (log-warning (format "Cyclic question ~v" q)) + (send-message (answered-question q (empty-complete-answer)))] + [(question-too-glueless? q) + (log-warning (format "Overly-glueless question ~v" q)) + (spawn (glueless-question-handler roots-only q client-sock) + #:debug-name (list 'glueless-question-handler-outer q))] + [else + (spawn (question-handler zone q client-sock) + #:debug-name (list 'question-handler q))]))]) + (role 'network-reply-snoop (topic-subscriber (network-reply (wild) (wild))) + #:state zone + [(network-reply _ answer) + (define-values (new-zone timers) (incorporate-complete-answer answer zone)) + (transition-and-set-timers new-zone timers)]) + (role 'timer-expiry-handler + (topic-subscriber (timer-expired (list 'check-dns-expiry (wild)) (wild))) + #:state zone + [(timer-expired (list 'check-dns-expiry name) now-msec) + (zone-expire-name zone name (/ now-msec 1000.0))]))) (struct question-state (zone q client-sock nameservers-tried retry-count) #:prefab) (struct expanding-cnames (q accumulator remaining-count) #:prefab) (define (question-handler zone q client-sock) - (os-big-bang/transition - (retry-question (question-state zone q client-sock (set) 0)))) + (retry-question (question-state zone q client-sock (set) 0))) (define (send-empty-reply w q) (transition w (send-message (answered-question q (empty-complete-answer))))) @@ -248,33 +250,34 @@ q zone-origin (map rr-rdata (set->list nameserver-rrs)) - referral-id)) - (subscribe referral-id - (message-handlers w - [(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN - (transition w - (unsubscribe referral-id) - (send-message (answered-question q #f)))] - [(network-reply (== referral-id) ans) - (define-values (new-zone ignored-timers) (incorporate-complete-answer ans zone)) - (when (log-level? (current-logger) 'debug) - (log-debug (format "Referral ~v results in origin ~v:~n" - referral-id zone-origin)) - (for ([k (set-union (list->set (hash-keys zone)) - (list->set (hash-keys new-zone)))] - #:when (in-bailiwick? k zone-origin)) - (log-debug (format "Old ~v ~v~nNew ~v ~v" - k (hash-ref zone k 'missing) - k (hash-ref new-zone k 'missing)))) - (log-debug "=-=-=-=-=-=")) - (define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr))) - (extend-transition - (retry-question (struct-copy question-state w - [nameservers-tried (set-union nameservers-tried - nameserver-names)] - [zone new-zone] - [retry-count (+ old-retry-count 1)])) - (unsubscribe referral-id))])))] + referral-id) + #:debug-name (list 'network-query q)) + (role referral-id (topic-subscriber (network-reply referral-id (wild))) + #:state w + [(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN + (transition w + (delete-role referral-id) + (send-message (answered-question q #f)))] + [(network-reply (== referral-id) ans) + (define-values (new-zone ignored-timers) (incorporate-complete-answer ans zone)) + (when (log-level? (current-logger) 'debug) + (log-debug (format "Referral ~v results in origin ~v:~n" + referral-id zone-origin)) + (for ([k (set-union (list->set (hash-keys zone)) + (list->set (hash-keys new-zone)))] + #:when (in-bailiwick? k zone-origin)) + (log-debug (format "Old ~v ~v~nNew ~v ~v" + k (hash-ref zone k 'missing) + k (hash-ref new-zone k 'missing)))) + (log-debug "=-=-=-=-=-=")) + (define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr))) + (extend-transition + (retry-question (struct-copy question-state w + [nameservers-tried (set-union nameservers-tried + nameserver-names)] + [zone new-zone] + [retry-count (+ old-retry-count 1)])) + (delete-role referral-id))]))] [(? complete-answer? ans) (transition w (send-message (answered-question q ans)))] [(partial-answer base cnames) @@ -283,17 +286,18 @@ ;; TODO: record chains of CNAMEs to avoid pathologically-long chains (define cname-q (cname-question cname q)) (list (send-message cname-q) - (subscribe/fresh subscription-id - (message-handlers (expanding-cnames q acc remaining) - [(answered-question (== cname-q) ans) - (define new-acc (if ans (merge-answers acc ans) acc)) - (define new-remaining (- remaining 1)) - (define new-w (expanding-cnames q new-acc new-remaining)) - (transition new-w - (unsubscribe subscription-id) - (if (zero? new-remaining) - (send-message (answered-question q new-acc)) - '()))])))) + (role/fresh subscription-id + (topic-subscriber (answered-question cname-q (wild))) + #:state (expanding-cnames q acc remaining) + [(answered-question (== cname-q) ans) + (define new-acc (if ans (merge-answers acc ans) acc)) + (define new-remaining (- remaining 1)) + (define new-w (expanding-cnames q new-acc new-remaining)) + (transition new-w + (delete-role subscription-id) + (if (zero? new-remaining) + (send-message (answered-question q new-acc)) + '()))]))) cnames))])])) (require "test-rrs.rkt")