#lang racket/base ;; Virtualized operating system, this time with presence. (require racket/set) (require racket/match) (require "unify.rkt") ;; Endpoints are the units of deduplication. ;; Flows (in canonical form) are the units of presence. ;;--------------------------------------------------------------------------- ;; Data definitions ;; A PID is an (arbitrary) VM-unique process identifier. Concretely, ;; it's an integer. ;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely, ;; it's a list of two elements, the first being the endpoint's ;; process's PID and the second being an integer. (Except for the ;; ground-vm, where they're different because there aren't any PIDs.) ;; One endpoint, one topic. ;; A Flow is a Topic that comes from the intersection of two dual ;; topics. ;; A QuasiQueue is a list of Xs in *reversed* order. (struct vm (processes ;; Hash endpoints ;; Hash next-process-id ;; PID pending-actions ;; QuasiQueue<(cons PID Action)> ) #:transparent) (struct endpoint (id ;; EID topic ;; Topic handlers ;; Handlers ) #:transparent) (struct process (id ;; PID state next-endpoint-id-number ;; NonnegativeInteger endpoints ;; Set ) #:transparent) (struct topic (role pattern virtual?) #:prefab) ;; InterruptK = State -> Transition ;; TrapK = X -> InterruptK ;; PresenceHandler = TrapK ;; AbsenceHandler = TrapK ;; MessageHandler = TrapK (struct handlers (presence absence message) #:transparent) ;; actions is a plain old List, not a QuasiQueue. (struct transition (state actions) #:transparent) ;; Preactions. ;; Ks are various TrapKs or #f, signifying lack of interest. (struct add-role (topic handlers k) #:prefab) (struct delete-role (eid reason) #:prefab) (struct send-message (topic body) #:prefab) (struct spawn (thunk k) #:prefab) ;; An Action is either a Preaction or an (at-meta-level Preaction). (struct at-meta-level (preaction) #:prefab) ;;--------------------------------------------------------------------------- ;; Topics and roles (define (topic-publisher pattern #:virtual? [virtual? #f]) (topic 'publisher pattern virtual?)) (define (topic-subscriber pattern #:virtual? [virtual? #f]) (topic 'subscriber pattern virtual?)) (define (co-roles r) (case r [(publisher) '(subscriber)] [(subscriber) '(publisher)] [else #f])) (define (co-topics t) (for/list ([co-role (co-roles (topic-role t))]) (struct-copy topic t [role co-role]))) (define (refine-topic remote-topic new-pattern) (struct-copy topic remote-topic [pattern new-pattern])) (define (roles-intersect? l r) (memq l (co-roles r))) ;; Both left and right must be canonicalized. (define (topic-intersection left right) (and (roles-intersect? (topic-role left) (topic-role right)) (mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right))))) ;;--------------------------------------------------------------------------- ;; QuasiQueue (define empty-quasi-queue '()) ;; X QuasiQueue -> QuasiQueue (define (quasi-enqueue-one thing existing-quasi-queue) (cons thing existing-quasi-queue)) ;; List QuasiQueue -> QuasiQueue (define (quasi-enqueue-many many-things-in-order existing-quasi-queue) (append (reverse many-things-in-order) existing-quasi-queue)) ;; QuasiQueue -> List (define (quasi-queue->list quasi-queue) (reverse quasi-queue)) ;; List -> QuasiQueue (define (list->quasi-queue xs) (reverse xs)) ;;--------------------------------------------------------------------------- (define (make-vm boot) (vm (hash) (hash) 0 (list->quasi-queue (list (spawn boot #f))))) (define (run-vm state) (let loop ((remaining-actions (quasi-queue->list (vm-pending-actions state))) (state (struct-copy vm state [pending-actions empty-quasi-queue])) (outbound-actions empty-quasi-queue)) (match remaining-actions ['() (transition state (quasi-queue->list outbound-actions))] [(cons (cons pid action) rest) (match action [(at-meta-level preaction) (define transformed-preaction (transform-meta-action pid preaction)) (loop rest state (quasi-enqueue-one transformed-preaction outbound-actions))] [preaction (loop rest (perform-action pid preaction state) outbound-actions)])]))) (define (run-user-code v) ;; TODO: use this hook to find all the bits of code that will need ;; with-handlers and crash compensation. v) (define (perform-action pid preaction state) (match preaction [(add-role topic hs k) (do-subscribe pid topic hs k state)] [(delete-role eid reason) (do-unsubscribe pid eid reason state)] [(send-message topic body) (route-and-deliver topic body state)] [(spawn thunk k) (do-spawn pid thunk k state)])) (define (do-subscribe pid topic hs k state) (define old-process (hash-ref (vm-processes state) pid)) (define eid-number (process-next-endpoint-id-number old-process)) (define new-eid (list pid eid-number)) (struct-copy vm (for*/fold ([state (run-trapk state pid k new-eid)]) ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] [matching-topic (in-value (endpoint-topic e))] [flow-pattern (in-value (topic-intersection topic matching-topic))] #:when flow-pattern) (define inbound-flow (refine-topic matching-topic flow-pattern)) (define outbound-flow (refine-topic topic flow-pattern)) (let* ((state (run-trapk state pid (handlers-presence hs) new-eid inbound-flow)) (state (run-trapk state matching-pid (handlers-presence (endpoint-handlers e)) matching-eid outbound-flow))) state)) [processes (hash-set (vm-processes state) pid (struct-copy process old-process [next-endpoint-id-number (+ eid-number 1)] [endpoints (set-add (process-endpoints old-process) new-eid)]))] [endpoints (hash-set (vm-endpoints state) new-eid (endpoint new-eid topic hs))])) (define (do-unsubscribe pid eid reason state) (define endpoint-to-remove (hash-ref (vm-endpoints state) eid)) (define removed-topic (endpoint-topic endpoint-to-remove)) (define old-process (hash-ref (vm-processes state) pid)) (define new-process (struct-copy process old-process [endpoints (set-remove (process-endpoints old-process) eid)])) (let ((state (struct-copy vm state [endpoints (hash-remove (vm-endpoints state) eid)] [processes (if (set-empty? (process-endpoints new-process)) (hash-remove (vm-processes state) pid) (hash-set (vm-processes state) pid new-process))]))) (for*/fold ([state state]) ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] [matching-topic (in-value (endpoint-topic e))] [flow-pattern (in-value (topic-intersection removed-topic matching-topic))] #:when flow-pattern) (define outbound-flow (refine-topic removed-topic flow-pattern)) (run-trapk state matching-pid (handlers-absence (endpoint-handlers e)) matching-eid outbound-flow reason)))) (define (route-and-deliver message-topic body state) (define pids-and-endpoints (for*/set ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] [matching-topic (in-value (endpoint-topic e))] [flow-pattern (in-value (topic-intersection message-topic matching-topic))] #:when flow-pattern) (cons matching-pid e))) (for/fold ([state state]) ([pid-and-endpoint (in-set pids-and-endpoints)]) (define matching-pid (car pid-and-endpoint)) (define e (cdr pid-and-endpoint)) (run-trapk state matching-pid (handlers-message (endpoint-handlers e)) (endpoint-id e) message-topic body))) (define (run-trapk state pid trap-k . args) (run-ready state pid (run-user-code (apply trap-k args)))) (define (run-ready state pid interrupt-k) (define old-process (hash-ref (vm-processes state) pid)) (match-define (transition new-process-state actions) (run-user-code (interrupt-k (process-state old-process)))) (struct-copy vm (enqueue-actions state pid actions) [processes (hash-set (vm-processes state) pid (struct-copy process old-process [state new-process-state]))])) (define (do-spawn spawning-pid thunk k state) (match-define (transition initial-state initial-actions) (run-user-code (thunk))) (define new-pid (vm-next-process-id state)) (run-trapk (struct-copy vm (enqueue-actions state new-pid initial-actions) [processes (hash-set (vm-processes state) new-pid (process new-pid initial-state 0 (set)))] [next-process-id (+ new-pid 1)]) spawning-pid k new-pid)) (define (enqueue-actions state pid actions) (struct-copy vm state [pending-actions (quasi-enqueue-many (for/list ([a actions]) (cons pid a)) (vm-pending-actions state))])) (define (wrap-trapk pid trapk) (lambda args (lambda (state) (apply run-trapk state pid trapk args)))) (define (transform-meta-action pid preaction) (match preaction [(add-role topic hs k) (add-role topic (handlers (wrap-trapk pid (handlers-presence hs)) (wrap-trapk pid (handlers-absence hs)) (wrap-trapk pid (handlers-message hs))) (wrap-trapk pid k))] [(? delete-role?) preaction] [(? send-message?) preaction] [(spawn thunk k) (spawn thunk (wrap-trapk pid k))])) ;;--------------------------------------------------------------------------- (define (ground-vm boot) (let run-kernel ((transition (run-vm (make-vm boot))) (endpoints (hash)) (next-eid-number 0)) (let integrate-delta ((state (transition-state transition)) (actions (transition-actions transition)) (endpoints endpoints) (next-eid-number 0)) (match actions ['() (define interruptk (apply sync (for/list ([(eid e) (in-hash events)]) (wrap-evt (endpoint-topic e) (lambda (message) (run-user-code ((handlers-message (endpoint-handlers e)) eid (endpoint-topic e) message))))))) (run-kernel (run-user-code (interruptk state)))] [(cons preaction rest) (match preaction [(add-role topic hs k) (integrate-delta (run-user-code ((run-user-code (k next-eid-number)) state)) rest (hash-set endpoints next-eid-number (endpoint next-eid-number [(delete-role eid reason) ...] [(send-message topic body) ...] [(spawn thunk k) ...])])))) (for-each (lambda (thunk) (thunk)) (kernel-mode-transition-messages transition)) (when (not (nested-vm-inert? (kernel-mode-transition-suspension transition))) (match transition [(kernel-mode-transition (suspension new-state polling-k message-handlers '()) _ '() '()) (define inbound-messages (map (match-lambda [(message-handler (ground-event-pattern tag evt) k) (wrap-evt evt (lambda (v) (cons (ground-event-value tag v) k)))]) message-handlers)) (match-define (cons inbound-value inbound-continuation) (apply sync (wrap-evt (if polling-k always-evt never-evt) (lambda (v) (cons (ground-event-value 'idle (void)) (lambda (dummy) polling-k)))) inbound-messages)) (loop ((inbound-continuation inbound-value) new-state))] [_ (error 'ground-vm "Outermost VM may not spawn new siblings or send or receive metamessages")])))