Make handlers etc *required* to return a transition structure.

This commit is contained in:
Tony Garnock-Jones 2012-07-23 17:21:12 -04:00
parent 7d7cfc9738
commit 1c42aea271
12 changed files with 52 additions and 72 deletions

View File

@ -13,7 +13,7 @@
#:name id #:name id
#:state state #:state state
[(timer-expired (== id) now) [(timer-expired (== id) now)
(sequence-actions state (sequence-actions (transition state)
k k
(delete-role id))]))) (delete-role id))])))
@ -32,7 +32,7 @@
(define spy (define spy
(lambda (spy-pid) (lambda (spy-pid)
(define (hs label) (define (hs label)
(define ((w kind) . args) (write `(,label ,kind ,@args)) (newline) values) (define ((w kind) . args) (write `(,label ,kind ,@args)) (newline) transition)
(handlers (w 'presence) (handlers (w 'presence)
(w 'absence) (w 'absence)
(w 'message))) (w 'message)))

View File

@ -7,7 +7,7 @@
(define (spy spy-label) (define (spy spy-label)
(lambda (spy-pid) (lambda (spy-pid)
(define (hs label) (define (hs label)
(define ((w kind) . args) (write `(,spy-label ,label ,kind ,@args)) (newline) values) (define ((w kind) . args) (write `(,spy-label ,label ,kind ,@args)) (newline) transition)
(handlers (w 'presence) (handlers (w 'presence)
(w 'absence) (w 'absence)
(w 'message))) (w 'message)))

View File

@ -11,14 +11,11 @@
(define ((connection-handler local-addr remote-addr) self-pid) (define ((connection-handler local-addr remote-addr) self-pid)
(define (reader-role) (define (reader-role)
(role (topic-subscriber (tcp-channel remote-addr local-addr (wild))) (role (topic-subscriber (tcp-channel remote-addr local-addr (wild)))
#:state state
[(tcp-channel remote _ (? bytes? bs)) [(tcp-channel remote _ (? bytes? bs))
(transition state (list (send-tcp-credit remote-addr local-addr 16)
(send-tcp-credit remote-addr local-addr 16) (send-message (tcp-channel local-addr remote-addr bs)))]
(send-message (tcp-channel local-addr remote-addr bs)))]
[(tcp-channel remote _ (? eof-object?)) [(tcp-channel remote _ (? eof-object?))
(transition state (kill)]))
(kill))]))
(transition 'no-state (transition 'no-state
(send-tcp-credit remote-addr local-addr 16) (send-tcp-credit remote-addr local-addr 16)
@ -27,20 +24,18 @@
(define (listener local-addr) (define (listener local-addr)
(transition 'no-state (transition 'no-state
(role (topic-subscriber (tcp-channel (wild) local-addr (wild)) #:monitor? #t) (role (topic-subscriber (tcp-channel (wild) local-addr (wild)) #:monitor? #t)
#:state state
#:topic t #:topic t
#:on-presence (match t #:on-presence (match t
[(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f) [(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f)
(transition state (spawn (connection-handler local-addr remote-addr)))])))) (spawn (connection-handler local-addr remote-addr))]))))
(define (main port) (define (main port)
(define (arm-timer) (define (arm-timer)
(role (topic-subscriber (timer-expired (wild) (wild))) (role (topic-subscriber (timer-expired (wild) (wild)))
#:name 'waiter #:name 'waiter
#:state state #:on-presence (send-message (set-timer 'label 500 'relative))
#:on-presence (transition state (send-message (set-timer 'label 500 'relative))) [(timer-expired _ _) (list (delete-role 'waiter)
[(timer-expired _ _) (arm-timer))]))
(transition state (delete-role 'waiter) (arm-timer))]))
(ground-vm (ground-vm
(transition 'none (transition 'none
(spawn tcp-spy) (spawn tcp-spy)

View File

@ -23,13 +23,14 @@
(define (connection-handler local-addr) (define (connection-handler local-addr)
(transition (set) ;; of remote TcpAddresses (transition (set) ;; of remote TcpAddresses
(role (topic-publisher (tcp-channel local-addr (wild) (wild))))
(role (topic-subscriber (tcp-channel (wild) local-addr (wild))) (role (topic-subscriber (tcp-channel (wild) local-addr (wild)))
#:state active-remotes #:state active-remotes
#:topic t #:topic t
#:on-presence (match t #:on-presence (match t
[(topic 'publisher (tcp-channel (== local-addr) _ _) #f) [(topic 'publisher (tcp-channel (== local-addr) _ _) #f)
;; Ignore loopback flow. ;; Ignore loopback flow.
active-remotes] (transition active-remotes)]
[(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f) [(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f)
(write `(arrived ,remote-addr)) (newline) (write `(arrived ,remote-addr)) (newline)
(transition (set-add active-remotes remote-addr) (transition (set-add active-remotes remote-addr)
@ -40,11 +41,11 @@
(send-tcp-credit remote-addr local-addr 1))]) (send-tcp-credit remote-addr local-addr 1))])
#:on-absence (match t #:on-absence (match t
[(topic 'publisher (tcp-channel (== local-addr) _ _) #f) [(topic 'publisher (tcp-channel (== local-addr) _ _) #f)
;; Ignore loopback flow. ;; Ignore loopback flow.
active-remotes] (transition active-remotes)]
[(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f) [(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f)
(write `(departed ,remote-addr)) (newline) (write `(departed ,remote-addr)) (newline)
(set-remove active-remotes remote-addr)]) (transition (set-remove active-remotes remote-addr))])
[(tcp-channel remote-addr (== local-addr) (? eof-object?)) [(tcp-channel remote-addr (== local-addr) (? eof-object?))
(define new-active-remotes (set-remove active-remotes remote-addr)) (define new-active-remotes (set-remove active-remotes remote-addr))
(transition new-active-remotes (transition new-active-remotes
@ -53,9 +54,7 @@
[(tcp-channel remote-addr (== local-addr) (? bytes? bs)) [(tcp-channel remote-addr (== local-addr) (? bytes? bs))
(transition active-remotes (transition active-remotes
(send-tcp-credit remote-addr local-addr 1) (send-tcp-credit remote-addr local-addr 1)
(send-to-all local-addr active-remotes remote-addr bs))]) (send-to-all local-addr active-remotes remote-addr bs))])))
(role (topic-publisher (tcp-channel local-addr (wild) (wild)))
#:state active-remotes)))
(define (main port) (define (main port)
(ground-vm (ground-vm

View File

@ -10,10 +10,7 @@
(define ((connection-handler local-addr remote-addr) self-pid) (define ((connection-handler local-addr remote-addr) self-pid)
(transition 'no-state (transition 'no-state
(role (topic-publisher (tcp-channel local-addr remote-addr (wild))) (role (topic-publisher (tcp-channel local-addr remote-addr (wild)))
#:state state [(tcp-channel _ _ (tcp-credit _)) (kill)])
[(tcp-channel _ _ (tcp-credit _))
(transition state
(kill))])
(send-message (tcp-channel local-addr remote-addr (send-message (tcp-channel local-addr remote-addr
(string->bytes/utf-8 (string->bytes/utf-8
(format "~a\n" (current-inexact-milliseconds))))))) (format "~a\n" (current-inexact-milliseconds)))))))
@ -21,11 +18,10 @@
(define (listener local-addr) (define (listener local-addr)
(transition 'no-state (transition 'no-state
(role (topic-subscriber (tcp-channel (wild) local-addr (wild)) #:monitor? #t) (role (topic-subscriber (tcp-channel (wild) local-addr (wild)) #:monitor? #t)
#:state state
#:topic t #:topic t
#:on-presence (match t #:on-presence (match t
[(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f) [(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f)
(transition state (spawn (connection-handler local-addr remote-addr)))])))) (spawn (connection-handler local-addr remote-addr))]))))
(define (main port) (define (main port)
(ground-vm (ground-vm

View File

@ -12,24 +12,20 @@
(send-tcp-mode remote-addr local-addr 'lines) (send-tcp-mode remote-addr local-addr 'lines)
(send-tcp-credit remote-addr local-addr 1) (send-tcp-credit remote-addr local-addr 1)
(role (topic-subscriber (tcp-channel remote-addr local-addr (wild))) (role (topic-subscriber (tcp-channel remote-addr local-addr (wild)))
#:state state
[(tcp-channel remote _ (? bytes? line)) [(tcp-channel remote _ (? bytes? line))
(transition state (list (send-tcp-credit remote-addr local-addr 1)
(send-tcp-credit remote-addr local-addr 1) (send-message
(send-message (tcp-channel local-addr remote-addr (bytes-append #"You said: " line #"\n"))))]
(tcp-channel local-addr remote-addr (bytes-append #"You said: " line #"\n"))))]
[(tcp-channel remote _ (? eof-object?)) [(tcp-channel remote _ (? eof-object?))
(transition state (kill)])))
(kill))])))
(define (listener local-addr) (define (listener local-addr)
(transition 'no-state (transition 'no-state
(role (topic-subscriber (tcp-channel (wild) local-addr (wild)) #:monitor? #t) (role (topic-subscriber (tcp-channel (wild) local-addr (wild)) #:monitor? #t)
#:state state
#:topic t #:topic t
#:on-presence (match t #:on-presence (match t
[(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f) [(topic 'publisher (tcp-channel remote-addr (== local-addr) _) #f)
(transition state (spawn (connection-handler local-addr remote-addr)))])))) (spawn (connection-handler local-addr remote-addr))]))))
(define (main port) (define (main port)
(ground-vm (ground-vm

View File

@ -104,9 +104,9 @@
[(or (topic 'publisher (tcp-channel local-addr remote-addr _) _) [(or (topic 'publisher (tcp-channel local-addr remote-addr _) _)
(topic 'subscriber (tcp-channel remote-addr local-addr _) _)) (topic 'subscriber (tcp-channel remote-addr local-addr _) _))
(cond (cond
[(ground? remote-addr) active-handles] [(ground? remote-addr) (transition active-handles)]
[(not (ground? local-addr)) active-handles] [(not (ground? local-addr)) (transition active-handles)]
[(set-member? active-handles (cons local-addr remote-addr)) active-handles] [(set-member? active-handles (cons local-addr remote-addr)) (transition active-handles)]
[else [else
(transition (set-add active-handles (cons local-addr remote-addr)) (transition (set-add active-handles (cons local-addr remote-addr))
(spawn (driver-fun local-addr remote-addr) (spawn (driver-fun local-addr remote-addr)
@ -118,9 +118,9 @@
[(or (topic 'publisher (tcp-channel local-addr remote-addr _) _) [(or (topic 'publisher (tcp-channel local-addr remote-addr _) _)
(topic 'subscriber (tcp-channel remote-addr local-addr _) _)) (topic 'subscriber (tcp-channel remote-addr local-addr _) _))
(cond (cond
[(ground? remote-addr) active-handles] [(ground? remote-addr) (transition active-handles)]
[(not (ground? local-addr)) active-handles] [(not (ground? local-addr)) (transition active-handles)]
[else (set-remove active-handles local-addr)])])) [else (transition (set-remove active-handles local-addr))])]))
;; TcpAddress TcpAddress -> BootK ;; TcpAddress TcpAddress -> BootK
(define ((tcp-listener-manager local-addr dummy-remote-addr) self-pid) (define ((tcp-listener-manager local-addr dummy-remote-addr) self-pid)
@ -140,7 +140,7 @@
[(or (topic 'publisher (tcp-channel (== local-addr) remote-addr _) _) [(or (topic 'publisher (tcp-channel (== local-addr) remote-addr _) _)
(topic 'subscriber (tcp-channel remote-addr (== local-addr) _) _)) (topic 'subscriber (tcp-channel remote-addr (== local-addr) _) _))
(if (ground? remote-addr) (if (ground? remote-addr)
state (transition state)
(transition 'listener-is-closed (transition 'listener-is-closed
(kill) (kill)
(when (eq? state 'listener-is-running) (when (eq? state 'listener-is-running)
@ -231,10 +231,10 @@
[(tcp-channel (== remote-addr) (== local-addr) subpacket) [(tcp-channel (== remote-addr) (== local-addr) subpacket)
(match subpacket (match subpacket
[(tcp-credit amount) [(tcp-credit amount)
(and state (adjust-credit state amount))] (if state (adjust-credit state amount) (transition state))]
[(tcp-mode new-mode) [(tcp-mode new-mode)
;; Also resets credit to zero. ;; Also resets credit to zero.
(and state (adjust-credit (tcp-connection-state new-mode 0) 0))] (if state (adjust-credit (tcp-connection-state new-mode 0) 0) (transition state))]
[_ [_
(error 'tcp-connection-manager* (error 'tcp-connection-manager*
"Subscriber on a channel may only send channel control messages")])]))) "Subscriber on a channel may only send channel control messages")])])))
@ -247,17 +247,16 @@
(transition 'no-state (transition 'no-state
(role (set (topic-subscriber (wild) #:monitor? #t) (role (set (topic-subscriber (wild) #:monitor? #t)
(topic-publisher (wild) #:monitor? #t)) (topic-publisher (wild) #:monitor? #t))
#:state state
[(tcp-channel source dest (? bytes? body)) [(tcp-channel source dest (? bytes? body))
(write `(TCPDATA ,source --> ,dest)) (newline) (write `(TCPDATA ,source --> ,dest)) (newline)
(dump-bytes! body (bytes-length body)) (dump-bytes! body (bytes-length body))
state] (void)]
[(tcp-channel source dest (? eof-object?)) [(tcp-channel source dest (? eof-object?))
(write `(TCPEOF ,source --> ,dest)) (newline) (write `(TCPEOF ,source --> ,dest)) (newline)
state] (void)]
[(tcp-channel source dest (tcp-credit amount)) [(tcp-channel source dest (tcp-credit amount))
(write `(TCPCREDIT ,source --> ,dest ,amount)) (newline) (write `(TCPCREDIT ,source --> ,dest ,amount)) (newline)
state] (void)]
[other [other
(write `(TCPOTHER ,other)) (newline) (write `(TCPOTHER ,other)) (newline)
state]))) (void)])))

View File

@ -9,9 +9,7 @@
(provide check-role) (provide check-role)
(define (flatten-transition t) (define (flatten-transition t)
(if (transition? t) (apply transition (transition-state t) (flatten (transition-actions t))))
(apply transition (transition-state t) (flatten (transition-actions t)))
(transition t)))
;; AddRole World SendMessage World ConsTreeOf<Action> -> Void ;; AddRole World SendMessage World ConsTreeOf<Action> -> Void
;; Passes the given message to the given message-handler, with the ;; Passes the given message to the given message-handler, with the

View File

@ -110,7 +110,7 @@
[(cons (? evt?) now) [(cons (? evt?) now)
(define to-send (fire-timers! (driver-state-heap state) now)) (define to-send (fire-timers! (driver-state-heap state) now))
;; Note: compute to-send before recursing, because of side-effects on heap ;; Note: compute to-send before recursing, because of side-effects on heap
(sequence-actions state (sequence-actions (transition state)
update-time-listener! update-time-listener!
to-send)])))) to-send)]))))

View File

@ -10,14 +10,11 @@
(define (packet-handler local-addr) (define (packet-handler local-addr)
(role (set (topic-publisher (udp-packet local-addr (wild) (wild))) (role (set (topic-publisher (udp-packet local-addr (wild) (wild)))
(topic-subscriber (udp-packet (wild) local-addr (wild)))) (topic-subscriber (udp-packet (wild) local-addr (wild))))
#:state state
[(udp-packet source _ #"quit\n") [(udp-packet source _ #"quit\n")
(transition state (list (send-message (udp-packet local-addr source #"OK, quitting\n"))
(send-message (udp-packet local-addr source #"OK, quitting\n")) (kill #:reason "Asked to quit"))]
(kill #:reason "Asked to quit"))]
[(udp-packet source sink body) [(udp-packet source sink body)
(transition state (send-message (udp-packet sink source body))]))
(send-message (udp-packet sink source body)))]))
(check-role (packet-handler (udp-listener 5555)) (check-role (packet-handler (udp-listener 5555))
'arbitrary 'arbitrary

View File

@ -58,7 +58,7 @@
#:on-presence (match t #:on-presence (match t
[(topic _ (udp-packet _ local-addr _) #f) [(topic _ (udp-packet _ local-addr _) #f)
(cond (cond
[(set-member? active-handles local-addr) active-handles] [(set-member? active-handles local-addr) (transition active-handles)]
[else [else
(transition (set-add active-handles local-addr) (transition (set-add active-handles local-addr)
(spawn (udp-socket-manager local-addr) (spawn (udp-socket-manager local-addr)
@ -102,7 +102,7 @@
#:debug-name (list 'udp-socket-closer local-addr)))) #:debug-name (list 'udp-socket-closer local-addr))))
[(udp-packet (== local-addr) (udp-address remote-host remote-port) body) [(udp-packet (== local-addr) (udp-address remote-host remote-port) body)
(udp-send-to s remote-host remote-port body) (udp-send-to s remote-host remote-port body)
state]) (transition state)])
;; Listen for messages arriving on the actual socket using a ;; Listen for messages arriving on the actual socket using a
;; ground event, and relay them at this level. ;; ground event, and relay them at this level.
(role (topic-subscriber (cons (udp-receive!-evt s buffer) (wild))) (role (topic-subscriber (cons (udp-receive!-evt s buffer) (wild)))
@ -119,11 +119,10 @@
(define udp-spy (define udp-spy
(transition 'no-state (transition 'no-state
(role (topic-subscriber (wild) #:monitor? #t) (role (topic-subscriber (wild) #:monitor? #t)
#:state state
[(udp-packet source dest body) [(udp-packet source dest body)
(write `(UDP ,source --> ,dest)) (newline) (write `(UDP ,source --> ,dest)) (newline)
(dump-bytes! body (bytes-length body)) (dump-bytes! body (bytes-length body))
state] (void)]
[other [other
(write `(UDP ,other)) (newline) (write `(UDP ,other)) (newline)
state]))) (void)])))

13
os2.rkt
View File

@ -174,7 +174,9 @@
;; Transition -> (transition ConsTreeOf<Action>) ;; Transition -> (transition ConsTreeOf<Action>)
(define (maybe-transition->transition t) (define (maybe-transition->transition t)
(cond [(transition? t) t] (cond [(transition? t) t]
[else (transition t '())])) [else
(define message (format "maybe-transition->transition: Expected transition; got ~v" t))
(transition #f (kill #f (exn:fail:contract message (current-continuation-marks))))]))
;; Preactions. ;; Preactions.
;; Ks are various TrapKs or #f, signifying lack of interest. ;; Ks are various TrapKs or #f, signifying lack of interest.
@ -277,7 +279,7 @@
(match message-body (match message-body
[message-pattern clause-body ...] [message-pattern clause-body ...]
... ...
[_ state])]))))]) [_ (make-transition state)])]))))])
#`(add-role #,(if (attribute pre-eid) #`(add-role #,(if (attribute pre-eid)
#'pre-eid #'pre-eid
#'(gensym 'anonymous-role)) #'(gensym 'anonymous-role))
@ -407,7 +409,7 @@
;; Composing state transitions and action emissions. ;; Composing state transitions and action emissions.
(define (sequence-actions t . more-actions-and-transformers) (define (sequence-actions t . more-actions-and-transformers)
(match-define (transition initial-state initial-actions) (maybe-transition->transition t)) (match-define (transition initial-state initial-actions) t)
(let loop ((state initial-state) (let loop ((state initial-state)
(actions initial-actions) (actions initial-actions)
(items more-actions-and-transformers)) (items more-actions-and-transformers))
@ -415,8 +417,7 @@
['() ['()
(transition state actions)] (transition state actions)]
[(cons (? procedure? transformer) remaining-items) [(cons (? procedure? transformer) remaining-items)
(match-define (transition new-state more-actions) (match-define (transition new-state more-actions) (transformer state))
(maybe-transition->transition (transformer state)))
(loop new-state (loop new-state
(cons actions more-actions) (cons actions more-actions)
remaining-items)] remaining-items)]
@ -724,7 +725,7 @@
(define (((wrap-trapk pid new-party trapk) . args) state) (define (((wrap-trapk pid new-party trapk) . args) state)
(if (hash-has-key? (vm-processes state) pid) (if (hash-has-key? (vm-processes state) pid)
(run-vm (apply run-trapk state pid new-party trapk args)) (run-vm (apply run-trapk state pid new-party trapk args))
state)) (make-transition state)))
(define (transform-meta-action pid preaction state) (define (transform-meta-action pid preaction state)
(match preaction (match preaction