#lang racket/base ;; Virtualized operating system, this time with presence. (require racket/set) (require racket/match) (require (only-in racket/list flatten)) (require "unify.rkt") (provide nested-vm ground-vm (struct-out topic) topic-publisher topic-subscriber co-roles co-topics (struct-out handlers) (except-out (struct-out transition) transition) (rename-out [make-transition transition]) extend-transition role (except-out (struct-out add-role) add-role) (rename-out [make-add-role add-role]) (except-out (struct-out delete-role) delete-role) (rename-out [make-delete-role delete-role]) (struct-out send-message) (except-out (struct-out spawn) spawn) (rename-out [make-spawn spawn]) (except-out (struct-out kill) kill) (rename-out [make-kill kill]) (struct-out at-meta-level) ;; Reexports from unify.rkt for convenience wild wild? non-wild?) ;; Endpoints are the units of deduplication. ;; Flows (in canonical form) are the units of presence. ;;--------------------------------------------------------------------------- ;; Data definitions ;; A PID is an (arbitrary) VM-unique process identifier. Concretely, ;; it's an integer. ;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely, ;; it's a list of two elements, the first being the endpoint's ;; process's PID and the second being an integer. (Except for the ;; ground-vm, where they're different because there aren't any PIDs.) ;; One endpoint, one topic. ;; A Flow is a Topic that comes from the intersection of two dual ;; topics. ;; A QuasiQueue is a list of Xs in *reversed* order. (struct vm (processes ;; Hash endpoints ;; Hash next-process-id ;; PID pending-actions ;; QuasiQueue<(cons PID Action)> ) #:transparent) ;; An Endpoint is an (endpoint EID Topic Handlers), representing a ;; facet of a process responsible for playing a particular role (the ;; topic) in a conversation. (struct endpoint (id topic handlers) #:transparent) ;; A Process is an Exists State . (process PID State ;; NonnegativeInteger Set), representing a VM process and its ;; collection of active endpoints. (struct process (id state next-eid-number endpoints) #:transparent) ;; A Topic is a (topic Role Pattern Boolean), describing an Endpoint's ;; role in a conversation. (struct topic (role pattern virtual?) #:prefab) ;; InterruptK = State -> Transition ;; TrapK = X -> InterruptK ;; PresenceHandler = TrapK ;; AbsenceHandler = TrapK ;; MessageHandler = TrapK (struct handlers (presence absence message) #:transparent) ;; actions is a plain old ordered ConsTreeOf, not a ;; QuasiQueue. (struct transition (state actions) #:transparent) ;; Preactions. ;; Ks are various TrapKs or #f, signifying lack of interest. (struct add-role (topic handlers k) #:prefab) (struct delete-role (eid reason) #:prefab) (struct send-message (topic body) #:prefab) (struct spawn (thunk k) #:prefab) (struct kill (pid reason) #:prefab) ;; An Action is either a Preaction or an (at-meta-level Preaction). (struct at-meta-level (preaction) #:prefab) ;;--------------------------------------------------------------------------- ;; role macro (require (for-syntax syntax/parse)) (require (for-syntax racket/base)) (define-syntax role (lambda (stx) (syntax-parse stx [(_ topic-expr #:state state-pattern (~or (~optional (~seq #:on-presence presence) #:name "#:on-presence handler") (~optional (~seq #:on-absence absence) #:name "#:on-absence handler") (~optional (~seq #:on-ready ready) #:name "#:on-ready handler") (~optional (~seq #:id eid) #:name "#:id") (~optional (~seq #:topic topic) #:name "#:topic") (~optional (~seq #:reason reason) #:name "#:reason")) ... [message-pattern clause-body ...] ...) (with-syntax ([eid (if (attribute eid) #'eid #'dummy-eid)] [topic (if (attribute topic) #'topic #'dummy-topic)] [reason (if (attribute reason) #'reason #'dummy-reason)]) (with-syntax ([presence-handler (if (not (attribute presence)) #'#f #'(lambda (eid topic) (lambda (state) (match state [state-pattern presence]))))] [absence-handler (if (not (attribute absence)) #'#f #'(lambda (eid topic reason) (lambda (state) (match state [state-pattern absence]))))] [ready-handler (if (not (attribute ready)) #'#f #'(lambda (eid) (lambda (state) (match state [state-pattern ready]))))] [message-handler #'(lambda (eid topic message-body) (lambda (state) (match state [state-pattern (match message-body [message-pattern clause-body ...] ... [_ state])])))]) #'(add-role topic-expr (handlers presence-handler absence-handler message-handler) ready-handler)))]))) ;;--------------------------------------------------------------------------- ;; Smarter constructors for transitions and preactions. (define (make-transition state . actions) (transition state actions)) (define (make-add-role topic handlers [k #f]) (add-role topic handlers k)) (define (make-delete-role eid [reason #f]) (delete-role eid reason)) (define (make-spawn thunk [k #f]) (spawn thunk k)) (define (make-kill [pid #f] [reason #f]) (kill pid reason)) (define (extend-transition t . more-actions) (match t [(transition state actions) (transition state (list actions more-actions))] [state (transition state more-actions)])) ;;--------------------------------------------------------------------------- ;; Topics and roles (define (topic-publisher pattern #:virtual? [virtual? #f]) (topic 'publisher pattern virtual?)) (define (topic-subscriber pattern #:virtual? [virtual? #f]) (topic 'subscriber pattern virtual?)) (define (co-roles r) (case r [(publisher) '(subscriber)] [(subscriber) '(publisher)] [else #f])) (define (co-topics t) (for/list ([co-role (co-roles (topic-role t))]) (struct-copy topic t [role co-role]))) (define (refine-topic remote-topic new-pattern) (struct-copy topic remote-topic [pattern new-pattern])) (define (roles-intersect? l r) (memq l (co-roles r))) ;; Both left and right must be canonicalized. (define (topic-intersection left right) (and (roles-intersect? (topic-role left) (topic-role right)) (mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right))))) ;; True iff the flow between remote-topic and local-topic should be ;; visible to the local peer. This is the case when either local-topic ;; is virtual (in which case everything is seen) or otherwise if ;; remote-topic is also not virtual. (define (flow-visible? local-topic remote-topic) (or (topic-virtual? local-topic) (not (topic-virtual? remote-topic)))) ;;--------------------------------------------------------------------------- ;; Core virtualizable virtual machine. (define (make-vm boot) (vm (hash) (hash) 0 (list (cons -1 (spawn boot #f))))) (define (run-vm state) (let loop ((remaining-actions (reverse (vm-pending-actions state))) (state (struct-copy vm state [pending-actions '()])) (outbound-actions '())) (match remaining-actions ['() (transition (collect-dead-processes state) (reverse outbound-actions))] [(cons (cons pid action) rest) (match action [(at-meta-level preaction) (define transformed-preaction (transform-meta-action pid preaction)) (loop rest state (cons transformed-preaction outbound-actions))] [preaction (loop rest (perform-action pid preaction state) outbound-actions)])]))) (define (collect-dead-processes state) (struct-copy vm state [processes (for/hash ([(pid p) (in-hash (vm-processes state))] #:when (or (not (set-empty? (process-endpoints p))) (ormap (lambda (entry) (= (car entry) pid)) (vm-pending-actions state)))) (values pid p))])) (define (send-to-user failure-proc f . args) (with-handlers ([exn:fail? failure-proc]) (apply f args))) (define (perform-action pid preaction state) (match preaction [(add-role topic hs k) (do-subscribe pid topic hs k state)] [(delete-role eid reason) (do-unsubscribe pid eid reason state)] [(send-message topic body) (route-and-deliver topic body state)] [(spawn thunk k) (do-spawn pid thunk k state)] [(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)])) (define (do-subscribe pid topic hs k state) (cond [(hash-has-key? (vm-processes state) pid) (define old-process (hash-ref (vm-processes state) pid)) (define eid-number (process-next-eid-number old-process)) (define new-eid (list pid eid-number)) (struct-copy vm (for*/fold ([state (run-trapk state pid k new-eid)]) ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] [matching-topic (in-value (endpoint-topic e))] [flow-pattern (in-value (topic-intersection topic matching-topic))] #:when flow-pattern) (define inbound-flow (refine-topic matching-topic flow-pattern)) (define outbound-flow (refine-topic topic flow-pattern)) (let* ((state (if (flow-visible? topic inbound-flow) (run-trapk state pid (handlers-presence hs) new-eid inbound-flow) state)) (state (if (flow-visible? matching-topic outbound-flow) (run-trapk state matching-pid (handlers-presence (endpoint-handlers e)) matching-eid outbound-flow) state))) state)) [processes (hash-set (vm-processes state) pid (struct-copy process old-process [next-eid-number (+ eid-number 1)] [endpoints (set-add (process-endpoints old-process) new-eid)]))] [endpoints (hash-set (vm-endpoints state) new-eid (endpoint new-eid topic hs))])] [else state])) (define (do-unsubscribe pid eid reason state) (cond [(hash-has-key? (vm-endpoints state) eid) (define endpoint-to-remove (hash-ref (vm-endpoints state) eid)) (define removed-topic (endpoint-topic endpoint-to-remove)) (define old-process (hash-ref (vm-processes state) pid)) (define new-process (struct-copy process old-process [endpoints (set-remove (process-endpoints old-process) eid)])) (let ((state (struct-copy vm state [endpoints (hash-remove (vm-endpoints state) eid)] [processes (hash-set (vm-processes state) pid new-process)]))) (for*/fold ([state state]) ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] [matching-topic (in-value (endpoint-topic e))] [flow-pattern (in-value (topic-intersection removed-topic matching-topic))] #:when flow-pattern) (define outbound-flow (refine-topic removed-topic flow-pattern)) (run-trapk state matching-pid (handlers-absence (endpoint-handlers e)) matching-eid outbound-flow reason)))] [else state])) (define (route-and-deliver message-topic body state) (define pids-and-endpoints (for*/set ([(matching-pid p) (in-hash (vm-processes state))] [matching-eid (in-set (process-endpoints p))] [e (in-value (hash-ref (vm-endpoints state) matching-eid))] [matching-topic (in-value (endpoint-topic e))] [flow-pattern (in-value (topic-intersection message-topic matching-topic))] #:when flow-pattern) (cons matching-pid e))) (for/fold ([state state]) ([pid-and-endpoint (in-set pids-and-endpoints)]) (define matching-pid (car pid-and-endpoint)) (define e (cdr pid-and-endpoint)) (run-trapk state matching-pid (handlers-message (endpoint-handlers e)) (endpoint-id e) message-topic body))) (define (run-trapk state pid trap-k . args) (if trap-k (let ((failure-proc (lambda (e) (lambda (process-state) (transition process-state (kill #f e)))))) (run-ready state pid (apply send-to-user failure-proc trap-k args))) state)) (define (maybe-transition->transition t) (if (transition? t) t (transition t '()))) (define (run-ready state pid interrupt-k) (define old-process (hash-ref (vm-processes state) pid)) (define old-state (process-state old-process)) (match-define (transition new-state actions) (maybe-transition->transition (send-to-user (lambda (e) (transition old-state (kill #f e))) interrupt-k old-state))) (struct-copy vm (enqueue-actions state pid actions) [processes (hash-set (vm-processes state) pid (struct-copy process old-process [state new-state]))])) (define (do-spawn spawning-pid thunk k state) (match-define (transition initial-state initial-actions) (send-to-user (lambda (e) (transition #f (kill #f e))) thunk)) (define new-pid (vm-next-process-id state)) (define spawned-state (struct-copy vm (enqueue-actions state new-pid initial-actions) [processes (hash-set (vm-processes state) new-pid (process new-pid initial-state 0 (set)))] [next-process-id (+ new-pid 1)])) (run-trapk spawned-state spawning-pid k new-pid)) (define (do-kill pid-to-kill reason state) (cond [(hash-has-key? (vm-processes state) pid-to-kill) (let ((state (for/fold ([state state]) ([eid (in-set (process-endpoints (hash-ref (vm-processes state) pid-to-kill)))]) (do-unsubscribe pid-to-kill eid reason state)))) (struct-copy vm state [processes (hash-remove (vm-processes state) pid-to-kill)]))] [else state])) (define (enqueue-actions state pid actions) (struct-copy vm state [pending-actions (append (reverse (for/list ([a (flatten actions)]) (cons pid a))) (vm-pending-actions state))])) (define (((wrap-trapk pid trapk) . args) state) (apply run-trapk state pid trapk args)) (define (transform-meta-action pid preaction) (match preaction [(add-role topic hs k) (add-role topic (handlers (wrap-trapk pid (handlers-presence hs)) (wrap-trapk pid (handlers-absence hs)) (wrap-trapk pid (handlers-message hs))) (wrap-trapk pid k))] [(? delete-role?) preaction] [(? send-message?) preaction] [(spawn thunk k) (spawn thunk (wrap-trapk pid k))] [(? kill?) preaction])) (define (nested-vm boot) (lambda () (run-vm (make-vm boot)))) (define (ground-vm boot) (let loop ((state (make-vm boot))) (match (run-vm state) [(transition state actions) (when (not (null? actions)) (error 'ground-vm "No meta-actions available in ground-vm: ~v" actions)) (define waiting? (null? (vm-pending-actions state))) (define active-events (for/list ([(eid e) (in-hash (vm-endpoints state))] #:when (and (evt? (topic-pattern (endpoint-topic e))) (eq? (topic-role (endpoint-topic e)) 'subscriber))) (define evt (topic-pattern (endpoint-topic e))) (wrap-evt evt (lambda (message) (lambda (state) (route-and-deliver (topic-publisher evt) message state)))))) (if (and waiting? (null? active-events)) 'done ;; About to block, and nothing can wake us (let ((interruptk (apply sync (if waiting? never-evt (wrap-evt always-evt (lambda (dummy) values))) active-events))) (loop (interruptk state))))])))