Fix metamessage dispatch

This commit is contained in:
Tony Garnock-Jones 2012-01-10 12:22:31 -05:00
parent dea5eeb3e9
commit be4bd64d2c
1 changed files with 87 additions and 79 deletions

166
os.rkt
View File

@ -70,21 +70,18 @@
;; - the type of messages to other VMs, MetaMessage ;; - the type of messages to other VMs, MetaMessage
;; - the type of patterns over MetaMessages, MetaMessagePattern ;; - the type of patterns over MetaMessages, MetaMessagePattern
;; A VTable is a (vtable (MessagePattern Message -> Boolean) ;; A PatternPredicate is a (MessagePattern Message -> Boolean), used
;; (MetaMessagePattern MetaMessage -> boolean)) ;; to match a message against a pattern.
;; supplying the behavioural aspects of the type-parameters of VMs.
(struct vtable (apply-message-pattern
apply-meta-message-pattern) #:transparent)
;; A VM is a (vm ListBagOf<Subscription> ;; A VM is a (vm ListBagOf<Suspension>
;; QueueOf<Message> ;; TODO: make unordered? ;; QueueOf<Message> ;; TODO: make unordered?
;; QueueOf<MetaMessage> ;; TODO: make unordered? ;; QueueOf<MetaMessage> ;; TODO: make unordered?
;; QueueOf<Runnable>). ;; QueueOf<Runnable>).
(struct vm (subscriptions (struct vm (suspensions
pending-messages pending-messages
pending-meta-messages pending-meta-messages
pending-processes pending-processes
vtable) #:transparent) pattern-predicate) #:transparent)
;; A TrapK<X> is a X -> InterruptK, representing a suspended process ;; A TrapK<X> is a X -> InterruptK, representing a suspended process
;; waiting for some information from the VM before it can continue. ;; waiting for some information from the VM before it can continue.
@ -111,6 +108,21 @@
message-handlers message-handlers
meta-message-handlers) #:transparent) meta-message-handlers) #:transparent)
;; A Suspension is a
;; (suspension ProcessState
;; Maybe<InterruptK>
;; ListBagOf<MessageHandler>
;; Map<HID,MetaMessageHandler>).
(struct suspension (state
k
message-handlers
meta-message-handlers) #:transparent)
;; A HID is a per-VM unique value, used to identify specific
;; MetaMessageHandlers. Here, we use gensyms, though an alternative
;; (and purer) approach would be to keep a counter in the VM and use
;; that to construct IDs.
;; A MessageHandler is one of ;; A MessageHandler is one of
;; -- (message-handler MessagePattern TrapK<Message>) ;; -- (message-handler MessagePattern TrapK<Message>)
(struct message-handler (pattern k) #:transparent) (struct message-handler (pattern k) #:transparent)
@ -137,13 +149,13 @@
;; TODO: enforce user-mode restrictions ;; TODO: enforce user-mode restrictions
;; TODO: timeouts ;; TODO: timeouts
;; ( -> KernelModeTransition ) -> VM ;; PatternPredicate ( -> KernelModeTransition ) -> VM
(define (make-vm vtable boot) (define (make-vm pattern-predicate boot)
(vm (list) (vm (list)
(make-queue) (make-queue)
(make-queue) (make-queue)
(enqueue (make-queue) (runnable (void) (lambda (dummy) (boot)))) (enqueue (make-queue) (runnable (void) (lambda (dummy) (boot))))
vtable)) pattern-predicate))
;; VM -> KernelModeTransition ;; VM -> KernelModeTransition
;; (A kind of Meta-InterruptK) ;; (A kind of Meta-InterruptK)
@ -155,14 +167,14 @@
(trap-to-metalevel state))) (trap-to-metalevel state)))
(define (requeue-pollers state) (define (requeue-pollers state)
(foldl (lambda (sub state) (foldl (lambda (susp state)
(if (subscription-polling? sub) (if (suspension-polling? susp)
(enqueue-runnable (runnable (subscription-state sub) (enqueue-runnable (runnable (suspension-state susp)
(subscription-k sub)) (suspension-k susp))
state) state)
(subscribe-process sub state))) (enqueue-suspension susp state)))
(struct-copy vm state [subscriptions '()]) (struct-copy vm state [suspensions '()])
(vm-subscriptions state))) (vm-suspensions state)))
(define (run-runnables state) (define (run-runnables state)
(foldl (lambda (r state) (foldl (lambda (r state)
@ -178,7 +190,6 @@
;; KernelModeTransition VM -> VM ;; KernelModeTransition VM -> VM
(define (perform-transition transition state) (define (perform-transition transition state)
(pretty-print `(--> perform-transition ,transition ,state))
(match transition (match transition
[(kernel-mode-transition new-subscription [(kernel-mode-transition new-subscription
messages messages
@ -186,70 +197,69 @@
new-processes) new-processes)
(let* ((state (foldl enqueue-message state messages)) (let* ((state (foldl enqueue-message state messages))
(state (foldl enqueue-runnable state new-processes)) (state (foldl enqueue-runnable state new-processes))
(state (subscribe-process new-subscription state)) (state (enqueue-suspension (subscription->suspension new-subscription) state))
(state (foldl enqueue-meta-message state meta-messages))) (state (foldl enqueue-meta-message state meta-messages)))
(pretty-print `(<-- ,state))
state)] state)]
[other [other
(error 'vm "Processes must return a kernel-mode-transition struct; got ~v" other)])) (error 'vm "Processes must return a kernel-mode-transition struct; got ~v" other)]))
(define (subscription->suspension sub)
(match-define (subscription ps k mhs mmhs) sub)
(suspension ps k mhs (for/hash ([mmh mmhs]) (values (gensym 'hid) mmh))))
(define (enqueue-message message state) (define (enqueue-message message state)
(struct-copy vm state [pending-messages (enqueue (vm-pending-messages state) message)])) (struct-copy vm state [pending-messages (enqueue (vm-pending-messages state) message)]))
(define (enqueue-runnable r state) (define (enqueue-runnable r state)
(struct-copy vm state [pending-processes (enqueue (vm-pending-processes state) r)])) (struct-copy vm state [pending-processes (enqueue (vm-pending-processes state) r)]))
(define (subscribe-process sub state) (define (enqueue-suspension susp state)
(match sub (match susp
[(subscription _ #f '() '()) [(suspension _ #f '() (? (lambda (h) (zero? (hash-count h)))))
;; dead process because no continuations offered ;; dead process because no continuations offered
state] state]
[_ [(suspension _ _ _ _)
(struct-copy vm state [subscriptions (cons sub (vm-subscriptions state))])])) (struct-copy vm state [suspensions (cons susp (vm-suspensions state))])]))
(define (enqueue-meta-message message state) (define (enqueue-meta-message message state)
(struct-copy vm state [pending-meta-messages (enqueue (vm-pending-meta-messages state) message)])) (struct-copy vm state [pending-meta-messages (enqueue (vm-pending-meta-messages state) message)]))
(define (dispatch-message message state) (define (dispatch-message message state)
(foldl (match-subscription message (foldl (match-suspension message
vtable-apply-message-pattern (vm-pattern-predicate state)
subscription-message-handlers) suspension-message-handlers)
(struct-copy vm state [subscriptions '()]) (struct-copy vm state [suspensions '()])
(vm-subscriptions state))) (vm-suspensions state)))
(define ((match-subscription message apply-getter handlers-getter) sub state) (define ((match-suspension message apply-pattern handlers-getter) susp state)
(let search-handlers ((message-handlers (handlers-getter sub))) (let search-handlers ((message-handlers (handlers-getter susp)))
(cond (cond
[(null? message-handlers) [(null? message-handlers)
;; No handler matched this message. Put the subscription ;; No handler matched this message. Put the suspension
;; back on the list for some future message. ;; back on the list for some future message.
(subscribe-process sub state)] (enqueue-suspension susp state)]
[((apply-getter (vm-vtable state)) [(apply-pattern (message-handler-pattern (car message-handlers)) message)
(message-handler-pattern (car message-handlers))
message)
(define trapk (message-handler-k (car message-handlers))) (define trapk (message-handler-k (car message-handlers)))
(define interruptk (trapk message)) (define interruptk (trapk message))
(perform-transition (interruptk (subscription-state sub)) state)] (perform-transition (interruptk (suspension-state susp)) state)]
[else [else
(search-handlers (cdr message-handlers))]))) (search-handlers (cdr message-handlers))])))
(define (subscription-polling? sub) (define (suspension-polling? susp)
(not (eq? (subscription-k sub) #f))) (not (eq? (suspension-k susp) #f)))
;; VM -> Boolean ;; VM -> Boolean
;; When should a VM block? When it has no runnables, no pending ;; When should a VM block? When it has no runnables, no pending
;; messages, and no polling subscriptions. Otherwise, it should poll. ;; messages, and no polling suspensions. Otherwise, it should poll.
(define (should-poll? state) (define (should-poll? state)
(or (not (queue-empty? (vm-pending-processes state))) (or (not (queue-empty? (vm-pending-processes state)))
(not (queue-empty? (vm-pending-messages state))) (not (queue-empty? (vm-pending-messages state)))
(ormap subscription-polling? (vm-subscriptions state)))) (ormap suspension-polling? (vm-suspensions state))))
(define (trap-to-metalevel state) (define (trap-to-metalevel state)
(define meta-messages (queue->list (vm-pending-meta-messages state))) (define meta-messages (queue->list (vm-pending-meta-messages state)))
(define meta-handlers (append-map (lambda (sub) (define meta-handlers (append-map extract-downward-meta-message-handlers
(map wrap-meta-message-handler (vm-suspensions state)))
(subscription-meta-message-handlers sub)))
(vm-subscriptions state)))
(define final-state (struct-copy vm state [pending-meta-messages (make-queue)])) (define final-state (struct-copy vm state [pending-meta-messages (make-queue)]))
(kernel-mode-transition (subscription final-state (kernel-mode-transition (subscription final-state
(and (should-poll? final-state) (and (should-poll? final-state)
@ -261,16 +271,21 @@
'() '()
'())) '()))
(define (wrap-meta-message-handler mh) (define (extract-downward-meta-message-handlers susp)
(message-handler (message-handler-pattern mh) dispatch-meta-message)) (for/list ([(hid mmh) (in-hash (suspension-meta-message-handlers susp))])
(message-handler (message-handler-pattern mmh) (dispatch-meta-message hid))))
(define ((dispatch-meta-message message) state) (define (extract-upward-meta-message-handlers susp)
(for/list ([(hid mmh) (in-hash (suspension-meta-message-handlers susp))])
(message-handler hid (message-handler-k mmh))))
(define (((dispatch-meta-message hid) message) state)
(run-vm (run-vm
(foldl (match-subscription message (foldl (match-suspension message
vtable-apply-meta-message-pattern (lambda (handler-hid message) (equal? hid handler-hid))
subscription-meta-message-handlers) extract-upward-meta-message-handlers)
(struct-copy vm state [subscriptions '()]) (struct-copy vm state [suspensions '()])
(vm-subscriptions state)))) (vm-suspensions state))))
;;--------------------------------------------------------------------------- ;;---------------------------------------------------------------------------
@ -293,16 +308,14 @@
#t] #t]
[_ #f])) [_ #f]))
;; (MessagePattern Message -> Boolean) ( -> KernelModeTransition ) -> Void ;; PatternPredicate ( -> KernelModeTransition ) -> Void
;; In this context, ;; In this context,
;; Message = a thunk ;; Message = a thunk
;; MessagePattern = evt? ;; MessagePattern = evt?
;; MetaMessage, MetaMessagePattern = not defined because there's no outer level ;; MetaMessage, MetaMessagePattern = not defined because there's no outer level
;; Runs its argument VM until it becomes (provably) inert. ;; Runs its argument VM until it becomes (provably) inert.
(define (ground-vm apply-message-pattern boot) (define (ground-vm pattern-predicate boot)
(let loop ((transition (run-vm (make-vm (vtable apply-message-pattern (let loop ((transition (run-vm (make-vm pattern-predicate boot))))
match-ground-message)
boot))))
(when (not (nested-vm-inert? transition)) (when (not (nested-vm-inert? transition))
(match transition (match transition
[(kernel-mode-transition (subscription new-state [(kernel-mode-transition (subscription new-state
@ -314,23 +327,15 @@
'()) '())
(for-each (lambda (thunk) (thunk)) outbound-messages) (for-each (lambda (thunk) (thunk)) outbound-messages)
(define inbound-messages (define inbound-messages
(map (match-lambda [(message-handler e k) (wrap-evt e (lambda (v) (list v e k)))]) (map (match-lambda [(message-handler e k) (wrap-evt e (lambda (v) (cons v k)))])
message-handlers)) message-handlers))
(match-define (cons inbound-value inbound-evt inbound-continuation) (match-define (cons inbound-value inbound-continuation)
;;
;; Plan: make the processes metatalking to the ground-vm supply unique
;; identifiers, and use those unique identifiers for matching up the
;; specific events with specific event handlers. So when a
;; metamessagepattern is built, use some constructor procedure to
;; build an opaque metamessagepattern containing a unique ID; and when
;; the incoming event comes in, label it with the appropriate ID.
;;
(apply sync (apply sync
(wrap-evt (if polling-k always-evt never-evt) (wrap-evt (if polling-k always-evt never-evt)
(lambda (v) (cons (void) (lambda (v) (cons (void)
(lambda (dummy) polling-k)))) (lambda (dummy) polling-k))))
inbound-messages)) inbound-messages))
(loop ((inbound-message-continuation inbound-message-value) new-state))] (loop ((inbound-continuation inbound-value) new-state))]
[_ [_
(error 'ground-vm (error 'ground-vm
"Outermost VM may not spawn new siblings or send or receive metamessages")])))) "Outermost VM may not spawn new siblings or send or receive metamessages")]))))
@ -360,16 +365,19 @@
'() '()
(list (message-handler (list (message-handler
(super-alarm (+ (current-inexact-milliseconds) n)) (super-alarm (+ (current-inexact-milliseconds) n))
(lambda (_) (k))))) (lambda (_message)
(lambda (_state)
(k))))))
'() '()
'() '()
'())) '()))
(ground-vm #f ;; TODO - fix this - what's a better way of correlating (ground-vm (lambda (p m) (p m))
;; metamessages with their metamessagepatterns??
(lambda () (lambda ()
(sleep 1000 (print "SLEEPING"
(lambda () (lambda ()
(yield (sleep 2000
(lambda () (lambda ()
(print "HELLO" (yield
quit))))))) (lambda ()
(print "HELLO"
quit)))))))))