817 lines
No EOL
30 KiB
Racket
817 lines
No EOL
30 KiB
Racket
#lang racket/base
|
|
;; Virtualized operating system, this time with presence.
|
|
|
|
(require racket/set)
|
|
(require racket/match)
|
|
(require racket/contract)
|
|
(require (only-in racket/list flatten))
|
|
(require "unify.rkt")
|
|
|
|
(provide nested-vm-proc
|
|
nested-vm
|
|
ground-vm
|
|
|
|
(struct-out topic)
|
|
topic-publisher
|
|
topic-subscriber
|
|
co-roles
|
|
co-topics
|
|
topic-union
|
|
|
|
(struct-out boot-specification)
|
|
|
|
(struct-out handlers)
|
|
|
|
(except-out (struct-out transition) transition)
|
|
(rename-out [make-transition transition])
|
|
transition/c
|
|
state/c
|
|
action/c
|
|
action-tree/c
|
|
|
|
(contract-out (sequence-actions (->* (transition/c)
|
|
#:rest (listof (or/c action-tree/c
|
|
(state/c . -> . transition/c)))
|
|
transition/c)))
|
|
|
|
role
|
|
role/fresh
|
|
(except-out (struct-out add-role) add-role)
|
|
(rename-out [make-add-role add-role])
|
|
(except-out (struct-out delete-role) delete-role)
|
|
(rename-out [make-delete-role delete-role])
|
|
(except-out (struct-out send-message) send-message)
|
|
(rename-out [make-send-message send-message])
|
|
send-feedback ;; TODO: maybe call this "send-reply" or "send-as-subscriber"?
|
|
(except-out (struct-out spawn) spawn)
|
|
(rename-out [make-spawn spawn])
|
|
(except-out (struct-out quit) quit)
|
|
(rename-out [make-quit quit])
|
|
(except-out (struct-out yield) yield)
|
|
(rename-out [yield-macro yield])
|
|
(except-out (struct-out at-meta-level) at-meta-level)
|
|
(rename-out [make-at-meta-level at-meta-level])
|
|
|
|
;; Aliases of structures themselves, because of shadowing of
|
|
;; constructors/type-names.
|
|
(rename-out [topic <topic>])
|
|
(rename-out [handlers <handlers>])
|
|
(rename-out [transition <transition>])
|
|
(rename-out [add-role <add-role>])
|
|
(rename-out [delete-role <delete-role>])
|
|
(rename-out [send-message <send-message>])
|
|
(rename-out [spawn <spawn>])
|
|
(rename-out [quit <quit>])
|
|
(rename-out [yield <yield>])
|
|
(rename-out [at-meta-level <at-meta-level>])
|
|
|
|
(except-out (struct-out debug-name) debug-name)
|
|
(rename-out [make-debug-name debug-name])
|
|
(rename-out [debug-name <debug-name>])
|
|
|
|
(struct-out exit-signal)
|
|
|
|
;; Reexports from unify.rkt for convenience
|
|
wild
|
|
wild?
|
|
non-wild?
|
|
ground?
|
|
|
|
;; Identifier-syntax for (wild)
|
|
?
|
|
|
|
;; Reexports from racket/match for convenience
|
|
(all-from-out racket/match)
|
|
|
|
;; For debugging
|
|
current-ground-transition)
|
|
|
|
;; Endpoints are the units of deduplication.
|
|
;; Flows (in canonical form) are the units of presence.
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; Data definitions
|
|
|
|
;; A PID is an (arbitrary) VM-unique process identifier. Concretely,
|
|
;; it's an integer.
|
|
|
|
;; A PreEID is an arbitrary, process-chosen-and-supplied identifier
|
|
;; for an endpoint. It is to be equal?-comparable. It is to be unique
|
|
;; within the scope of a single process.
|
|
|
|
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
|
|
;; it's an (eid PID PreEID). As a consequence of the scope of PreEIDs,
|
|
;; EIDs shouldn't be visible outside the scope of the owning process.
|
|
(struct eid (pid pre-eid) #:transparent)
|
|
|
|
;; One endpoint, one topic (which may be changed over time!), with the
|
|
;; caveat that as we are at present unable to represent true topic
|
|
;; unions, we actually store a *set* of topics against each
|
|
;; endpoint. The (current!) topic for the endpoint is to be taken as
|
|
;; the union of all the members in the set.
|
|
|
|
;; A Flow is a Topic that comes from the intersection of two dual
|
|
;; topics.
|
|
|
|
;; A sent message includes a "body" and a "role", and is equivalent to
|
|
;; a non-monitor topic with that role and with the given "body" as a
|
|
;; pattern. In a sense, topics quite literally are patterns over
|
|
;; entire messages.
|
|
|
|
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
|
|
|
|
(struct vm (name ;; Any - for debugging complex VM trees
|
|
processes ;; Hash<PID, Process>
|
|
endpoints ;; Hash<EID, Endpoint>
|
|
next-process-id ;; PID
|
|
pending-actions ;; QuasiQueue<(cons PID Action)>
|
|
) #:transparent)
|
|
|
|
;; An Endpoint is an (endpoint EID Set<Topic> Handlers), representing a
|
|
;; facet of a process responsible for playing a particular role (the
|
|
;; topic) in a conversation.
|
|
(struct endpoint (id topics handlers) #:transparent)
|
|
|
|
;; A Process is an Exists State . (process Any PID State Contract
|
|
;; ContractParty Set<EID> Set<EID>), representing a VM process and its
|
|
;; collection of active endpoints at this level and at the VM's
|
|
;; container's level. The (responsible-party) is the party that
|
|
;; generated the (state).
|
|
(struct process (name ;; Any
|
|
id ;; PID
|
|
state ;; State
|
|
state-contract ;; Contract
|
|
responsible-party ;; ContractParty
|
|
endpoints ;; Set<EID>
|
|
meta-endpoints ;; Set<EID>
|
|
) #:transparent)
|
|
|
|
;; An InterestType is one of
|
|
;; -- #f, representing an ordinary *participant* in a conversation;
|
|
;; -- #t, representing a *monitor* or *observer* of a conversation; or
|
|
;; -- 'everything, representing a monitor that is also interested in
|
|
;; the existence of other monitors of the conversation.
|
|
|
|
;; A Topic is a (topic Role Pattern InterestType), describing an Endpoint's
|
|
;; role in a conversation.
|
|
(struct topic (role pattern monitor?) #:prefab)
|
|
|
|
;; BootSpecification = BootK or (boot-specification BootK Contract)
|
|
(struct boot-specification (proc contract) #:transparent)
|
|
|
|
;; BootK = (PID -> Transition) or Transition
|
|
;; InterruptK = State -> Transition
|
|
;; TrapK<X> = X -> InterruptK
|
|
|
|
;; (handlers Maybe<TrapK<Topic>>
|
|
;; Maybe<TrapK<Topic * Reason>>
|
|
;; Maybe<TrapK<Topic * Message>>)
|
|
(struct handlers (presence absence message) #:transparent)
|
|
|
|
;; Transition = State or (transition State ConsTreeOf<Action>)
|
|
;;
|
|
;; actions is a plain old ordered ConsTreeOf<Action>, not a
|
|
;; QuasiQueue.
|
|
(struct transition (state actions) #:transparent)
|
|
|
|
;; Transition -> (transition ConsTreeOf<Action>)
|
|
(define (maybe-transition->transition t)
|
|
(cond [(transition? t) t]
|
|
[else
|
|
(define message (format "maybe-transition->transition: Expected transition; got ~v" t))
|
|
(transition #f (quit #f (exn:fail:contract message (current-continuation-marks))))]))
|
|
|
|
;; Preactions.
|
|
;; Ks are various TrapKs or #f, signifying lack of interest.
|
|
;;
|
|
;; (add-role PreEID (or Topic Set<Topic>) Handlers)
|
|
(struct add-role (pre-eid topics handlers) #:prefab)
|
|
;;
|
|
;; (delete-role PreEID Any)
|
|
(struct delete-role (pre-eid reason) #:prefab)
|
|
;;
|
|
;; (send-message Any Role)
|
|
(struct send-message (body role) #:prefab)
|
|
;;
|
|
;; (spawn BootSpecification Maybe<TrapK<PID>> Any)
|
|
(struct spawn (spec k debug-name) #:prefab)
|
|
;;
|
|
;; (quit Maybe<PID> Any)
|
|
(struct quit (pid reason) #:prefab)
|
|
|
|
(define (preaction? a)
|
|
(or (add-role? a)
|
|
(delete-role? a)
|
|
(send-message? a)
|
|
(spawn? a)
|
|
(quit? a)))
|
|
|
|
;; An Action is either a Preaction or a (yield InterruptK) or an
|
|
;; (at-meta-level Preaction) or an ignored placeholder (namely #f or
|
|
;; (void)).
|
|
(struct yield (k) #:prefab)
|
|
(struct at-meta-level (preaction) #:prefab)
|
|
|
|
(define (action? a)
|
|
(or (preaction? a)
|
|
(yield? a)
|
|
(eq? a #f)
|
|
(void? a)
|
|
(and (at-meta-level? a)
|
|
(preaction? (at-meta-level-preaction a)))))
|
|
|
|
;; A DebugName is a simple prefab struct that holds a collection of
|
|
;; arbitrary values used to help programmers identify processes by
|
|
;; something more mnemonic than a PID.
|
|
(struct debug-name (pieces) #:prefab)
|
|
|
|
(define (make-debug-name . pieces)
|
|
(debug-name pieces))
|
|
|
|
;; An ExitSignal instance describes the presence of a whole process, as a
|
|
;; convention.
|
|
;;
|
|
;; TODO: revisit the idea of points-of-attachment. There's an
|
|
;; intermediate network between the processes and the kernel, and
|
|
;; pid-level presence could be seen as object-level presence on that
|
|
;; network somehow.
|
|
(struct exit-signal (pid debug-name) #:prefab)
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; role & yield macros
|
|
|
|
(require (for-syntax syntax/parse))
|
|
(require (for-syntax racket/base))
|
|
|
|
(define-syntax role
|
|
(lambda (stx)
|
|
(syntax-parse stx
|
|
[(_ topics-expr
|
|
(~or (~optional (~seq #:name pre-eid) #:name "#:name of role")
|
|
(~optional (~seq #:state state-pattern) #:name "#:state pattern")
|
|
(~optional (~seq #:on-presence presence) #:name "#:on-presence handler")
|
|
(~optional (~seq #:on-absence absence) #:name "#:on-absence handler")
|
|
(~optional (~seq #:topic topic) #:defaults ([topic #'t0]) #:name "#:topic")
|
|
(~optional (~seq #:reason reason) #:defaults ([reason #'r0]) #:name "#:reason"))
|
|
...
|
|
[message-pattern clause-body ...]
|
|
...)
|
|
(define-syntax-rule (build-handler args e-attr)
|
|
(cond
|
|
[(not (attribute e-attr))
|
|
#'#f]
|
|
[(not (attribute state-pattern))
|
|
#`(lambda args (match-lambda [state (transition state e-attr)]))]
|
|
[else
|
|
#`(lambda args (match-lambda [state-pattern e-attr]))]))
|
|
(with-syntax ([presence-handler (build-handler (topic) presence)]
|
|
[absence-handler (build-handler (topic reason) absence)]
|
|
[message-handler
|
|
(if (not (attribute state-pattern))
|
|
#'(lambda (topic message-body)
|
|
(lambda (state)
|
|
(transition state
|
|
(match message-body
|
|
[message-pattern clause-body ...]
|
|
...
|
|
[_ '()]))))
|
|
#'(lambda (topic message-body)
|
|
(lambda (state)
|
|
(match state
|
|
[state-pattern
|
|
(match message-body
|
|
[message-pattern clause-body ...]
|
|
...
|
|
[_ (make-transition state)])]))))])
|
|
#`(add-role #,(if (attribute pre-eid)
|
|
#'pre-eid
|
|
#'(gensym 'anonymous-role))
|
|
topics-expr
|
|
(handlers presence-handler absence-handler message-handler)))])))
|
|
|
|
;; Invents a role name for you, and binds it to pre-eid-var.
|
|
(define-syntax-rule (role/fresh pre-eid-var topics-expr rest ...)
|
|
(let ((pre-eid-var (gensym 'role)))
|
|
(role topics-expr #:name pre-eid-var rest ...)))
|
|
|
|
(define-syntax-rule (yield-macro #:state state-pattern body ...)
|
|
(yield (match-lambda [state-pattern body ...])))
|
|
|
|
;; A fresh unification variable, as identifier-syntax.
|
|
(define-syntax ? (syntax-id-rules () (_ (wild))))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; Smarter constructors for transitions and preactions.
|
|
|
|
(define (make-transition state . actions) (transition state actions))
|
|
|
|
(define transition/c (or/c transition? any/c))
|
|
(define state/c (not/c transition?))
|
|
(define action/c action?)
|
|
(define action-tree/c (flat-rec-contract action-tree/c
|
|
(or/c action/c
|
|
null?
|
|
(cons/c action-tree/c action-tree/c))))
|
|
|
|
(define make-add-role add-role) ;; no special treatment required at present
|
|
(define (make-delete-role pre-eid [reason #f]) (delete-role pre-eid reason))
|
|
(define (make-send-message body [role 'publisher]) (send-message body role))
|
|
(define (make-quit [pid #f] #:reason [reason #f]) (quit pid reason))
|
|
|
|
(define (make-at-meta-level . actions)
|
|
(match actions
|
|
[(cons action '()) (at-meta-level action)]
|
|
[_ (map at-meta-level actions)]))
|
|
|
|
(define (send-feedback body) (make-send-message body 'subscriber))
|
|
|
|
(define (make-spawn raw-spec [k #f]
|
|
#:exit-signal? [exit-signal? #f]
|
|
#:debug-name [debug-name #f]
|
|
#:state-contract [state-contract any/c])
|
|
(match-define (boot-specification raw-main raw-contract)
|
|
(cond [(boot-specification? raw-spec) raw-spec]
|
|
[else (boot-specification raw-spec any/c)]))
|
|
(define maybe-monitored-main
|
|
(if exit-signal?
|
|
(let ((unmonitored-main (if (procedure? raw-main) raw-main (lambda (self-pid) raw-main))))
|
|
(lambda (self-pid)
|
|
(define m (exit-signal self-pid debug-name))
|
|
(sequence-actions (unmonitored-main self-pid)
|
|
(role (topic-publisher m) #:name (list 'canary m)))))
|
|
raw-main))
|
|
(define final-contract
|
|
(cond [(eq? raw-contract any/c) state-contract]
|
|
[(eq? state-contract any/c) raw-contract]
|
|
[else (error 'spawn
|
|
"Cannot apply #:state-contract to already-contracted boot-specification")]))
|
|
(define spec
|
|
(if (eq? final-contract any/c)
|
|
maybe-monitored-main
|
|
(boot-specification maybe-monitored-main final-contract)))
|
|
(spawn spec k debug-name))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; Topics and roles
|
|
|
|
(define (topic-publisher pattern #:monitor? [monitor? #f])
|
|
(topic 'publisher pattern monitor?))
|
|
|
|
(define (topic-subscriber pattern #:monitor? [monitor? #f])
|
|
(topic 'subscriber pattern monitor?))
|
|
|
|
;; TODO: Ideally this would be extensible; roles like 'debug-listener,
|
|
;; 'logger etc exist.
|
|
(define (co-roles r)
|
|
(case r
|
|
[(publisher) '(subscriber)]
|
|
[(subscriber) '(publisher)]
|
|
[else #f]))
|
|
|
|
(define (co-topics t)
|
|
(for/list ([co-role (co-roles (topic-role t))])
|
|
(struct-copy topic t [role co-role])))
|
|
|
|
(define (topic-union . ts)
|
|
(unless (andmap topic? ts)
|
|
(error 'topic-union "Expects topics as arguments, but given ~v" ts))
|
|
(list->set ts))
|
|
|
|
(define (refine-topic remote-topic new-pattern)
|
|
(struct-copy topic remote-topic [pattern new-pattern]))
|
|
|
|
(define (roles-intersect? l r)
|
|
(memq l (co-roles r)))
|
|
|
|
;; Both left and right must be canonicalized.
|
|
(define (topic-intersection left right)
|
|
(and (roles-intersect? (topic-role left) (topic-role right))
|
|
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
|
|
|
|
;; True iff the flow between remote-topic and local-topic should be
|
|
;; visible to the local peer. This is the case when either local-topic
|
|
;; is monitoring 'everything or otherwise if remote-topic is an
|
|
;; ordinary topic only.
|
|
;;
|
|
;; |--------------+--------------+------------------------|
|
|
;; | local-topic | remote-topic | visible to local peer? |
|
|
;; |--------------+--------------+------------------------|
|
|
;; | #f | #f | yes |
|
|
;; | #f | #t | no |
|
|
;; | #f | 'everything | no |
|
|
;; | #t | #f | yes |
|
|
;; | #t | #t | no |
|
|
;; | #t | 'everything | no |
|
|
;; | 'everything | #f | yes |
|
|
;; | 'everything | #t | yes |
|
|
;; | 'everything | 'everything | yes |
|
|
;; |--------------+--------------+------------------------|
|
|
;;
|
|
(define (flow-visible? local-topic remote-topic)
|
|
(or (not (topic-monitor? remote-topic))
|
|
(eq? (topic-monitor? local-topic) 'everything)))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; Composing state transitions and action emissions.
|
|
|
|
(define (sequence-actions t . more-actions-and-transformers)
|
|
(match-define (transition initial-state initial-actions) t)
|
|
(let loop ((state initial-state)
|
|
(actions initial-actions)
|
|
(items more-actions-and-transformers))
|
|
(match items
|
|
['()
|
|
(transition state actions)]
|
|
[(cons (? procedure? transformer) remaining-items)
|
|
(match-define (transition new-state more-actions) (transformer state))
|
|
(loop new-state
|
|
(cons actions more-actions)
|
|
remaining-items)]
|
|
[(cons additional-action-or-actions remaining-items)
|
|
(loop state
|
|
(cons actions additional-action-or-actions)
|
|
remaining-items)])))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; Core virtualizable virtual machine.
|
|
|
|
(define (vm-boot-parameters->boot-specification params)
|
|
(match params
|
|
[(list (? transition? t)) t]
|
|
[(list (? procedure? p)) p]
|
|
[(? list? actions) (transition (void) actions)]))
|
|
|
|
(define (make-vm name boot)
|
|
(vm name
|
|
(hash)
|
|
(hash)
|
|
0
|
|
(list (cons -1 (spawn boot #f 'primordial-process)))))
|
|
|
|
(define (run-vm state)
|
|
(let loop ((remaining-actions (reverse (vm-pending-actions state)))
|
|
(state (struct-copy vm state [pending-actions '()]))
|
|
(outbound-actions '()))
|
|
(match remaining-actions
|
|
['()
|
|
(let ((state (collect-dead-processes state)))
|
|
(transition state (reverse (if (vm-idle? state)
|
|
outbound-actions
|
|
(cons (yield run-vm) outbound-actions)))))]
|
|
[(cons (cons pid action) rest)
|
|
(match action
|
|
[(at-meta-level preaction)
|
|
(let-values (((transformed-preaction state) (transform-meta-action pid preaction state)))
|
|
(loop rest state (cons transformed-preaction outbound-actions)))]
|
|
[(yield k)
|
|
(loop rest
|
|
(if (hash-has-key? (vm-processes state) pid)
|
|
(run-ready state pid 'yield k)
|
|
state)
|
|
outbound-actions)]
|
|
[preaction
|
|
(let-values (((new-outbound-actions-rev state) (perform-action pid preaction state)))
|
|
(loop rest state (append new-outbound-actions-rev outbound-actions)))])])))
|
|
|
|
(define (vm-idle? state)
|
|
(null? (vm-pending-actions state)))
|
|
|
|
(define (collect-dead-processes state)
|
|
;; dns-read-driver is being collected because it only has a metarole.%%%
|
|
(define (process-alive? pid p)
|
|
(or (not (set-empty? (process-endpoints p)))
|
|
(not (set-empty? (process-meta-endpoints p)))
|
|
(ormap (lambda (entry) (= (car entry) pid))
|
|
(vm-pending-actions state))))
|
|
(struct-copy vm state
|
|
[processes (for/hash ([(pid p) (in-hash (vm-processes state))]
|
|
#:when (or (process-alive? pid p)
|
|
(begin
|
|
(log-info
|
|
(format "~a PID ~v (~a) garbage-collected"
|
|
(vm-name state)
|
|
pid
|
|
(process-name p)))
|
|
#f)))
|
|
(values pid p))]))
|
|
|
|
(define (send-to-user failure-proc f . args)
|
|
(with-handlers ([exn:fail? failure-proc])
|
|
(apply f args)))
|
|
|
|
(define (ensure-topic-union t)
|
|
(cond [(topic? t) (set t)]
|
|
[(set? t) t]
|
|
[else
|
|
(error 'ensure-topic-union
|
|
"Expected either a single topic or a set of topics; got ~v"
|
|
t)]))
|
|
|
|
(define (perform-action pid preaction state)
|
|
(match preaction
|
|
[(add-role pre-eid topics hs)
|
|
(values '() (do-subscribe pid pre-eid (ensure-topic-union topics) hs state))]
|
|
[(delete-role pre-eid reason)
|
|
(values '() (do-unsubscribe pid pre-eid reason state))]
|
|
[(send-message body role)
|
|
(values '() (route-and-deliver role body state))]
|
|
[(spawn spec k debug-name)
|
|
(values '() (do-spawn pid spec k debug-name state))]
|
|
[(quit pid-to-quit reason)
|
|
(do-quit (or pid-to-quit pid) reason state)]))
|
|
|
|
(define (topics-equal? ta tb)
|
|
;; TODO: OK, if we had a couple of simple topics here, we'd be done
|
|
;; by asking (and (specialization? ta tb) (specialization? tb ta)),
|
|
;; but because we have sets of implicitly-unioned topics, things get
|
|
;; jolly awkward. For now, we punt, trusting the user to not supply
|
|
;; an incompatible set of topics on an endpoint update. This is
|
|
;; definitely an interim position: full presence will require a
|
|
;; serious treatment of topic unions via anti-unification.
|
|
#t)
|
|
|
|
(define (do-subscribe pid pre-eid topics hs state)
|
|
(cond
|
|
[(hash-has-key? (vm-processes state) pid)
|
|
(define new-eid (eid pid pre-eid))
|
|
(define old-endpoint (hash-ref (vm-endpoints state) new-eid #f))
|
|
(define new-endpoint (endpoint new-eid topics hs))
|
|
(if old-endpoint
|
|
;; We are *updating* an existing endpoint's behaviour.
|
|
(if (topics-equal? (endpoint-topics old-endpoint)
|
|
(endpoint-topics new-endpoint))
|
|
(let* ((state (install-endpoint state new-eid new-endpoint)))
|
|
state)
|
|
(error 'do-subscribe
|
|
"Topics must be equal when updating an endpoint: ~v vs ~v"
|
|
old-endpoint
|
|
new-endpoint))
|
|
;; We are installing a *new* endpoint.
|
|
(let* ((state (notify-route-additions state new-endpoint))
|
|
(state (generic-update-process state pid (add-process-eid new-eid)))
|
|
(state (install-endpoint state new-eid new-endpoint)))
|
|
state))]
|
|
[else state]))
|
|
|
|
(define (generic-update-process state pid updater)
|
|
(struct-copy vm state [processes (hash-update (vm-processes state) pid updater)]))
|
|
|
|
(define ((add-process-eid new-eid) p)
|
|
(struct-copy process p [endpoints (set-add (process-endpoints p) new-eid)]))
|
|
|
|
(define ((add-process-meta-eid new-eid) p)
|
|
(struct-copy process p [meta-endpoints (set-add (process-meta-endpoints p) new-eid)]))
|
|
|
|
(define ((remove-process-eid old-eid) p)
|
|
(struct-copy process p [endpoints (set-remove (process-endpoints p) old-eid)]))
|
|
|
|
(define ((remove-process-meta-eid old-eid) p)
|
|
(struct-copy process p [meta-endpoints (set-remove (process-meta-endpoints p) old-eid)]))
|
|
|
|
(define (install-endpoint state new-eid new-endpoint)
|
|
(struct-copy vm state [endpoints (hash-set (vm-endpoints state) new-eid new-endpoint)]))
|
|
|
|
(define (uninstall-endpoint state old-eid)
|
|
(struct-copy vm state [endpoints (hash-remove (vm-endpoints state) old-eid)]))
|
|
|
|
(define (notify-route-additions state new-endpoint)
|
|
(match-define (endpoint (eid pid _) topics (handlers presence-handler _ _)) new-endpoint)
|
|
(for*/fold ([state state])
|
|
([(matching-pid p) (in-hash (vm-processes state))]
|
|
[matching-eid (in-set (process-endpoints p))]
|
|
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
|
[matching-topics (in-value (endpoint-topics e))]
|
|
[matching-topic (in-set matching-topics)]
|
|
[topic (in-set topics)]
|
|
[flow-pattern (in-value (topic-intersection topic matching-topic))]
|
|
#:when flow-pattern)
|
|
(define inbound-flow (refine-topic matching-topic flow-pattern))
|
|
(define outbound-flow (refine-topic topic flow-pattern))
|
|
(define e-presence-handler (handlers-presence (endpoint-handlers e)))
|
|
(let* ((state (if (flow-visible? topic inbound-flow)
|
|
(run-trapk state pid eid presence-handler inbound-flow)
|
|
state))
|
|
(state (if (flow-visible? matching-topic outbound-flow)
|
|
(run-trapk state matching-pid matching-eid e-presence-handler outbound-flow)
|
|
state)))
|
|
state)))
|
|
|
|
(define (do-unsubscribe pid pre-eid reason state)
|
|
(define old-eid (eid pid pre-eid))
|
|
(cond
|
|
[(hash-has-key? (vm-endpoints state) old-eid)
|
|
(define old-endpoint (hash-ref (vm-endpoints state) old-eid))
|
|
(let* ((state (generic-update-process state pid (remove-process-eid old-eid)))
|
|
(state (uninstall-endpoint state old-eid))
|
|
(state (notify-route-deletions state old-endpoint reason)))
|
|
state)]
|
|
[else state]))
|
|
|
|
(define (notify-route-deletions state old-endpoint reason)
|
|
(define removed-topics (endpoint-topics old-endpoint))
|
|
(for*/fold ([state state])
|
|
([(matching-pid p) (in-hash (vm-processes state))]
|
|
[matching-eid (in-set (process-endpoints p))]
|
|
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
|
[matching-topics (in-value (endpoint-topics e))]
|
|
[matching-topic (in-set matching-topics)]
|
|
[removed-topic (in-set removed-topics)]
|
|
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
|
|
#:when flow-pattern)
|
|
(define outbound-flow (refine-topic removed-topic flow-pattern))
|
|
(define absence-handler (handlers-absence (endpoint-handlers e)))
|
|
(run-trapk state matching-pid matching-eid absence-handler outbound-flow reason)))
|
|
|
|
(define (route-and-deliver role body state)
|
|
(define message-topic (topic role body #f))
|
|
(define endpoints
|
|
(for*/set ([(matching-pid p) (in-hash (vm-processes state))]
|
|
[matching-eid (in-set (process-endpoints p))]
|
|
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
|
[matching-topics (in-value (endpoint-topics e))]
|
|
[matching-topic (in-set matching-topics)]
|
|
[flow-pattern (in-value (topic-intersection message-topic matching-topic))]
|
|
#:when flow-pattern)
|
|
e))
|
|
(for/fold ([state state]) ([e (in-set endpoints)])
|
|
(match-define (endpoint (eid pid _) _ (handlers _ _ message-handler)) e)
|
|
(run-trapk state pid eid message-handler message-topic body)))
|
|
|
|
(define (do-spawn spawning-pid spec k debug-name state)
|
|
(define new-pid (vm-next-process-id state))
|
|
(define new-name (or debug-name new-pid))
|
|
(define-values (main state-contract)
|
|
(match spec
|
|
[(boot-specification main state-contract) (values main state-contract)]
|
|
[main (values main any/c)]))
|
|
(match-define (transition initial-state initial-actions)
|
|
(maybe-transition->transition
|
|
(cond
|
|
[(procedure? main) (send-to-user (lambda (e) (transition #f (quit #f e))) main new-pid)]
|
|
[else main])))
|
|
(define initial-process (process new-name
|
|
new-pid
|
|
initial-state
|
|
state-contract
|
|
(list 'spawn 'main debug-name) ;; TODO: use contract-regions
|
|
(set)
|
|
(set)))
|
|
(define spawned-state
|
|
(struct-copy vm (enqueue-actions state new-pid initial-actions)
|
|
[processes (hash-set (vm-processes state) new-pid initial-process)]
|
|
[next-process-id (+ new-pid 1)]))
|
|
(log-info (format "~a PID ~v (~a) started" (vm-name state) new-pid new-name))
|
|
(run-trapk spawned-state spawning-pid (list 'spawn 'k debug-name) k new-pid))
|
|
|
|
(define (print-quit vm-name pid-to-quit process-name reason)
|
|
(cond
|
|
[(eq? reason #f) (log-info (format "~a PID ~v (~a) exited normally"
|
|
vm-name
|
|
pid-to-quit
|
|
process-name))]
|
|
[(exn? reason) (begin ((error-display-handler)
|
|
(format "~a PID ~v (~a) exited with exception~n~a"
|
|
vm-name
|
|
pid-to-quit
|
|
process-name
|
|
(exn-message reason))
|
|
reason)
|
|
(flush-output (current-output-port)))]
|
|
[else (log-info (format "~a PID ~v (~a) exited with reason: ~a"
|
|
vm-name
|
|
pid-to-quit
|
|
process-name
|
|
reason))]))
|
|
|
|
(define (do-quit pid-to-quit reason state)
|
|
(cond
|
|
[(hash-has-key? (vm-processes state) pid-to-quit)
|
|
(define dying-process (hash-ref (vm-processes state) pid-to-quit))
|
|
(print-quit (vm-name state) pid-to-quit (process-name dying-process) reason)
|
|
(let* ((state (for/fold ([state state]) ([eid (in-set (process-endpoints dying-process))])
|
|
(do-unsubscribe pid-to-quit (eid-pre-eid eid) reason state)))
|
|
(new-outbound-actions (for/list ([eid (in-set (process-meta-endpoints dying-process))])
|
|
(delete-role eid reason))))
|
|
(values new-outbound-actions
|
|
(struct-copy vm state [processes (hash-remove (vm-processes state) pid-to-quit)])))]
|
|
[else (values '() state)]))
|
|
|
|
(define (run-trapk state pid new-party trap-k . args)
|
|
(if trap-k
|
|
(let ((failure-proc (lambda (e) (lambda (process-state)
|
|
(transition process-state (quit #f e))))))
|
|
(run-ready state pid new-party (apply send-to-user failure-proc trap-k args)))
|
|
state))
|
|
|
|
(define (run-ready state pid new-party interrupt-k)
|
|
(match-define (process _ _ old-state state-contract old-party _ _)
|
|
(hash-ref (vm-processes state) pid))
|
|
(match-define (transition new-state actions)
|
|
(maybe-transition->transition
|
|
(send-to-user (lambda (e) (transition old-state (quit #f e)))
|
|
(lambda () (interrupt-k (contract state-contract
|
|
old-state
|
|
old-party
|
|
new-party
|
|
(list (vm-name state) 'pid pid 'state)
|
|
#f))))))
|
|
(generic-update-process (enqueue-actions state pid actions)
|
|
pid
|
|
(lambda (p) (struct-copy process p
|
|
[state new-state]
|
|
[responsible-party new-party]))))
|
|
|
|
(define (valid-action? pid a)
|
|
(cond
|
|
[(eq? a #f) #f] ;; skip falses in action ConsTrees
|
|
[(void? a) #f] ;; skip voids in action ConsTrees
|
|
[(action? a)]
|
|
[else (log-warning (format "Illegal action ~v from pid ~v" a pid))
|
|
#f]))
|
|
|
|
(define (enqueue-actions state pid actions)
|
|
(define flat-actions (for/list ([a (flatten actions)] #:when (valid-action? pid a)) (cons pid a)))
|
|
(struct-copy vm state
|
|
[pending-actions (append (reverse flat-actions) (vm-pending-actions state))]))
|
|
|
|
(define (((wrap-trapk pid new-party trapk) . args) state)
|
|
(if (hash-has-key? (vm-processes state) pid)
|
|
(run-vm (apply run-trapk state pid new-party trapk args))
|
|
(make-transition state)))
|
|
|
|
(define (transform-meta-action pid preaction state)
|
|
(match preaction
|
|
[(add-role pre-eid topics hs)
|
|
(define new-eid (eid pid pre-eid))
|
|
(values (add-role new-eid
|
|
topics
|
|
(handlers (wrap-trapk pid new-eid (handlers-presence hs))
|
|
(wrap-trapk pid new-eid (handlers-absence hs))
|
|
(wrap-trapk pid new-eid (handlers-message hs))))
|
|
(if (hash-has-key? (vm-processes state) pid)
|
|
(generic-update-process state pid (add-process-meta-eid new-eid))
|
|
state))]
|
|
[(delete-role pre-eid reason)
|
|
(define old-eid (eid pid pre-eid))
|
|
(values (delete-role old-eid reason)
|
|
(if (hash-has-key? (vm-processes state) pid)
|
|
(generic-update-process state pid (remove-process-meta-eid old-eid))
|
|
state))]
|
|
[(? send-message? p)
|
|
(values p state)]
|
|
[(spawn spec k debug-name)
|
|
(values (spawn spec
|
|
(wrap-trapk pid (list 'meta-spawn 'k debug-name) k)
|
|
debug-name)
|
|
state)]
|
|
[(? quit? p)
|
|
(values p state)]))
|
|
|
|
(define-syntax nested-vm
|
|
(lambda (stx)
|
|
(syntax-parse stx
|
|
[(_ (~or (~optional (~seq #:debug-name debug-name) #:name "#:debug-name of nested-vm")
|
|
(~optional (~seq #:pid pid) #:name "#:pid variable name"))
|
|
...
|
|
boot-param ...)
|
|
#`(nested-vm-proc #,(if (attribute debug-name)
|
|
#'debug-name
|
|
#'(gensym 'nested-vm))
|
|
(lambda (#,(if (attribute pid) #'pid #'dummy-pid))
|
|
(vm-boot-parameters->boot-specification (list boot-param ...))))])))
|
|
|
|
(define (nested-vm-proc name boot)
|
|
(boot-specification (lambda (self-pid) (run-vm (make-vm name boot)))
|
|
vm?))
|
|
|
|
(define (ground-vm . boot-params)
|
|
(let loop ((state (make-vm 'ground-vm (vm-boot-parameters->boot-specification boot-params))))
|
|
(match (let ((t (run-vm state))) (set! current-ground-transition t) t)
|
|
[(transition state actions)
|
|
(define is-blocking?
|
|
(match actions
|
|
['() #t] ;; no "yield" action -> certainly blocking
|
|
[(list (yield _)) #f] ;; single "yield", with k statically known to be run-vm -> poll
|
|
[_ (error 'ground-vm
|
|
"Cannot process meta-actions ~v because no further metalevel exists"
|
|
actions)]))
|
|
(define active-events
|
|
(for*/fold ([acc '()])
|
|
([(eid e) (in-hash (vm-endpoints state))]
|
|
[active-topic (in-set (endpoint-topics e))])
|
|
(match active-topic
|
|
[(topic 'subscriber (cons (? evt? evt) _) #f)
|
|
(define ((evt-handler message) state)
|
|
(route-and-deliver 'publisher (cons evt message) state))
|
|
(cons (wrap-evt evt evt-handler) acc)]
|
|
[_ acc])))
|
|
(if (and is-blocking? (null? active-events))
|
|
'done ;; Not polling, and no events that could wake us from blocking, so quit
|
|
(let ((interruptk (apply sync
|
|
(if is-blocking?
|
|
never-evt
|
|
(wrap-evt always-evt (lambda (dummy) values)))
|
|
active-events)))
|
|
(loop (interruptk state))))])))
|
|
|
|
(define current-ground-transition #f) |