Initial translation of proxy and network-query to os2

This commit is contained in:
Tony Garnock-Jones 2012-05-03 16:00:43 -04:00
parent f466fd6a05
commit f79bf86111
2 changed files with 243 additions and 237 deletions

View File

@ -5,10 +5,10 @@
(require "api.rkt")
(require "codec.rkt")
(require "zonedb.rkt")
(require "../racket-matrix/os-big-bang.rkt")
(require "../racket-matrix/os-udp.rkt")
(require "../racket-matrix/os-timer.rkt")
(require "os-dns.rkt")
(require "../racket-matrix/os2.rkt")
(require "../racket-matrix/os2-udp.rkt")
(require "../racket-matrix/os2-timer.rkt")
(require "os2-dns.rkt")
(provide network-query
(struct-out network-reply))
@ -214,13 +214,13 @@
;; network-query : UdpAddress Question DomainName NEListOf<DomainName> UniqueId -> BootK
(define (network-query s q zone-origin server-names unique-id)
(os-big-bang/transition
(try-next-server (network-query-state (network-request s q zone-origin server-names unique-id)
first-timeout
(hash)
'()
#f
server-names))))
(lambda (self-pid)
(try-next-server (network-query-state (network-request s q zone-origin server-names unique-id)
first-timeout
(hash)
'()
#f
server-names))))
(define (try-next-server w)
(match w
@ -248,31 +248,31 @@
[current-name current-name]
[remaining-names remaining-names])
(send-message subq)
(subscribe/fresh subscription-id
(message-handlers w
[(answered-question (== subq) ans)
(define ips
(map make-dns-address (set->list (extract-addresses current-name ans))))
(extend-transition
(try-next-server (struct-copy network-query-state w
[known-addresses (hash-set known-addresses
current-name
ips)]
[remaining-addresses ips]))
(unsubscribe subscription-id))])))))]
(role/fresh subq-id (topic-subscriber (answered-question subq (wild)))
#:state w
[(answered-question (== subq) ans)
(define ips
(map make-dns-address (set->list (extract-addresses current-name ans))))
(extend-transition
(try-next-server (struct-copy network-query-state w
[known-addresses (hash-set known-addresses
current-name
ips)]
[remaining-addresses ips]))
(delete-role subq-id))]))))]
[(network-query-state req timeout _ (cons current-ip remaining-ips) _ _)
(define rpc-id (gensym 'network-query/allocate-query-id))
(transition w
(send-message `(request ,rpc-id allocate-query-id))
(subscribe rpc-id
(message-handlers w
[`(reply ,(== rpc-id) ,id)
(extend-transition (send-request (struct-copy network-query-state w
[remaining-addresses remaining-ips])
id
timeout
current-ip)
(unsubscribe rpc-id))])))]))
(role rpc-id (topic-subscriber `(reply ,rpc-id ,(wild)))
#:state w
[`(reply ,(== rpc-id) ,id)
(extend-transition (send-request (struct-copy network-query-state w
[remaining-addresses remaining-ips])
id
timeout
current-ip)
(delete-role rpc-id))]))]))
(define (on-answer w ans server-ip)
(match ans
@ -309,30 +309,32 @@
(transition w
(send-message (dns-request query s server-ip))
(send-message (set-timer subscription-id (* timeout 1000) 'relative))
(subscribe subscription-id
(message-handlers w
[(timer-expired (== subscription-id) _)
(log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds"
q query-id
zone-origin server-ip
timeout))
(extend-transition (try-next-server w)
(unsubscribe subscription-id)
(send-message (list 'release-query-id query-id)))]
[(dns-reply reply-message source (== s))
;; TODO: maybe receive only specifically from the queried IP address?
(log-debug
(format
"Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v"
q zone-origin server-ip
(inexact->exact (round (- (current-inexact-milliseconds) start-time)))
(dns-message-answers reply-message)
(dns-message-authorities reply-message)
(dns-message-additional reply-message)))
(if (not (= (dns-message-id reply-message) (dns-message-id query)))
w
(extend-transition (on-answer w
(filter-dns-reply q reply-message zone-origin)
server-ip)
(unsubscribe subscription-id)
(send-message (list 'release-query-id query-id))))]))))
(role subscription-id
(set (topic-subscriber (timer-expired subscription-id (wild)))
(topic-subscriber (dns-reply (wild) (wild) s)))
#:state w
[(timer-expired (== subscription-id) _)
(log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds"
q query-id
zone-origin server-ip
timeout))
(extend-transition (try-next-server w)
(delete-role subscription-id)
(send-message (list 'release-query-id query-id)))]
[(dns-reply reply-message source (== s))
;; TODO: maybe receive only specifically from the queried IP address?
(log-debug
(format
"Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v"
q zone-origin server-ip
(inexact->exact (round (- (current-inexact-milliseconds) start-time)))
(dns-message-answers reply-message)
(dns-message-authorities reply-message)
(dns-message-additional reply-message)))
(if (not (= (dns-message-id reply-message) (dns-message-id query)))
w
(extend-transition (on-answer w
(filter-dns-reply q reply-message zone-origin)
server-ip)
(delete-role subscription-id)
(send-message (list 'release-query-id query-id))))])))

360
proxy.rkt
View File

@ -11,11 +11,10 @@
(require "zonedb.rkt")
(require "network-query.rkt")
(require "resolver.rkt")
(require "../racket-matrix/os.rkt")
(require "../racket-matrix/os-big-bang.rkt")
(require "../racket-matrix/os-udp.rkt")
(require "../racket-matrix/os-timer.rkt")
(require "os-dns.rkt")
(require "../racket-matrix/os2.rkt")
(require "../racket-matrix/os2-udp.rkt")
(require "../racket-matrix/os2-timer.rkt")
(require "os2-dns.rkt")
(require racket/pretty)
@ -27,77 +26,72 @@
;; start-proxy : UInt16 CompiledZone CompiledZone -> Void
(define (start-proxy port-number zone roots-only)
(define server-addr (udp-listener port-number))
(define client-addr (udp-handle 'dns-client))
(define boot-server
(os-big-bang 'no-state/boot-server
(spawn dns-spy)
(spawn (timer-relay 'timer-relay:dns))
(spawn (query-id-allocator))
(send-meta-message `(request create-server-socket (udp new ,port-number 512)))
(subscribe/fresh wait-id
(meta-message-handlers w
[`(reply create-server-socket ,s)
(transition w
(unsubscribe wait-id)
(send-meta-message
`(request create-client-socket (udp new 0 512)))
(client-socket-waiter s))]))))
(log-info "Ready.")
(define (client-socket-waiter s)
(subscribe/fresh wait-id
(meta-message-handlers w
[`(reply create-client-socket ,c)
(log-info "Ready.")
(transition w
(unsubscribe wait-id)
(spawn (dns-read-driver s))
(spawn (dns-write-driver s))
(spawn (dns-read-driver c))
(spawn (dns-write-driver c))
(spawn (packet-dispatcher s))
(spawn (question-dispatcher zone roots-only c)))])))
(ground-vm (os-big-bang (void)
;;(spawn udp-spy)
(spawn udp-driver)
(spawn (timer-driver 'timer-driver))
(spawn (nested-vm boot-server)))))
(ground-vm
(transition 'no-state
;;(spawn udp-spy)
(spawn udp-driver #:debug-name 'udp-driver)
(spawn (timer-driver 'timer-driver) #:debug-name 'timer-driver)
(spawn (nested-vm
'dns-vm
(transition 'no-state
(spawn dns-spy #:debug-name 'dns-spy)
(spawn (timer-relay 'timer-relay:dns) #:debug-name 'timer-relay)
(spawn (query-id-allocator) #:debug-name 'query-id-allocator)
(spawn (dns-read-driver server-addr) #:debug-name 'server-dns-reader)
(spawn (dns-write-driver server-addr) #:debug-name 'server-dns-writer)
(spawn (dns-read-driver client-addr) #:debug-name 'client-dns-reader)
(spawn (dns-write-driver client-addr) #:debug-name 'client-dns-writer)
(spawn (packet-dispatcher server-addr) #:debug-name 'packet-dispatcher)
(spawn (question-dispatcher zone roots-only client-addr)
#:debug-name 'question-dispatcher)))
#:debug-name 'dns-vm))))
(define (query-id-allocator)
;; TODO: track how many are allocated and throttle requests if too
;; many are in flight
(os-big-bang (set) ;; SetOf<UInt16>, all active query IDs
(subscribe 'query-id-request-handler
(message-handlers allocated
[`(request ,reply-addr allocate-query-id)
(let recheck ()
(define n (random 65536))
(if (set-member? allocated n)
(recheck)
(transition (set-add allocated n)
(send-message `(reply ,reply-addr ,n)))))]
[`(release-query-id ,n)
(transition (set-remove allocated n))]))))
(transition (set) ;; SetOf<UInt16>, all active query IDs
(role 'query-id-request-handler (topic-subscriber `(request ,(wild) allocate-query-id))
#:state allocated
[`(request ,reply-addr allocate-query-id)
(let recheck ()
(define n (random 65536))
(if (set-member? allocated n)
(recheck)
(transition (set-add allocated n)
(send-message `(reply ,reply-addr ,n)))))])
(role 'query-id-release-handler (topic-subscriber `(release-query-id ,(wild)))
#:state allocated
[`(release-query-id ,n)
(transition (set-remove allocated n))])))
(define (packet-dispatcher s)
(os-big-bang (set) ;; SetOf<ActiveRequest>
(subscribe 'packet-dispatcher
(message-handlers old-active-requests
[(? bad-dns-packet? p)
(log-error (pretty-format p))
;; TODO: ^ perhaps use metalevel events? perhaps don't bother though
old-active-requests]
[(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket
(define req-id (active-request source (dns-message-id m)))
;; TODO: when we have presence/error-handling, remove req-id
;; from active requests once request-handler pseudothread exits.
(if (set-member? old-active-requests req-id)
old-active-requests ;; ignore retransmitted duplicates
(transition (set-add old-active-requests req-id)
(spawn (packet-relay req-id r))))]
[(and r (dns-reply m (== s) sink))
(define req-id (active-request sink (dns-message-id m)))
(set-remove old-active-requests req-id)]))))
(transition (set) ;; SetOf<ActiveRequest>
(role 'error-logger (topic-subscriber (bad-dns-packet (wild) (wild) (wild) (wild)))
#:state old-active-requests
[p
(log-error (pretty-format p))
;; TODO: ^ perhaps use metalevel events? perhaps don't bother though
old-active-requests])
(role 'request-booter (topic-subscriber (dns-request (wild) (wild) s))
#:state old-active-requests
[(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket
(define req-id (active-request source (dns-message-id m)))
;; TODO: when we have presence/error-handling, remove req-id
;; from active requests once request-handler pseudothread exits.
(if (set-member? old-active-requests req-id)
old-active-requests ;; ignore retransmitted duplicates
(transition (set-add old-active-requests req-id)
(spawn (packet-relay req-id r) #:debug-name (list 'packet-relay req-id))))])
(role 'reply-cleanup (topic-subscriber (dns-reply (wild) s (wild)))
#:state old-active-requests
[(and r (dns-reply m (== s) sink))
(define req-id (active-request sink (dns-message-id m)))
(set-remove old-active-requests req-id)])))
(define (packet-relay req-id request)
(match-define (dns-request request-message request-source request-sink) request)
@ -126,39 +120,40 @@
(match (dns-message-questions request-message)
['()
;; No questions!
(os-big-bang 'no-state/packet-relay
(transition 'no-state/packet-relay
(send-message (answer->reply #f (empty-complete-answer))))]
[(cons original-question _)
;; At least one question
(log-debug (format "Looking up ~v with query id ~v"
original-question (dns-message-id request-message)))
(os-big-bang 'no-state/packet-relay
(transition 'no-state/packet-relay
(send-message original-question)
(subscribe/fresh wait-id
(message-handlers w
[(answered-question (== original-question) answer)
(log-debug (format "Final answer to ~v with query id ~v is ~v"
original-question
(dns-message-id request-message)
answer))
(transition w
(unsubscribe wait-id)
(send-message (answer->reply original-question answer)))])))]))
(role/fresh wait-id (topic-subscriber (answered-question original-question (wild)))
#:state w
[(answered-question (== original-question) answer)
(log-debug (format "Final answer to ~v with query id ~v is ~v"
original-question
(dns-message-id request-message)
answer))
(transition w
(delete-role wait-id)
(send-message (answer->reply original-question answer)))]))]))
(define (glueless-question-handler roots-only-zone q client-sock)
;; Restart q, an overly-glueless question, from the roots.
(define restarted-question (restart-question q))
(os-big-bang 'no-state
(subscribe/fresh relay
(message-handlers w
[(answered-question (== restarted-question) ans)
;; We got the answer to our restarted question; now transform
;; it into an answer to the original question, to unblock the
;; original questioner.
(transition w
(unsubscribe relay)
(send-message (answered-question q ans)))]))
(spawn (question-handler roots-only-zone restarted-question client-sock))))
(transition 'no-state
(role/fresh relay (topic-subscriber (answered-question restarted-question (wild)))
#:state w
[(answered-question (== restarted-question) ans)
;; We got the answer to our restarted question; now transform
;; it into an answer to the original question, to unblock the
;; original questioner.
(transition w
(delete-role relay)
(send-message (answered-question q ans)))])
(spawn (question-handler roots-only-zone restarted-question client-sock)
#:debug-name (list 'glueless-question-handler-inner restarted-question))))
(define (question-dispatcher seed-zone roots-only client-sock)
(define (transition-and-set-timers new-zone timers)
@ -167,60 +162,67 @@
(match-define (cons name ttl) timerspec)
(send-message (set-timer (list 'check-dns-expiry name) (* ttl 1000) 'relative)))))
(define-values (cleaned-seed-zone initial-timers) (zone-expire seed-zone))
(os-big-bang/transition
(extend-transition (transition-and-set-timers cleaned-seed-zone initial-timers)
;; TODO: consider deduping questions here too?
(subscribe 'question-handler-factory
(message-handlers zone
[`(debug-dump)
(with-output-to-file "zone-proxy.zone"
(lambda ()
(write-bytes (bit-string->bytes (zone->bit-string zone))))
#:mode 'binary
#:exists 'replace)
(with-output-to-file "zone-proxy.dump"
(lambda ()
(display "----------------------------------------------------------------------\n")
(display (seconds->date (current-seconds)))
(newline)
(for* ([(name rrmap) zone] [(rr expiry) rrmap])
(write (list rr expiry))
(newline))
(newline))
#:mode 'text
#:exists 'append)
(with-output-to-file "zone-proxy.debug"
(lambda ()
(display "----------------------------------------------------------------------\n")
(display (seconds->date (current-seconds)))
(newline)
(pretty-write current-ground-transition))
#:mode 'text
#:exists 'append)
zone]
[(? question? q)
(transition zone
(cond
[(question-cyclic? q)
(log-warning (format "Cyclic question ~v" q))
(send-message (answered-question q (empty-complete-answer)))]
[(question-too-glueless? q)
(log-warning (format "Overly-glueless question ~v" q))
(spawn (glueless-question-handler roots-only q client-sock))]
[else
(spawn (question-handler zone q client-sock))]))]
[(network-reply _ answer)
(define-values (new-zone timers) (incorporate-complete-answer answer zone))
(transition-and-set-timers new-zone timers)]
[(timer-expired (list 'check-dns-expiry name) now-msec)
(zone-expire-name zone name (/ now-msec 1000.0))])))))
(extend-transition (transition-and-set-timers cleaned-seed-zone initial-timers)
;; TODO: consider deduping questions here too?
(role 'debug-dumper (topic-subscriber `(debug-dump))
#:state zone
[`(debug-dump)
(with-output-to-file "zone-proxy.zone"
(lambda ()
(write-bytes (bit-string->bytes (zone->bit-string zone))))
#:mode 'binary
#:exists 'replace)
(with-output-to-file "zone-proxy.dump"
(lambda ()
(display "----------------------------------------------------------------------\n")
(display (seconds->date (current-seconds)))
(newline)
(for* ([(name rrmap) zone] [(rr expiry) rrmap])
(write (list rr expiry))
(newline))
(newline))
#:mode 'text
#:exists 'append)
(with-output-to-file "zone-proxy.debug"
(lambda ()
(display "----------------------------------------------------------------------\n")
(display (seconds->date (current-seconds)))
(newline)
(pretty-write current-ground-transition))
#:mode 'text
#:exists 'append)
zone])
(role 'question-handler-factory (topic-subscriber (question (wild) (wild) (wild) (wild)))
#:state zone
[(? question? q)
(transition zone
(cond
[(question-cyclic? q)
(log-warning (format "Cyclic question ~v" q))
(send-message (answered-question q (empty-complete-answer)))]
[(question-too-glueless? q)
(log-warning (format "Overly-glueless question ~v" q))
(spawn (glueless-question-handler roots-only q client-sock)
#:debug-name (list 'glueless-question-handler-outer q))]
[else
(spawn (question-handler zone q client-sock)
#:debug-name (list 'question-handler q))]))])
(role 'network-reply-snoop (topic-subscriber (network-reply (wild) (wild)))
#:state zone
[(network-reply _ answer)
(define-values (new-zone timers) (incorporate-complete-answer answer zone))
(transition-and-set-timers new-zone timers)])
(role 'timer-expiry-handler
(topic-subscriber (timer-expired (list 'check-dns-expiry (wild)) (wild)))
#:state zone
[(timer-expired (list 'check-dns-expiry name) now-msec)
(zone-expire-name zone name (/ now-msec 1000.0))])))
(struct question-state (zone q client-sock nameservers-tried retry-count) #:prefab)
(struct expanding-cnames (q accumulator remaining-count) #:prefab)
(define (question-handler zone q client-sock)
(os-big-bang/transition
(retry-question (question-state zone q client-sock (set) 0))))
(retry-question (question-state zone q client-sock (set) 0)))
(define (send-empty-reply w q)
(transition w (send-message (answered-question q (empty-complete-answer)))))
@ -248,33 +250,34 @@
q
zone-origin
(map rr-rdata (set->list nameserver-rrs))
referral-id))
(subscribe referral-id
(message-handlers w
[(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN
(transition w
(unsubscribe referral-id)
(send-message (answered-question q #f)))]
[(network-reply (== referral-id) ans)
(define-values (new-zone ignored-timers) (incorporate-complete-answer ans zone))
(when (log-level? (current-logger) 'debug)
(log-debug (format "Referral ~v results in origin ~v:~n"
referral-id zone-origin))
(for ([k (set-union (list->set (hash-keys zone))
(list->set (hash-keys new-zone)))]
#:when (in-bailiwick? k zone-origin))
(log-debug (format "Old ~v ~v~nNew ~v ~v"
k (hash-ref zone k 'missing)
k (hash-ref new-zone k 'missing))))
(log-debug "=-=-=-=-=-="))
(define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr)))
(extend-transition
(retry-question (struct-copy question-state w
[nameservers-tried (set-union nameservers-tried
nameserver-names)]
[zone new-zone]
[retry-count (+ old-retry-count 1)]))
(unsubscribe referral-id))])))]
referral-id)
#:debug-name (list 'network-query q))
(role referral-id (topic-subscriber (network-reply referral-id (wild)))
#:state w
[(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN
(transition w
(delete-role referral-id)
(send-message (answered-question q #f)))]
[(network-reply (== referral-id) ans)
(define-values (new-zone ignored-timers) (incorporate-complete-answer ans zone))
(when (log-level? (current-logger) 'debug)
(log-debug (format "Referral ~v results in origin ~v:~n"
referral-id zone-origin))
(for ([k (set-union (list->set (hash-keys zone))
(list->set (hash-keys new-zone)))]
#:when (in-bailiwick? k zone-origin))
(log-debug (format "Old ~v ~v~nNew ~v ~v"
k (hash-ref zone k 'missing)
k (hash-ref new-zone k 'missing))))
(log-debug "=-=-=-=-=-="))
(define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr)))
(extend-transition
(retry-question (struct-copy question-state w
[nameservers-tried (set-union nameservers-tried
nameserver-names)]
[zone new-zone]
[retry-count (+ old-retry-count 1)]))
(delete-role referral-id))]))]
[(? complete-answer? ans)
(transition w (send-message (answered-question q ans)))]
[(partial-answer base cnames)
@ -283,17 +286,18 @@
;; TODO: record chains of CNAMEs to avoid pathologically-long chains
(define cname-q (cname-question cname q))
(list (send-message cname-q)
(subscribe/fresh subscription-id
(message-handlers (expanding-cnames q acc remaining)
[(answered-question (== cname-q) ans)
(define new-acc (if ans (merge-answers acc ans) acc))
(define new-remaining (- remaining 1))
(define new-w (expanding-cnames q new-acc new-remaining))
(transition new-w
(unsubscribe subscription-id)
(if (zero? new-remaining)
(send-message (answered-question q new-acc))
'()))]))))
(role/fresh subscription-id
(topic-subscriber (answered-question cname-q (wild)))
#:state (expanding-cnames q acc remaining)
[(answered-question (== cname-q) ans)
(define new-acc (if ans (merge-answers acc ans) acc))
(define new-remaining (- remaining 1))
(define new-w (expanding-cnames q new-acc new-remaining))
(transition new-w
(delete-role subscription-id)
(if (zero? new-remaining)
(send-message (answered-question q new-acc))
'()))])))
cnames))])]))
(require "test-rrs.rkt")