#lang racket/base ;; Virtualized operating system, this time with presence. (require racket/match) (require "relation.rkt") (require "unify.rkt") ;; Endpoints are the units of deduplication. ;; Flows (in canonical form) are the units of presence. ;;--------------------------------------------------------------------------- ;; Data definitions ;; A PID is an (arbitrary) VM-unique process identifier. Concretely, ;; it's an integer. ;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely, ;; it's a list of two elements, the first being the endpoint's ;; process's PID and the second being an integer. ;; A QuasiQueue is a list of Xs in *reversed* order. (struct vm (processes ;; Hash endpoints ;; Hash topic-flows ;; Relation flow-topics ;; Relation active-handlers ;; Relation next-process-id ;; PID pending-actions ;; QuasiQueue<(cons PID Action)> ) #:transparent) (struct endpoint (id ;; EID handlers ;; Handlers ) #:transparent) (struct process (id ;; PID state next-endpoint-id-number ;; NonnegativeInteger endpoints ;; Set ) #:transparent) (struct topic (role pattern virtual?) #:prefab) ;; A Flow is a Topic that comes from the intersection of two dual ;; topics. ;; InterruptK = State -> Transition ;; PresenceHandler = EID * Topic -> InterruptK ;; AbsenceHandler = EID * Topic * Reason -> InterruptK ;; MessageHandler = EID * Topic * Message -> InterruptK (struct handlers (presence absence message) #:transparent) ;; actions is a plain old List, not a QuasiQueue. (struct transition (state actions) #:transparent) ;; Preactions. (struct add-role (topic handlers k) #:prefab) (struct delete-role (eid) #:prefab) (struct send-message (topic body) #:prefab) (struct spawn (thunk k) #:prefab) ;; An Action is either a Preaction or an (at-meta-level Preaction). (struct at-meta-level (preaction) #:prefab) ;;--------------------------------------------------------------------------- ;; Topics and roles (define (topic-publisher pattern #:virtual? [virtual? #f]) (topic 'publisher pattern virtual?)) (define (topic-subscriber pattern #:virtual? [virtual? #f]) (topic 'subscriber pattern virtual?)) (define (co-roles r) (case r [(publisher) '(subscriber)] [(subscriber) '(publisher)] [else #f])) (define (co-topics t) (for/list ([role (co-roles (topic-role t))]) (struct-copy topic t [topic-role role]))) (define (refine-topic remote-topic new-pattern) (struct-copy topic remote-topic [pattern new-pattern])) (define (roles-intersect? l r) (memq l (co-roles r))) ;; Both left and right must be canonicalized. (define (topic-intersection left right) (and (roles-intersect? (topic-role left) (topic-role right)) (mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right))))) ;; True iff the flow between remote-topic and local-topic should be ;; visible to the local peer. This is the case when either local-topic ;; is virtual (in which case everything is seen) or otherwise if ;; remote-topic is also not virtual. (define (flow-visible? local-topic remote-topic) (or (topic-virtual? local-topic) (not (topic-virtual? remote-topic)))) ;;--------------------------------------------------------------------------- ;; QuasiQueue (define empty-quasi-queue '()) ;; X QuasiQueue -> QuasiQueue (define (quasi-enqueue-one thing existing-quasi-queue) (cons thing existing-quasi-queue)) ;; List QuasiQueue -> QuasiQueue (define (quasi-enqueue-many many-things-in-order existing-quasi-queue) (append (reverse many-things-in-order) existing-quasi-queue)) ;; QuasiQueue -> List (define (quasi-queue->list quasi-queue) (reverse quasi-queue)) ;; List -> QuasiQueue (define (list->quasi-queue xs) (reverse xs)) ;;--------------------------------------------------------------------------- (define (make-vm boot) (vm (hash) (relation) (relation) (relation) 0 (list->quasi-queue (list (spawn boot))))) (define (run-vm state) (let loop ((remaining-actions (quasi-queue->list (vm-pending-actions state))) (state (struct-copy vm state [pending-actions empty-quasi-queue])) (outbound-actions empty-quasi-queue)) (match remaining-actions ['() (transition state (quasi-queue->list outbound-actions))] [(cons (cons pid action) rest) (if (at-meta-level? action) (let-values (((state new-actions) (perform-meta-action pid (at-meta-level-preaction action) state))) (loop rest state (quasi-enqueue-many new-actions outbound-actions))) (loop rest (perform-action pid action state) outbound-actions))]))) (define (run-user-code v) ;; TODO: use this hook to find all the bits of code that will need ;; with-handlers and crash compensation. v) (define (perform-action pid action state) (match action [(add-role eid topic handlers) (do-subscribe pid eid topic handlers state)] [(delete-roles eid) (do-unsubscribe pid eid state)] [(send-message topic body) (route-and-deliver topic body state)] [(spawn thunk) (do-spawn thunk state)])) (define (install-flow state0 source-flow target-topic) (define state (struct-copy vm state0 [topic-flows (relation-add (vm-topic-flows state0) target-topic source-flow)] [flow-topics (relation-add (vm-flow-topics state0) source-flow target-topic)])) (if (and (flow-visible? target-topic source-flow) ;; Only notify if not previously notified, i.e., the routes were ;; absent in state0. (not (relation-domain-member? (vm-flow-topics state0) source-flow))) (for/fold ([state state]) ([e (in-set (relation-ref (vm-active-handlers state) target-topic))]) (run-ready state (endpoint-process-id e) ((handlers-presence (endpoint-handlers e)) source-flow))) state)) (define ((add-interest eid topic) p) (struct-copy process p [interests (relation-add (process-interests p) eid topic)])) (define (do-subscribe pid eid topic handlers state) (define e (endpoint pid eid handlers)) (define topic-previously-known? (relation-domain-member? (vm-active-handlers state))) ;; Install the handler. ;; Update the process. (let ((state (struct-copy vm state [active-handlers (relation-add (vm-active-handlers state) topic e)] [processes (hash-update (vm-processes state) pid (add-interest eid topic))]))) ;; Add topic <--> flow mappings and fire the appropriate presence handlers. (if topic-previously-known? ;; Just tell the local end. The other ends have already heard about this topic. (for/fold ([state state]) ([matching-flow (in-set (relation-ref (vm-topic-flows state) topic))]) (install-flow state matching-flow topic)) ;; Compute intersections, and tell both ends. (for/fold ([state state]) ([matching-topic (in-set (vm-known-topics state))] [flow-pattern (in-value (topic-intersection topic matching-topic))] #:when flow-pattern) ;; We know that topic intersects matching-topic. (let* ((state (install-flow state (refine-topic topic flow-pattern) matching-topic)) (state (install-flow state (refine-topic matching-topic flow-pattern) topic))) state))))) (define (do-unsubscribe pid eid state) ;; For each topic in the process's interests, ;; - for each appropriate endpoint in active-handlers, ;; - fire the absence handler ;; - remove the endpoint ;; - if no handlers remain in active-handlers for that topic, ;; - for each flow in topic-flows for the topic, ;; - remove the topic from flow-topics for the flow ;; - if no topics remain for the flow, ;; - dualize it using our source topic ;; - remember it may have virtual duals ??? ;; - for each dual, ;; - if it has NONE of its own duals left, ;; - ;; OK at this point this is getting far too complex. ;; Back to TSTTCPW: O(n^2) full-table-scans. (define (route-and-deliver message-topic body state) (define endpoints (for/set ([flow (in-relation-domain (vm-flow-topics state))] #:when (specialization? message-topic flow) [matching-flow (co-topics flow)] [matching-topic (in-set (relation-ref (vm-flow-topics state) matching-flow))] [matching-endpoint (in-set (relation-ref (vm-active-handlers state) matching-topic))]) matching-endpoint)) (for/fold ([state state]) ([e (in-set endpoints)]) (run-ready state (endpoint-process-id e) ((handlers-message (endpoint-handlers e)) message-topic body)))) (define (run-ready state pid interrupt-k) (define old-process (hash-ref (vm-processes state) pid)) (match-define (transition new-process-state actions) (interrupt-k (process-state old-process-state))) (struct-copy vm (enqueue-actions state pid actions) [processes (hash-set (vm-processes state) pid (struct-copy process old-process [state new-process-state]))])) (define (do-spawn thunk state) (match-define (transition initial-state initial-actions) (run-user-code (thunk))) (define new-pid (vm-next-process-id state)) (struct-copy vm (enqueue-actions state new-pid initial-actions) [processes (hash-set (vm-processes state) new-pid (process new-pid initial-state (relation) (relation)))] [next-process-id (+ new-pid 1)])) (define (enqueue-actions state pid actions) (struct-copy vm state [pending-actions (quasi-enqueue-many (for/list ([a actions]) (cons pid a)) (vm-pending-actions state))])) -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- (let* ((state (requeue-pollers state)) (state (run-runnables state)) (state (dispatch-messages state)) (meta-messages (reverse (vm-pending-meta-messages state))) (meta-handlers (append-map extract-downward-meta-message-handlers (vm-suspensions state))) (poller-k (and (should-poll? state) run-vm)) ;; only block if there's nothing left to do (state (struct-copy vm state [pending-meta-messages (list)]))) (kernel-mode-transition (suspension state poller-k meta-handlers '()) meta-messages '() '()))) (define (requeue-pollers state) (foldl (lambda (susp state) (if (suspension-polling? susp) (enqueue-runnable (lambda () ((suspension-k susp) (suspension-state susp))) state) (enqueue-suspension susp state))) (struct-copy vm state [suspensions '()]) (vm-suspensions state))) (define (run-runnables state) (foldl (lambda (r state) (perform-transition (r) state)) (struct-copy vm state [pending-processes (list)]) (reverse (vm-pending-processes state)))) (define (dispatch-messages state) (foldl dispatch-message (struct-copy vm state [pending-messages (list)]) (reverse (vm-pending-messages state)))) (define (extract-downward-meta-message-handlers susp) (for/list ([mmh (suspension-meta-message-handlers susp)]) (message-handler (message-handler-pattern mmh) dispatch-meta-message))) (define ((dispatch-meta-message message) state) (run-vm (foldl (match-suspension message (vm-meta-pattern-predicate state) suspension-meta-message-handlers) (struct-copy vm state [suspensions '()]) (vm-suspensions state)))) (define (perform-transition transition state) (match transition [(kernel-mode-transition new-suspension messages meta-messages new-processes) (let* ((state (foldl enqueue-message state messages)) (state (foldl enqueue-runnable state new-processes)) (state (enqueue-suspension new-suspension state)) (state (foldl enqueue-meta-message state meta-messages))) state)] [other (error 'vm "Processes must return a kernel-mode-transition struct; got ~v" other)])) (define (enqueue-message message state) (struct-copy vm state [pending-messages (cons message (vm-pending-messages state))])) (define (enqueue-runnable r state) (struct-copy vm state [pending-processes (cons r (vm-pending-processes state))])) (define (enqueue-suspension susp state) (match susp [(suspension _ #f '() '()) ;; dead process because no continuations offered state] [(suspension _ _ _ _) (struct-copy vm state [suspensions (cons susp (vm-suspensions state))])])) (define (enqueue-meta-message message state) (struct-copy vm state [pending-meta-messages (cons message (vm-pending-meta-messages state))])) (define (dispatch-message message state) (foldl (match-suspension message (vm-pattern-predicate state) suspension-message-handlers) (struct-copy vm state [suspensions '()]) (vm-suspensions state))) (define ((match-suspension message apply-pattern handlers-getter) susp state) (let search-handlers ((message-handlers (handlers-getter susp))) (cond [(null? message-handlers) ;; No handler matched this message. Put the suspension ;; back on the list for some future message. (enqueue-suspension susp state)] [(apply-pattern (message-handler-pattern (car message-handlers)) message) (define trapk (message-handler-k (car message-handlers))) (define interruptk (trapk message)) (perform-transition (interruptk (suspension-state susp)) state)] [else (search-handlers (cdr message-handlers))]))) (define (suspension-polling? susp) (not (eq? (suspension-k susp) #f))) (define (should-poll? state) (or (not (null? (vm-pending-processes state))) (not (null? (vm-pending-messages state))) (ormap suspension-polling? (vm-suspensions state)))) (define (nested-vm boot #:pattern-predicate [pattern-predicate default-pattern-predicate] #:meta-pattern-predicate [meta-pattern-predicate default-pattern-predicate]) (lambda () (run-vm (make-vm boot #:pattern-predicate pattern-predicate #:meta-pattern-predicate meta-pattern-predicate)))) (define default-pattern-predicate (lambda (p m) (p m))) ;;--------------------------------------------------------------------------- (define (nested-vm-inert? susp) (match susp [(suspension (vm _ '() '() '() _ _) #f '() '()) ;; Inert iff not waiting for any messages or metamessages, and ;; with no internal work left to do. #t] [_ #f])) (struct ground-event-pattern (tag evt) #:transparent) (struct ground-event-value (tag val) #:transparent) (define (match-ground-event p m) (equal? (ground-event-pattern-tag p) (ground-event-value-tag m))) (define (ground-vm boot #:pattern-predicate [pattern-predicate default-pattern-predicate]) (let loop ((transition (run-vm (make-vm boot #:pattern-predicate pattern-predicate #:meta-pattern-predicate match-ground-event)))) (for-each (lambda (thunk) (thunk)) (kernel-mode-transition-messages transition)) (when (not (nested-vm-inert? (kernel-mode-transition-suspension transition))) (match transition [(kernel-mode-transition (suspension new-state polling-k message-handlers '()) _ '() '()) (define inbound-messages (map (match-lambda [(message-handler (ground-event-pattern tag evt) k) (wrap-evt evt (lambda (v) (cons (ground-event-value tag v) k)))]) message-handlers)) (match-define (cons inbound-value inbound-continuation) (apply sync (wrap-evt (if polling-k always-evt never-evt) (lambda (v) (cons (ground-event-value 'idle (void)) (lambda (dummy) polling-k)))) inbound-messages)) (loop ((inbound-continuation inbound-value) new-state))] [_ (error 'ground-vm "Outermost VM may not spawn new siblings or send or receive metamessages")]))))