#lang racket/base ;; Virtualized operating system, this time with presence. (require racket/match) ;;--------------------------------------------------------------------------- ;; Data definitions ;; A PID is an (arbitrary) VM-unique process identifier. ;; A SID is an (arbitrary) process-unique subscription identifier. ;; A QuasiQueue is a list of Xs in *reversed* order. (struct vm (processes ;; PID -> Process topics ;; Topic -> Set flows ;; Flow -> Set next-process-id ;; PID pending-actions ;; QuasiQueue<(cons PID Action)> ) #:transparent) ;; (route PID SID Handlers) (struct route (process-id sid handlers) #:transparent) (struct process (id ;; PID state interests ;; SID -> List meta-interests ;; SID -> List ) #:transparent) (struct topic (role pattern virtual?) #:prefab) ;; A Flow is a Topic that comes from the intersection of two dual ;; topics. ;; PresenceHandler = Topic -> State -> Transition ;; AbsenceHandler = Topic * Reason -> State -> Transition ;; MessageHandler = Topic * Message -> State -> Transition (struct handlers (topic presence absence message) #:transparent) ;; actions is a plain old List, not a QuasiQueue. (struct transition (state actions) #:transparent) ;; Preactions (struct add-role (sid handlers) #:prefab) (struct delete-roles (sid) #:prefab) (struct send-message (topic body) #:prefab) (struct spawn (thunk) #:prefab) ;; An Action is either a Preaction or an (at-meta-level Preaction). (struct at-meta-level (preaction) #:prefab) ;;--------------------------------------------------------------------------- ;; Topics and roles (define (topic-publisher pattern #:virtual? [virtual? #f]) (topic 'publisher pattern virtual?)) (define (topic-subscriber pattern #:virtual? [virtual? #f]) (topic 'subscriber pattern virtual?)) (define (co-roles r) (case r [(publisher) '(subscriber)] [(subscriber) '(publisher)] [else #f])) (define (co-topics t) (for/list ([role (co-roles (topic-role t))]) (struct-copy topic t [topic-role role]))) ;;--------------------------------------------------------------------------- ;; QuasiQueue (define empty-quasi-queue '()) ;; X QuasiQueue -> QuasiQueue (define (quasi-enqueue-one thing existing-quasi-queue) (cons thing existing-quasi-queue)) ;; List QuasiQueue -> QuasiQueue (define (quasi-enqueue-many many-things-in-order existing-quasi-queue) (append (reverse many-things-in-order) existing-quasi-queue)) ;; QuasiQueue -> List (define (quasi-queue->list quasi-queue) (reverse quasi-queue)) ;; List -> QuasiQueue (define (list->quasi-queue xs) (reverse xs)) ;;--------------------------------------------------------------------------- (define (make-vm boot) (vm (hash) (hash) (hash) 0 (list->quasi-queue (list (spawn boot))))) (define (run-vm state) (let loop ((remaining-actions (quasi-queue->list (vm-pending-actions state))) (state (struct-copy vm state [pending-actions empty-quasi-queue])) (outbound-actions empty-quasi-queue)) (match remaining-actions ['() (transition state (quasi-queue->list outbound-actions))] [(cons (cons pid action) rest) (if (at-meta-level? action) (let-values (((state new-actions) (perform-meta-action pid (at-meta-level-preaction action) state))) (loop rest state (quasi-enqueue-many new-actions outbound-actions))) (loop rest (perform-action pid action state) outbound-actions))]))) (define (run-user-code v) ;; TODO: use this hook to find all the bits of code that will need ;; with-handlers and crash compensation. v) (define (perform-action pid action state) (match action [(add-role sid handlers) ...] [(delete-roles sid) ...] [(send-message topic body) ...] [(spawn thunk) (match-define (transition initial-state initial-actions) (run-user-code (thunk))) (define new-pid (vm-next-process-id state)) (struct-copy vm (enqueue-actions state new-pid initial-actions) [processes (hash-set (vm-processes state) new-pid (process new-pid initial-state (hash) (hash)))] [next-process-id (+ new-pid 1)])])) (define (enqueue-actions state pid actions) (struct-copy vm state [pending-actions (quasi-enqueue-many (for/list ([a actions]) (cons pid a)) (vm-pending-actions state))])) -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- (let* ((state (requeue-pollers state)) (state (run-runnables state)) (state (dispatch-messages state)) (meta-messages (reverse (vm-pending-meta-messages state))) (meta-handlers (append-map extract-downward-meta-message-handlers (vm-suspensions state))) (poller-k (and (should-poll? state) run-vm)) ;; only block if there's nothing left to do (state (struct-copy vm state [pending-meta-messages (list)]))) (kernel-mode-transition (suspension state poller-k meta-handlers '()) meta-messages '() '()))) (define (requeue-pollers state) (foldl (lambda (susp state) (if (suspension-polling? susp) (enqueue-runnable (lambda () ((suspension-k susp) (suspension-state susp))) state) (enqueue-suspension susp state))) (struct-copy vm state [suspensions '()]) (vm-suspensions state))) (define (run-runnables state) (foldl (lambda (r state) (perform-transition (r) state)) (struct-copy vm state [pending-processes (list)]) (reverse (vm-pending-processes state)))) (define (dispatch-messages state) (foldl dispatch-message (struct-copy vm state [pending-messages (list)]) (reverse (vm-pending-messages state)))) (define (extract-downward-meta-message-handlers susp) (for/list ([mmh (suspension-meta-message-handlers susp)]) (message-handler (message-handler-pattern mmh) dispatch-meta-message))) (define ((dispatch-meta-message message) state) (run-vm (foldl (match-suspension message (vm-meta-pattern-predicate state) suspension-meta-message-handlers) (struct-copy vm state [suspensions '()]) (vm-suspensions state)))) (define (perform-transition transition state) (match transition [(kernel-mode-transition new-suspension messages meta-messages new-processes) (let* ((state (foldl enqueue-message state messages)) (state (foldl enqueue-runnable state new-processes)) (state (enqueue-suspension new-suspension state)) (state (foldl enqueue-meta-message state meta-messages))) state)] [other (error 'vm "Processes must return a kernel-mode-transition struct; got ~v" other)])) (define (enqueue-message message state) (struct-copy vm state [pending-messages (cons message (vm-pending-messages state))])) (define (enqueue-runnable r state) (struct-copy vm state [pending-processes (cons r (vm-pending-processes state))])) (define (enqueue-suspension susp state) (match susp [(suspension _ #f '() '()) ;; dead process because no continuations offered state] [(suspension _ _ _ _) (struct-copy vm state [suspensions (cons susp (vm-suspensions state))])])) (define (enqueue-meta-message message state) (struct-copy vm state [pending-meta-messages (cons message (vm-pending-meta-messages state))])) (define (dispatch-message message state) (foldl (match-suspension message (vm-pattern-predicate state) suspension-message-handlers) (struct-copy vm state [suspensions '()]) (vm-suspensions state))) (define ((match-suspension message apply-pattern handlers-getter) susp state) (let search-handlers ((message-handlers (handlers-getter susp))) (cond [(null? message-handlers) ;; No handler matched this message. Put the suspension ;; back on the list for some future message. (enqueue-suspension susp state)] [(apply-pattern (message-handler-pattern (car message-handlers)) message) (define trapk (message-handler-k (car message-handlers))) (define interruptk (trapk message)) (perform-transition (interruptk (suspension-state susp)) state)] [else (search-handlers (cdr message-handlers))]))) (define (suspension-polling? susp) (not (eq? (suspension-k susp) #f))) (define (should-poll? state) (or (not (null? (vm-pending-processes state))) (not (null? (vm-pending-messages state))) (ormap suspension-polling? (vm-suspensions state)))) (define (nested-vm boot #:pattern-predicate [pattern-predicate default-pattern-predicate] #:meta-pattern-predicate [meta-pattern-predicate default-pattern-predicate]) (lambda () (run-vm (make-vm boot #:pattern-predicate pattern-predicate #:meta-pattern-predicate meta-pattern-predicate)))) (define default-pattern-predicate (lambda (p m) (p m))) ;;--------------------------------------------------------------------------- (define (nested-vm-inert? susp) (match susp [(suspension (vm _ '() '() '() _ _) #f '() '()) ;; Inert iff not waiting for any messages or metamessages, and ;; with no internal work left to do. #t] [_ #f])) (struct ground-event-pattern (tag evt) #:transparent) (struct ground-event-value (tag val) #:transparent) (define (match-ground-event p m) (equal? (ground-event-pattern-tag p) (ground-event-value-tag m))) (define (ground-vm boot #:pattern-predicate [pattern-predicate default-pattern-predicate]) (let loop ((transition (run-vm (make-vm boot #:pattern-predicate pattern-predicate #:meta-pattern-predicate match-ground-event)))) (for-each (lambda (thunk) (thunk)) (kernel-mode-transition-messages transition)) (when (not (nested-vm-inert? (kernel-mode-transition-suspension transition))) (match transition [(kernel-mode-transition (suspension new-state polling-k message-handlers '()) _ '() '()) (define inbound-messages (map (match-lambda [(message-handler (ground-event-pattern tag evt) k) (wrap-evt evt (lambda (v) (cons (ground-event-value tag v) k)))]) message-handlers)) (match-define (cons inbound-value inbound-continuation) (apply sync (wrap-evt (if polling-k always-evt never-evt) (lambda (v) (cons (ground-event-value 'idle (void)) (lambda (dummy) polling-k)))) inbound-messages)) (loop ((inbound-continuation inbound-value) new-state))] [_ (error 'ground-vm "Outermost VM may not spawn new siblings or send or receive metamessages")]))))