racket-matrix-2012/os2.rkt

409 lines
15 KiB
Racket
Raw Normal View History

2012-03-19 18:28:34 +00:00
#lang racket/base
;; Virtualized operating system, this time with presence.
(require racket/match)
2012-03-20 15:33:54 +00:00
(require "relation.rkt")
(require "unify.rkt")
;; Endpoints are the units of deduplication.
;; Flows (in canonical form) are the units of presence.
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Data definitions
;; A PID is an (arbitrary) VM-unique process identifier.
;; A SID is an (arbitrary) process-unique subscription identifier.
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
2012-03-20 15:33:54 +00:00
(struct vm (processes ;; Hash<PID, Process>
topic-flows ;; Relation<Topic, Flow>
flow-topics ;; Relation<Flow, Topic>
active-handlers ;; Relation<Topic, Endpoint>
2012-03-19 18:28:34 +00:00
next-process-id ;; PID
pending-actions ;; QuasiQueue<(cons PID Action)>
) #:transparent)
2012-03-20 15:33:54 +00:00
;; (endpoint PID SID Handlers)
(struct endpoint (process-id sid handlers) #:transparent)
2012-03-19 18:28:34 +00:00
(struct process (id ;; PID
state
2012-03-20 15:33:54 +00:00
interests ;; Relation<SID, Topic>
meta-interests ;; Relation<SID, Topic>
2012-03-19 18:28:34 +00:00
) #:transparent)
(struct topic (role pattern virtual?) #:prefab)
;; A Flow is a Topic that comes from the intersection of two dual
;; topics.
;; PresenceHandler = Topic -> State -> Transition
;; AbsenceHandler = Topic * Reason -> State -> Transition
;; MessageHandler = Topic * Message -> State -> Transition
2012-03-20 15:33:54 +00:00
(struct handlers (presence absence message) #:transparent)
2012-03-19 18:28:34 +00:00
;; actions is a plain old List<Action>, not a QuasiQueue.
(struct transition (state actions) #:transparent)
;; Preactions
2012-03-20 15:33:54 +00:00
(struct add-role (sid topic handlers) #:prefab)
2012-03-19 18:28:34 +00:00
(struct delete-roles (sid) #:prefab)
(struct send-message (topic body) #:prefab)
(struct spawn (thunk) #:prefab)
;; An Action is either a Preaction or an (at-meta-level Preaction).
(struct at-meta-level (preaction) #:prefab)
;;---------------------------------------------------------------------------
;; Topics and roles
(define (topic-publisher pattern #:virtual? [virtual? #f])
(topic 'publisher pattern virtual?))
(define (topic-subscriber pattern #:virtual? [virtual? #f])
(topic 'subscriber pattern virtual?))
(define (co-roles r)
(case r
[(publisher) '(subscriber)]
[(subscriber) '(publisher)]
[else #f]))
(define (co-topics t)
(for/list ([role (co-roles (topic-role t))])
(struct-copy topic t [topic-role role])))
2012-03-20 15:33:54 +00:00
(define (refine-topic remote-topic new-pattern)
(struct-copy topic remote-topic [pattern new-pattern]))
(define (roles-intersect? l r)
(memq l (co-roles r)))
;; Both left and right must be canonicalized.
(define (topic-intersection left right)
(and (roles-intersect? (topic-role left) (topic-role right))
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
;; True iff the flow between remote-topic and local-topic should be
;; visible to the local peer. This is the case when either local-topic
;; is virtual (in which case everything is seen) or otherwise if
;; remote-topic is also not virtual.
(define (flow-visible? local-topic remote-topic)
(or (topic-virtual? local-topic)
(not (topic-virtual? remote-topic))))
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; QuasiQueue<X>
(define empty-quasi-queue '())
;; X QuasiQueue<X> -> QuasiQueue<X>
(define (quasi-enqueue-one thing existing-quasi-queue)
(cons thing existing-quasi-queue))
;; List<X> QuasiQueue<X> -> QuasiQueue<X>
(define (quasi-enqueue-many many-things-in-order existing-quasi-queue)
(append (reverse many-things-in-order) existing-quasi-queue))
;; QuasiQueue<X> -> List<X>
(define (quasi-queue->list quasi-queue)
(reverse quasi-queue))
;; List<X> -> QuasiQueue<X>
(define (list->quasi-queue xs)
(reverse xs))
;;---------------------------------------------------------------------------
(define (make-vm boot)
(vm (hash)
2012-03-20 15:33:54 +00:00
(relation)
(relation)
(relation)
2012-03-19 18:28:34 +00:00
0
(list->quasi-queue (list (spawn boot)))))
(define (run-vm state)
(let loop ((remaining-actions (quasi-queue->list (vm-pending-actions state)))
(state (struct-copy vm state [pending-actions empty-quasi-queue]))
(outbound-actions empty-quasi-queue))
(match remaining-actions
['() (transition state (quasi-queue->list outbound-actions))]
[(cons (cons pid action) rest)
(if (at-meta-level? action)
(let-values (((state new-actions)
(perform-meta-action pid (at-meta-level-preaction action) state)))
(loop rest state (quasi-enqueue-many new-actions outbound-actions)))
(loop rest (perform-action pid action state) outbound-actions))])))
(define (run-user-code v)
;; TODO: use this hook to find all the bits of code that will need
;; with-handlers and crash compensation.
v)
(define (perform-action pid action state)
(match action
2012-03-20 15:33:54 +00:00
[(add-role sid topic handlers) (do-subscribe pid sid topic handlers state)]
[(delete-roles sid) (do-unsubscribe pid sid state)]
[(send-message topic body) (route-and-deliver topic body state)]
[(spawn thunk) (do-spawn thunk state)]))
(define (install-flow state0 source-flow target-topic)
(define state
(struct-copy vm state0
[topic-flows (relation-add (vm-topic-flows state0) target-topic source-flow)]
[flow-topics (relation-add (vm-flow-topics state0) source-flow target-topic)]))
(if (and (flow-visible? target-topic source-flow)
;; Only notify if not previously notified, i.e., the routes were
;; absent in state0.
(not (relation-domain-member? (vm-flow-topics state0) source-flow)))
(for/fold ([state state])
([e (in-set (relation-ref (vm-active-handlers state) target-topic))])
(run-ready state
(endpoint-process-id e)
((handlers-presence (endpoint-handlers e)) source-flow)))
state))
(define ((add-interest sid topic) p)
(struct-copy process p [interests (relation-add (process-interests p) sid topic)]))
(define (do-subscribe pid sid topic handlers state)
(define e (endpoint pid sid handlers))
(define topic-previously-known? (relation-domain-member? (vm-active-handlers state)))
;; Install the handler.
;; Update the process.
(let ((state
(struct-copy vm state
[active-handlers (relation-add (vm-active-handlers state) topic e)]
[processes (hash-update (vm-processes state) pid (add-interest sid topic))])))
;; Add topic <--> flow mappings and fire the appropriate presence handlers.
(if topic-previously-known?
;; Just tell the local end. The other ends have already heard about this topic.
(for/fold ([state state])
([matching-flow (in-set (relation-ref (vm-topic-flows state) topic))])
(install-flow state matching-flow topic))
;; Compute intersections, and tell both ends.
(for/fold ([state state])
([matching-topic (in-set (vm-known-topics state))]
[flow-pattern (in-value (topic-intersection topic matching-topic))]
#:when flow-pattern) ;; We know that topic intersects matching-topic.
(let* ((state (install-flow state (refine-topic topic flow-pattern) matching-topic))
(state (install-flow state (refine-topic matching-topic flow-pattern) topic)))
state)))))
(define (do-unsubscribe pid sid state)
;; For each topic in the process's interests,
;; - for each appropriate endpoint in active-handlers,
;; - fire the absence handler
;; - remove the endpoint
;; - if no handlers remain in active-handlers for that topic,
;; - for each flow in topic-flows for the topic,
;; - remove the topic from flow-topics for the flow
;; - if no topics remain for the flow,
;; - dualize it using our source topic
;; - remember it may have virtual duals ???
;; - for each dual,
;; - if it has NONE of its own duals left,
;; -
;; OK at this point this is getting far too complex.
;; Back to TSTTCPW: O(n^2) full-table-scans.
2012-03-20 15:33:54 +00:00
(define (route-and-deliver message-topic body state)
(define endpoints
(for/set ([flow (in-relation-domain (vm-flow-topics state))]
#:when (specialization? message-topic flow)
[matching-flow (co-topics flow)]
[matching-topic (in-set (relation-ref (vm-flow-topics state) matching-flow))]
[matching-endpoint (in-set (relation-ref (vm-active-handlers state) matching-topic))])
matching-endpoint))
(for/fold ([state state]) ([e (in-set endpoints)])
(run-ready state
(endpoint-process-id e)
((handlers-message (endpoint-handlers e)) message-topic body))))
(define (run-ready state pid interrupt-k)
(define old-process (hash-ref (vm-processes state) pid))
(match-define (transition new-process-state actions)
(interrupt-k (process-state old-process-state)))
(struct-copy vm (enqueue-actions state pid actions)
[processes (hash-set (vm-processes state) pid
(struct-copy process old-process
[state new-process-state]))]))
(define (do-spawn thunk state)
(match-define (transition initial-state initial-actions) (run-user-code (thunk)))
(define new-pid (vm-next-process-id state))
(struct-copy vm (enqueue-actions state new-pid initial-actions)
[processes (hash-set (vm-processes state) new-pid (process new-pid
initial-state
(relation)
(relation)))]
[next-process-id (+ new-pid 1)]))
2012-03-19 18:28:34 +00:00
(define (enqueue-actions state pid actions)
(struct-copy vm state
[pending-actions (quasi-enqueue-many (for/list ([a actions]) (cons pid a))
(vm-pending-actions state))]))
-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
(let* ((state (requeue-pollers state))
(state (run-runnables state))
(state (dispatch-messages state))
(meta-messages (reverse (vm-pending-meta-messages state)))
(meta-handlers (append-map extract-downward-meta-message-handlers (vm-suspensions state)))
(poller-k (and (should-poll? state) run-vm)) ;; only block if there's nothing left to do
(state (struct-copy vm state [pending-meta-messages (list)])))
(kernel-mode-transition (suspension state poller-k meta-handlers '())
meta-messages
'()
'())))
(define (requeue-pollers state)
(foldl (lambda (susp state)
(if (suspension-polling? susp)
(enqueue-runnable (lambda () ((suspension-k susp) (suspension-state susp))) state)
(enqueue-suspension susp state)))
(struct-copy vm state [suspensions '()])
(vm-suspensions state)))
(define (run-runnables state)
(foldl (lambda (r state) (perform-transition (r) state))
(struct-copy vm state [pending-processes (list)])
(reverse (vm-pending-processes state))))
(define (dispatch-messages state)
(foldl dispatch-message
(struct-copy vm state [pending-messages (list)])
(reverse (vm-pending-messages state))))
(define (extract-downward-meta-message-handlers susp)
(for/list ([mmh (suspension-meta-message-handlers susp)])
(message-handler (message-handler-pattern mmh) dispatch-meta-message)))
(define ((dispatch-meta-message message) state)
(run-vm
(foldl (match-suspension message
(vm-meta-pattern-predicate state)
suspension-meta-message-handlers)
(struct-copy vm state [suspensions '()])
(vm-suspensions state))))
(define (perform-transition transition state)
(match transition
[(kernel-mode-transition new-suspension
messages
meta-messages
new-processes)
(let* ((state (foldl enqueue-message state messages))
(state (foldl enqueue-runnable state new-processes))
(state (enqueue-suspension new-suspension state))
(state (foldl enqueue-meta-message state meta-messages)))
state)]
[other
(error 'vm "Processes must return a kernel-mode-transition struct; got ~v" other)]))
(define (enqueue-message message state)
(struct-copy vm state [pending-messages (cons message (vm-pending-messages state))]))
(define (enqueue-runnable r state)
(struct-copy vm state [pending-processes (cons r (vm-pending-processes state))]))
(define (enqueue-suspension susp state)
(match susp
[(suspension _ #f '() '())
;; dead process because no continuations offered
state]
[(suspension _ _ _ _)
(struct-copy vm state [suspensions (cons susp (vm-suspensions state))])]))
(define (enqueue-meta-message message state)
(struct-copy vm state [pending-meta-messages (cons message (vm-pending-meta-messages state))]))
(define (dispatch-message message state)
(foldl (match-suspension message
(vm-pattern-predicate state)
suspension-message-handlers)
(struct-copy vm state [suspensions '()])
(vm-suspensions state)))
(define ((match-suspension message apply-pattern handlers-getter) susp state)
(let search-handlers ((message-handlers (handlers-getter susp)))
(cond
[(null? message-handlers)
;; No handler matched this message. Put the suspension
;; back on the list for some future message.
(enqueue-suspension susp state)]
[(apply-pattern (message-handler-pattern (car message-handlers)) message)
(define trapk (message-handler-k (car message-handlers)))
(define interruptk (trapk message))
(perform-transition (interruptk (suspension-state susp)) state)]
[else
(search-handlers (cdr message-handlers))])))
(define (suspension-polling? susp)
(not (eq? (suspension-k susp) #f)))
(define (should-poll? state)
(or (not (null? (vm-pending-processes state)))
(not (null? (vm-pending-messages state)))
(ormap suspension-polling? (vm-suspensions state))))
(define (nested-vm boot
#:pattern-predicate [pattern-predicate default-pattern-predicate]
#:meta-pattern-predicate [meta-pattern-predicate default-pattern-predicate])
(lambda () (run-vm (make-vm boot
#:pattern-predicate pattern-predicate
#:meta-pattern-predicate meta-pattern-predicate))))
(define default-pattern-predicate
(lambda (p m) (p m)))
;;---------------------------------------------------------------------------
(define (nested-vm-inert? susp)
(match susp
[(suspension (vm _ '() '() '() _ _) #f '() '())
;; Inert iff not waiting for any messages or metamessages, and
;; with no internal work left to do.
#t]
[_ #f]))
(struct ground-event-pattern (tag evt) #:transparent)
(struct ground-event-value (tag val) #:transparent)
(define (match-ground-event p m)
(equal? (ground-event-pattern-tag p) (ground-event-value-tag m)))
(define (ground-vm boot
#:pattern-predicate [pattern-predicate default-pattern-predicate])
(let loop ((transition (run-vm (make-vm boot
#:pattern-predicate pattern-predicate
#:meta-pattern-predicate match-ground-event))))
(for-each (lambda (thunk) (thunk)) (kernel-mode-transition-messages transition))
(when (not (nested-vm-inert? (kernel-mode-transition-suspension transition)))
(match transition
[(kernel-mode-transition (suspension new-state
polling-k
message-handlers
'())
_
'()
'())
(define inbound-messages
(map (match-lambda [(message-handler (ground-event-pattern tag evt) k)
(wrap-evt evt (lambda (v) (cons (ground-event-value tag v) k)))])
message-handlers))
(match-define (cons inbound-value inbound-continuation)
(apply sync
(wrap-evt (if polling-k always-evt never-evt)
(lambda (v) (cons (ground-event-value 'idle (void))
(lambda (dummy) polling-k))))
inbound-messages))
(loop ((inbound-continuation inbound-value) new-state))]
[_
(error 'ground-vm
"Outermost VM may not spawn new siblings or send or receive metamessages")]))))