racket-matrix-2012/os2.rkt

506 lines
18 KiB
Racket
Raw Normal View History

2012-03-19 18:28:34 +00:00
#lang racket/base
;; Virtualized operating system, this time with presence.
(require racket/set)
2012-03-19 18:28:34 +00:00
(require racket/match)
2012-03-24 19:57:32 +00:00
(require (only-in racket/list flatten))
2012-03-20 15:33:54 +00:00
(require "unify.rkt")
2012-03-24 20:01:15 +00:00
(provide nested-vm
ground-vm
(struct-out topic)
topic-publisher
topic-subscriber
co-roles
co-topics
topic-union
2012-03-24 20:01:15 +00:00
(struct-out handlers)
(except-out (struct-out transition) transition)
(rename-out [make-transition transition])
extend-transition
2012-03-24 23:13:45 +00:00
role
2012-03-24 20:01:15 +00:00
(except-out (struct-out add-role) add-role)
(rename-out [make-add-role add-role])
(except-out (struct-out delete-role) delete-role)
(rename-out [make-delete-role delete-role])
(except-out (struct-out send-message) send-message)
(rename-out [make-send-message send-message])
2012-03-24 20:01:15 +00:00
(except-out (struct-out spawn) spawn)
(rename-out [make-spawn spawn])
(except-out (struct-out kill) kill)
(rename-out [make-kill kill])
(struct-out at-meta-level)
;; Aliases of structures themselves, because of shadowing of
;; constructors/type-names.
(rename-out [topic <topic>])
(rename-out [handlers <handlers>])
(rename-out [transition <transition>])
(rename-out [add-role <add-role>])
(rename-out [delete-role <delete-role>])
(rename-out [send-message <send-message>])
(rename-out [spawn <spawn>])
(rename-out [kill <kill>])
(rename-out [at-meta-level <at-meta-level>])
;; Reexports from unify.rkt for convenience
wild
wild?
non-wild?
;; Reexports from racket/match for convenience
(all-from-out racket/match))
2012-03-24 20:01:15 +00:00
2012-03-20 15:33:54 +00:00
;; Endpoints are the units of deduplication.
;; Flows (in canonical form) are the units of presence.
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Data definitions
;; A PID is an (arbitrary) VM-unique process identifier. Concretely,
;; it's an integer.
;; A PreEID is an arbitrary, process-chosen-and-supplied identifier
;; for an endpoint. It is to be equal?-comparable. It is to be unique
;; within the scope of a single process.
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
;; it's an (eid PID PreEID). As a consequence of the scope of PreEIDs,
;; EIDs shouldn't be visible outside the scope of the owning process.
(struct eid (pid pre-eid) #:transparent)
2012-03-19 18:28:34 +00:00
;; One endpoint, one topic (which may be changed over time!), with the
;; caveat that as we are at present unable to represent true topic
;; unions, we actually store a *set* of topics against each
;; endpoint. The (current!) topic for the endpoint is to be taken as
;; the union of all the members in the set.
;; A Flow is a Topic that comes from the intersection of two dual
;; topics.
;; A sent message includes a "body" and a "role", and is equivalent to
;; a non-virtual topic with that role and with the given "body" as a
;; pattern. In a sense, topics quite literally are patterns over
;; entire messages.
2012-03-19 18:28:34 +00:00
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
2012-03-20 15:33:54 +00:00
(struct vm (processes ;; Hash<PID, Process>
endpoints ;; Hash<EID, Endpoint>
2012-03-19 18:28:34 +00:00
next-process-id ;; PID
pending-actions ;; QuasiQueue<(cons PID Action)>
) #:transparent)
;; An Endpoint is an (endpoint EID Set<Topic> Handlers), representing a
2012-03-24 23:41:04 +00:00
;; facet of a process responsible for playing a particular role (the
;; topic) in a conversation.
(struct endpoint (id topics handlers) #:transparent)
2012-03-24 23:41:04 +00:00
;; A Process is an Exists State . (process PID State Set<EID>),
;; representing a VM process and its collection of active endpoints.
(struct process (id state endpoints) #:transparent)
2012-03-24 23:41:04 +00:00
;; A Topic is a (topic Role Pattern Boolean), describing an Endpoint's
;; role in a conversation.
2012-03-19 18:28:34 +00:00
(struct topic (role pattern virtual?) #:prefab)
2012-04-17 16:47:01 +00:00
;; BootK = either (PID -> Transition) or Transition
;; InterruptK = State -> Transition
;; TrapK<X> = X -> InterruptK
;; (handlers Maybe<TrapK<Topic>>
;; Maybe<TrapK<Topic * Reason>>
;; Maybe<TrapK<Topic * Message>>)
2012-03-20 15:33:54 +00:00
(struct handlers (presence absence message) #:transparent)
2012-03-19 18:28:34 +00:00
2012-03-24 19:57:32 +00:00
;; actions is a plain old ordered ConsTreeOf<Action>, not a
;; QuasiQueue.
2012-03-19 18:28:34 +00:00
(struct transition (state actions) #:transparent)
;; Preactions.
;; Ks are various TrapKs or #f, signifying lack of interest.
;;
;; (add-role PreEID (or Topic Set<Topic>) Handlers)
(struct add-role (pre-eid topics handlers) #:prefab)
;;
;; (delete-role PreEID Any)
(struct delete-role (pre-eid reason) #:prefab)
;;
;; (send-message Any Role)
(struct send-message (body role) #:prefab)
;;
;; (spawn BootK Maybe<TrapK<PID>>)
(struct spawn (main k) #:prefab)
;;
;; (kill Maybe<PID> Any)
2012-03-24 19:58:45 +00:00
(struct kill (pid reason) #:prefab)
2012-03-19 18:28:34 +00:00
;; An Action is either a Preaction or an (at-meta-level Preaction).
(struct at-meta-level (preaction) #:prefab)
2012-03-24 20:01:15 +00:00
;;---------------------------------------------------------------------------
;; role macro
2012-03-24 20:01:15 +00:00
2012-03-24 23:13:45 +00:00
(require (for-syntax syntax/parse))
(require (for-syntax racket/base))
(define-syntax role
(lambda (stx)
(syntax-parse stx
[(_ pre-eid topics-expr
2012-03-24 23:13:45 +00:00
#:state state-pattern
(~or (~optional (~seq #:on-presence presence) #:name "#:on-presence handler")
(~optional (~seq #:on-absence absence) #:name "#:on-absence handler")
(~optional (~seq #:topic topic) #:defaults ([topic #'t0]) #:name "#:topic")
(~optional (~seq #:reason reason) #:defaults ([reason #'r0]) #:name "#:reason"))
2012-03-24 23:13:45 +00:00
...
[message-pattern clause-body ...]
...)
(define-syntax-rule (build-handler args e-attr)
(if (not (attribute e-attr))
#'#f
#`(lambda args (match-lambda [state-pattern e-attr]))))
(with-syntax ([presence-handler (build-handler (topic) presence)]
[absence-handler (build-handler (topic reason) absence)]
[message-handler #'(lambda (topic message-body)
(lambda (state)
(match state
[state-pattern
(match message-body
[message-pattern clause-body ...]
...
[_ state])])))])
#'(add-role pre-eid
topics-expr
(handlers presence-handler absence-handler message-handler)))])))
2012-03-24 23:13:45 +00:00
;;---------------------------------------------------------------------------
;; Smarter constructors for transitions and preactions.
2012-03-24 20:01:15 +00:00
(define (make-transition state . actions) (transition state actions))
(define make-add-role add-role) ;; no special treatment required at present
(define (make-delete-role pre-eid [reason #f]) (delete-role pre-eid reason))
(define (make-send-message body [role 'publisher]) (send-message body role))
(define (make-spawn main [k #f]) (spawn main k))
2012-05-02 17:53:20 +00:00
(define (make-kill [pid #f] #:reason [reason #f]) (kill pid reason))
2012-03-24 20:01:15 +00:00
(define (extend-transition t . more-actions)
(match t
[(transition state actions) (transition state (list actions more-actions))]
[state (transition state more-actions)]))
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Topics and roles
(define (topic-publisher pattern #:virtual? [virtual? #f])
(topic 'publisher pattern virtual?))
(define (topic-subscriber pattern #:virtual? [virtual? #f])
(topic 'subscriber pattern virtual?))
(define (co-roles r)
(case r
[(publisher) '(subscriber)]
[(subscriber) '(publisher)]
[else #f]))
(define (co-topics t)
(for/list ([co-role (co-roles (topic-role t))])
(struct-copy topic t [role co-role])))
2012-03-19 18:28:34 +00:00
(define (topic-union . ts)
(unless (andmap topic? ts)
(error 'topic-union "Expects topics as arguments, but given ~v" ts))
(list->set ts))
2012-03-20 15:33:54 +00:00
(define (refine-topic remote-topic new-pattern)
(struct-copy topic remote-topic [pattern new-pattern]))
(define (roles-intersect? l r)
(memq l (co-roles r)))
;; Both left and right must be canonicalized.
(define (topic-intersection left right)
(and (roles-intersect? (topic-role left) (topic-role right))
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
;; True iff the flow between remote-topic and local-topic should be
;; visible to the local peer. This is the case when either local-topic
;; is virtual (in which case everything is seen) or otherwise if
;; remote-topic is also not virtual.
(define (flow-visible? local-topic remote-topic)
(or (topic-virtual? local-topic)
(not (topic-virtual? remote-topic))))
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Core virtualizable virtual machine.
2012-03-19 18:28:34 +00:00
(define (make-vm boot)
(vm (hash)
(hash)
2012-03-19 18:28:34 +00:00
0
(list (cons -1 (spawn boot #f)))))
2012-03-19 18:28:34 +00:00
(define (run-vm state)
(let loop ((remaining-actions (reverse (vm-pending-actions state)))
(state (struct-copy vm state [pending-actions '()]))
(outbound-actions '()))
2012-03-19 18:28:34 +00:00
(match remaining-actions
['()
(transition (collect-dead-processes state) (reverse outbound-actions))]
2012-03-19 18:28:34 +00:00
[(cons (cons pid action) rest)
2012-03-24 19:02:15 +00:00
(match action
[(at-meta-level preaction)
(define transformed-preaction (transform-meta-action pid preaction))
(loop rest state (cons transformed-preaction outbound-actions))]
2012-03-24 19:02:15 +00:00
[preaction
(loop rest (perform-action pid preaction state) outbound-actions)])])))
2012-03-19 18:28:34 +00:00
(define (collect-dead-processes state)
(struct-copy vm state
[processes (for/hash ([(pid p) (in-hash (vm-processes state))]
#:when (or (not (set-empty? (process-endpoints p)))
(ormap (lambda (entry) (= (car entry) pid))
(vm-pending-actions state))))
(values pid p))]))
2012-03-24 23:29:00 +00:00
(define (send-to-user failure-proc f . args)
(with-handlers ([exn:fail? failure-proc])
(apply f args)))
2012-03-19 18:28:34 +00:00
(define (ensure-topic-union t)
(cond [(topic? t) (set t)]
[(set? t) t]
[else
(error 'ensure-topic-union
"Expected either a single topic or a set of topics; got ~v"
t)]))
2012-03-24 19:02:15 +00:00
(define (perform-action pid preaction state)
(match preaction
[(add-role pre-eid topics hs) (do-subscribe pid pre-eid (ensure-topic-union topics) hs state)]
[(delete-role pre-eid reason) (do-unsubscribe pid pre-eid reason state)]
[(send-message body role) (route-and-deliver role body state)]
[(spawn main k) (do-spawn pid main k state)]
2012-03-24 19:58:45 +00:00
[(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)]))
(define (do-subscribe pid pre-eid topics hs state)
(cond
[(hash-has-key? (vm-processes state) pid)
(define new-eid (eid pid pre-eid))
(define new-endpoint (endpoint new-eid topics hs))
(let* ((state (notify-route-additions state new-endpoint))
(state (generic-update-process state pid (add-process-eid new-eid)))
(state (install-endpoint state new-eid new-endpoint)))
state)]
[else state]))
(define (generic-update-process state pid updater)
(struct-copy vm state [processes (hash-update (vm-processes state) pid updater)]))
(define ((add-process-eid new-eid) p)
(struct-copy process p [endpoints (set-add (process-endpoints p) new-eid)]))
(define ((remove-process-eid old-eid) p)
(struct-copy process p [endpoints (set-remove (process-endpoints p) old-eid)]))
(define (install-endpoint state new-eid new-endpoint)
(struct-copy vm state [endpoints (hash-set (vm-endpoints state) new-eid new-endpoint)]))
(define (uninstall-endpoint state old-eid)
(struct-copy vm state [endpoints (hash-remove (vm-endpoints state) old-eid)]))
(define (notify-route-additions state new-endpoint)
(match-define (endpoint (eid pid _) topics (handlers presence-handler _ _)) new-endpoint)
(for*/fold ([state state])
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[topic (in-set topics)]
[flow-pattern (in-value (topic-intersection topic matching-topic))]
#:when flow-pattern)
(define inbound-flow (refine-topic matching-topic flow-pattern))
(define outbound-flow (refine-topic topic flow-pattern))
(define e-presence-handler (handlers-presence (endpoint-handlers e)))
(let* ((state (if (flow-visible? topic inbound-flow)
(run-trapk state pid presence-handler inbound-flow)
state))
(state (if (flow-visible? matching-topic outbound-flow)
(run-trapk state matching-pid e-presence-handler outbound-flow)
state)))
state)))
(define (do-unsubscribe pid pre-eid reason state)
(define old-eid (eid pid pre-eid))
2012-03-24 19:59:52 +00:00
(cond
[(hash-has-key? (vm-endpoints state) old-eid)
(define old-endpoint (hash-ref (vm-endpoints state) old-eid))
(let* ((state (generic-update-process state pid (remove-process-eid old-eid)))
(state (uninstall-endpoint state old-eid))
(state (notify-route-deletions state old-endpoint reason)))
state)]
2012-03-24 19:59:52 +00:00
[else state]))
2012-03-20 15:33:54 +00:00
(define (notify-route-deletions state old-endpoint reason)
(define removed-topics (endpoint-topics old-endpoint))
(for*/fold ([state state])
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[removed-topic (in-set removed-topics)]
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
#:when flow-pattern)
(define outbound-flow (refine-topic removed-topic flow-pattern))
(define absence-handler (handlers-absence (endpoint-handlers e)))
(run-trapk state matching-pid absence-handler outbound-flow reason)))
(define (route-and-deliver role body state)
(define message-topic (topic role body #f))
(define endpoints
(for*/set ([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[flow-pattern (in-value (topic-intersection message-topic matching-topic))]
#:when flow-pattern)
e))
(for/fold ([state state]) ([e (in-set endpoints)])
(match-define (endpoint (eid pid _) _ (handlers _ _ message-handler)) e)
(run-trapk state pid message-handler message-topic body)))
(define (do-spawn spawning-pid main k state)
(define new-pid (vm-next-process-id state))
2012-03-25 01:18:19 +00:00
(match-define (transition initial-state initial-actions)
(cond
[(procedure? main) (send-to-user (lambda (e) (transition #f (kill #f e))) main new-pid)]
[(transition? main) main]))
(define initial-process (process new-pid initial-state (set)))
2012-03-25 01:18:19 +00:00
(define spawned-state
(struct-copy vm (enqueue-actions state new-pid initial-actions)
[processes (hash-set (vm-processes state) new-pid initial-process)]
2012-03-25 01:18:19 +00:00
[next-process-id (+ new-pid 1)]))
(run-trapk spawned-state spawning-pid k new-pid))
2012-05-02 17:53:52 +00:00
(define (print-kill pid-to-kill reason)
(cond
[(eq? reason #f) (printf "PID ~v exited normally~n" pid-to-kill)]
[(exn? reason) ((error-display-handler)
(format "PID ~v exited with exception~n~a" pid-to-kill (exn-message reason))
reason)]
[else (printf "PID ~v exited with reason: ~a~n" pid-to-kill reason)]))
2012-03-25 01:18:19 +00:00
(define (do-kill pid-to-kill reason state)
(cond
[(hash-has-key? (vm-processes state) pid-to-kill)
2012-05-02 17:53:52 +00:00
(print-kill pid-to-kill reason)
(define dying-endpoints (process-endpoints (hash-ref (vm-processes state) pid-to-kill)))
(let* ((state (for/fold ([state state]) ([eid (in-set dying-endpoints)])
(do-unsubscribe pid-to-kill (eid-pre-eid eid) reason state))))
(struct-copy vm state [processes (hash-remove (vm-processes state) pid-to-kill)]))]
2012-03-25 01:18:19 +00:00
[else state]))
(define (run-trapk state pid trap-k . args)
(if trap-k
2012-03-24 23:31:30 +00:00
(let ((failure-proc (lambda (e) (lambda (process-state)
2012-03-24 23:41:04 +00:00
(transition process-state (kill #f e))))))
2012-03-24 23:31:30 +00:00
(run-ready state pid (apply send-to-user failure-proc trap-k args)))
state))
(define (maybe-transition->transition t)
(cond [(transition? t) t]
[else (transition t '())]))
2012-03-20 15:33:54 +00:00
(define (run-ready state pid interrupt-k)
(define old-state (process-state (hash-ref (vm-processes state) pid)))
2012-03-24 23:41:04 +00:00
(match-define (transition new-state actions)
2012-03-24 23:31:30 +00:00
(maybe-transition->transition
2012-03-24 23:41:04 +00:00
(send-to-user (lambda (e) (transition old-state (kill #f e))) interrupt-k old-state)))
(generic-update-process (enqueue-actions state pid actions)
pid
(lambda (p) (struct-copy process p [state new-state]))))
2012-03-20 15:33:54 +00:00
(define (preaction? a)
(or (add-role? a)
(delete-role? a)
(send-message? a)
(spawn? a)
(kill? a)))
(define (action? a)
(or (preaction? a)
(and (at-meta-level? a)
(preaction? (at-meta-level-preaction a)))))
(define (valid-action? pid a)
(cond
[(action? a)]
[(eq? a #f) #f] ;; skip falses in action ConsTrees
[(void? a) #f] ;; skip voids in action ConsTrees
[else (log-warning (format "Illegal action ~v from pid ~v" a pid))
#f]))
2012-03-19 18:28:34 +00:00
(define (enqueue-actions state pid actions)
(define flat-actions (for/list ([a (flatten actions)] #:when (valid-action? pid a)) (cons pid a)))
2012-03-19 18:28:34 +00:00
(struct-copy vm state
[pending-actions (append (reverse flat-actions) (vm-pending-actions state))]))
2012-03-19 18:28:34 +00:00
2012-03-24 23:41:04 +00:00
(define (((wrap-trapk pid trapk) . args) state)
(apply run-trapk state pid trapk args))
2012-03-24 19:02:15 +00:00
(define (transform-meta-action pid preaction)
(match preaction
[(add-role pre-eid topics hs)
(add-role (eid pid pre-eid)
topics
2012-03-24 19:02:15 +00:00
(handlers (wrap-trapk pid (handlers-presence hs))
(wrap-trapk pid (handlers-absence hs))
(wrap-trapk pid (handlers-message hs))))]
[(delete-role pre-eid reason)
(delete-role (eid pid pre-eid) reason)]
[(? send-message? p) p]
[(spawn main k)
(spawn main (wrap-trapk pid k))]
[(? kill? p) p]))
2012-03-24 19:02:25 +00:00
2012-03-24 20:01:15 +00:00
(define (nested-vm boot)
(lambda (self-pid) (run-vm (make-vm boot))))
2012-03-24 19:02:25 +00:00
(define (ground-vm boot)
(let loop ((state (make-vm boot)))
(match (run-vm state)
[(transition state actions)
(when (not (null? actions))
2012-05-02 17:54:02 +00:00
(error 'ground-vm "Cannot process meta-actions because no further meta-level exists: ~v"
actions))
(define waiting? (null? (vm-pending-actions state)))
(define active-events
(for*/fold ([acc '()])
([(eid e) (in-hash (vm-endpoints state))]
[active-topic (in-set (endpoint-topics e))])
(match active-topic
[(topic 'subscriber (cons (? evt? evt) _) #f)
(cons (wrap-evt evt (lambda (message)
(lambda (state)
(route-and-deliver 'publisher
(cons evt message)
state))))
acc)]
[_ acc])))
(if (and waiting? (null? active-events))
'done ;; About to block, and nothing can wake us
(let ((interruptk (apply sync
(if waiting?
never-evt
(wrap-evt always-evt (lambda (dummy) values)))
active-events)))
(loop (interruptk state))))])))