diff --git a/os2.rkt b/os2.rkt index f8e9328..7d261e9 100644 --- a/os2.rkt +++ b/os2.rkt @@ -1,8 +1,8 @@ #lang racket/base ;; Virtualized operating system, this time with presence. +(require racket/set) (require racket/match) -(require "relation.rkt") (require "unify.rkt") ;; Endpoints are the units of deduplication. @@ -18,19 +18,21 @@ ;; it's a list of two elements, the first being the endpoint's ;; process's PID and the second being an integer. +;; One endpoint, one topic. + +;; A Flow is a Topic that comes from the intersection of two dual +;; topics. + ;; A QuasiQueue is a list of Xs in *reversed* order. (struct vm (processes ;; Hash endpoints ;; Hash - - topic-flows ;; Relation - flow-topics ;; Relation - active-handlers ;; Relation next-process-id ;; PID pending-actions ;; QuasiQueue<(cons PID Action)> ) #:transparent) (struct endpoint (id ;; EID + topic ;; Topic handlers ;; Handlers ) #:transparent) @@ -42,21 +44,21 @@ (struct topic (role pattern virtual?) #:prefab) -;; A Flow is a Topic that comes from the intersection of two dual -;; topics. - ;; InterruptK = State -> Transition -;; PresenceHandler = EID * Topic -> InterruptK -;; AbsenceHandler = EID * Topic * Reason -> InterruptK -;; MessageHandler = EID * Topic * Message -> InterruptK +;; TrapK = X -> InterruptK + +;; PresenceHandler = TrapK +;; AbsenceHandler = TrapK +;; MessageHandler = TrapK (struct handlers (presence absence message) #:transparent) ;; actions is a plain old List, not a QuasiQueue. (struct transition (state actions) #:transparent) ;; Preactions. +;; Ks are various TrapKs or #f, signifying lack of interest. (struct add-role (topic handlers k) #:prefab) -(struct delete-role (eid) #:prefab) +(struct delete-role (eid reason) #:prefab) (struct send-message (topic body) #:prefab) (struct spawn (thunk k) #:prefab) @@ -79,8 +81,8 @@ [else #f])) (define (co-topics t) - (for/list ([role (co-roles (topic-role t))]) - (struct-copy topic t [topic-role role]))) + (for/list ([co-role (co-roles (topic-role t))]) + (struct-copy topic t [role co-role]))) (define (refine-topic remote-topic new-pattern) (struct-copy topic remote-topic [pattern new-pattern])) @@ -93,14 +95,6 @@ (and (roles-intersect? (topic-role left) (topic-role right)) (mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right))))) -;; True iff the flow between remote-topic and local-topic should be -;; visible to the local peer. This is the case when either local-topic -;; is virtual (in which case everything is seen) or otherwise if -;; remote-topic is also not virtual. -(define (flow-visible? local-topic remote-topic) - (or (topic-virtual? local-topic) - (not (topic-virtual? remote-topic)))) - ;;--------------------------------------------------------------------------- ;; QuasiQueue @@ -126,11 +120,9 @@ (define (make-vm boot) (vm (hash) - (relation) - (relation) - (relation) + (hash) 0 - (list->quasi-queue (list (spawn boot))))) + (list->quasi-queue (list (spawn boot #f))))) (define (run-vm state) (let loop ((remaining-actions (quasi-queue->list (vm-pending-actions state))) @@ -152,265 +144,122 @@ (define (perform-action pid action state) (match action - [(add-role eid topic handlers) (do-subscribe pid eid topic handlers state)] - [(delete-roles eid) (do-unsubscribe pid eid state)] + [(add-role topic handlers k) (do-subscribe pid topic handlers k state)] + [(delete-role eid reason) (do-unsubscribe pid eid reason state)] [(send-message topic body) (route-and-deliver topic body state)] - [(spawn thunk) (do-spawn thunk state)])) + [(spawn thunk k) (do-spawn pid thunk k state)])) -(define (install-flow state0 source-flow target-topic) - (define state - (struct-copy vm state0 - [topic-flows (relation-add (vm-topic-flows state0) target-topic source-flow)] - [flow-topics (relation-add (vm-flow-topics state0) source-flow target-topic)])) - (if (and (flow-visible? target-topic source-flow) - ;; Only notify if not previously notified, i.e., the routes were - ;; absent in state0. - (not (relation-domain-member? (vm-flow-topics state0) source-flow))) - (for/fold ([state state]) - ([e (in-set (relation-ref (vm-active-handlers state) target-topic))]) - (run-ready state - (endpoint-process-id e) - ((handlers-presence (endpoint-handlers e)) source-flow))) - state)) +(define (do-subscribe pid topic handlers k state) + (define old-process (hash-ref (vm-processes state) pid)) + (define eid-number (process-next-endpoint-id-number old-process)) + (define new-eid (list pid eid-number)) + (struct-copy vm (for*/fold ([state (run-trapk state pid k new-eid)]) + ([(matching-pid p) (in-hash (vm-processes state))] + [matching-eid (in-set (process-endpoints p))] + [e (in-value (hash-ref (vm-endpoints state) matching-eid))] + [matching-topic (in-value (endpoint-topic e))] + [flow-pattern (in-value (topic-intersection topic matching-topic))] + #:when flow-pattern) + (define inbound-flow (refine-topic matching-topic flow-pattern)) + (define outbound-flow (refine-topic topic flow-pattern)) + (let* ((state (run-trapk state + pid + (handlers-presence handlers) + new-eid + inbound-flow)) + (state (run-trapk state + matching-pid + (handlers-presence (endpoint-handlers e)) + matching-eid + outbound-flow))) + state)) + [processes (hash-set (vm-processes state) + pid + (struct-copy process old-process + [next-endpoint-id-number (+ eid-number 1)] + [endpoints + (set-add (process-endpoints old-process) + new-eid)]))] + [endpoints (hash-set (vm-endpoints state) + new-eid + (endpoint new-eid + topic + handlers))])) -(define ((add-interest eid topic) p) - (struct-copy process p [interests (relation-add (process-interests p) eid topic)])) - -(define (do-subscribe pid eid topic handlers state) - (define e (endpoint pid eid handlers)) - (define topic-previously-known? (relation-domain-member? (vm-active-handlers state))) - ;; Install the handler. - ;; Update the process. - (let ((state - (struct-copy vm state - [active-handlers (relation-add (vm-active-handlers state) topic e)] - [processes (hash-update (vm-processes state) pid (add-interest eid topic))]))) - ;; Add topic <--> flow mappings and fire the appropriate presence handlers. - (if topic-previously-known? - ;; Just tell the local end. The other ends have already heard about this topic. - (for/fold ([state state]) - ([matching-flow (in-set (relation-ref (vm-topic-flows state) topic))]) - (install-flow state matching-flow topic)) - ;; Compute intersections, and tell both ends. - (for/fold ([state state]) - ([matching-topic (in-set (vm-known-topics state))] - [flow-pattern (in-value (topic-intersection topic matching-topic))] - #:when flow-pattern) ;; We know that topic intersects matching-topic. - (let* ((state (install-flow state (refine-topic topic flow-pattern) matching-topic)) - (state (install-flow state (refine-topic matching-topic flow-pattern) topic))) - state))))) - -(define (do-unsubscribe pid eid state) - ;; For each topic in the process's interests, - ;; - for each appropriate endpoint in active-handlers, - ;; - fire the absence handler - ;; - remove the endpoint - ;; - if no handlers remain in active-handlers for that topic, - ;; - for each flow in topic-flows for the topic, - ;; - remove the topic from flow-topics for the flow - ;; - if no topics remain for the flow, - ;; - dualize it using our source topic - ;; - remember it may have virtual duals ??? - ;; - for each dual, - ;; - if it has NONE of its own duals left, - ;; - - ;; OK at this point this is getting far too complex. - ;; Back to TSTTCPW: O(n^2) full-table-scans. +(define (do-unsubscribe pid eid reason state) + (define endpoint-to-remove (hash-ref (vm-endpoints state) eid)) + (define removed-topic (endpoint-topic endpoint-to-remove)) + (define old-process (hash-ref (vm-processes state) pid)) + (define new-process (struct-copy process old-process + [endpoints (set-remove (process-endpoints old-process) eid)])) + (let ((state (struct-copy vm state + [endpoints (hash-remove (vm-endpoints state) eid)] + [processes (if (set-empty? (process-endpoints new-process)) + (hash-remove (vm-processes state) pid) + (hash-set (vm-processes state) pid new-process))]))) + (for*/fold ([state state]) + ([(matching-pid p) (in-hash (vm-processes state))] + [matching-eid (in-set (process-endpoints p))] + [e (in-value (hash-ref (vm-endpoints state) matching-eid))] + [matching-topic (in-value (endpoint-topic e))] + [flow-pattern (in-value (topic-intersection removed-topic matching-topic))] + #:when flow-pattern) + (define outbound-flow (refine-topic removed-topic flow-pattern)) + (run-trapk state + matching-pid + (handlers-absence (endpoint-handlers e)) + matching-eid + outbound-flow + reason)))) (define (route-and-deliver message-topic body state) - (define endpoints - (for/set ([flow (in-relation-domain (vm-flow-topics state))] - #:when (specialization? message-topic flow) - [matching-flow (co-topics flow)] - [matching-topic (in-set (relation-ref (vm-flow-topics state) matching-flow))] - [matching-endpoint (in-set (relation-ref (vm-active-handlers state) matching-topic))]) - matching-endpoint)) - (for/fold ([state state]) ([e (in-set endpoints)]) - (run-ready state - (endpoint-process-id e) - ((handlers-message (endpoint-handlers e)) message-topic body)))) + (define pids-and-endpoints + (for*/set ([(matching-pid p) (in-hash (vm-processes state))] + [matching-eid (in-set (process-endpoints p))] + [e (in-value (hash-ref (vm-endpoints state) matching-eid))] + [matching-topic (in-value (endpoint-topic e))] + [flow-pattern (in-value (topic-intersection message-topic matching-topic))] + #:when flow-pattern) + (cons matching-pid e))) + (for/fold ([state state]) ([pid-and-endpoint (in-set pids-and-endpoints)]) + (define matching-pid (car pid-and-endpoint)) + (define e (cdr pid-and-endpoint)) + (run-trapk state + matching-pid + (handlers-message (endpoint-handlers e)) + (endpoint-id e) + message-topic + body))) + +(define (run-trapk state pid trap-k . args) + (run-ready state pid (run-user-code (apply trap-k args)))) (define (run-ready state pid interrupt-k) (define old-process (hash-ref (vm-processes state) pid)) (match-define (transition new-process-state actions) - (interrupt-k (process-state old-process-state))) + (run-user-code (interrupt-k (process-state old-process)))) (struct-copy vm (enqueue-actions state pid actions) [processes (hash-set (vm-processes state) pid (struct-copy process old-process [state new-process-state]))])) -(define (do-spawn thunk state) +(define (do-spawn spawning-pid thunk k state) (match-define (transition initial-state initial-actions) (run-user-code (thunk))) (define new-pid (vm-next-process-id state)) - (struct-copy vm (enqueue-actions state new-pid initial-actions) - [processes (hash-set (vm-processes state) new-pid (process new-pid - initial-state - (relation) - (relation)))] - [next-process-id (+ new-pid 1)])) + (run-trapk (struct-copy vm (enqueue-actions state new-pid initial-actions) + [processes (hash-set (vm-processes state) new-pid (process new-pid + initial-state + 0 + (set)))] + [next-process-id (+ new-pid 1)]) + spawning-pid + k + new-pid)) (define (enqueue-actions state pid actions) (struct-copy vm state [pending-actions (quasi-enqueue-many (for/list ([a actions]) (cons pid a)) (vm-pending-actions state))])) - --=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - - (let* ((state (requeue-pollers state)) - (state (run-runnables state)) - (state (dispatch-messages state)) - (meta-messages (reverse (vm-pending-meta-messages state))) - (meta-handlers (append-map extract-downward-meta-message-handlers (vm-suspensions state))) - (poller-k (and (should-poll? state) run-vm)) ;; only block if there's nothing left to do - (state (struct-copy vm state [pending-meta-messages (list)]))) - (kernel-mode-transition (suspension state poller-k meta-handlers '()) - meta-messages - '() - '()))) - -(define (requeue-pollers state) - (foldl (lambda (susp state) - (if (suspension-polling? susp) - (enqueue-runnable (lambda () ((suspension-k susp) (suspension-state susp))) state) - (enqueue-suspension susp state))) - (struct-copy vm state [suspensions '()]) - (vm-suspensions state))) - -(define (run-runnables state) - (foldl (lambda (r state) (perform-transition (r) state)) - (struct-copy vm state [pending-processes (list)]) - (reverse (vm-pending-processes state)))) - -(define (dispatch-messages state) - (foldl dispatch-message - (struct-copy vm state [pending-messages (list)]) - (reverse (vm-pending-messages state)))) - -(define (extract-downward-meta-message-handlers susp) - (for/list ([mmh (suspension-meta-message-handlers susp)]) - (message-handler (message-handler-pattern mmh) dispatch-meta-message))) - -(define ((dispatch-meta-message message) state) - (run-vm - (foldl (match-suspension message - (vm-meta-pattern-predicate state) - suspension-meta-message-handlers) - (struct-copy vm state [suspensions '()]) - (vm-suspensions state)))) - -(define (perform-transition transition state) - (match transition - [(kernel-mode-transition new-suspension - messages - meta-messages - new-processes) - (let* ((state (foldl enqueue-message state messages)) - (state (foldl enqueue-runnable state new-processes)) - (state (enqueue-suspension new-suspension state)) - (state (foldl enqueue-meta-message state meta-messages))) - state)] - [other - (error 'vm "Processes must return a kernel-mode-transition struct; got ~v" other)])) - -(define (enqueue-message message state) - (struct-copy vm state [pending-messages (cons message (vm-pending-messages state))])) - -(define (enqueue-runnable r state) - (struct-copy vm state [pending-processes (cons r (vm-pending-processes state))])) - -(define (enqueue-suspension susp state) - (match susp - [(suspension _ #f '() '()) - ;; dead process because no continuations offered - state] - [(suspension _ _ _ _) - (struct-copy vm state [suspensions (cons susp (vm-suspensions state))])])) - -(define (enqueue-meta-message message state) - (struct-copy vm state [pending-meta-messages (cons message (vm-pending-meta-messages state))])) - -(define (dispatch-message message state) - (foldl (match-suspension message - (vm-pattern-predicate state) - suspension-message-handlers) - (struct-copy vm state [suspensions '()]) - (vm-suspensions state))) - -(define ((match-suspension message apply-pattern handlers-getter) susp state) - (let search-handlers ((message-handlers (handlers-getter susp))) - (cond - [(null? message-handlers) - ;; No handler matched this message. Put the suspension - ;; back on the list for some future message. - (enqueue-suspension susp state)] - [(apply-pattern (message-handler-pattern (car message-handlers)) message) - (define trapk (message-handler-k (car message-handlers))) - (define interruptk (trapk message)) - (perform-transition (interruptk (suspension-state susp)) state)] - [else - (search-handlers (cdr message-handlers))]))) - -(define (suspension-polling? susp) - (not (eq? (suspension-k susp) #f))) - -(define (should-poll? state) - (or (not (null? (vm-pending-processes state))) - (not (null? (vm-pending-messages state))) - (ormap suspension-polling? (vm-suspensions state)))) - -(define (nested-vm boot - #:pattern-predicate [pattern-predicate default-pattern-predicate] - #:meta-pattern-predicate [meta-pattern-predicate default-pattern-predicate]) - (lambda () (run-vm (make-vm boot - #:pattern-predicate pattern-predicate - #:meta-pattern-predicate meta-pattern-predicate)))) - -(define default-pattern-predicate - (lambda (p m) (p m))) - -;;--------------------------------------------------------------------------- - -(define (nested-vm-inert? susp) - (match susp - [(suspension (vm _ '() '() '() _ _) #f '() '()) - ;; Inert iff not waiting for any messages or metamessages, and - ;; with no internal work left to do. - #t] - [_ #f])) - -(struct ground-event-pattern (tag evt) #:transparent) -(struct ground-event-value (tag val) #:transparent) - -(define (match-ground-event p m) - (equal? (ground-event-pattern-tag p) (ground-event-value-tag m))) - -(define (ground-vm boot - #:pattern-predicate [pattern-predicate default-pattern-predicate]) - (let loop ((transition (run-vm (make-vm boot - #:pattern-predicate pattern-predicate - #:meta-pattern-predicate match-ground-event)))) - (for-each (lambda (thunk) (thunk)) (kernel-mode-transition-messages transition)) - (when (not (nested-vm-inert? (kernel-mode-transition-suspension transition))) - (match transition - [(kernel-mode-transition (suspension new-state - polling-k - message-handlers - '()) - _ - '() - '()) - (define inbound-messages - (map (match-lambda [(message-handler (ground-event-pattern tag evt) k) - (wrap-evt evt (lambda (v) (cons (ground-event-value tag v) k)))]) - message-handlers)) - (match-define (cons inbound-value inbound-continuation) - (apply sync - (wrap-evt (if polling-k always-evt never-evt) - (lambda (v) (cons (ground-event-value 'idle (void)) - (lambda (dummy) polling-k)))) - inbound-messages)) - (loop ((inbound-continuation inbound-value) new-state))] - [_ - (error 'ground-vm - "Outermost VM may not spawn new siblings or send or receive metamessages")])))) +(define (perform-meta-action pid preaction state) + (error 'perform-meta-action "%%% Not implemented"))