racket-matrix-2012/os2.rkt

817 lines
30 KiB
Racket
Raw Normal View History

2012-03-19 18:28:34 +00:00
#lang racket/base
;; Virtualized operating system, this time with presence.
(require racket/set)
2012-03-19 18:28:34 +00:00
(require racket/match)
(require racket/contract)
2012-03-24 19:57:32 +00:00
(require (only-in racket/list flatten))
2012-03-20 15:33:54 +00:00
(require "unify.rkt")
(provide nested-vm-proc
nested-vm
2012-03-24 20:01:15 +00:00
ground-vm
(struct-out topic)
topic-publisher
topic-subscriber
co-roles
co-topics
topic-union
2012-03-24 20:01:15 +00:00
(struct-out boot-specification)
2012-03-24 20:01:15 +00:00
(struct-out handlers)
(except-out (struct-out transition) transition)
(rename-out [make-transition transition])
transition/c
state/c
action/c
action-tree/c
(contract-out (sequence-actions (->* (transition/c)
#:rest (listof (or/c action-tree/c
(state/c . -> . transition/c)))
transition/c)))
2012-03-24 20:01:15 +00:00
2012-03-24 23:13:45 +00:00
role
2012-05-03 20:31:05 +00:00
role/fresh
2012-03-24 20:01:15 +00:00
(except-out (struct-out add-role) add-role)
(rename-out [make-add-role add-role])
(except-out (struct-out delete-role) delete-role)
(rename-out [make-delete-role delete-role])
(except-out (struct-out send-message) send-message)
(rename-out [make-send-message send-message])
2012-07-03 19:49:58 +00:00
send-feedback ;; TODO: maybe call this "send-reply" or "send-as-subscriber"?
2012-03-24 20:01:15 +00:00
(except-out (struct-out spawn) spawn)
(rename-out [make-spawn spawn])
2012-08-13 20:49:24 +00:00
(except-out (struct-out quit) quit)
(rename-out [make-quit quit])
(except-out (struct-out yield) yield)
(rename-out [yield-macro yield])
(except-out (struct-out at-meta-level) at-meta-level)
(rename-out [make-at-meta-level at-meta-level])
;; Aliases of structures themselves, because of shadowing of
;; constructors/type-names.
(rename-out [topic <topic>])
(rename-out [handlers <handlers>])
(rename-out [transition <transition>])
(rename-out [add-role <add-role>])
(rename-out [delete-role <delete-role>])
(rename-out [send-message <send-message>])
(rename-out [spawn <spawn>])
2012-08-13 20:49:24 +00:00
(rename-out [quit <quit>])
(rename-out [yield <yield>])
(rename-out [at-meta-level <at-meta-level>])
2012-07-04 21:51:40 +00:00
(except-out (struct-out debug-name) debug-name)
(rename-out [make-debug-name debug-name])
(rename-out [debug-name <debug-name>])
(struct-out exit-signal)
2012-06-19 15:37:01 +00:00
;; Reexports from unify.rkt for convenience
wild
wild?
non-wild?
2012-06-19 21:27:17 +00:00
ground?
;; Identifier-syntax for (wild)
?
;; Reexports from racket/match for convenience
(all-from-out racket/match)
;; For debugging
current-ground-transition)
2012-03-24 20:01:15 +00:00
2012-03-20 15:33:54 +00:00
;; Endpoints are the units of deduplication.
;; Flows (in canonical form) are the units of presence.
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Data definitions
;; A PID is an (arbitrary) VM-unique process identifier. Concretely,
;; it's an integer.
;; A PreEID is an arbitrary, process-chosen-and-supplied identifier
;; for an endpoint. It is to be equal?-comparable. It is to be unique
;; within the scope of a single process.
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
;; it's an (eid PID PreEID). As a consequence of the scope of PreEIDs,
;; EIDs shouldn't be visible outside the scope of the owning process.
(struct eid (pid pre-eid) #:transparent)
2012-03-19 18:28:34 +00:00
;; One endpoint, one topic (which may be changed over time!), with the
;; caveat that as we are at present unable to represent true topic
;; unions, we actually store a *set* of topics against each
;; endpoint. The (current!) topic for the endpoint is to be taken as
;; the union of all the members in the set.
;; A Flow is a Topic that comes from the intersection of two dual
;; topics.
;; A sent message includes a "body" and a "role", and is equivalent to
;; a non-monitor topic with that role and with the given "body" as a
;; pattern. In a sense, topics quite literally are patterns over
;; entire messages.
2012-03-19 18:28:34 +00:00
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
(struct vm (name ;; Any - for debugging complex VM trees
processes ;; Hash<PID, Process>
endpoints ;; Hash<EID, Endpoint>
2012-03-19 18:28:34 +00:00
next-process-id ;; PID
pending-actions ;; QuasiQueue<(cons PID Action)>
) #:transparent)
;; An Endpoint is an (endpoint EID Set<Topic> Handlers), representing a
2012-03-24 23:41:04 +00:00
;; facet of a process responsible for playing a particular role (the
;; topic) in a conversation.
(struct endpoint (id topics handlers) #:transparent)
2012-03-24 23:41:04 +00:00
;; A Process is an Exists State . (process Any PID State Contract
;; ContractParty Set<EID> Set<EID>), representing a VM process and its
;; collection of active endpoints at this level and at the VM's
;; container's level. The (responsible-party) is the party that
;; generated the (state).
(struct process (name ;; Any
id ;; PID
state ;; State
state-contract ;; Contract
responsible-party ;; ContractParty
endpoints ;; Set<EID>
meta-endpoints ;; Set<EID>
) #:transparent)
2012-03-24 23:41:04 +00:00
;; An InterestType is one of
;; -- #f, representing an ordinary *participant* in a conversation;
;; -- #t, representing a *monitor* or *observer* of a conversation; or
;; -- 'everything, representing a monitor that is also interested in
;; the existence of other monitors of the conversation.
;; A Topic is a (topic Role Pattern InterestType), describing an Endpoint's
2012-03-24 23:41:04 +00:00
;; role in a conversation.
(struct topic (role pattern monitor?) #:prefab)
2012-03-19 18:28:34 +00:00
;; BootSpecification = BootK or (boot-specification BootK Contract)
(struct boot-specification (proc contract) #:transparent)
;; BootK = (PID -> Transition) or Transition
;; InterruptK = State -> Transition
;; TrapK<X> = X -> InterruptK
;; (handlers Maybe<TrapK<Topic>>
;; Maybe<TrapK<Topic * Reason>>
;; Maybe<TrapK<Topic * Message>>)
2012-03-20 15:33:54 +00:00
(struct handlers (presence absence message) #:transparent)
2012-03-19 18:28:34 +00:00
;; Transition = State or (transition State ConsTreeOf<Action>)
;;
2012-03-24 19:57:32 +00:00
;; actions is a plain old ordered ConsTreeOf<Action>, not a
;; QuasiQueue.
2012-03-19 18:28:34 +00:00
(struct transition (state actions) #:transparent)
;; Transition -> (transition ConsTreeOf<Action>)
(define (maybe-transition->transition t)
(cond [(transition? t) t]
[else
(define message (format "maybe-transition->transition: Expected transition; got ~v" t))
2012-08-13 20:49:24 +00:00
(transition #f (quit #f (exn:fail:contract message (current-continuation-marks))))]))
;; Preactions.
;; Ks are various TrapKs or #f, signifying lack of interest.
;;
;; (add-role PreEID (or Topic Set<Topic>) Handlers)
(struct add-role (pre-eid topics handlers) #:prefab)
;;
;; (delete-role PreEID Any)
(struct delete-role (pre-eid reason) #:prefab)
;;
;; (send-message Any Role)
(struct send-message (body role) #:prefab)
;;
;; (spawn BootSpecification Maybe<TrapK<PID>> Any)
(struct spawn (spec k debug-name) #:prefab)
;;
2012-08-13 20:49:24 +00:00
;; (quit Maybe<PID> Any)
(struct quit (pid reason) #:prefab)
2012-03-19 18:28:34 +00:00
(define (preaction? a)
(or (add-role? a)
(delete-role? a)
(send-message? a)
(spawn? a)
2012-08-13 20:49:24 +00:00
(quit? a)))
;; An Action is either a Preaction or a (yield InterruptK) or an
;; (at-meta-level Preaction) or an ignored placeholder (namely #f or
;; (void)).
(struct yield (k) #:prefab)
2012-03-19 18:28:34 +00:00
(struct at-meta-level (preaction) #:prefab)
(define (action? a)
(or (preaction? a)
(yield? a)
(eq? a #f)
(void? a)
(and (at-meta-level? a)
(preaction? (at-meta-level-preaction a)))))
2012-07-04 21:51:40 +00:00
;; A DebugName is a simple prefab struct that holds a collection of
;; arbitrary values used to help programmers identify processes by
;; something more mnemonic than a PID.
(struct debug-name (pieces) #:prefab)
(define (make-debug-name . pieces)
(debug-name pieces))
;; An ExitSignal instance describes the presence of a whole process, as a
;; convention.
;;
;; TODO: revisit the idea of points-of-attachment. There's an
;; intermediate network between the processes and the kernel, and
;; pid-level presence could be seen as object-level presence on that
;; network somehow.
(struct exit-signal (pid debug-name) #:prefab)
2012-03-24 20:01:15 +00:00
;;---------------------------------------------------------------------------
;; role & yield macros
2012-03-24 20:01:15 +00:00
2012-03-24 23:13:45 +00:00
(require (for-syntax syntax/parse))
(require (for-syntax racket/base))
2012-03-24 23:13:45 +00:00
(define-syntax role
(lambda (stx)
(syntax-parse stx
[(_ topics-expr
(~or (~optional (~seq #:name pre-eid) #:name "#:name of role")
(~optional (~seq #:state state-pattern) #:name "#:state pattern")
2012-07-16 21:01:35 +00:00
(~optional (~seq #:on-presence presence) #:name "#:on-presence handler")
2012-03-24 23:13:45 +00:00
(~optional (~seq #:on-absence absence) #:name "#:on-absence handler")
(~optional (~seq #:topic topic) #:defaults ([topic #'t0]) #:name "#:topic")
(~optional (~seq #:reason reason) #:defaults ([reason #'r0]) #:name "#:reason"))
2012-03-24 23:13:45 +00:00
...
[message-pattern clause-body ...]
...)
(define-syntax-rule (build-handler args e-attr)
2012-07-16 21:01:35 +00:00
(cond
[(not (attribute e-attr))
#'#f]
[(not (attribute state-pattern))
#`(lambda args (match-lambda [state (transition state e-attr)]))]
[else
#`(lambda args (match-lambda [state-pattern e-attr]))]))
(with-syntax ([presence-handler (build-handler (topic) presence)]
[absence-handler (build-handler (topic reason) absence)]
2012-07-16 21:01:35 +00:00
[message-handler
(if (not (attribute state-pattern))
#'(lambda (topic message-body)
(lambda (state)
(transition state
(match message-body
[message-pattern clause-body ...]
...
[_ '()]))))
#'(lambda (topic message-body)
(lambda (state)
(match state
[state-pattern
(match message-body
[message-pattern clause-body ...]
...
[_ (make-transition state)])]))))])
#`(add-role #,(if (attribute pre-eid)
#'pre-eid
#'(gensym 'anonymous-role))
topics-expr
(handlers presence-handler absence-handler message-handler)))])))
2012-03-24 23:13:45 +00:00
2012-07-16 21:01:16 +00:00
;; Invents a role name for you, and binds it to pre-eid-var.
(define-syntax-rule (role/fresh pre-eid-var topics-expr rest ...)
2012-05-03 20:31:05 +00:00
(let ((pre-eid-var (gensym 'role)))
(role topics-expr #:name pre-eid-var rest ...)))
2012-05-03 20:31:05 +00:00
(define-syntax-rule (yield-macro #:state state-pattern body ...)
(yield (match-lambda [state-pattern body ...])))
;; A fresh unification variable, as identifier-syntax.
(define-syntax ? (syntax-id-rules () (_ (wild))))
;;---------------------------------------------------------------------------
;; Smarter constructors for transitions and preactions.
2012-03-24 20:01:15 +00:00
(define (make-transition state . actions) (transition state actions))
(define transition/c (or/c transition? any/c))
(define state/c (not/c transition?))
(define action/c action?)
(define action-tree/c (flat-rec-contract action-tree/c
2012-07-04 20:28:19 +00:00
(or/c action/c
null?
(cons/c action-tree/c action-tree/c))))
(define make-add-role add-role) ;; no special treatment required at present
(define (make-delete-role pre-eid [reason #f]) (delete-role pre-eid reason))
(define (make-send-message body [role 'publisher]) (send-message body role))
2012-08-13 20:49:24 +00:00
(define (make-quit [pid #f] #:reason [reason #f]) (quit pid reason))
2012-03-24 20:01:15 +00:00
(define (make-at-meta-level . actions)
(match actions
[(cons action '()) (at-meta-level action)]
[_ (map at-meta-level actions)]))
2012-07-03 19:49:58 +00:00
(define (send-feedback body) (make-send-message body 'subscriber))
(define (make-spawn raw-spec [k #f]
#:exit-signal? [exit-signal? #f]
#:debug-name [debug-name #f]
#:state-contract [state-contract any/c])
(match-define (boot-specification raw-main raw-contract)
(cond [(boot-specification? raw-spec) raw-spec]
[else (boot-specification raw-spec any/c)]))
(define maybe-monitored-main
(if exit-signal?
(let ((unmonitored-main (if (procedure? raw-main) raw-main (lambda (self-pid) raw-main))))
(lambda (self-pid)
(define m (exit-signal self-pid debug-name))
(sequence-actions (unmonitored-main self-pid)
(role (topic-publisher m) #:name (list 'canary m)))))
raw-main))
(define final-contract
(cond [(eq? raw-contract any/c) state-contract]
[(eq? state-contract any/c) raw-contract]
[else (error 'spawn
"Cannot apply #:state-contract to already-contracted boot-specification")]))
(define spec
(if (eq? final-contract any/c)
maybe-monitored-main
(boot-specification maybe-monitored-main final-contract)))
(spawn spec k debug-name))
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Topics and roles
(define (topic-publisher pattern #:monitor? [monitor? #f])
(topic 'publisher pattern monitor?))
2012-03-19 18:28:34 +00:00
(define (topic-subscriber pattern #:monitor? [monitor? #f])
(topic 'subscriber pattern monitor?))
2012-03-19 18:28:34 +00:00
2012-06-19 15:37:10 +00:00
;; TODO: Ideally this would be extensible; roles like 'debug-listener,
;; 'logger etc exist.
2012-03-19 18:28:34 +00:00
(define (co-roles r)
(case r
[(publisher) '(subscriber)]
[(subscriber) '(publisher)]
[else #f]))
(define (co-topics t)
(for/list ([co-role (co-roles (topic-role t))])
(struct-copy topic t [role co-role])))
2012-03-19 18:28:34 +00:00
(define (topic-union . ts)
(unless (andmap topic? ts)
(error 'topic-union "Expects topics as arguments, but given ~v" ts))
(list->set ts))
2012-03-20 15:33:54 +00:00
(define (refine-topic remote-topic new-pattern)
(struct-copy topic remote-topic [pattern new-pattern]))
(define (roles-intersect? l r)
(memq l (co-roles r)))
;; Both left and right must be canonicalized.
(define (topic-intersection left right)
(and (roles-intersect? (topic-role left) (topic-role right))
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
;; True iff the flow between remote-topic and local-topic should be
;; visible to the local peer. This is the case when either local-topic
;; is monitoring 'everything or otherwise if remote-topic is an
;; ordinary topic only.
;;
;; |--------------+--------------+------------------------|
;; | local-topic | remote-topic | visible to local peer? |
;; |--------------+--------------+------------------------|
;; | #f | #f | yes |
;; | #f | #t | no |
;; | #f | 'everything | no |
;; | #t | #f | yes |
;; | #t | #t | no |
;; | #t | 'everything | no |
;; | 'everything | #f | yes |
;; | 'everything | #t | yes |
;; | 'everything | 'everything | yes |
;; |--------------+--------------+------------------------|
;;
(define (flow-visible? local-topic remote-topic)
(or (not (topic-monitor? remote-topic))
(eq? (topic-monitor? local-topic) 'everything)))
;;---------------------------------------------------------------------------
;; Composing state transitions and action emissions.
(define (sequence-actions t . more-actions-and-transformers)
(match-define (transition initial-state initial-actions) t)
(let loop ((state initial-state)
(actions initial-actions)
(items more-actions-and-transformers))
(match items
['()
(transition state actions)]
[(cons (? procedure? transformer) remaining-items)
(match-define (transition new-state more-actions) (transformer state))
(loop new-state
(cons actions more-actions)
remaining-items)]
[(cons additional-action-or-actions remaining-items)
(loop state
(cons actions additional-action-or-actions)
remaining-items)])))
2012-03-19 18:28:34 +00:00
;;---------------------------------------------------------------------------
;; Core virtualizable virtual machine.
2012-03-19 18:28:34 +00:00
(define (vm-boot-parameters->boot-specification params)
(match params
[(list (? transition? t)) t]
[(list (? procedure? p)) p]
[(? list? actions) (transition (void) actions)]))
(define (make-vm name boot)
(vm name
(hash)
(hash)
2012-03-19 18:28:34 +00:00
0
(list (cons -1 (spawn boot #f 'primordial-process)))))
2012-03-19 18:28:34 +00:00
(define (run-vm state)
(let loop ((remaining-actions (reverse (vm-pending-actions state)))
(state (struct-copy vm state [pending-actions '()]))
(outbound-actions '()))
2012-03-19 18:28:34 +00:00
(match remaining-actions
['()
(let ((state (collect-dead-processes state)))
(transition state (reverse (if (vm-idle? state)
outbound-actions
(cons (yield run-vm) outbound-actions)))))]
2012-03-19 18:28:34 +00:00
[(cons (cons pid action) rest)
2012-03-24 19:02:15 +00:00
(match action
[(at-meta-level preaction)
(let-values (((transformed-preaction state) (transform-meta-action pid preaction state)))
(loop rest state (cons transformed-preaction outbound-actions)))]
[(yield k)
(loop rest
(if (hash-has-key? (vm-processes state) pid)
(run-ready state pid 'yield k)
state)
outbound-actions)]
2012-03-24 19:02:15 +00:00
[preaction
(let-values (((new-outbound-actions-rev state) (perform-action pid preaction state)))
(loop rest state (append new-outbound-actions-rev outbound-actions)))])])))
2012-03-19 18:28:34 +00:00
(define (vm-idle? state)
(null? (vm-pending-actions state)))
(define (collect-dead-processes state)
;; dns-read-driver is being collected because it only has a metarole.%%%
(define (process-alive? pid p)
(or (not (set-empty? (process-endpoints p)))
(not (set-empty? (process-meta-endpoints p)))
(ormap (lambda (entry) (= (car entry) pid))
(vm-pending-actions state))))
(struct-copy vm state
[processes (for/hash ([(pid p) (in-hash (vm-processes state))]
#:when (or (process-alive? pid p)
(begin
(log-info
(format "~a PID ~v (~a) garbage-collected"
(vm-name state)
pid
(process-name p)))
#f)))
(values pid p))]))
2012-03-24 23:29:00 +00:00
(define (send-to-user failure-proc f . args)
(with-handlers ([exn:fail? failure-proc])
(apply f args)))
2012-03-19 18:28:34 +00:00
(define (ensure-topic-union t)
(cond [(topic? t) (set t)]
[(set? t) t]
[else
(error 'ensure-topic-union
"Expected either a single topic or a set of topics; got ~v"
t)]))
2012-03-24 19:02:15 +00:00
(define (perform-action pid preaction state)
(match preaction
[(add-role pre-eid topics hs)
(values '() (do-subscribe pid pre-eid (ensure-topic-union topics) hs state))]
[(delete-role pre-eid reason)
(values '() (do-unsubscribe pid pre-eid reason state))]
[(send-message body role)
(values '() (route-and-deliver role body state))]
[(spawn spec k debug-name)
(values '() (do-spawn pid spec k debug-name state))]
2012-08-13 20:49:24 +00:00
[(quit pid-to-quit reason)
(do-quit (or pid-to-quit pid) reason state)]))
(define (topics-equal? ta tb)
;; TODO: OK, if we had a couple of simple topics here, we'd be done
;; by asking (and (specialization? ta tb) (specialization? tb ta)),
;; but because we have sets of implicitly-unioned topics, things get
;; jolly awkward. For now, we punt, trusting the user to not supply
;; an incompatible set of topics on an endpoint update. This is
;; definitely an interim position: full presence will require a
;; serious treatment of topic unions via anti-unification.
#t)
(define (do-subscribe pid pre-eid topics hs state)
(cond
[(hash-has-key? (vm-processes state) pid)
(define new-eid (eid pid pre-eid))
(define old-endpoint (hash-ref (vm-endpoints state) new-eid #f))
(define new-endpoint (endpoint new-eid topics hs))
(if old-endpoint
;; We are *updating* an existing endpoint's behaviour.
(if (topics-equal? (endpoint-topics old-endpoint)
(endpoint-topics new-endpoint))
(let* ((state (install-endpoint state new-eid new-endpoint)))
state)
(error 'do-subscribe
"Topics must be equal when updating an endpoint: ~v vs ~v"
old-endpoint
new-endpoint))
;; We are installing a *new* endpoint.
(let* ((state (notify-route-additions state new-endpoint))
(state (generic-update-process state pid (add-process-eid new-eid)))
(state (install-endpoint state new-eid new-endpoint)))
state))]
[else state]))
(define (generic-update-process state pid updater)
(struct-copy vm state [processes (hash-update (vm-processes state) pid updater)]))
(define ((add-process-eid new-eid) p)
(struct-copy process p [endpoints (set-add (process-endpoints p) new-eid)]))
(define ((add-process-meta-eid new-eid) p)
(struct-copy process p [meta-endpoints (set-add (process-meta-endpoints p) new-eid)]))
(define ((remove-process-eid old-eid) p)
(struct-copy process p [endpoints (set-remove (process-endpoints p) old-eid)]))
(define ((remove-process-meta-eid old-eid) p)
(struct-copy process p [meta-endpoints (set-remove (process-meta-endpoints p) old-eid)]))
(define (install-endpoint state new-eid new-endpoint)
(struct-copy vm state [endpoints (hash-set (vm-endpoints state) new-eid new-endpoint)]))
(define (uninstall-endpoint state old-eid)
(struct-copy vm state [endpoints (hash-remove (vm-endpoints state) old-eid)]))
(define (notify-route-additions state new-endpoint)
(match-define (endpoint (eid pid _) topics (handlers presence-handler _ _)) new-endpoint)
(for*/fold ([state state])
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[topic (in-set topics)]
[flow-pattern (in-value (topic-intersection topic matching-topic))]
#:when flow-pattern)
(define inbound-flow (refine-topic matching-topic flow-pattern))
(define outbound-flow (refine-topic topic flow-pattern))
(define e-presence-handler (handlers-presence (endpoint-handlers e)))
(let* ((state (if (flow-visible? topic inbound-flow)
(run-trapk state pid eid presence-handler inbound-flow)
state))
(state (if (flow-visible? matching-topic outbound-flow)
(run-trapk state matching-pid matching-eid e-presence-handler outbound-flow)
state)))
state)))
(define (do-unsubscribe pid pre-eid reason state)
(define old-eid (eid pid pre-eid))
2012-03-24 19:59:52 +00:00
(cond
[(hash-has-key? (vm-endpoints state) old-eid)
(define old-endpoint (hash-ref (vm-endpoints state) old-eid))
(let* ((state (generic-update-process state pid (remove-process-eid old-eid)))
(state (uninstall-endpoint state old-eid))
(state (notify-route-deletions state old-endpoint reason)))
state)]
2012-03-24 19:59:52 +00:00
[else state]))
2012-03-20 15:33:54 +00:00
(define (notify-route-deletions state old-endpoint reason)
(define removed-topics (endpoint-topics old-endpoint))
(for*/fold ([state state])
([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[removed-topic (in-set removed-topics)]
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
#:when flow-pattern)
(define outbound-flow (refine-topic removed-topic flow-pattern))
(define absence-handler (handlers-absence (endpoint-handlers e)))
(run-trapk state matching-pid matching-eid absence-handler outbound-flow reason)))
(define (route-and-deliver role body state)
(define message-topic (topic role body #f))
(define endpoints
(for*/set ([(matching-pid p) (in-hash (vm-processes state))]
[matching-eid (in-set (process-endpoints p))]
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
[matching-topics (in-value (endpoint-topics e))]
[matching-topic (in-set matching-topics)]
[flow-pattern (in-value (topic-intersection message-topic matching-topic))]
#:when flow-pattern)
e))
(for/fold ([state state]) ([e (in-set endpoints)])
(match-define (endpoint (eid pid _) _ (handlers _ _ message-handler)) e)
(run-trapk state pid eid message-handler message-topic body)))
(define (do-spawn spawning-pid spec k debug-name state)
(define new-pid (vm-next-process-id state))
(define new-name (or debug-name new-pid))
(define-values (main state-contract)
(match spec
[(boot-specification main state-contract) (values main state-contract)]
[main (values main any/c)]))
2012-03-25 01:18:19 +00:00
(match-define (transition initial-state initial-actions)
(maybe-transition->transition
(cond
2012-08-13 20:49:24 +00:00
[(procedure? main) (send-to-user (lambda (e) (transition #f (quit #f e))) main new-pid)]
[else main])))
(define initial-process (process new-name
new-pid
initial-state
state-contract
(list 'spawn 'main debug-name) ;; TODO: use contract-regions
(set)
(set)))
2012-03-25 01:18:19 +00:00
(define spawned-state
(struct-copy vm (enqueue-actions state new-pid initial-actions)
[processes (hash-set (vm-processes state) new-pid initial-process)]
2012-03-25 01:18:19 +00:00
[next-process-id (+ new-pid 1)]))
(log-info (format "~a PID ~v (~a) started" (vm-name state) new-pid new-name))
(run-trapk spawned-state spawning-pid (list 'spawn 'k debug-name) k new-pid))
2012-03-25 01:18:19 +00:00
2012-08-13 20:49:24 +00:00
(define (print-quit vm-name pid-to-quit process-name reason)
2012-05-02 17:53:52 +00:00
(cond
[(eq? reason #f) (log-info (format "~a PID ~v (~a) exited normally"
vm-name
2012-08-13 20:49:24 +00:00
pid-to-quit
process-name))]
[(exn? reason) (begin ((error-display-handler)
(format "~a PID ~v (~a) exited with exception~n~a"
vm-name
2012-08-13 20:49:24 +00:00
pid-to-quit
process-name
(exn-message reason))
reason)
(flush-output (current-output-port)))]
[else (log-info (format "~a PID ~v (~a) exited with reason: ~a"
vm-name
2012-08-13 20:49:24 +00:00
pid-to-quit
process-name
reason))]))
2012-05-02 17:53:52 +00:00
2012-08-13 20:49:24 +00:00
(define (do-quit pid-to-quit reason state)
2012-03-25 01:18:19 +00:00
(cond
2012-08-13 20:49:24 +00:00
[(hash-has-key? (vm-processes state) pid-to-quit)
(define dying-process (hash-ref (vm-processes state) pid-to-quit))
(print-quit (vm-name state) pid-to-quit (process-name dying-process) reason)
(let* ((state (for/fold ([state state]) ([eid (in-set (process-endpoints dying-process))])
2012-08-13 20:49:24 +00:00
(do-unsubscribe pid-to-quit (eid-pre-eid eid) reason state)))
(new-outbound-actions (for/list ([eid (in-set (process-meta-endpoints dying-process))])
(delete-role eid reason))))
(values new-outbound-actions
2012-08-13 20:49:24 +00:00
(struct-copy vm state [processes (hash-remove (vm-processes state) pid-to-quit)])))]
[else (values '() state)]))
2012-03-25 01:18:19 +00:00
(define (run-trapk state pid new-party trap-k . args)
(if trap-k
2012-03-24 23:31:30 +00:00
(let ((failure-proc (lambda (e) (lambda (process-state)
2012-08-13 20:49:24 +00:00
(transition process-state (quit #f e))))))
(run-ready state pid new-party (apply send-to-user failure-proc trap-k args)))
state))
(define (run-ready state pid new-party interrupt-k)
(match-define (process _ _ old-state state-contract old-party _ _)
(hash-ref (vm-processes state) pid))
2012-03-24 23:41:04 +00:00
(match-define (transition new-state actions)
2012-03-24 23:31:30 +00:00
(maybe-transition->transition
2012-08-13 20:49:24 +00:00
(send-to-user (lambda (e) (transition old-state (quit #f e)))
(lambda () (interrupt-k (contract state-contract
old-state
old-party
new-party
(list (vm-name state) 'pid pid 'state)
#f))))))
(generic-update-process (enqueue-actions state pid actions)
pid
(lambda (p) (struct-copy process p
[state new-state]
[responsible-party new-party]))))
2012-03-20 15:33:54 +00:00
(define (valid-action? pid a)
(cond
[(eq? a #f) #f] ;; skip falses in action ConsTrees
[(void? a) #f] ;; skip voids in action ConsTrees
[(action? a)]
[else (log-warning (format "Illegal action ~v from pid ~v" a pid))
#f]))
2012-03-19 18:28:34 +00:00
(define (enqueue-actions state pid actions)
(define flat-actions (for/list ([a (flatten actions)] #:when (valid-action? pid a)) (cons pid a)))
2012-03-19 18:28:34 +00:00
(struct-copy vm state
[pending-actions (append (reverse flat-actions) (vm-pending-actions state))]))
2012-03-19 18:28:34 +00:00
(define (((wrap-trapk pid new-party trapk) . args) state)
(if (hash-has-key? (vm-processes state) pid)
(run-vm (apply run-trapk state pid new-party trapk args))
(make-transition state)))
2012-03-24 19:02:15 +00:00
(define (transform-meta-action pid preaction state)
2012-03-24 19:02:15 +00:00
(match preaction
[(add-role pre-eid topics hs)
(define new-eid (eid pid pre-eid))
(values (add-role new-eid
topics
(handlers (wrap-trapk pid new-eid (handlers-presence hs))
(wrap-trapk pid new-eid (handlers-absence hs))
(wrap-trapk pid new-eid (handlers-message hs))))
(if (hash-has-key? (vm-processes state) pid)
(generic-update-process state pid (add-process-meta-eid new-eid))
state))]
[(delete-role pre-eid reason)
(define old-eid (eid pid pre-eid))
(values (delete-role old-eid reason)
(if (hash-has-key? (vm-processes state) pid)
(generic-update-process state pid (remove-process-meta-eid old-eid))
state))]
[(? send-message? p)
(values p state)]
[(spawn spec k debug-name)
(values (spawn spec
(wrap-trapk pid (list 'meta-spawn 'k debug-name) k)
debug-name)
state)]
2012-08-13 20:49:24 +00:00
[(? quit? p)
(values p state)]))
(define-syntax nested-vm
(lambda (stx)
(syntax-parse stx
[(_ (~or (~optional (~seq #:debug-name debug-name) #:name "#:debug-name of nested-vm")
(~optional (~seq #:pid pid) #:name "#:pid variable name"))
...
boot-param ...)
#`(nested-vm-proc #,(if (attribute debug-name)
#'debug-name
#'(gensym 'nested-vm))
(lambda (#,(if (attribute pid) #'pid #'dummy-pid))
(vm-boot-parameters->boot-specification (list boot-param ...))))])))
(define (nested-vm-proc name boot)
(boot-specification (lambda (self-pid) (run-vm (make-vm name boot)))
vm?))
2012-03-24 19:02:25 +00:00
(define (ground-vm . boot-params)
(let loop ((state (make-vm 'ground-vm (vm-boot-parameters->boot-specification boot-params))))
(match (let ((t (run-vm state))) (set! current-ground-transition t) t)
[(transition state actions)
(define is-blocking?
(match actions
['() #t] ;; no "yield" action -> certainly blocking
[(list (yield _)) #f] ;; single "yield", with k statically known to be run-vm -> poll
[_ (error 'ground-vm
"Cannot process meta-actions ~v because no further metalevel exists"
actions)]))
(define active-events
(for*/fold ([acc '()])
([(eid e) (in-hash (vm-endpoints state))]
[active-topic (in-set (endpoint-topics e))])
(match active-topic
[(topic 'subscriber (cons (? evt? evt) _) #f)
(define ((evt-handler message) state)
(route-and-deliver 'publisher (cons evt message) state))
(cons (wrap-evt evt evt-handler) acc)]
[_ acc])))
(if (and is-blocking? (null? active-events))
'done ;; Not polling, and no events that could wake us from blocking, so quit
(let ((interruptk (apply sync
(if is-blocking?
never-evt
(wrap-evt always-evt (lambda (dummy) values)))
active-events)))
(loop (interruptk state))))])))
(define current-ground-transition #f)