Further porting to typed kernel

This commit is contained in:
Tony Garnock-Jones 2012-10-31 17:48:55 -04:00
parent a8b6d50c36
commit 510ba9d750
2 changed files with 239 additions and 231 deletions

View File

@ -5,10 +5,10 @@
(require "api.rkt") (require "api.rkt")
(require "codec.rkt") (require "codec.rkt")
(require "zonedb.rkt") (require "zonedb.rkt")
(require "../racket-matrix/os2.rkt") (require racket-typed-matrix/sugar-untyped)
(require "../racket-matrix/os2-udp.rkt") (require racket-typed-matrix/drivers/udp-untyped)
(require "../racket-matrix/os2-timer.rkt") (require racket-typed-matrix/drivers/timer-untyped)
(require "os2-dns.rkt") (require "tk-dns.rkt")
(provide network-query (provide network-query
(struct-out network-reply)) (struct-out network-reply))
@ -210,7 +210,7 @@
(format "~a.~a.~a.~a" a b c d)) (format "~a.~a.~a.~a" a b c d))
(define (make-dns-address ip-address) (define (make-dns-address ip-address)
(udp-address (ip->host-name ip-address) 53)) (udp-remote-address (ip->host-name ip-address) 53))
;; network-query : UdpAddress Question DomainName NEListOf<DomainName> UniqueId -> BootK ;; network-query : UdpAddress Question DomainName NEListOf<DomainName> UniqueId -> BootK
(define (network-query s q zone-origin server-names unique-id) (define (network-query s q zone-origin server-names unique-id)
@ -248,32 +248,33 @@
[current-name current-name] [current-name current-name]
[remaining-names remaining-names]) [remaining-names remaining-names])
(send-message subq) (send-message subq)
(role/fresh subq-id (topic-subscriber (answered-question subq (wild))) (endpoint #:subscriber (answered-question subq (wild))
#:state w #:let-name subq-id
[(answered-question (== subq) ans) #:state w
(define ips [(answered-question (== subq) ans)
(map make-dns-address (set->list (extract-addresses current-name ans)))) (let ((ips (map make-dns-address
(sequence-actions (set->list (extract-addresses current-name ans)))))
(try-next-server (struct-copy network-query-state w (sequence-actions
[known-addresses (hash-set known-addresses (try-next-server (struct-copy network-query-state w
current-name [known-addresses (hash-set known-addresses
ips)] current-name
[remaining-addresses ips])) ips)]
(delete-role subq-id))]))))] [remaining-addresses ips]))
(delete-endpoint subq-id)))]))))]
[(network-query-state req timeout _ (cons current-ip remaining-ips) _ _) [(network-query-state req timeout _ (cons current-ip remaining-ips) _ _)
(define rpc-id (gensym 'network-query/allocate-query-id)) (define rpc-id (gensym 'network-query/allocate-query-id))
(transition w (transition w
(send-message `(request ,rpc-id allocate-query-id)) (send-message `(request ,rpc-id allocate-query-id))
(role (topic-subscriber `(reply ,rpc-id ,(wild))) (endpoint #:subscriber `(reply ,rpc-id ,(wild))
#:name rpc-id #:name rpc-id
#:state w #:state w
[`(reply ,(== rpc-id) ,id) [`(reply ,(== rpc-id) ,id)
(sequence-actions (send-request (struct-copy network-query-state w (sequence-actions (send-request (struct-copy network-query-state w
[remaining-addresses remaining-ips]) [remaining-addresses remaining-ips])
id id
timeout timeout
current-ip) current-ip)
(delete-role rpc-id))]))])) (delete-endpoint rpc-id))]))]))
(define (on-answer w ans server-ip) (define (on-answer w ans server-ip)
(match ans (match ans
@ -301,7 +302,8 @@
(define (send-request w query-id timeout server-ip) (define (send-request w query-id timeout server-ip)
(match-define (network-request s q zone-origin _ _) (network-query-state-request w)) (match-define (network-request s q zone-origin _ _) (network-query-state-request w))
(define query (make-dns-query-message q query-id)) (define query (make-dns-query-message q query-id))
(define subscription-id (list s query-id)) (define reply-wait-id (list s query-id 'reply-wait))
(define timeout-id (list s query-id 'timeout))
(define start-time (current-inexact-milliseconds)) (define start-time (current-inexact-milliseconds))
(log-debug (format "Sending ~v ~v to ~v ~v with ~v seconds of timeout" (log-debug (format "Sending ~v ~v to ~v ~v with ~v seconds of timeout"
q query-id q query-id
@ -309,33 +311,40 @@
timeout)) timeout))
(transition w (transition w
(send-message (dns-request query s server-ip)) (send-message (dns-request query s server-ip))
(send-message (set-timer subscription-id (* timeout 1000) 'relative)) (send-message (set-timer timeout-id (* timeout 1000) 'relative))
(role (set (topic-subscriber (timer-expired subscription-id (wild))) ;; TODO: Restore this to a "join" when proper pattern-unions are implemented
(topic-subscriber (dns-reply (wild) (wild) s))) (endpoint #:subscriber (timer-expired timeout-id (wild))
#:name subscription-id #:name timeout-id
#:state w #:state w
[(timer-expired (== subscription-id) _) [(timer-expired (== timeout-id) _)
(log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds" (begin
q query-id (log-debug (format "Timed out ~v ~v to ~v ~v after ~v seconds"
zone-origin server-ip q query-id
timeout)) zone-origin server-ip
(sequence-actions (try-next-server w) timeout))
(delete-role subscription-id) (sequence-actions (try-next-server w)
(send-message (list 'release-query-id query-id)))] (delete-endpoint timeout-id)
[(dns-reply reply-message source (== s)) (delete-endpoint reply-wait-id)
;; TODO: maybe receive only specifically from the queried IP address? (send-message (list 'release-query-id query-id))))])
(log-debug (endpoint #:subscriber (dns-reply (wild) (wild) s)
(format #:name reply-wait-id
"Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v" #:state w
q zone-origin server-ip [(dns-reply reply-message source (== s))
(inexact->exact (round (- (current-inexact-milliseconds) start-time))) ;; TODO: maybe receive only specifically from the queried IP address?
(dns-message-answers reply-message) (begin
(dns-message-authorities reply-message) (log-debug
(dns-message-additional reply-message))) (format
(if (not (= (dns-message-id reply-message) (dns-message-id query))) "Answer to ~v from ~v ~v in ~v ms~n-- Answers: ~v~n-- Authorities: ~v~n-- Additional: ~v"
(transition w) q zone-origin server-ip
(sequence-actions (on-answer w (inexact->exact (round (- (current-inexact-milliseconds) start-time)))
(filter-dns-reply q reply-message zone-origin) (dns-message-answers reply-message)
server-ip) (dns-message-authorities reply-message)
(delete-role subscription-id) (dns-message-additional reply-message)))
(send-message (list 'release-query-id query-id))))]))) (if (not (= (dns-message-id reply-message) (dns-message-id query)))
(transition w)
(sequence-actions (on-answer w
(filter-dns-reply q reply-message zone-origin)
server-ip)
(delete-endpoint timeout-id)
(delete-endpoint reply-wait-id)
(send-message (list 'release-query-id query-id)))))])))

345
proxy.rkt
View File

@ -11,10 +11,11 @@
(require "zonedb.rkt") (require "zonedb.rkt")
(require "network-query.rkt") (require "network-query.rkt")
(require "resolver.rkt") (require "resolver.rkt")
(require "../racket-matrix/os2.rkt") (require racket-typed-matrix/sugar-untyped)
(require "../racket-matrix/os2-udp.rkt") (require racket-typed-matrix/support/spy)
(require "../racket-matrix/os2-timer.rkt") (require racket-typed-matrix/drivers/timer-untyped)
(require "os2-dns.rkt") (require racket-typed-matrix/drivers/udp-untyped)
(require "tk-dns.rkt")
(require racket/pretty) (require racket/pretty)
@ -32,65 +33,59 @@
(log-info "Ready.") (log-info "Ready.")
(ground-vm (ground-vm
(transition 'no-state (generic-spy 'UDP)
;;(spawn udp-spy) (udp-driver)
(spawn udp-driver #:debug-name 'udp-driver) (timer-driver)
(spawn (timer-driver 'timer-driver) #:debug-name 'timer-driver) (nested-vm #:debug-name 'dns-vm
(spawn (nested-vm #:debug-name 'dns-vm (dns-spy)
(transition 'no-state (timer-relay 'timer-relay:dns)
(spawn dns-spy #:debug-name 'dns-spy) (spawn #:debug-name 'query-id-allocator #:child (query-id-allocator))
(spawn (timer-relay 'timer-relay:dns) #:debug-name 'timer-relay) (spawn #:debug-name 'server-dns-reader #:child (dns-read-driver server-addr))
(spawn (query-id-allocator) #:debug-name 'query-id-allocator) (spawn #:debug-name 'server-dns-writer #:child (dns-write-driver server-addr))
(spawn (dns-read-driver server-addr) #:debug-name 'server-dns-reader) (spawn #:debug-name 'client-dns-reader #:child (dns-read-driver client-addr))
(spawn (dns-write-driver server-addr) #:debug-name 'server-dns-writer) (spawn #:debug-name 'client-dns-writer #:child (dns-write-driver client-addr))
(spawn (dns-read-driver client-addr) #:debug-name 'client-dns-reader) (spawn #:debug-name 'packet-dispatcher #:child (packet-dispatcher server-addr))
(spawn (dns-write-driver client-addr) #:debug-name 'client-dns-writer) (spawn #:debug-name 'question-dispatcher
(spawn (packet-dispatcher server-addr) #:debug-name 'packet-dispatcher) #:child (question-dispatcher zone roots-only client-addr)))))
(spawn (question-dispatcher zone roots-only client-addr)
#:debug-name 'question-dispatcher)))
#:debug-name 'dns-vm))))
(define (query-id-allocator) (define (query-id-allocator)
;; TODO: track how many are allocated and throttle requests if too ;; TODO: track how many are allocated and throttle requests if too
;; many are in flight ;; many are in flight
(transition (set) ;; SetOf<UInt16>, all active query IDs (transition (set) ;; SetOf<UInt16>, all active query IDs
(role (topic-subscriber `(request ,(wild) allocate-query-id)) (endpoint #:subscriber `(request ,(wild) allocate-query-id)
#:state allocated #:state allocated
[`(request ,reply-addr allocate-query-id) [`(request ,reply-addr allocate-query-id)
(let recheck () (let recheck ()
(define n (random 65536)) (define n (random 65536))
(if (set-member? allocated n) (if (set-member? allocated n)
(recheck) (recheck)
(transition (set-add allocated n) (transition (set-add allocated n)
(send-message `(reply ,reply-addr ,n)))))]) (send-message `(reply ,reply-addr ,n)))))])
(role (topic-subscriber `(release-query-id ,(wild))) (endpoint #:subscriber `(release-query-id ,(wild))
#:state allocated #:state allocated
[`(release-query-id ,n) [`(release-query-id ,n)
(transition (set-remove allocated n))]))) (transition (set-remove allocated n))])))
(define (packet-dispatcher s) (define (packet-dispatcher s)
(transition (set) ;; SetOf<ActiveRequest> (transition (set) ;; SetOf<ActiveRequest>
(role (topic-subscriber (bad-dns-packet (wild) (wild) (wild) (wild))) (endpoint #:subscriber (bad-dns-packet (wild) (wild) (wild) (wild))
#:state old-active-requests [p (begin (log-error (pretty-format p)) '())])
[p (endpoint #:subscriber (dns-request (wild) (wild) s)
(log-error (pretty-format p)) #:state old-active-requests
;; TODO: ^ perhaps use metalevel events? perhaps don't bother though [(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket
(transition old-active-requests)]) (let ((req-id (active-request source (dns-message-id m))))
(role (topic-subscriber (dns-request (wild) (wild) s)) ;; TODO: when we have presence/error-handling, remove req-id
#:state old-active-requests ;; from active requests once request-handler pseudothread exits.
[(and r (dns-request m source (== s))) ;; We only listen for requests on our server socket (if (set-member? old-active-requests req-id)
(define req-id (active-request source (dns-message-id m))) (transition old-active-requests) ;; ignore retransmitted duplicates
;; TODO: when we have presence/error-handling, remove req-id (transition (set-add old-active-requests req-id)
;; from active requests once request-handler pseudothread exits. (spawn #:debug-name (list 'packet-relay req-id)
(if (set-member? old-active-requests req-id) #:child (packet-relay req-id r)))))])
(transition old-active-requests) ;; ignore retransmitted duplicates (endpoint #:subscriber (dns-reply (wild) s (wild))
(transition (set-add old-active-requests req-id) #:state old-active-requests
(spawn (packet-relay req-id r) #:debug-name (list 'packet-relay req-id))))]) [(and r (dns-reply m (== s) sink))
(role (topic-subscriber (dns-reply (wild) s (wild))) (let ((req-id (active-request sink (dns-message-id m))))
#:state old-active-requests (transition (set-remove old-active-requests req-id)))])))
[(and r (dns-reply m (== s) sink))
(define req-id (active-request sink (dns-message-id m)))
(transition (set-remove old-active-requests req-id))])))
(define (packet-relay req-id request) (define (packet-relay req-id request)
(match-define (dns-request request-message request-source request-sink) request) (match-define (dns-request request-message request-source request-sink) request)
@ -127,32 +122,34 @@
original-question (dns-message-id request-message))) original-question (dns-message-id request-message)))
(transition 'no-state/packet-relay (transition 'no-state/packet-relay
(send-message original-question) (send-message original-question)
(role/fresh wait-id (topic-subscriber (answered-question original-question (wild))) (endpoint #:subscriber (answered-question original-question (wild))
#:state w #:state w
[(answered-question (== original-question) answer) #:let-name wait-id
(log-debug (format "Final answer to ~v with query id ~v is ~v" [(answered-question (== original-question) answer)
original-question (begin (log-debug (format "Final answer to ~v with query id ~v is ~v"
(dns-message-id request-message) original-question
answer)) (dns-message-id request-message)
(transition w answer))
(delete-role wait-id) (transition w
(send-message (answer->reply original-question answer)))]))])) (delete-endpoint wait-id)
(send-message (answer->reply original-question answer))))]))]))
(define (glueless-question-handler roots-only-zone q client-sock) (define (glueless-question-handler roots-only-zone q client-sock)
;; Restart q, an overly-glueless question, from the roots. ;; Restart q, an overly-glueless question, from the roots.
(define restarted-question (restart-question q)) (define restarted-question (restart-question q))
(transition 'no-state (transition 'no-state
(role/fresh relay (topic-subscriber (answered-question restarted-question (wild))) (endpoint #:subscriber (answered-question restarted-question (wild))
#:state w #:state w
[(answered-question (== restarted-question) ans) #:let-name relay
;; We got the answer to our restarted question; now transform [(answered-question (== restarted-question) ans)
;; it into an answer to the original question, to unblock the ;; We got the answer to our restarted question; now transform
;; original questioner. ;; it into an answer to the original question, to unblock the
(transition w ;; original questioner.
(delete-role relay) (transition w
(send-message (answered-question q ans)))]) (delete-endpoint relay)
(spawn (question-handler roots-only-zone restarted-question client-sock) (send-message (answered-question q ans)))])
#:debug-name (list 'glueless-question-handler-inner restarted-question)))) (spawn #:debug-name (list 'glueless-question-handler-inner restarted-question)
#:child (question-handler roots-only-zone restarted-question client-sock))))
(define (question-dispatcher seed-zone roots-only client-sock) (define (question-dispatcher seed-zone roots-only client-sock)
(define (transition-and-set-timers new-zone timers) (define (transition-and-set-timers new-zone timers)
@ -163,58 +160,59 @@
(define-values (cleaned-seed-zone initial-timers) (zone-expire seed-zone)) (define-values (cleaned-seed-zone initial-timers) (zone-expire seed-zone))
(sequence-actions (transition-and-set-timers cleaned-seed-zone initial-timers) (sequence-actions (transition-and-set-timers cleaned-seed-zone initial-timers)
;; TODO: consider deduping questions here too? ;; TODO: consider deduping questions here too?
(role (topic-subscriber `(debug-dump)) (endpoint #:subscriber `(debug-dump)
#:state zone #:state zone
[`(debug-dump) [`(debug-dump)
(with-output-to-file "zone-proxy.zone" (begin
(lambda () (with-output-to-file "zone-proxy.zone"
(write-bytes (bit-string->bytes (zone->bit-string zone)))) (lambda ()
#:mode 'binary (write-bytes (bit-string->bytes (zone->bit-string zone))))
#:exists 'replace) #:mode 'binary
(with-output-to-file "zone-proxy.dump" #:exists 'replace)
(lambda () (with-output-to-file "zone-proxy.dump"
(display "----------------------------------------------------------------------\n") (lambda ()
(display (seconds->date (current-seconds))) (display "----------------------------------------------------------------------\n")
(newline) (display (seconds->date (current-seconds)))
(for* ([(name rrmap) zone] [(rr expiry) rrmap]) (newline)
(write (list rr expiry)) (for* ([(name rrmap) zone] [(rr expiry) rrmap])
(newline)) (write (list rr expiry))
(newline)) (newline))
#:mode 'text (newline))
#:exists 'append) #:mode 'text
(with-output-to-file "zone-proxy.debug" #:exists 'append)
(lambda () ;; (with-output-to-file "zone-proxy.debug"
(display "----------------------------------------------------------------------\n") ;; (lambda ()
(display (seconds->date (current-seconds))) ;; (display "----------------------------------------------------------------------\n")
(newline) ;; (display (seconds->date (current-seconds)))
(pretty-write current-ground-transition)) ;; (newline)
#:mode 'text ;; (pretty-write current-ground-transition))
#:exists 'append) ;; #:mode 'text
(transition zone)]) ;; #:exists 'append)
(role (topic-subscriber (question (wild) (wild) (wild) (wild))) '())])
#:state zone (endpoint #:subscriber (question (wild) (wild) (wild) (wild))
[(? question? q) #:state zone
(transition zone [(? question? q)
(cond (transition zone
[(question-cyclic? q) (cond
(log-warning (format "Cyclic question ~v" q)) [(question-cyclic? q)
(send-message (answered-question q (empty-complete-answer)))] (log-warning (format "Cyclic question ~v" q))
[(question-too-glueless? q) (send-message (answered-question q (empty-complete-answer)))]
(log-warning (format "Overly-glueless question ~v" q)) [(question-too-glueless? q)
(spawn (glueless-question-handler roots-only q client-sock) (log-warning (format "Overly-glueless question ~v" q))
#:debug-name (list 'glueless-question-handler-outer q))] (spawn #:debug-name (list 'glueless-question-handler-outer q)
[else #:child (glueless-question-handler roots-only q client-sock))]
(spawn (question-handler zone q client-sock) [else
#:debug-name (list 'question-handler q))]))]) (spawn #:debug-name (list 'question-handler q)
(role (topic-subscriber (network-reply (wild) (wild))) #:child (question-handler zone q client-sock))]))])
#:state zone (endpoint #:subscriber (network-reply (wild) (wild))
[(network-reply _ answer) #:state zone
(define-values (new-zone timers) (incorporate-complete-answer answer zone)) [(network-reply _ answer)
(transition-and-set-timers new-zone timers)]) (let-values (((new-zone timers) (incorporate-complete-answer answer zone)))
(role (topic-subscriber (timer-expired (list 'check-dns-expiry (wild)) (wild))) (transition-and-set-timers new-zone timers))])
#:state zone (endpoint #:subscriber (timer-expired (list 'check-dns-expiry (wild)) (wild))
[(timer-expired (list 'check-dns-expiry name) now-msec) #:state zone
(transition (zone-expire-name zone name (/ now-msec 1000.0)))]))) [(timer-expired (list 'check-dns-expiry name) now-msec)
(transition (zone-expire-name zone name (/ now-msec 1000.0)))])))
(struct question-state (zone q client-sock nameservers-tried retry-count) #:prefab) (struct question-state (zone q client-sock nameservers-tried retry-count) #:prefab)
(struct expanding-cnames (q accumulator remaining-count) #:prefab) (struct expanding-cnames (q accumulator remaining-count) #:prefab)
@ -244,39 +242,39 @@
q referral-id (domain-labels zone-origin) q referral-id (domain-labels zone-origin)
(map domain-labels (set-map nameserver-rrs rr-rdata)))) (map domain-labels (set-map nameserver-rrs rr-rdata))))
(transition w (transition w
(spawn (network-query client-sock (spawn #:debug-name (list 'network-query q)
q #:child (network-query client-sock
zone-origin q
(map rr-rdata (set->list nameserver-rrs)) zone-origin
referral-id) (map rr-rdata (set->list nameserver-rrs))
#:debug-name (list 'network-query q)) referral-id))
(role (topic-subscriber (network-reply referral-id (wild))) (endpoint #:subscriber (network-reply referral-id (wild))
#:name referral-id #:name referral-id
#:state w #:state w
[(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN [(network-reply (== referral-id) #f) ;; name-error/NXDOMAIN
(transition w (transition w
(delete-role referral-id) (delete-endpoint referral-id)
(send-message (answered-question q #f)))] (send-message (answered-question q #f)))]
[(network-reply (== referral-id) ans) [(network-reply (== referral-id) ans)
(define-values (new-zone ignored-timers) (incorporate-complete-answer ans zone)) (let-values (((new-zone ignored-timers) (incorporate-complete-answer ans zone)))
(when (log-level? (current-logger) 'debug) (when (log-level? (current-logger) 'debug)
(log-debug (format "Referral ~v results in origin ~v:~n" (log-debug (format "Referral ~v results in origin ~v:~n"
referral-id zone-origin)) referral-id zone-origin))
(for ([k (set-union (list->set (hash-keys zone)) (for ([k (set-union (list->set (hash-keys zone))
(list->set (hash-keys new-zone)))] (list->set (hash-keys new-zone)))]
#:when (in-bailiwick? k zone-origin)) #:when (in-bailiwick? k zone-origin))
(log-debug (format "Old ~v ~v~nNew ~v ~v" (log-debug (format "Old ~v ~v~nNew ~v ~v"
k (hash-ref zone k 'missing) k (hash-ref zone k 'missing)
k (hash-ref new-zone k 'missing)))) k (hash-ref new-zone k 'missing))))
(log-debug "=-=-=-=-=-=")) (log-debug "=-=-=-=-=-="))
(define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr))) (define nameserver-names (for/set ([rr nameserver-rrs]) (rr-rdata rr)))
(sequence-actions (sequence-actions
(retry-question (struct-copy question-state w (retry-question (struct-copy question-state w
[nameservers-tried (set-union nameservers-tried [nameservers-tried (set-union nameservers-tried
nameserver-names)] nameserver-names)]
[zone new-zone] [zone new-zone]
[retry-count (+ old-retry-count 1)])) [retry-count (+ old-retry-count 1)]))
(delete-role referral-id))]))] (delete-endpoint referral-id)))]))]
[(? complete-answer? ans) [(? complete-answer? ans)
(transition w (send-message (answered-question q ans)))] (transition w (send-message (answered-question q ans)))]
[(partial-answer base cnames) [(partial-answer base cnames)
@ -285,18 +283,19 @@
;; TODO: record chains of CNAMEs to avoid pathologically-long chains ;; TODO: record chains of CNAMEs to avoid pathologically-long chains
(define cname-q (cname-question cname q)) (define cname-q (cname-question cname q))
(list (send-message cname-q) (list (send-message cname-q)
(role/fresh subscription-id (endpoint #:subscriber (answered-question cname-q (wild))
(topic-subscriber (answered-question cname-q (wild))) #:state (expanding-cnames q acc remaining)
#:state (expanding-cnames q acc remaining) #:let-name subscription-id
[(answered-question (== cname-q) ans) [(answered-question (== cname-q) ans)
(define new-acc (if ans (merge-answers acc ans) acc)) (let ()
(define new-remaining (- remaining 1)) (define new-acc (if ans (merge-answers acc ans) acc))
(define new-w (expanding-cnames q new-acc new-remaining)) (define new-remaining (- remaining 1))
(transition new-w (define new-w (expanding-cnames q new-acc new-remaining))
(delete-role subscription-id) (transition new-w
(if (zero? new-remaining) (delete-endpoint subscription-id)
(send-message (answered-question q new-acc)) (if (zero? new-remaining)
'()))]))) (send-message (answered-question q new-acc))
'())))])))
cnames))])])) cnames))])]))
(require "test-rrs.rkt") (require "test-rrs.rkt")