Progress on Syndicate conversion of proxy.rkt; doesn't run yet
This commit is contained in:
parent
aea344fd81
commit
53af1e0dd5
|
@ -123,27 +123,10 @@
|
|||
|
||||
(define first-timeout 3) ;; seconds
|
||||
|
||||
;; A NetworkRequest is a (network-request UdpAddress Question
|
||||
;; DomainName NEListOf<DomainName> UniqueID) representing the
|
||||
;; parameters used to start and process a network query.
|
||||
(struct network-request (client-socket question zone-origin server-names unique-id) #:transparent)
|
||||
|
||||
;; A NetworkReply is a (network-reply UniqueID Maybe<CompleteAnswer>)
|
||||
;; representing the final result of a network query.
|
||||
(struct network-reply (unique-id answer) #:transparent)
|
||||
|
||||
;; A NetworkQueryState is a (network-query-state NetworkRequest
|
||||
;; Integer Map<DomainName,ListOf<UdpAddress>> ListOf<UdpAddress>
|
||||
;; Maybe<DomainName> ListOf<DomainName>), representing an in-progress
|
||||
;; DNS network query.
|
||||
(struct network-query-state (request
|
||||
timeout
|
||||
known-addresses
|
||||
remaining-addresses
|
||||
current-name
|
||||
remaining-names)
|
||||
#:transparent)
|
||||
|
||||
;; (: next-timeout : Natural -> (Option Natural))
|
||||
(define (next-timeout timeout)
|
||||
(cond
|
||||
|
@ -210,9 +193,9 @@
|
|||
(f (dns-message-additional message))))]
|
||||
[(name-error) #f]
|
||||
[else
|
||||
(log-info (format "Abnormal response-code ~v in response to questions ~v"
|
||||
(log-info "Abnormal response-code ~v in response to questions ~v"
|
||||
(dns-message-response-code message)
|
||||
(dns-message-questions message)))
|
||||
(dns-message-questions message))
|
||||
'bad-answer]))
|
||||
|
||||
;; (: ip->host-name : IPv4 -> String)
|
||||
|
@ -226,152 +209,113 @@
|
|||
|
||||
;; (: network-query : (All (ParentState)
|
||||
;; UdpAddress Question DomainName (Listof DomainName) Any ->
|
||||
;; (Action ParentState)))
|
||||
;; Void))
|
||||
(define (network-query s q zone-origin server-names unique-id)
|
||||
(spawn #:name (list 'network-query q)
|
||||
(try-next-server
|
||||
(network-query-state (network-request s q zone-origin server-names unique-id)
|
||||
first-timeout
|
||||
#hash()
|
||||
'()
|
||||
#f
|
||||
server-names))))
|
||||
(actor*
|
||||
#:name (list 'network-query q)
|
||||
(field [timeout first-timeout]
|
||||
[known-addresses #hash()] ;; Hash DomainName (Listof UdpAddress)
|
||||
[remaining-addresses '()] ;; Listof UdpAddress
|
||||
[current-name #f] ;; Option DomainName
|
||||
[remaining-names server-names]) ;; Listof DomainName
|
||||
|
||||
;; (: try-next-server : NetworkQueryState -> (Transition NetworkQueryState))
|
||||
(define (try-next-server w)
|
||||
(define timeout (network-query-state-timeout w))
|
||||
(if (not timeout)
|
||||
(define (on-answer ans server-ip)
|
||||
(match ans
|
||||
['bad-answer ;; can come from filter-dns-reply
|
||||
(try-next-server)]
|
||||
['lame-delegation ;; can come from filter-dns-reply
|
||||
(log-info "Lame delegation received from ~v (at ~v) in bailiwick ~v in response to ~v"
|
||||
(current-name)
|
||||
server-ip
|
||||
zone-origin
|
||||
q)
|
||||
(when (and (current-name) server-ip)
|
||||
;; Actually remove the offending IP address so it's never tried again.
|
||||
(known-addresses (hash-update (known-addresses)
|
||||
(current-name)
|
||||
(lambda (addrs) (remove server-ip addrs)))))
|
||||
(try-next-server)]
|
||||
[(and (or (? complete-answer?) #f) ans)
|
||||
(send! (network-reply unique-id ans))]))
|
||||
|
||||
(define (try-next-server)
|
||||
(if (not (timeout))
|
||||
;; No more timeouts to try, so give up.
|
||||
(on-answer w (empty-complete-answer) #f)
|
||||
(match w
|
||||
[(network-query-state req _ _ '() _ '())
|
||||
(on-answer (empty-complete-answer) #f)
|
||||
(match (remaining-addresses)
|
||||
['()
|
||||
(match (remaining-names)
|
||||
['()
|
||||
;; No more addresses to try with this timeout. Refill the list
|
||||
;; and bump the timeout and retry.
|
||||
;; TODO: randomize ordering of servers in list. (Don't forget the initial fill.)
|
||||
(try-next-server (struct-copy network-query-state w
|
||||
[timeout (next-timeout timeout)]
|
||||
[remaining-addresses '()]
|
||||
[current-name #f]
|
||||
[remaining-names (network-request-server-names req)]))]
|
||||
[(network-query-state req _ known-addresses '() _ (cons current-name remaining-names))
|
||||
(if (hash-has-key? known-addresses current-name)
|
||||
(try-next-server (struct-copy network-query-state w
|
||||
[remaining-addresses (hash-ref known-addresses current-name)]
|
||||
[current-name current-name]
|
||||
[remaining-names remaining-names]))
|
||||
(let ((subq (ns-question current-name (network-request-question req))))
|
||||
(transition (struct-copy network-query-state w
|
||||
[current-name current-name]
|
||||
[remaining-names remaining-names])
|
||||
(send-message subq)
|
||||
(let-fresh (subq-id)
|
||||
(name-endpoint subq-id
|
||||
(subscriber (answered-question subq (wild))
|
||||
(match-state w
|
||||
(on-message
|
||||
[(answered-question (== subq) ans)
|
||||
(let ((ips (map make-dns-address
|
||||
(set->list (extract-addresses current-name ans)))))
|
||||
(sequence-actions
|
||||
(try-next-server (struct-copy network-query-state w
|
||||
[known-addresses (hash-set known-addresses
|
||||
current-name
|
||||
ips)]
|
||||
[remaining-addresses ips]))
|
||||
(delete-endpoint subq-id)))]))))))))]
|
||||
[(network-query-state req _ _ (cons current-ip remaining-ips) _ _)
|
||||
(timeout (next-timeout (timeout)))
|
||||
(current-name #f)
|
||||
(remaining-names server-names)
|
||||
(try-next-server)]
|
||||
[(cons next-name new-remaining-names)
|
||||
(current-name next-name)
|
||||
(remaining-names new-remaining-names)
|
||||
(if (hash-has-key? (known-addresses) next-name)
|
||||
(begin (remaining-addresses (hash-ref (known-addresses) (current-name)))
|
||||
(try-next-server))
|
||||
(let ((subq (ns-question next-name q)))
|
||||
(react (on-start (send! subq))
|
||||
(stop-when (message (answered-question subq $ans))
|
||||
(define ips
|
||||
(for/list [(a (extract-addresses next-name ans))]
|
||||
(make-dns-address a)))
|
||||
(known-addresses (hash-set (known-addresses) next-name ips))
|
||||
(remaining-addresses ips)
|
||||
(try-next-server)))))])]
|
||||
[(cons current-ip new-remaining-ips)
|
||||
(remaining-addresses new-remaining-ips)
|
||||
(define rpc-id (gensym 'network-query/allocate-query-id))
|
||||
(transition w
|
||||
(send-message `(request ,rpc-id allocate-query-id))
|
||||
(name-endpoint rpc-id
|
||||
(subscriber `(reply ,rpc-id ,(wild))
|
||||
(match-state w
|
||||
(on-message
|
||||
[`(reply ,(== rpc-id) ,(? exact-nonnegative-integer? id))
|
||||
(sequence-actions (send-request (struct-copy network-query-state w
|
||||
[remaining-addresses remaining-ips])
|
||||
id
|
||||
timeout
|
||||
current-ip)
|
||||
(delete-endpoint rpc-id))])))))])))
|
||||
(react (on-start (send! `(request ,rpc-id allocate-query-id)))
|
||||
(stop-when (message `(reply ,rpc-id ,(? exact-nonnegative-integer? $id)))
|
||||
(remaining-addresses new-remaining-ips)
|
||||
(send-request id current-ip)))])))
|
||||
|
||||
;; (: on-answer : NetworkQueryState CheckedAnswer (Option UdpAddress)
|
||||
;; -> (Transition NetworkQueryState))
|
||||
(define (on-answer w ans server-ip)
|
||||
(match ans
|
||||
['bad-answer ;; can come from filter-dns-reply
|
||||
(try-next-server w)]
|
||||
['lame-delegation ;; can come from filter-dns-reply
|
||||
(match-define (network-query-state req _ known-addresses _ current-name _) w)
|
||||
(match-define (network-request _ q zone-origin _ _) req)
|
||||
(log-info (format "Lame delegation received from ~v (at ~v) in bailiwick ~v in response to ~v"
|
||||
current-name
|
||||
server-ip
|
||||
zone-origin
|
||||
q))
|
||||
(try-next-server (if (and current-name server-ip)
|
||||
;; Actually remove the offending IP address so it's never tried again.
|
||||
(struct-copy network-query-state w
|
||||
[known-addresses (hash-update known-addresses
|
||||
current-name
|
||||
(lambda (addrs)
|
||||
(remove server-ip addrs)))])
|
||||
w))]
|
||||
[(and (or (? complete-answer?) #f) ans)
|
||||
(transition w
|
||||
(send-message (network-reply (network-request-unique-id (network-query-state-request w))
|
||||
ans)))]))
|
||||
|
||||
;; (: send-request : NetworkQueryState Nonnegative-Integer Natural UdpAddress
|
||||
;; -> (Transition NetworkQueryState))
|
||||
(define (send-request w query-id timeout server-ip)
|
||||
(match-define (network-request s q zone-origin _ _) (network-query-state-request w))
|
||||
(define (send-request query-id server-ip)
|
||||
(define query (make-dns-query-message q query-id))
|
||||
(define reply-wait-id (list s query-id 'reply-wait))
|
||||
(define timeout-id (list s query-id 'timeout))
|
||||
(define start-time (current-inexact-milliseconds))
|
||||
(log-debug (format "Sending ~v ~v to ~v ~v with ~v seconds of timeout"
|
||||
(log-debug "Sending ~v ~v to ~v ~v with ~v seconds of timeout"
|
||||
q query-id
|
||||
zone-origin server-ip
|
||||
timeout))
|
||||
(transition w
|
||||
(send-message (dns-request query s server-ip))
|
||||
(send-message (set-timer timeout-id (* timeout 1000) 'relative))
|
||||
;; TODO: Restore this to a "join" when proper pattern-unions are implemented
|
||||
(name-endpoint timeout-id
|
||||
(subscriber (timer-expired timeout-id (wild))
|
||||
(match-state w
|
||||
(on-message
|
||||
[(timer-expired (== timeout-id) _)
|
||||
(begin
|
||||
(log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds"
|
||||
(timeout))
|
||||
;; NB: ANALYSIS: Previous implementation of this used a
|
||||
;; quasi-join, where one endpoint deleted the other. Here the two
|
||||
;; stop-when clauses do a similar job. Also, we can pull the
|
||||
;; `release-query-id` send up to an on-stop clause.
|
||||
(react (on-start
|
||||
(send! (dns-request query s server-ip))
|
||||
(send! (set-timer timeout-id (* (timeout) 1000) 'relative)))
|
||||
(on-stop
|
||||
(send! (list 'release-query-id query-id)))
|
||||
(stop-when (message (timer-expired timeout-id _))
|
||||
(log-debug "Timed out ~v ~v to ~v ~v after ~v seconds"
|
||||
q query-id
|
||||
zone-origin server-ip
|
||||
timeout))
|
||||
(sequence-actions (try-next-server w)
|
||||
(delete-endpoint timeout-id)
|
||||
(delete-endpoint reply-wait-id)
|
||||
(send-message (list 'release-query-id query-id))))]))))
|
||||
(name-endpoint reply-wait-id
|
||||
(subscriber (dns-reply (wild) (wild) s)
|
||||
(match-state w
|
||||
(on-message
|
||||
[(dns-reply reply-message source (== s))
|
||||
(timeout))
|
||||
(try-next-server))
|
||||
(stop-when (message (dns-reply
|
||||
(? (lambda (m) (= (dns-message-id m)
|
||||
(dns-message-id query)))
|
||||
$reply-message)
|
||||
$source
|
||||
s))
|
||||
;; TODO: maybe receive only specifically from the queried IP address?
|
||||
(begin
|
||||
(log-debug
|
||||
(format
|
||||
"Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v"
|
||||
(log-debug "Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v"
|
||||
q zone-origin server-ip
|
||||
(inexact->exact (round (- (current-inexact-milliseconds) start-time)))
|
||||
(dns-message-answers reply-message)
|
||||
(dns-message-authorities reply-message)
|
||||
(dns-message-additional reply-message)))
|
||||
(if (not (= (dns-message-id reply-message) (dns-message-id query)))
|
||||
(transition w)
|
||||
(sequence-actions (on-answer w
|
||||
(filter-dns-reply q reply-message zone-origin)
|
||||
server-ip)
|
||||
(delete-endpoint timeout-id)
|
||||
(delete-endpoint reply-wait-id)
|
||||
(send-message (list 'release-query-id query-id)))))]))))))
|
||||
(dns-message-additional reply-message))
|
||||
(on-answer (filter-dns-reply q reply-message zone-origin) server-ip))))
|
||||
|
||||
(try-next-server)))
|
||||
|
||||
(define ((dns-message-id-matches? expected) m)
|
||||
(= (dns-message-id m) expected))
|
||||
|
|
307
proxy.rkt
307
proxy.rkt
|
@ -28,7 +28,7 @@
|
|||
(require "zonedb.rkt")
|
||||
(require "network-query.rkt")
|
||||
(require "resolver.rkt")
|
||||
(require syndicate)
|
||||
(require (except-in syndicate dataspace assert))
|
||||
(require syndicate/actor)
|
||||
(require syndicate/drivers/timer)
|
||||
(require syndicate/drivers/udp)
|
||||
|
@ -49,73 +49,62 @@
|
|||
|
||||
(log-info "Ready.")
|
||||
|
||||
(ground-vm
|
||||
(generic-spy 'UDP)
|
||||
(udp-driver)
|
||||
(timer-driver)
|
||||
(spawn-vm #:debug-name 'dns-vm
|
||||
(name-process 'dns-spy (spawn (dns-spy)))
|
||||
(timer-relay 'timer-relay:dns)
|
||||
(name-process 'query-id-allocator (spawn (query-id-allocator)))
|
||||
(name-process 'server-dns-reader (spawn (dns-read-driver server-addr)))
|
||||
(name-process 'server-dns-writer (spawn (dns-write-driver server-addr)))
|
||||
(name-process 'client-dns-reader (spawn (dns-read-driver client-addr)))
|
||||
(name-process 'client-dns-writer (spawn (dns-write-driver client-addr)))
|
||||
(name-process 'packet-dispatcher (spawn (packet-dispatcher server-addr)))
|
||||
(name-process 'question-dispatcher (spawn (question-dispatcher zone
|
||||
roots-only
|
||||
client-addr))))))
|
||||
(run-ground
|
||||
(spawn-timer-driver)
|
||||
(spawn-udp-driver)
|
||||
(dataspace #:name 'dns-vm
|
||||
(dns-spy)
|
||||
(actor #:name 'timer-relay:dns
|
||||
(on (message (inbound ($ m (timer-expired _ _)))) (send! m))
|
||||
(on (message ($ m (set-timer _ _ _))) (send! (outbound m))))
|
||||
(query-id-allocator)
|
||||
(dns-read-driver server-addr)
|
||||
(dns-write-driver server-addr)
|
||||
(dns-read-driver client-addr)
|
||||
(dns-write-driver client-addr)
|
||||
(packet-dispatcher server-addr)
|
||||
(question-dispatcher zone roots-only client-addr)
|
||||
(forever))))
|
||||
|
||||
;; (: query-id-allocator : -> (Transition (Setof Natural)))
|
||||
;; (: query-id-allocator : -> Void)
|
||||
(define (query-id-allocator)
|
||||
;; TODO: track how many are allocated and throttle requests if too
|
||||
;; many are in flight
|
||||
(transition (set) ;; all active query IDs
|
||||
(subscriber `(request ,(wild) allocate-query-id)
|
||||
(match-state allocated
|
||||
(on-message
|
||||
[`(request ,reply-addr allocate-query-id)
|
||||
(actor #:name 'query-id-allocator
|
||||
(field [allocated (set)])
|
||||
(on (message `(request ,$reply-addr allocate-query-id))
|
||||
(let recheck ()
|
||||
(define n (random 65536))
|
||||
(if (set-member? allocated n)
|
||||
(if (set-member? (allocated) n)
|
||||
(recheck)
|
||||
(transition (set-add allocated n)
|
||||
(send-message `(reply ,reply-addr ,n)))))])))
|
||||
(subscriber `(release-query-id ,(wild))
|
||||
(match-state allocated
|
||||
(on-message
|
||||
[`(release-query-id ,(? exact-nonnegative-integer? n))
|
||||
(transition (set-remove allocated n))])))))
|
||||
(begin (allocated (set-add (allocated) n))
|
||||
(send! `(reply ,reply-addr ,n))))))
|
||||
(on (message `(release-query-id ,(? exact-nonnegative-integer? $n)))
|
||||
(allocated (set-remove (allocated) n)))))
|
||||
|
||||
;; (: packet-dispatcher : UdpAddress -> (Transition (Setof ActiveRequest)))
|
||||
;; (: packet-dispatcher : UdpAddress -> Void)
|
||||
(define (packet-dispatcher s)
|
||||
(transition (set)
|
||||
(subscriber (bad-dns-packet (wild) (wild) (wild) (wild))
|
||||
(on-message [p (begin (log-error (pretty-format p)) '())]))
|
||||
(subscriber (dns-request (wild) (wild) s)
|
||||
(match-state old-active-requests
|
||||
(on-message
|
||||
[(and r (dns-request m source (== s)))
|
||||
(actor #:name 'packet-dispatcher
|
||||
(field [old-active-requests (set)])
|
||||
(on (message ($ p (bad-dns-packet _ _ _ _)))
|
||||
(log-error "~a" (pretty-format p)))
|
||||
(on (message ($ r (dns-request $m $source s)))
|
||||
;; ^ We only listen for requests on our server socket
|
||||
(let ((req-id (active-request source (dns-message-id m))))
|
||||
(define req-id (active-request source (dns-message-id m)))
|
||||
;; TODO: when we have presence/error-handling, remove req-id
|
||||
;; from active requests once request-handler pseudothread exits.
|
||||
(if (set-member? old-active-requests req-id)
|
||||
(transition old-active-requests)
|
||||
(when (not (set-member? (old-active-requests) req-id))
|
||||
;; ^ ignore retransmitted duplicates
|
||||
(transition (set-add old-active-requests req-id)
|
||||
(name-process (list 'packet-relay req-id)
|
||||
(spawn (packet-relay req-id r))))))])))
|
||||
(subscriber (dns-reply (wild) s (wild))
|
||||
(match-state old-active-requests
|
||||
(on-message
|
||||
[(and r (dns-reply m (== s) sink))
|
||||
(let ((req-id (active-request sink (dns-message-id m))))
|
||||
(transition (set-remove old-active-requests req-id)))])))))
|
||||
(old-active-requests (set-add (old-active-requests) req-id))
|
||||
(packet-relay req-id r)))
|
||||
(on (message ($ r (dns-reply $m s $sink)))
|
||||
(define req-id (active-request sink (dns-message-id m)))
|
||||
(old-active-requests (set-remove (old-active-requests) req-id)))))
|
||||
|
||||
;; (: packet-relay : ActiveRequest DNSRequest -> (Transition Void))
|
||||
;; (: packet-relay : ActiveRequest DNSRequest -> Void)
|
||||
(define (packet-relay req-id request)
|
||||
(match-define (dns-request request-message request-source request-sink) request)
|
||||
|
||||
;; (: answer->reply : (Option Question) (Option CompleteAnswer) -> DNSReply)
|
||||
(define (answer->reply q a)
|
||||
(define-values (response-code ns us ds)
|
||||
|
@ -139,68 +128,57 @@
|
|||
ds)
|
||||
request-sink
|
||||
request-source))
|
||||
|
||||
(actor*
|
||||
#:name (list 'packet-relay req-id)
|
||||
|
||||
;; TODO: pay attention to recursion-desired flag
|
||||
(match (dns-message-questions request-message)
|
||||
['()
|
||||
;; No questions!
|
||||
(transition/no-state
|
||||
(send-message (answer->reply #f (empty-complete-answer))))]
|
||||
(send! (answer->reply #f (empty-complete-answer)))]
|
||||
[(cons original-question _)
|
||||
;; At least one question
|
||||
(log-debug (format "Looking up ~v with query id ~v"
|
||||
original-question (dns-message-id request-message)))
|
||||
(transition/no-state
|
||||
(send-message original-question)
|
||||
(let-fresh (wait-id)
|
||||
(name-endpoint wait-id
|
||||
(subscriber (answered-question original-question (wild))
|
||||
(on-message
|
||||
[(answered-question (== original-question) answer)
|
||||
(begin (log-debug (format "Final answer to ~v with query id ~v is ~v"
|
||||
(send! original-question)
|
||||
(react (stop-when (message (answered-question original-question $answer))
|
||||
(log-debug "Final answer to ~v with query id ~v is ~v"
|
||||
original-question
|
||||
(dns-message-id request-message)
|
||||
answer))
|
||||
(list (delete-endpoint wait-id)
|
||||
(send-message (answer->reply original-question answer))))])))))]))
|
||||
answer)
|
||||
(send! (answer->reply original-question answer))))])))
|
||||
|
||||
;; (: glueless-question-handler : CompiledZone Question UdpAddress -> (Transition Void))
|
||||
;; (: glueless-question-handler : CompiledZone Question UdpAddress -> Void)
|
||||
(define (glueless-question-handler roots-only-zone q client-sock)
|
||||
;; Restart q, an overly-glueless question, from the roots.
|
||||
(define restarted-question (restart-question q))
|
||||
(transition/no-state
|
||||
(let-fresh (relay)
|
||||
(name-endpoint relay
|
||||
(subscriber (answered-question restarted-question (wild))
|
||||
(on-message
|
||||
[(answered-question (== restarted-question) ans)
|
||||
(actor #:name (list 'glueless-question-handler q)
|
||||
(stop-when (message (answered-question restarted-question $ans))
|
||||
;; We got the answer to our restarted question; now transform
|
||||
;; it into an answer to the original question, to unblock the
|
||||
;; original questioner.
|
||||
(list (delete-endpoint relay)
|
||||
(send-message (answered-question q ans)))]))))
|
||||
(name-process (list 'glueless-question-handler-inner restarted-question)
|
||||
(spawn (question-handler roots-only-zone restarted-question client-sock)))))
|
||||
(send! (answered-question q ans)))
|
||||
(on-start (question-handler roots-only-zone restarted-question client-sock))))
|
||||
|
||||
;; (: question-dispatcher : CompiledZone CompiledZone UdpAddress -> (Transition CompiledZone))
|
||||
;; (: question-dispatcher : CompiledZone CompiledZone UdpAddress -> Void)
|
||||
(define (question-dispatcher seed-zone roots-only client-sock)
|
||||
;; (: transition-and-set-timers : CompiledZone (Setof (Pairof DomainName Real))
|
||||
;; -> (Transition CompiledZone))
|
||||
(define (transition-and-set-timers new-zone timers)
|
||||
(transition new-zone
|
||||
(define-values (cleaned-seed-zone initial-timers) (zone-expire seed-zone))
|
||||
(actor #:name 'question-dispatcher
|
||||
(field [zone cleaned-seed-zone])
|
||||
(on-start (set-timers! initial-timers))
|
||||
|
||||
(define (set-timers! timers)
|
||||
(for/list ([timerspec timers])
|
||||
(match-define (cons name ttl) timerspec)
|
||||
(send-message (set-timer (list 'check-dns-expiry name) (* ttl 1000) 'relative)))))
|
||||
(define-values (cleaned-seed-zone initial-timers) (zone-expire seed-zone))
|
||||
(sequence-actions (transition-and-set-timers cleaned-seed-zone initial-timers)
|
||||
(send! (set-timer (list 'check-dns-expiry name) (* ttl 1000) 'relative))))
|
||||
|
||||
;; TODO: consider deduping questions here too?
|
||||
(subscriber `(debug-dump)
|
||||
(match-state zone
|
||||
(on-message
|
||||
[`(debug-dump)
|
||||
(begin
|
||||
|
||||
(on (message `(debug-dump))
|
||||
(with-output-to-file "zone-proxy.zone"
|
||||
(lambda ()
|
||||
(write-bytes (bit-string->bytes (zone->bit-string zone))))
|
||||
(write-bytes (bit-string->bytes (zone->bit-string (zone)))))
|
||||
#:mode 'binary
|
||||
#:exists 'replace)
|
||||
(with-output-to-file "zone-proxy.dump"
|
||||
|
@ -208,8 +186,8 @@
|
|||
(display "----------------------------------------------------------------------\n")
|
||||
(display (seconds->date (current-seconds)))
|
||||
(newline)
|
||||
(for ([name (in-hash-keys zone)])
|
||||
(define rrmap (hash-ref zone name))
|
||||
(for ([name (in-hash-keys (zone))])
|
||||
(define rrmap (hash-ref (zone) name))
|
||||
(for ([rr (in-hash-keys rrmap)])
|
||||
(define expiry (hash-ref rrmap rr))
|
||||
(write (list rr expiry))
|
||||
|
@ -225,130 +203,99 @@
|
|||
;; (pretty-write current-ground-transition))
|
||||
;; #:mode 'text
|
||||
;; #:exists 'append)
|
||||
(transition zone))])))
|
||||
(subscriber (question (wild) (wild) (wild) (wild))
|
||||
(match-state zone
|
||||
(on-message
|
||||
[(? question? q)
|
||||
(transition zone
|
||||
)
|
||||
|
||||
(on (message ($ q (question _ _ _ _)))
|
||||
(cond
|
||||
[(question-cyclic? q)
|
||||
(log-warning (format "Cyclic question ~v" q))
|
||||
(send-message (answered-question q (empty-complete-answer)))]
|
||||
(send! (answered-question q (empty-complete-answer)))]
|
||||
[(question-too-glueless? q)
|
||||
(log-warning (format "Overly-glueless question ~v" q))
|
||||
(name-process (list 'glueless-question-handler-outer q)
|
||||
(spawn (glueless-question-handler roots-only q client-sock)))]
|
||||
(glueless-question-handler roots-only q client-sock)]
|
||||
[else
|
||||
(name-process (list 'question-handler q)
|
||||
(spawn (question-handler zone q client-sock)))]))])))
|
||||
(subscriber (network-reply (wild) (wild))
|
||||
(match-state zone
|
||||
(on-message
|
||||
[(network-reply _ answer)
|
||||
(let-values (((new-zone timers) (incorporate-complete-answer answer zone #t)))
|
||||
(transition-and-set-timers new-zone timers))])))
|
||||
(subscriber (timer-expired (list 'check-dns-expiry (wild)) (wild))
|
||||
(match-state zone
|
||||
(on-message
|
||||
[(timer-expired (list 'check-dns-expiry (? domain? name)) (? number? now-msec))
|
||||
(transition (zone-expire-name zone name (/ now-msec 1000.0)))])))))
|
||||
(question-handler (zone) q client-sock)]))
|
||||
|
||||
(struct question-state (zone q client-sock nameservers-tried retry-count) #:transparent)
|
||||
(on (message (network-reply _ $answer))
|
||||
(define-values (new-zone timers) (incorporate-complete-answer answer (zone) #t))
|
||||
(zone new-zone)
|
||||
(set-timers! timers))
|
||||
|
||||
(struct expanding-cnames (q accumulator remaining-count) #:transparent)
|
||||
(on (message (timer-expired (list 'check-dns-expiry (? domain? $name))
|
||||
(? number? $now-msec)))
|
||||
(zone (zone-expire-name (zone) name (/ now-msec 1000.0))))))
|
||||
|
||||
;; (define-type QHState (U QuestionState ExpandingCNAMEs))
|
||||
;; (: send-empty-reply! : Question -> Void)
|
||||
(define (send-empty-reply! q)
|
||||
(send! (answered-question q (empty-complete-answer))))
|
||||
|
||||
;; (: question-handler : CompiledZone Question UdpAddress -> (Transition QHState))
|
||||
(define (question-handler zone q client-sock)
|
||||
(retry-question (question-state zone q client-sock (set) 0)))
|
||||
|
||||
;; (: send-empty-reply : QHState Question -> (Transition QHState))
|
||||
(define (send-empty-reply w q)
|
||||
(transition w (send-message (answered-question q (empty-complete-answer)))))
|
||||
|
||||
;; (: retry-question : QHState -> (Transition QHState))
|
||||
(define (retry-question w)
|
||||
(match w
|
||||
[(question-state _ q _ _ 20) ;; TODO: is this a sensible limit?
|
||||
;; (: question-handler : CompiledZone Question UdpAddress -> Void)
|
||||
(define (question-handler zone0 q client-sock)
|
||||
(actor*
|
||||
#:name (list 'question-handler q)
|
||||
(let retry-question ((zone zone0)
|
||||
(nameservers-tried (set))
|
||||
(retry-count 0))
|
||||
(if (= retry-count 20) ;; TODO: is this a sensible limit?
|
||||
;; Too many retries, i.e. too many referrals.
|
||||
(log-error (format "Too many retries: ~v" w))
|
||||
(send-empty-reply w q)]
|
||||
[(question-state zone q client-sock nameservers-tried old-retry-count)
|
||||
(begin (log-error (format "Too many retries: ~v" q))
|
||||
(send-empty-reply! q))
|
||||
;; Credit remaining. Try once more (perhaps for the first time, in fact).
|
||||
(define resolution-result (resolve-from-zone q zone #f nameservers-tried))
|
||||
(let ((resolution-result (resolve-from-zone q zone #f nameservers-tried)))
|
||||
(log-debug (format "Resolution result: ~v" resolution-result))
|
||||
|
||||
(match resolution-result
|
||||
[#f ;; We're not authoritative so this is just a signal that we can't answer usefully
|
||||
(send-empty-reply w q)]
|
||||
(send-empty-reply! q)]
|
||||
|
||||
[(referral zone-origin nameserver-rrs _)
|
||||
(define referral-id (gensym 'referral))
|
||||
(log-debug (format "Referral for ~v id ~v to ~v servers ~v"
|
||||
q referral-id (domain-labels zone-origin)
|
||||
(map domain-labels (set-map nameserver-rrs rr-rdata-domain-name))))
|
||||
(transition w
|
||||
(network-query client-sock
|
||||
q
|
||||
zone-origin
|
||||
(map rr-rdata-domain-name (set->list nameserver-rrs))
|
||||
referral-id)
|
||||
(name-endpoint referral-id
|
||||
(subscriber (network-reply referral-id (wild))
|
||||
(match-state w
|
||||
(on-message
|
||||
[(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN
|
||||
(transition w
|
||||
(delete-endpoint referral-id)
|
||||
(send-message (answered-question q #f)))]
|
||||
[(network-reply (== referral-id) ans)
|
||||
(let-values (((new-zone ignored-timers)
|
||||
(incorporate-complete-answer ans zone #f)))
|
||||
(react (stop-when
|
||||
(message (network-reply referral-id $ans))
|
||||
(cond [(not ans) ;; name-error/NXDOMAIN
|
||||
(send! (answered-question q #f))]
|
||||
[else
|
||||
(define-values (new-zone _ignored-timers)
|
||||
(incorporate-complete-answer ans zone #f))
|
||||
(when (log-level? (current-logger) 'debug)
|
||||
(log-debug (format "Referral ~v results in origin ~v:~n"
|
||||
referral-id zone-origin))
|
||||
(log-debug "Referral ~v results in origin ~v:~n"
|
||||
referral-id zone-origin)
|
||||
(for ([k (set-union (list->set (hash-keys zone))
|
||||
(list->set (hash-keys new-zone)))]
|
||||
#:when (in-bailiwick? k zone-origin))
|
||||
(log-debug (format "Old ~v ~v~nNew ~v ~v"
|
||||
(log-debug "Old ~v ~v~nNew ~v ~v"
|
||||
k (hash-ref zone k (lambda () 'missing))
|
||||
k (hash-ref new-zone k (lambda () 'missing)))))
|
||||
k (hash-ref new-zone k (lambda () 'missing))))
|
||||
(log-debug "=-=-=-=-=-="))
|
||||
(define nameserver-names
|
||||
(list->set
|
||||
(for/list ([rr nameserver-rrs])
|
||||
(rr-rdata-domain-name rr))))
|
||||
(sequence-actions
|
||||
(retry-question (struct-copy question-state w
|
||||
[nameservers-tried (set-union nameservers-tried
|
||||
nameserver-names)]
|
||||
[zone new-zone]
|
||||
[retry-count (+ old-retry-count 1)]))
|
||||
(delete-endpoint referral-id)))])))))]
|
||||
(for/set ([rr nameserver-rrs]) (rr-rdata-domain-name rr)))
|
||||
(retry-question new-zone
|
||||
(set-union nameservers-tried nameserver-names)
|
||||
(+ retry-count 1))])))]
|
||||
|
||||
[(? complete-answer? ans)
|
||||
(transition w (send-message (answered-question q ans)))]
|
||||
(send! (answered-question q ans))]
|
||||
|
||||
[(partial-answer base cnames)
|
||||
(transition (expanding-cnames q base (length cnames))
|
||||
(map (lambda (cname)
|
||||
;; TODO: record chains of CNAMEs to avoid pathologically-long chains
|
||||
(react (field [acc base]
|
||||
[remaining (length cnames)])
|
||||
(on-start (for [(cname cnames)]
|
||||
(define cname-q (cname-question cname q))
|
||||
(list (send-message cname-q)
|
||||
(let-fresh (subscription-id)
|
||||
(name-endpoint subscription-id
|
||||
(subscriber (answered-question cname-q (wild))
|
||||
(match-state (expanding-cnames q acc remaining)
|
||||
(on-message
|
||||
[(answered-question (== cname-q) ans)
|
||||
(let ()
|
||||
(define new-acc (if ans (merge-answers acc ans) acc))
|
||||
(define new-remaining (- remaining 1))
|
||||
(define new-w (expanding-cnames q new-acc new-remaining))
|
||||
(transition new-w
|
||||
(delete-endpoint subscription-id)
|
||||
(if (zero? new-remaining)
|
||||
(send-message (answered-question q new-acc))
|
||||
'())))])))))))
|
||||
cnames))])]))
|
||||
(react (on-start (send! cname-q))
|
||||
(stop-when (message (answered-question cname-q $ans))
|
||||
(acc (if ans (merge-answers (acc) ans) (acc)))
|
||||
(remaining (- (remaining) 1))))))
|
||||
(stop-when (rising-edge (zero? (remaining)))
|
||||
(send! (answered-question q (acc)))))]))))))
|
||||
|
||||
(require "test-rrs.rkt")
|
||||
(require racket/file)
|
||||
|
|
19
tk-dns.rkt
19
tk-dns.rkt
|
@ -39,31 +39,34 @@
|
|||
|
||||
(define (dns-read-driver s)
|
||||
(actor
|
||||
#:name (list 'dns-read-driver s)
|
||||
(on (message (inbound (udp-packet $source s #"")))
|
||||
(log-info "Debug dump packet received")
|
||||
(send! `(debug-dump)))
|
||||
(on (message (inbound (udp-packet $source s $body)))
|
||||
(when (positive? (bytes-length body))
|
||||
(send!
|
||||
(with-handlers ((exn:fail? (lambda (e)
|
||||
(bad-dns-packet body source s 'unparseable))))
|
||||
(define message (packet->dns-message body))
|
||||
(case (dns-message-direction message)
|
||||
((request) (dns-request message source s))
|
||||
((response) (dns-reply message source s))))))))
|
||||
((response) (dns-reply message source s)))))))))
|
||||
|
||||
(define (translate message s sink)
|
||||
(with-handlers ((exn:fail? (lambda (e)
|
||||
(send! (bad-dns-packet message s sink 'unencodable)))))
|
||||
(send! (outbound (udp-packet s sink (dns-message->packet message))))))
|
||||
(with-handlers ((exn:fail? (lambda (e) (bad-dns-packet message s sink 'unencodable))))
|
||||
(outbound (udp-packet s sink (dns-message->packet message)))))
|
||||
|
||||
(define (dns-write-driver s)
|
||||
(actor (on (message (dns-request $message s $sink))
|
||||
(translate message s sink))
|
||||
(actor #:name (list 'dns-write-driver s)
|
||||
(on (message (dns-request $message s $sink))
|
||||
(send! (translate message s sink)))
|
||||
(on (message (dns-reply $message s $sink))
|
||||
(translate message s sink))))
|
||||
(send! (translate message s sink)))))
|
||||
|
||||
(define (dns-spy)
|
||||
(actor (on (message (dns-request $message $source $sink))
|
||||
(actor #:name 'dns-spy
|
||||
(on (message (dns-request $message $source $sink))
|
||||
(log-info (format "DNS: ~v asks ~v ~v~n : ~v"
|
||||
source sink (dns-message-id message)
|
||||
(dns-message-questions message))))
|
||||
|
|
Loading…
Reference in New Issue