racket-matrix-2012/os2.rkt

453 lines
16 KiB
Racket
Raw Normal View History

2012-03-19 18:28:34 +00:00
#lang racket/base
;; Virtualized operating system, this time with presence.
(require racket/set)
2012-03-19 18:28:34 +00:00
(require racket/match)
2012-03-24 19:57:32 +00:00
(require (only-in racket/list flatten))
2012-03-20 15:33:54 +00:00
(require "unify.rkt")
2012-03-24 20:01:15 +00:00
(provide nested-vm
ground-vm
(struct-out topic)
topic-publisher
topic-subscriber
co-roles
co-topics
topic-union
2012-03-24 20:01:15 +00:00
(struct-out handlers)
(except-out (struct-out transition) transition)
(rename-out [make-transition transition])
extend-transition
2012-03-24 23:13:45 +00:00
role
2012-03-24 20:01:15 +00:00
(except-out (struct-out add-role) add-role)
(rename-out [make-add-role add-role])
(except-out (struct-out delete-role) delete-role)
(rename-out [make-delete-role delete-role])
(except-out (struct-out send-message) send-message)
(rename-out [make-send-message send-message])
2012-03-24 20:01:15 +00:00
(except-out (struct-out spawn) spawn)
(rename-out [make-spawn spawn])
(except-out (struct-out kill) kill)
(rename-out [make-kill kill])
(struct-out at-meta-level)
;; Reexports from unify.rkt for convenience
wild
wild?
non-wild?)
2012-03-24 20:01:15 +00:00
2012-03-20 15:33:54 +00:00
;; Endpoints are the units of deduplication.
;; Flows (in canonical form) are the units of presence.
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Data definitions
;; A PID is an (arbitrary) VM-unique process identifier. Concretely,
;; it's an integer.
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
;; it's a list of two elements, the first being the endpoint's
2012-03-24 19:02:15 +00:00
;; process's PID and the second being an integer. (Except for the
;; ground-vm, where they're different because there aren't any PIDs.)
2012-03-19 18:28:34 +00:00
;; One endpoint, one topic, with the caveat that as we are at present
;; unable to represent true topic unions, we actually store a *set* of
;; topics against each endpoint. The topic for the endpoint is to be
;; taken as the union of all the members in the set.
;; A Flow is a Topic that comes from the intersection of two dual
;; topics.
2012-03-19 18:28:34 +00:00
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
2012-03-20 15:33:54 +00:00
(struct vm (processes ;; Hash<PID, Process>
endpoints ;; Hash<EID, Endpoint>
2012-03-19 18:28:34 +00:00
next-process-id ;; PID
pending-actions ;; QuasiQueue<(cons PID Action)>
) #:transparent)
;; An Endpoint is an (endpoint EID Set<Topic> Handlers), representing a
2012-03-24 23:41:04 +00:00
;; facet of a process responsible for playing a particular role (the
;; topic) in a conversation.
(struct endpoint (id topics handlers) #:transparent)
2012-03-24 23:41:04 +00:00
;; A Process is an Exists State . (process PID State
;; NonnegativeInteger Set<EID>), representing a VM process and its
;; collection of active endpoints.
(struct process (id state next-eid-number endpoints) #:transparent)
;; A Topic is a (topic Role Pattern Boolean), describing an Endpoint's
;; role in a conversation.
2012-03-19 18:28:34 +00:00
(struct topic (role pattern virtual?) #:prefab)
2012-04-12 19:17:15 +00:00
;; BootK = PID -> Transition
;; InterruptK = State -> Transition
;; TrapK<X> = X -> InterruptK
;; PresenceHandler = TrapK<EID * Topic>
;; AbsenceHandler = TrapK<EID * Topic * Reason>
;; MessageHandler = TrapK<EID * Topic * Message>
2012-03-20 15:33:54 +00:00
(struct handlers (presence absence message) #:transparent)
2012-03-19 18:28:34 +00:00
2012-03-24 19:57:32 +00:00
;; actions is a plain old ordered ConsTreeOf<Action>, not a
;; QuasiQueue.
2012-03-19 18:28:34 +00:00
(struct transition (state actions) #:transparent)
;; Preactions.
;; Ks are various TrapKs or #f, signifying lack of interest.
(struct add-role (topics handlers k) #:prefab)
(struct delete-role (eid reason) #:prefab)
(struct send-message (body topic) #:prefab)
(struct spawn (main k) #:prefab)
2012-03-24 19:58:45 +00:00
(struct kill (pid reason) #:prefab)
2012-03-19 18:28:34 +00:00
;; An Action is either a Preaction or an (at-meta-level Preaction).
(struct at-meta-level (preaction) #:prefab)
2012-03-24 20:01:15 +00:00
;;---------------------------------------------------------------------------
;; role macro
2012-03-24 20:01:15 +00:00
2012-03-24 23:13:45 +00:00
(require (for-syntax syntax/parse))
(require (for-syntax racket/base))
(define-syntax role
(lambda (stx)
(syntax-parse stx
[(_ topics-expr
2012-03-24 23:13:45 +00:00
#:state state-pattern
(~or (~optional (~seq #:on-presence presence) #:name "#:on-presence handler")
(~optional (~seq #:on-absence absence) #:name "#:on-absence handler")
(~optional (~seq #:on-ready ready) #:name "#:on-ready handler")
(~optional (~seq #:id eid) #:defaults ([eid #'e0]) #:name "#:id")
(~optional (~seq #:topic topic) #:defaults ([topic #'t0]) #:name "#:topic")
(~optional (~seq #:reason reason) #:defaults ([reason #'r0]) #:name "#:reason"))
2012-03-24 23:13:45 +00:00
...
[message-pattern clause-body ...]
...)
(define-syntax-rule (build-handler args e-attr)
(if (not (attribute e-attr))
#'#f
#`(lambda (eid . args) (match-lambda [state-pattern e-attr]))))
(with-syntax ([presence-handler (build-handler (topic) presence)]
[absence-handler (build-handler (topic reason) absence)]
[ready-handler (build-handler () ready)]
[message-handler #'(lambda (eid topic message-body)
(lambda (state)
(match state
[state-pattern
(match message-body
[message-pattern clause-body ...]
...
[_ state])])))])
#'(add-role topics-expr
(handlers presence-handler absence-handler message-handler)
ready-handler))])))
2012-03-24 23:13:45 +00:00
;;---------------------------------------------------------------------------
;; Smarter constructors for transitions and preactions.
2012-03-24 20:01:15 +00:00
(define (make-transition state . actions) (transition state actions))
(define (make-add-role topics handlers [k #f]) (add-role topics handlers k))
2012-03-24 20:01:15 +00:00
(define (make-delete-role eid [reason #f]) (delete-role eid reason))
(define (make-send-message body [topic (topic-publisher body)]) (send-message body topic))
(define (make-spawn main [k #f]) (spawn main k))
2012-03-24 20:01:15 +00:00
(define (make-kill [pid #f] [reason #f]) (kill pid reason))
(define (extend-transition t . more-actions)
(match t
[(transition state actions) (transition state (list actions more-actions))]
[state (transition state more-actions)]))
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Topics and roles
(define (topic-publisher pattern #:virtual? [virtual? #f])
(topic 'publisher pattern virtual?))
(define (topic-subscriber pattern #:virtual? [virtual? #f])
(topic 'subscriber pattern virtual?))
(define (co-roles r)
(case r
[(publisher) '(subscriber)]
[(subscriber) '(publisher)]
[else #f]))
(define (co-topics t)
(for/list ([co-role (co-roles (topic-role t))])
(struct-copy topic t [role co-role])))
2012-03-19 18:28:34 +00:00
(define (topic-union . ts)
(unless (andmap topic? ts)
(error 'topic-union "Expects topics as arguments, but given ~v" ts))
(list->set ts))
2012-03-20 15:33:54 +00:00
(define (refine-topic remote-topic new-pattern)
(struct-copy topic remote-topic [pattern new-pattern]))
(define (roles-intersect? l r)
(memq l (co-roles r)))
;; Both left and right must be canonicalized.
(define (topic-intersection left right)
(and (roles-intersect? (topic-role left) (topic-role right))
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
;; True iff the flow between remote-topic and local-topic should be
;; visible to the local peer. This is the case when either local-topic
;; is virtual (in which case everything is seen) or otherwise if
;; remote-topic is also not virtual.
(define (flow-visible? local-topic remote-topic)
(or (topic-virtual? local-topic)
(not (topic-virtual? remote-topic))))
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Core virtualizable virtual machine.
2012-03-19 18:28:34 +00:00
(define (make-vm boot)
(vm (hash)
(hash)
2012-03-19 18:28:34 +00:00
0
(list (cons -1 (spawn boot #f)))))
2012-03-19 18:28:34 +00:00
(define (run-vm state)
(let loop ((remaining-actions (reverse (vm-pending-actions state)))
(state (struct-copy vm state [pending-actions '()]))
(outbound-actions '()))
2012-03-19 18:28:34 +00:00
(match remaining-actions
['() (transition (collect-dead-processes state) (reverse outbound-actions))]
2012-03-19 18:28:34 +00:00
[(cons (cons pid action) rest)
2012-03-24 19:02:15 +00:00
(match action
[(at-meta-level preaction)
(define transformed-preaction (transform-meta-action pid preaction))
(loop rest state (cons transformed-preaction outbound-actions))]
2012-03-24 19:02:15 +00:00
[preaction
(loop rest (perform-action pid preaction state) outbound-actions)])])))
2012-03-19 18:28:34 +00:00
(define (collect-dead-processes state)
(struct-copy vm state
[processes (for/hash ([(pid p) (in-hash (vm-processes state))]
#:when (or (not (set-empty? (process-endpoints p)))
(ormap (lambda (entry) (= (car entry) pid))
(vm-pending-actions state))))
(values pid p))]))
2012-03-24 23:29:00 +00:00
(define (send-to-user failure-proc f . args)
(with-handlers ([exn:fail? failure-proc])
(apply f args)))
2012-03-19 18:28:34 +00:00
(define (ensure-topic-union t)
(cond [(topic? t) (set t)]
[(set? t) t]
[else
(error 'ensure-topic-union
"Expected either a single topic or a set of topics; got ~v"
t)]))
2012-03-24 19:02:15 +00:00
(define (perform-action pid preaction state)
(match preaction
[(add-role topics hs k) (do-subscribe pid (ensure-topic-union topics) hs k state)]
[(delete-role eid reason) (do-unsubscribe pid eid reason state)]
[(send-message body topic) (route-and-deliver topic body state)]
[(spawn main k) (do-spawn pid main k state)]
2012-03-24 19:58:45 +00:00
[(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)]))
(define (do-subscribe pid topics hs k state)
(cond
[(hash-has-key? (vm-processes state) pid)
(define old-process (hash-ref (vm-processes state) pid))
(define eid-number (process-next-eid-number old-process))
(define new-eid (list pid eid-number))
(struct-copy vm (for*/fold ([state (run-trapk state pid k new-eid)])
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[topic (in-set topics)]
[flow-pattern (in-value (topic-intersection topic matching-topic))]
#:when flow-pattern)
(define inbound-flow (refine-topic matching-topic flow-pattern))
(define outbound-flow (refine-topic topic flow-pattern))
(let* ((state (if (flow-visible? topic inbound-flow)
(run-trapk state
pid
(handlers-presence hs)
new-eid
inbound-flow)
state))
(state (if (flow-visible? matching-topic outbound-flow)
(run-trapk state
matching-pid
(handlers-presence (endpoint-handlers e))
matching-eid
outbound-flow)
state)))
state))
[processes (hash-set (vm-processes state)
pid
(struct-copy process old-process
[next-eid-number (+ eid-number 1)]
[endpoints
(set-add (process-endpoints old-process)
new-eid)]))]
[endpoints (hash-set (vm-endpoints state)
new-eid
(endpoint new-eid
topics
hs))])]
[else state]))
(define (do-unsubscribe pid eid reason state)
2012-03-24 19:59:52 +00:00
(cond
[(hash-has-key? (vm-endpoints state) eid)
(define endpoint-to-remove (hash-ref (vm-endpoints state) eid))
(define removed-topics (endpoint-topics endpoint-to-remove))
2012-03-24 19:59:52 +00:00
(define old-process (hash-ref (vm-processes state) pid))
(define new-process (struct-copy process old-process
[endpoints (set-remove (process-endpoints old-process) eid)]))
(let ((state (struct-copy vm state
[endpoints (hash-remove (vm-endpoints state) eid)]
[processes (hash-set (vm-processes state) pid new-process)])))
2012-03-24 19:59:52 +00:00
(for*/fold ([state state])
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[removed-topic (in-set removed-topics)]
2012-03-24 19:59:52 +00:00
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
#:when flow-pattern)
(define outbound-flow (refine-topic removed-topic flow-pattern))
(run-trapk state
matching-pid
(handlers-absence (endpoint-handlers e))
matching-eid
outbound-flow
reason)))]
[else state]))
2012-03-20 15:33:54 +00:00
(define (route-and-deliver message-topic body state)
(define pids-and-endpoints
(for*/set ([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[flow-pattern (in-value (topic-intersection message-topic matching-topic))]
#:when flow-pattern)
(cons matching-pid e)))
(for/fold ([state state]) ([pid-and-endpoint (in-set pids-and-endpoints)])
(define matching-pid (car pid-and-endpoint))
(define e (cdr pid-and-endpoint))
(run-trapk state
matching-pid
(handlers-message (endpoint-handlers e))
(endpoint-id e)
message-topic
body)))
(define (do-spawn spawning-pid main k state)
(define new-pid (vm-next-process-id state))
2012-03-25 01:18:19 +00:00
(match-define (transition initial-state initial-actions)
(cond
[(procedure? main) (send-to-user (lambda (e) (transition #f (kill #f e))) main new-pid)]
[(transition? main) main]))
2012-03-25 01:18:19 +00:00
(define spawned-state
(struct-copy vm (enqueue-actions state new-pid initial-actions)
[processes (hash-set (vm-processes state)
new-pid
(process new-pid initial-state 0 (set)))]
[next-process-id (+ new-pid 1)]))
(run-trapk spawned-state spawning-pid k new-pid))
(define (do-kill pid-to-kill reason state)
(cond
[(hash-has-key? (vm-processes state) pid-to-kill)
(let ((state (for/fold ([state state])
([eid (in-set (process-endpoints
(hash-ref (vm-processes state) pid-to-kill)))])
(do-unsubscribe pid-to-kill eid reason state))))
(struct-copy vm state
[processes (hash-remove (vm-processes state) pid-to-kill)]))]
[else state]))
(define (run-trapk state pid trap-k . args)
(if trap-k
2012-03-24 23:31:30 +00:00
(let ((failure-proc (lambda (e) (lambda (process-state)
2012-03-24 23:41:04 +00:00
(transition process-state (kill #f e))))))
2012-03-24 23:31:30 +00:00
(run-ready state pid (apply send-to-user failure-proc trap-k args)))
state))
(define (maybe-transition->transition t)
(if (transition? t)
t
(transition t '())))
2012-03-20 15:33:54 +00:00
(define (run-ready state pid interrupt-k)
(define old-process (hash-ref (vm-processes state) pid))
2012-03-24 23:41:04 +00:00
(define old-state (process-state old-process))
(match-define (transition new-state actions)
2012-03-24 23:31:30 +00:00
(maybe-transition->transition
2012-03-24 23:41:04 +00:00
(send-to-user (lambda (e) (transition old-state (kill #f e))) interrupt-k old-state)))
2012-03-20 15:33:54 +00:00
(struct-copy vm (enqueue-actions state pid actions)
[processes (hash-set (vm-processes state) pid
(struct-copy process old-process
2012-03-24 23:41:04 +00:00
[state new-state]))]))
2012-03-20 15:33:54 +00:00
2012-03-19 18:28:34 +00:00
(define (enqueue-actions state pid actions)
(struct-copy vm state
[pending-actions (append (reverse (for/list ([a (flatten actions)]) (cons pid a)))
(vm-pending-actions state))]))
2012-03-19 18:28:34 +00:00
2012-03-24 23:41:04 +00:00
(define (((wrap-trapk pid trapk) . args) state)
(apply run-trapk state pid trapk args))
2012-03-24 19:02:15 +00:00
(define (transform-meta-action pid preaction)
(match preaction
[(add-role topics hs k)
(add-role topics
2012-03-24 19:02:15 +00:00
(handlers (wrap-trapk pid (handlers-presence hs))
(wrap-trapk pid (handlers-absence hs))
(wrap-trapk pid (handlers-message hs)))
(wrap-trapk pid k))]
[(? delete-role?) preaction]
[(? send-message?) preaction]
[(spawn main k)
(spawn main (wrap-trapk pid k))]
2012-03-24 19:58:45 +00:00
[(? kill?) preaction]))
2012-03-24 19:02:25 +00:00
2012-03-24 20:01:15 +00:00
(define (nested-vm boot)
(lambda () (run-vm (make-vm boot))))
2012-03-24 19:02:25 +00:00
(define (ground-vm boot)
(let loop ((state (make-vm boot)))
(match (run-vm state)
[(transition state actions)
(when (not (null? actions))
(error 'ground-vm "No meta-actions available in ground-vm: ~v" actions))
(define waiting? (null? (vm-pending-actions state)))
(define active-events (for*/list ([(eid e) (in-hash (vm-endpoints state))]
[topic (in-set (endpoint-topics e))]
#:when (and (evt? (topic-pattern topic))
(eq? (topic-role topic)
'subscriber)))
(define evt (topic-pattern topic))
(wrap-evt evt (lambda (message)
(lambda (state)
(route-and-deliver (topic-publisher evt)
message
state))))))
(if (and waiting? (null? active-events))
'done ;; About to block, and nothing can wake us
(let ((interruptk (apply sync
(if waiting?
never-evt
(wrap-evt always-evt (lambda (dummy) values)))
active-events)))
(loop (interruptk state))))])))