Introduce subscribe/fresh and use it in some places.

This commit is contained in:
Tony Garnock-Jones 2012-02-08 17:34:44 -05:00
parent fe88c1cbb6
commit a891956867
9 changed files with 97 additions and 94 deletions

View File

@ -42,14 +42,14 @@
(define boot-server
(os-big-bang 'no-state
(send-meta-message `(request create-server-socket (udp new ,port-number 512)))
(subscribe 'wait-for-server-socket
(meta-message-handlers w
[`(reply create-server-socket ,s)
(transition w
(unsubscribe 'wait-for-server-socket)
(spawn (dns-read-driver s))
(spawn (dns-write-driver s))
(subscribe 'packet-handler (packet-handler s)))]))))
(subscribe/fresh wait-id
(meta-message-handlers w
[`(reply create-server-socket ,s)
(transition w
(unsubscribe wait-id)
(spawn (dns-read-driver s))
(spawn (dns-write-driver s))
(subscribe 'packet-handler (packet-handler s)))]))))
(define (packet-handler s)
(message-handlers old-state

View File

@ -1,5 +1,5 @@
(mapcar #'(lambda (x) (put x 'scheme-indent-function 1))
'(transition extend-transition
subscribe unsubscribe
subscribe subscribe/fresh unsubscribe
os-big-bang
message-handlers meta-message-handlers ground-message-handler))

View File

@ -171,11 +171,10 @@
[remaining-addresses (hash-ref known-addresses
current-name)]
[remaining-names remaining-names]))
(let ((subscription-id (list 'nameserver-name-resolution current-name))
(subq (question current-name 'a 'in))) ;; TODO: 'aaaa ?
(let ((subq (question current-name 'a 'in))) ;; TODO: 'aaaa ?
(transition (struct-copy network-query-state w [remaining-names remaining-names])
(send-message subq)
(subscribe subscription-id
(subscribe/fresh subscription-id
(message-handlers w
[(answered-question (== subq) ans)
(define ips

View File

@ -14,16 +14,15 @@
(define read-line-driver-handler
(message-handlers w
[`(request ,reply-addr read-line)
(define sid `(read-line-transaction ,reply-addr))
(transition w
(subscribe sid
(ground-message-handler w
[((list 'read-line reply-addr)
(read-line-evt (current-input-port) 'any)
=> l)
(transition w
(unsubscribe sid)
(send-message `(reply ,reply-addr ,l)))])))]))
(subscribe/fresh sid
(ground-message-handler w
[((list 'read-line reply-addr)
(read-line-evt (current-input-port) 'any)
=> l)
(transition w
(unsubscribe sid)
(send-message `(reply ,reply-addr ,l)))])))]))
;; This should be part of racket
(define (time-evt msecs)
@ -33,14 +32,14 @@
(let loop ((last-tick-time 0) (counter 0))
(define next-time (+ last-tick-time interval))
(subscribe self-sid
(ground-message-handler w
[((list 'timer-alarm next-time)
(time-evt next-time)
=> now)
(transition w
(unsubscribe self-sid)
(send-message `(tick ,counter ,now))
(loop now (+ counter 1)))]))))
(ground-message-handler w
[((list 'timer-alarm next-time)
(time-evt next-time)
=> now)
(transition w
(unsubscribe self-sid)
(send-message `(tick ,counter ,now))
(loop now (+ counter 1)))]))))
(define main
(os-big-bang 'none

View File

@ -8,6 +8,8 @@
(require "os.rkt")
(provide (struct-out subscribe)
subscribe/fresh
(struct-out unsubscribe)
(struct-out send-message)
(struct-out send-meta-message)
@ -68,6 +70,12 @@
;; representation of a suspended world and its active subscriptions.
(struct world (state subscriptions) #:transparent)
(define-syntax subscribe/fresh
(syntax-rules ()
((_ id-binder event-description)
(let ((id-binder (gensym 'id-binder)))
(subscribe id-binder event-description)))))
(define-syntax message-handlers*
(syntax-rules ()
((_ action-constructor old-state-pattern [pattern body ...] ...)

View File

@ -20,18 +20,16 @@
(define (dns-read-driver s)
(os-big-bang 'no-state
(subscribe 'packet-reader
(meta-message-handlers w
[(udp-packet source (== s) body)
(transition w
(send-message
(with-handlers ((exn:fail? (lambda (e)
(bad-dns-packet body source s
'unparseable))))
(define message (packet->dns-message body))
(case (dns-message-direction message)
((request) (dns-request message source s))
((response) (dns-reply message source s))))))]))))
(subscribe 'packet-reader
(meta-message-handlers w
[(udp-packet source (== s) body)
(transition w
(send-message
(with-handlers ((exn:fail? (lambda (e) (bad-dns-packet body source s 'unparseable))))
(define message (packet->dns-message body))
(case (dns-message-direction message)
((request) (dns-request message source s))
((response) (dns-reply message source s))))))]))))
(define (dns-write-driver s)
(define (translate message sink)
@ -39,21 +37,20 @@
(send-message (bad-dns-packet message s sink 'unencodable)))))
(send-meta-message (udp-packet s sink (dns-message->packet message)))))
(os-big-bang 'no-state
(subscribe 'packet-writer
(message-handlers w
[(dns-request message (== s) sink)
(transition w (translate message sink))]
[(dns-reply message (== s) sink)
(transition w (translate message sink))]))))
(subscribe 'packet-writer
(message-handlers w
[(dns-request message (== s) sink) (transition w (translate message sink))]
[(dns-reply message (== s) sink) (transition w (translate message sink))]))))
(require racket/pretty)
(define dns-spy
(os-big-bang 'none
(subscribe 'spy
(message-handlers w
[(dns-request message source sink)
(pretty-display `(DNS (,source asks ,sink)
,@(dns-message-questions message)))]
[(dns-reply message source sink)
(pretty-display `(DNS (,source answers ,sink) ,message))]
[x (write `(DNS ,x)) (newline)]))))
(subscribe 'spy
(message-handlers w
[(dns-request message source sink)
(pretty-display `(DNS (,source asks ,sink) ,@(dns-message-questions message)))]
[(dns-reply message source sink)
(pretty-display `(DNS (,source answers ,sink) ,message))]
[x
(write `(DNS ,x))
(newline)]))))

View File

@ -20,29 +20,30 @@
(define (timer-driver [self-id 'timer-driver])
(os-big-bang 'no-state
(subscribe 'timer-setter
(message-handlers w
[(set-timer label msecs relative?)
(transition w
(subscribe label
(ground-message-handler w
[((list self-id label)
(timer-evt msecs relative?)
=> now)
(transition w
(unsubscribe label)
(send-message (timer-expired label now)))])))]))))
(subscribe 'timer-setter
(message-handlers w
[(set-timer reply-label msecs relative?)
(transition w
(subscribe/fresh label
(ground-message-handler w
[((list self-id label)
(timer-evt msecs relative?)
=> now)
(transition w
(unsubscribe label)
(send-message (timer-expired reply-label now)))])))]))))
(define (timer-relay [self-id 'timer-relay])
(os-big-bang 'no-state
(subscribe 'timer-relay
(message-handlers w
[(set-timer label msecs relative?)
(transition w
(send-meta-message (set-timer (list self-id label) msecs relative?))
(subscribe label
(meta-message-handlers w
[(timer-expired (list (== self-id) (== label)) now)
(transition w
(unsubscribe label)
(send-message (timer-expired label now)))])))]))))
(subscribe 'timer-relay
(message-handlers w
[(set-timer reply-label msecs relative?)
(define timer-id (list self-id reply-label))
(transition w
(send-meta-message (set-timer timer-id msecs relative?))
(subscribe/fresh label
(meta-message-handlers w
[(timer-expired (== timer-id) now)
(transition w
(unsubscribe label)
(send-message (timer-expired reply-label now)))])))]))))

View File

@ -22,17 +22,17 @@
(define echoer
(os-big-bang 'none
(send-message `(request create-echo-socket (udp new 5555 65536)))
(subscribe 'echo-socket-receiver
(message-handlers w
[`(reply create-echo-socket ,sname)
(transition w
(unsubscribe 'echo-socket-receiver)
(subscribe 'packet-handler (packet-handler sname)))]))))
(send-message `(request create-echo-socket (udp new 5555 65536)))
(subscribe/fresh sub
(message-handlers w
[`(reply create-echo-socket ,sname)
(transition w
(unsubscribe sub)
(subscribe 'packet-handler (packet-handler sname)))]))))
(define spy
(os-big-bang 'none
(subscribe 'spy (message-handlers w [x (write `(MESSAGE ,x)) (newline)]))))
(subscribe 'spy (message-handlers w [x (write `(MESSAGE ,x)) (newline)]))))
(define (main)
(ground-vm

View File

@ -35,22 +35,22 @@
(spawn (timer-relay))
(spawn (query-id-allocator))
(send-meta-message `(request create-server-socket (udp new ,port-number 512)))
(subscribe 'wait-for-server-socket
(subscribe/fresh wait-id
(meta-message-handlers w
[`(reply create-server-socket ,s)
(transition w
(unsubscribe 'wait-for-server-socket)
(unsubscribe wait-id)
(send-meta-message
`(request create-client-socket (udp new 0 512)))
(client-socket-waiter s))]))))
(define (client-socket-waiter s)
(subscribe 'wait-for-client-socket
(subscribe/fresh wait-id
(meta-message-handlers w
[`(reply create-client-socket ,c)
(display "Ready.") (newline)
(transition w
(unsubscribe 'wait-for-client-socket)
(unsubscribe wait-id)
(spawn (dns-read-driver s))
(spawn (dns-write-driver s))
(spawn (dns-read-driver c))
@ -134,11 +134,11 @@
with query id ,(dns-message-id request-message))) (newline)
(os-big-bang 'no-state/packet-relay
(send-message original-question)
(subscribe 'wait-for-answer
(subscribe/fresh wait-id
(message-handlers w
[(answered-question (== original-question) answer)
(transition w
(unsubscribe 'wait-for-answer)
(unsubscribe wait-id)
(send-message (answer->reply original-question answer)))])))]))
(define (question-dispatcher zone0 client-sock)
@ -202,10 +202,9 @@
[(partial-answer base cnames)
(transition (expanding-cnames q base (length cnames))
(map (lambda (cname)
(define subscription-id (list 'cname-expander cname))
(define cname-q (question cname (question-type q) (question-class q)))
(list (send-message cname-q)
(subscribe subscription-id
(subscribe/fresh subscription-id
(message-handlers (expanding-cnames q acc remaining)
[(answered-question (== cname-q) ans)
(define new-acc (if ans (merge-answers acc ans) acc))