Use process-allocated endpoint-identifiers. Add a timer driver and example.
This commit is contained in:
parent
2e7aa6ee1f
commit
bb24f19317
|
@ -1,3 +1,6 @@
|
|||
;; Emacs indent settings
|
||||
(mapcar #'(lambda (x) (put x 'scheme-indent-function 1))
|
||||
'(transition extend-transition role))
|
||||
(progn
|
||||
(mapcar #'(lambda (x) (put x 'scheme-indent-function 1))
|
||||
'(transition extend-transition))
|
||||
(mapcar #'(lambda (x) (put x 'scheme-indent-function 2))
|
||||
'(role)))
|
|
@ -2,28 +2,29 @@
|
|||
;; Trivial demonstration of an os2.rkt virtual machine.
|
||||
|
||||
(require "os2.rkt")
|
||||
(require "os2-timer.rkt")
|
||||
(require racket/pretty)
|
||||
|
||||
(define (super-alarm msecs)
|
||||
(wrap-evt (alarm-evt msecs) (lambda (_) (current-inexact-milliseconds))))
|
||||
|
||||
(define (sleep n k)
|
||||
(transition k
|
||||
(role (topic-subscriber (super-alarm (+ (current-inexact-milliseconds) n)))
|
||||
#:state k
|
||||
#:id id
|
||||
#:on-ready (begin (write `(ready ,id)) (newline) k)
|
||||
[now (extend-transition (k) (delete-role id))])))
|
||||
(define (sleep state n k)
|
||||
(define id (gensym 'sleep-id))
|
||||
(transition state
|
||||
(send-message (set-timer id n 'relative))
|
||||
(role 'sleeper (topic-subscriber (timer-expired id (wild)))
|
||||
#:state state
|
||||
[(timer-expired (== id) now)
|
||||
(extend-transition (k state) (delete-role id))])))
|
||||
|
||||
(define (example-process delay)
|
||||
(write `(sleeping for ,delay milliseconds))
|
||||
(newline)
|
||||
(sleep delay (lambda ()
|
||||
(if (> delay 1500)
|
||||
(error 'example-process "Oh noes!")
|
||||
(begin (write `(awoke after ,delay milliseconds))
|
||||
(newline)
|
||||
(transition 'no-state (kill)))))))
|
||||
(sleep 'no-state
|
||||
delay
|
||||
(lambda (state)
|
||||
(if (> delay 1500)
|
||||
(error 'example-process "Oh noes!")
|
||||
(begin (write `(awoke after ,delay milliseconds))
|
||||
(newline)
|
||||
(transition state (kill)))))))
|
||||
|
||||
(define spy
|
||||
(lambda (spy-pid)
|
||||
|
@ -33,11 +34,12 @@
|
|||
(w 'absence)
|
||||
(w 'message)))
|
||||
(transition 'spy-state
|
||||
(add-role (topic-publisher (wild) #:virtual? #t) (hs 'subscriber->publisher))
|
||||
(add-role (topic-subscriber (wild) #:virtual? #t) (hs 'publisher->subscriber)))))
|
||||
(add-role 's->p (topic-publisher (wild) #:virtual? #t) (hs 'subscriber->publisher))
|
||||
(add-role 'p->s (topic-subscriber (wild) #:virtual? #t) (hs 'publisher->subscriber)))))
|
||||
|
||||
(ground-vm (lambda (boot-pid)
|
||||
(transition 'no-state
|
||||
(spawn (timer-driver 'example-timer-driver))
|
||||
(spawn spy)
|
||||
(spawn (lambda (pid) (example-process 1000)))
|
||||
(spawn (lambda (pid) (example-process 2000)))
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
#lang racket/base
|
||||
;; Timer drivers for os2.rkt.
|
||||
|
||||
;; Uses mutable state internally, but because the scope of the
|
||||
;; mutation is limited to each timer process alone, it's easy to show
|
||||
;; correct linear use of the various pointers.
|
||||
|
||||
(require data/heap)
|
||||
(require racket/match)
|
||||
(require "os2.rkt")
|
||||
|
||||
(provide (struct-out set-timer)
|
||||
(struct-out timer-expired)
|
||||
timer-driver
|
||||
timer-relay)
|
||||
|
||||
;; (set-timer Any Number (or 'relative 'absolute))
|
||||
;; The timer driver and timer relays listen for messages of this type,
|
||||
;; and when they hear one, they set an alarm that will later send a
|
||||
;; corresponding timer-expired message.
|
||||
(struct set-timer (label msecs kind) #:prefab)
|
||||
|
||||
;; (timer-expired Any Number)
|
||||
;; Message sent by the timer driver or a timer relay upon expiry of a
|
||||
;; timer. Contains the label specified in the corresponding set-timer
|
||||
;; message, and also the current absolute time from the outside world.
|
||||
(struct timer-expired (label msecs) #:prefab)
|
||||
|
||||
;; (pending-timer AbsoluteSeconds Any Boolean)
|
||||
;; An outstanding timer being managed by the timer-driver.
|
||||
(struct pending-timer (deadline label) #:transparent)
|
||||
|
||||
;; (driver-state Symbol Maybe<EID> Heap<PendingTimer>)
|
||||
;; State of a timer-driver, including the identifier of the driver,
|
||||
;; the currently-active subscription to ground time events (if any),
|
||||
;; and the heap of all remaining timers.
|
||||
(struct driver-state (self-id heap) #:transparent)
|
||||
|
||||
;; (relay-state ExactPositiveInteger Hash<ExactPositiveInteger,Any>)
|
||||
;; State of a timer-relay, including the next timer number and a
|
||||
;; mapping from timer number to timer label.
|
||||
(struct relay-state (next-counter active-timers) #:transparent)
|
||||
|
||||
;; Note that (set-timer 'current-time 0 #f) causes an immediate reply
|
||||
;; of (timer-expired 'current-time (current-inexact-milliseconds)),
|
||||
;; which can be used for an event-oriented interface to reading the
|
||||
;; system clock.
|
||||
|
||||
;; Racket's alarm-evt is almost the right design for timeouts: its
|
||||
;; synchronisation value should be the (or some) value of the clock
|
||||
;; after the asked-for time. That way it serves as timeout and
|
||||
;; clock-reader in one.
|
||||
(define (timer-evt msecs)
|
||||
(wrap-evt (alarm-evt msecs)
|
||||
(lambda (_) (current-inexact-milliseconds))))
|
||||
|
||||
;; -> Heap<PendingTimer>
|
||||
(define (make-timer-heap)
|
||||
(make-heap (lambda (t1 t2) (<= (pending-timer-deadline t1) (pending-timer-deadline t2)))))
|
||||
|
||||
;; Heap<PendingTimer> -> Maybe<PendingTimer>
|
||||
;; Retrieves the earliest-deadline timer from the heap, if there is
|
||||
;; one.
|
||||
(define (next-timer! heap)
|
||||
(if (zero? (heap-count heap))
|
||||
#f
|
||||
(heap-min heap)))
|
||||
|
||||
;; Heap<PendingTimer> AbsoluteSeconds -> ListOf<TimerExpired>
|
||||
;; Retrieves (and removes) all timers from the heap that have deadline
|
||||
;; earlier or equal to the time passed in.
|
||||
(define (fire-timers! heap now)
|
||||
(if (zero? (heap-count heap))
|
||||
'()
|
||||
(let ((m (heap-min heap)))
|
||||
(if (<= (pending-timer-deadline m) now)
|
||||
(begin (heap-remove-min! heap)
|
||||
(cons (send-message (timer-expired (pending-timer-label m) now))
|
||||
(fire-timers! heap now)))
|
||||
'()))))
|
||||
|
||||
;; [Symbol] -> BootK
|
||||
;; Process for mapping this-level timer requests to ground-level timer
|
||||
;; events and back.
|
||||
(define (timer-driver self-id)
|
||||
(transition (driver-state self-id (make-timer-heap))
|
||||
(role 'relay-down (topic-subscriber (set-timer (wild) (wild) (wild)))
|
||||
#:state state
|
||||
[(set-timer label msecs 'relative)
|
||||
(install-timer! state label (+ (current-inexact-milliseconds) msecs))]
|
||||
[(set-timer label msecs 'absolute)
|
||||
(install-timer! state label msecs)])))
|
||||
|
||||
;; DriverState Any AbsoluteSeconds -> Transition
|
||||
(define (install-timer! state label deadline)
|
||||
(heap-add! (driver-state-heap state) (pending-timer deadline label))
|
||||
(update-time-listener! state))
|
||||
|
||||
;; DriverState -> Transition
|
||||
(define (update-time-listener! state)
|
||||
(define next (next-timer! (driver-state-heap state)))
|
||||
(transition state
|
||||
(delete-role 'time-listener)
|
||||
(and next
|
||||
(role 'time-listener (topic-subscriber (timer-evt (pending-timer-deadline next)))
|
||||
#:state state
|
||||
[now
|
||||
(define to-send (fire-timers! (driver-state-heap state) now))
|
||||
;; Note: compute to-send before recursing, because of side-effects on heap
|
||||
(extend-transition (update-time-listener! state) to-send)]))))
|
||||
|
||||
;; Symbol -> BootK
|
||||
;; Process for mapping this-level timer requests to meta-level timer
|
||||
;; requests. Useful when running nested VMs: essentially extends timer
|
||||
;; support up the branches of the VM tree toward the leaves.
|
||||
(define (timer-relay self-id)
|
||||
(transition (relay-state 0 (hash))
|
||||
(at-meta-level
|
||||
(role 'relay-up (topic-subscriber (timer-expired (wild) (wild)))
|
||||
#:state (relay-state next-counter active-timers)
|
||||
[(timer-expired (list (== self-id) counter) now)
|
||||
(transition (relay-state next-counter (hash-remove active-timers counter))
|
||||
(and (hash-has-key? active-timers counter)
|
||||
(send-message (timer-expired (hash-ref active-timers counter) now))))]))
|
||||
(role 'relay-down (topic-subscriber (set-timer (wild) (wild) (wild)))
|
||||
#:state (relay-state next-counter active-timers)
|
||||
[(set-timer label msecs kind)
|
||||
(transition (relay-state (+ next-counter 1) (hash-set active-timers next-counter label))
|
||||
(at-meta-level
|
||||
(send-message (set-timer (list self-id next-counter) msecs kind))))])))
|
278
os2.rkt
278
os2.rkt
|
@ -53,15 +53,20 @@
|
|||
;; A PID is an (arbitrary) VM-unique process identifier. Concretely,
|
||||
;; it's an integer.
|
||||
|
||||
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
|
||||
;; it's a list of two elements, the first being the endpoint's
|
||||
;; process's PID and the second being an integer. (Except for the
|
||||
;; ground-vm, where they're different because there aren't any PIDs.)
|
||||
;; A PreEID is an arbitrary, process-chosen-and-supplied identifier
|
||||
;; for an endpoint. It is to be equal?-comparable. It is to be unique
|
||||
;; within the scope of a single process.
|
||||
|
||||
;; One endpoint, one topic, with the caveat that as we are at present
|
||||
;; unable to represent true topic unions, we actually store a *set* of
|
||||
;; topics against each endpoint. The topic for the endpoint is to be
|
||||
;; taken as the union of all the members in the set.
|
||||
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
|
||||
;; it's an (eid PID PreEID). As a consequence of the scope of PreEIDs,
|
||||
;; EIDs shouldn't be visible outside the scope of the owning process.
|
||||
(struct eid (pid pre-eid) #:transparent)
|
||||
|
||||
;; One endpoint, one topic (which may be changed over time!), with the
|
||||
;; caveat that as we are at present unable to represent true topic
|
||||
;; unions, we actually store a *set* of topics against each
|
||||
;; endpoint. The (current!) topic for the endpoint is to be taken as
|
||||
;; the union of all the members in the set.
|
||||
|
||||
;; A Flow is a Topic that comes from the intersection of two dual
|
||||
;; topics.
|
||||
|
@ -79,10 +84,9 @@
|
|||
;; topic) in a conversation.
|
||||
(struct endpoint (id topics handlers) #:transparent)
|
||||
|
||||
;; A Process is an Exists State . (process PID State
|
||||
;; NonnegativeInteger Set<EID>), representing a VM process and its
|
||||
;; collection of active endpoints.
|
||||
(struct process (id state next-eid-number endpoints) #:transparent)
|
||||
;; A Process is an Exists State . (process PID State Set<EID>),
|
||||
;; representing a VM process and its collection of active endpoints.
|
||||
(struct process (id state endpoints) #:transparent)
|
||||
|
||||
;; A Topic is a (topic Role Pattern Boolean), describing an Endpoint's
|
||||
;; role in a conversation.
|
||||
|
@ -92,9 +96,9 @@
|
|||
;; InterruptK = State -> Transition
|
||||
;; TrapK<X> = X -> InterruptK
|
||||
|
||||
;; PresenceHandler = TrapK<EID * Topic>
|
||||
;; AbsenceHandler = TrapK<EID * Topic * Reason>
|
||||
;; MessageHandler = TrapK<EID * Topic * Message>
|
||||
;; (handlers Maybe<TrapK<Topic>>
|
||||
;; Maybe<TrapK<Topic * Reason>>
|
||||
;; Maybe<TrapK<Topic * Message>>)
|
||||
(struct handlers (presence absence message) #:transparent)
|
||||
|
||||
;; actions is a plain old ordered ConsTreeOf<Action>, not a
|
||||
|
@ -103,10 +107,20 @@
|
|||
|
||||
;; Preactions.
|
||||
;; Ks are various TrapKs or #f, signifying lack of interest.
|
||||
(struct add-role (topics handlers k) #:prefab)
|
||||
(struct delete-role (eid reason) #:prefab)
|
||||
;;
|
||||
;; (add-role PreEID (or Topic Set<Topic>) Handlers)
|
||||
(struct add-role (pre-eid topics handlers) #:prefab)
|
||||
;;
|
||||
;; (delete-role PreEID Any)
|
||||
(struct delete-role (pre-eid reason) #:prefab)
|
||||
;;
|
||||
;; (send-message Any Topic)
|
||||
(struct send-message (body topic) #:prefab)
|
||||
;;
|
||||
;; (spawn BootK Maybe<TrapK<PID>>)
|
||||
(struct spawn (main k) #:prefab)
|
||||
;;
|
||||
;; (kill Maybe<PID> Any)
|
||||
(struct kill (pid reason) #:prefab)
|
||||
|
||||
;; An Action is either a Preaction or an (at-meta-level Preaction).
|
||||
|
@ -120,12 +134,10 @@
|
|||
(define-syntax role
|
||||
(lambda (stx)
|
||||
(syntax-parse stx
|
||||
[(_ topics-expr
|
||||
[(_ pre-eid topics-expr
|
||||
#:state state-pattern
|
||||
(~or (~optional (~seq #:on-presence presence) #:name "#:on-presence handler")
|
||||
(~optional (~seq #:on-absence absence) #:name "#:on-absence handler")
|
||||
(~optional (~seq #:on-ready ready) #:name "#:on-ready handler")
|
||||
(~optional (~seq #:id eid) #:defaults ([eid #'e0]) #:name "#:id")
|
||||
(~optional (~seq #:topic topic) #:defaults ([topic #'t0]) #:name "#:topic")
|
||||
(~optional (~seq #:reason reason) #:defaults ([reason #'r0]) #:name "#:reason"))
|
||||
...
|
||||
|
@ -134,11 +146,10 @@
|
|||
(define-syntax-rule (build-handler args e-attr)
|
||||
(if (not (attribute e-attr))
|
||||
#'#f
|
||||
#`(lambda (eid . args) (match-lambda [state-pattern e-attr]))))
|
||||
#`(lambda args (match-lambda [state-pattern e-attr]))))
|
||||
(with-syntax ([presence-handler (build-handler (topic) presence)]
|
||||
[absence-handler (build-handler (topic reason) absence)]
|
||||
[ready-handler (build-handler () ready)]
|
||||
[message-handler #'(lambda (eid topic message-body)
|
||||
[message-handler #'(lambda (topic message-body)
|
||||
(lambda (state)
|
||||
(match state
|
||||
[state-pattern
|
||||
|
@ -146,16 +157,16 @@
|
|||
[message-pattern clause-body ...]
|
||||
...
|
||||
[_ state])])))])
|
||||
#'(add-role topics-expr
|
||||
(handlers presence-handler absence-handler message-handler)
|
||||
ready-handler))])))
|
||||
#'(add-role pre-eid
|
||||
topics-expr
|
||||
(handlers presence-handler absence-handler message-handler)))])))
|
||||
|
||||
;;---------------------------------------------------------------------------
|
||||
;; Smarter constructors for transitions and preactions.
|
||||
|
||||
(define (make-transition state . actions) (transition state actions))
|
||||
(define (make-add-role topics handlers [k #f]) (add-role topics handlers k))
|
||||
(define (make-delete-role eid [reason #f]) (delete-role eid reason))
|
||||
(define make-add-role add-role) ;; no special treatment required at present
|
||||
(define (make-delete-role pre-eid [reason #f]) (delete-role pre-eid reason))
|
||||
(define (make-send-message body [topic (topic-publisher body)]) (send-message body topic))
|
||||
(define (make-spawn main [k #f]) (spawn main k))
|
||||
(define (make-kill [pid #f] [reason #f]) (kill pid reason))
|
||||
|
@ -222,7 +233,8 @@
|
|||
(state (struct-copy vm state [pending-actions '()]))
|
||||
(outbound-actions '()))
|
||||
(match remaining-actions
|
||||
['() (transition (collect-dead-processes state) (reverse outbound-actions))]
|
||||
['()
|
||||
(transition (collect-dead-processes state) (reverse outbound-actions))]
|
||||
[(cons (cons pid action) rest)
|
||||
(match action
|
||||
[(at-meta-level preaction)
|
||||
|
@ -253,89 +265,88 @@
|
|||
|
||||
(define (perform-action pid preaction state)
|
||||
(match preaction
|
||||
[(add-role topics hs k) (do-subscribe pid (ensure-topic-union topics) hs k state)]
|
||||
[(delete-role eid reason) (do-unsubscribe pid eid reason state)]
|
||||
[(add-role pre-eid topics hs) (do-subscribe pid pre-eid (ensure-topic-union topics) hs state)]
|
||||
[(delete-role pre-eid reason) (do-unsubscribe pid pre-eid reason state)]
|
||||
[(send-message body topic) (route-and-deliver topic body state)]
|
||||
[(spawn main k) (do-spawn pid main k state)]
|
||||
[(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)]))
|
||||
|
||||
(define (do-subscribe pid topics hs k state)
|
||||
(define (do-subscribe pid pre-eid topics hs state)
|
||||
(cond
|
||||
[(hash-has-key? (vm-processes state) pid)
|
||||
(define old-process (hash-ref (vm-processes state) pid))
|
||||
(define eid-number (process-next-eid-number old-process))
|
||||
(define new-eid (list pid eid-number))
|
||||
(struct-copy vm (for*/fold ([state (run-trapk state pid k new-eid)])
|
||||
([(matching-pid p) (in-hash (vm-processes state))]
|
||||
[matching-eid (in-set (process-endpoints p))]
|
||||
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
||||
[matching-topics (in-value (endpoint-topics e))]
|
||||
[matching-topic (in-set matching-topics)]
|
||||
[topic (in-set topics)]
|
||||
[flow-pattern (in-value (topic-intersection topic matching-topic))]
|
||||
#:when flow-pattern)
|
||||
(define inbound-flow (refine-topic matching-topic flow-pattern))
|
||||
(define outbound-flow (refine-topic topic flow-pattern))
|
||||
(let* ((state (if (flow-visible? topic inbound-flow)
|
||||
(run-trapk state
|
||||
pid
|
||||
(handlers-presence hs)
|
||||
new-eid
|
||||
inbound-flow)
|
||||
state))
|
||||
(state (if (flow-visible? matching-topic outbound-flow)
|
||||
(run-trapk state
|
||||
matching-pid
|
||||
(handlers-presence (endpoint-handlers e))
|
||||
matching-eid
|
||||
outbound-flow)
|
||||
state)))
|
||||
state))
|
||||
[processes (hash-set (vm-processes state)
|
||||
pid
|
||||
(struct-copy process old-process
|
||||
[next-eid-number (+ eid-number 1)]
|
||||
[endpoints
|
||||
(set-add (process-endpoints old-process)
|
||||
new-eid)]))]
|
||||
[endpoints (hash-set (vm-endpoints state)
|
||||
new-eid
|
||||
(endpoint new-eid
|
||||
topics
|
||||
hs))])]
|
||||
(define new-eid (eid pid pre-eid))
|
||||
(define new-endpoint (endpoint new-eid topics hs))
|
||||
(let* ((state (notify-route-additions state new-endpoint))
|
||||
(state (generic-update-process state pid (add-process-eid new-eid)))
|
||||
(state (install-endpoint state new-eid new-endpoint)))
|
||||
state)]
|
||||
[else state]))
|
||||
|
||||
(define (do-unsubscribe pid eid reason state)
|
||||
(define (generic-update-process state pid updater)
|
||||
(struct-copy vm state [processes (hash-update (vm-processes state) pid updater)]))
|
||||
|
||||
(define ((add-process-eid new-eid) p)
|
||||
(struct-copy process p [endpoints (set-add (process-endpoints p) new-eid)]))
|
||||
|
||||
(define ((remove-process-eid old-eid) p)
|
||||
(struct-copy process p [endpoints (set-remove (process-endpoints p) old-eid)]))
|
||||
|
||||
(define (install-endpoint state new-eid new-endpoint)
|
||||
(struct-copy vm state [endpoints (hash-set (vm-endpoints state) new-eid new-endpoint)]))
|
||||
|
||||
(define (uninstall-endpoint state old-eid)
|
||||
(struct-copy vm state [endpoints (hash-remove (vm-endpoints state) old-eid)]))
|
||||
|
||||
(define (notify-route-additions state new-endpoint)
|
||||
(match-define (endpoint (eid pid _) topics (handlers presence-handler _ _)) new-endpoint)
|
||||
(for*/fold ([state state])
|
||||
([(matching-pid p) (in-hash (vm-processes state))]
|
||||
[matching-eid (in-set (process-endpoints p))]
|
||||
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
||||
[matching-topics (in-value (endpoint-topics e))]
|
||||
[matching-topic (in-set matching-topics)]
|
||||
[topic (in-set topics)]
|
||||
[flow-pattern (in-value (topic-intersection topic matching-topic))]
|
||||
#:when flow-pattern)
|
||||
(define inbound-flow (refine-topic matching-topic flow-pattern))
|
||||
(define outbound-flow (refine-topic topic flow-pattern))
|
||||
(define e-presence-handler (handlers-presence (endpoint-handlers e)))
|
||||
(let* ((state (if (flow-visible? topic inbound-flow)
|
||||
(run-trapk state pid presence-handler inbound-flow)
|
||||
state))
|
||||
(state (if (flow-visible? matching-topic outbound-flow)
|
||||
(run-trapk state matching-pid e-presence-handler outbound-flow)
|
||||
state)))
|
||||
state)))
|
||||
|
||||
(define (do-unsubscribe pid pre-eid reason state)
|
||||
(define old-eid (eid pid pre-eid))
|
||||
(cond
|
||||
[(hash-has-key? (vm-endpoints state) eid)
|
||||
(define endpoint-to-remove (hash-ref (vm-endpoints state) eid))
|
||||
(define removed-topics (endpoint-topics endpoint-to-remove))
|
||||
(define old-process (hash-ref (vm-processes state) pid))
|
||||
(define new-process (struct-copy process old-process
|
||||
[endpoints (set-remove (process-endpoints old-process) eid)]))
|
||||
(let ((state (struct-copy vm state
|
||||
[endpoints (hash-remove (vm-endpoints state) eid)]
|
||||
[processes (hash-set (vm-processes state) pid new-process)])))
|
||||
(for*/fold ([state state])
|
||||
([(matching-pid p) (in-hash (vm-processes state))]
|
||||
[matching-eid (in-set (process-endpoints p))]
|
||||
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
||||
[matching-topics (in-value (endpoint-topics e))]
|
||||
[matching-topic (in-set matching-topics)]
|
||||
[removed-topic (in-set removed-topics)]
|
||||
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
|
||||
#:when flow-pattern)
|
||||
(define outbound-flow (refine-topic removed-topic flow-pattern))
|
||||
(run-trapk state
|
||||
matching-pid
|
||||
(handlers-absence (endpoint-handlers e))
|
||||
matching-eid
|
||||
outbound-flow
|
||||
reason)))]
|
||||
[(hash-has-key? (vm-endpoints state) old-eid)
|
||||
(define old-endpoint (hash-ref (vm-endpoints state) old-eid))
|
||||
(let* ((state (generic-update-process state pid (remove-process-eid old-eid)))
|
||||
(state (uninstall-endpoint state old-eid))
|
||||
(state (notify-route-deletions state old-endpoint reason)))
|
||||
state)]
|
||||
[else state]))
|
||||
|
||||
(define (notify-route-deletions state old-endpoint reason)
|
||||
(define removed-topics (endpoint-topics old-endpoint))
|
||||
(for*/fold ([state state])
|
||||
([(matching-pid p) (in-hash (vm-processes state))]
|
||||
[matching-eid (in-set (process-endpoints p))]
|
||||
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
||||
[matching-topics (in-value (endpoint-topics e))]
|
||||
[matching-topic (in-set matching-topics)]
|
||||
[removed-topic (in-set removed-topics)]
|
||||
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
|
||||
#:when flow-pattern)
|
||||
(define outbound-flow (refine-topic removed-topic flow-pattern))
|
||||
(define absence-handler (handlers-absence (endpoint-handlers e)))
|
||||
(run-trapk state matching-pid absence-handler outbound-flow reason)))
|
||||
|
||||
(define (route-and-deliver message-topic body state)
|
||||
(define pids-and-endpoints
|
||||
(define endpoints
|
||||
(for*/set ([(matching-pid p) (in-hash (vm-processes state))]
|
||||
[matching-eid (in-set (process-endpoints p))]
|
||||
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
||||
|
@ -343,16 +354,10 @@
|
|||
[matching-topic (in-set matching-topics)]
|
||||
[flow-pattern (in-value (topic-intersection message-topic matching-topic))]
|
||||
#:when flow-pattern)
|
||||
(cons matching-pid e)))
|
||||
(for/fold ([state state]) ([pid-and-endpoint (in-set pids-and-endpoints)])
|
||||
(define matching-pid (car pid-and-endpoint))
|
||||
(define e (cdr pid-and-endpoint))
|
||||
(run-trapk state
|
||||
matching-pid
|
||||
(handlers-message (endpoint-handlers e))
|
||||
(endpoint-id e)
|
||||
message-topic
|
||||
body)))
|
||||
e))
|
||||
(for/fold ([state state]) ([e (in-set endpoints)])
|
||||
(match-define (endpoint (eid pid _) _ (handlers _ _ message-handler)) e)
|
||||
(run-trapk state pid message-handler message-topic body)))
|
||||
|
||||
(define (do-spawn spawning-pid main k state)
|
||||
(define new-pid (vm-next-process-id state))
|
||||
|
@ -360,23 +365,20 @@
|
|||
(cond
|
||||
[(procedure? main) (send-to-user (lambda (e) (transition #f (kill #f e))) main new-pid)]
|
||||
[(transition? main) main]))
|
||||
(define initial-process (process new-pid initial-state (set)))
|
||||
(define spawned-state
|
||||
(struct-copy vm (enqueue-actions state new-pid initial-actions)
|
||||
[processes (hash-set (vm-processes state)
|
||||
new-pid
|
||||
(process new-pid initial-state 0 (set)))]
|
||||
[processes (hash-set (vm-processes state) new-pid initial-process)]
|
||||
[next-process-id (+ new-pid 1)]))
|
||||
(run-trapk spawned-state spawning-pid k new-pid))
|
||||
|
||||
(define (do-kill pid-to-kill reason state)
|
||||
(cond
|
||||
[(hash-has-key? (vm-processes state) pid-to-kill)
|
||||
(let ((state (for/fold ([state state])
|
||||
([eid (in-set (process-endpoints
|
||||
(hash-ref (vm-processes state) pid-to-kill)))])
|
||||
(do-unsubscribe pid-to-kill eid reason state))))
|
||||
(struct-copy vm state
|
||||
[processes (hash-remove (vm-processes state) pid-to-kill)]))]
|
||||
(define dying-endpoints (process-endpoints (hash-ref (vm-processes state) pid-to-kill)))
|
||||
(let* ((state (for/fold ([state state]) ([eid (in-set dying-endpoints)])
|
||||
(do-unsubscribe pid-to-kill (eid-pre-eid eid) reason state))))
|
||||
(struct-copy vm state [processes (hash-remove (vm-processes state) pid-to-kill)]))]
|
||||
[else state]))
|
||||
|
||||
(define (run-trapk state pid trap-k . args)
|
||||
|
@ -387,20 +389,17 @@
|
|||
state))
|
||||
|
||||
(define (maybe-transition->transition t)
|
||||
(if (transition? t)
|
||||
t
|
||||
(transition t '())))
|
||||
(cond [(transition? t) t]
|
||||
[else (transition t '())]))
|
||||
|
||||
(define (run-ready state pid interrupt-k)
|
||||
(define old-process (hash-ref (vm-processes state) pid))
|
||||
(define old-state (process-state old-process))
|
||||
(define old-state (process-state (hash-ref (vm-processes state) pid)))
|
||||
(match-define (transition new-state actions)
|
||||
(maybe-transition->transition
|
||||
(send-to-user (lambda (e) (transition old-state (kill #f e))) interrupt-k old-state)))
|
||||
(struct-copy vm (enqueue-actions state pid actions)
|
||||
[processes (hash-set (vm-processes state) pid
|
||||
(struct-copy process old-process
|
||||
[state new-state]))]))
|
||||
(generic-update-process (enqueue-actions state pid actions)
|
||||
pid
|
||||
(lambda (p) (struct-copy process p [state new-state]))))
|
||||
|
||||
(define (preaction? a)
|
||||
(or (add-role? a)
|
||||
|
@ -423,31 +422,30 @@
|
|||
#f]))
|
||||
|
||||
(define (enqueue-actions state pid actions)
|
||||
(define flat-actions (for/list ([a (flatten actions)] #:when (valid-action? pid a)) (cons pid a)))
|
||||
(struct-copy vm state
|
||||
[pending-actions (append (reverse (for/list ([a (flatten actions)]
|
||||
#:when (valid-action? pid a))
|
||||
(cons pid a)))
|
||||
(vm-pending-actions state))]))
|
||||
[pending-actions (append (reverse flat-actions) (vm-pending-actions state))]))
|
||||
|
||||
(define (((wrap-trapk pid trapk) . args) state)
|
||||
(apply run-trapk state pid trapk args))
|
||||
|
||||
(define (transform-meta-action pid preaction)
|
||||
(match preaction
|
||||
[(add-role topics hs k)
|
||||
(add-role topics
|
||||
[(add-role pre-eid topics hs)
|
||||
(add-role (eid pid pre-eid)
|
||||
topics
|
||||
(handlers (wrap-trapk pid (handlers-presence hs))
|
||||
(wrap-trapk pid (handlers-absence hs))
|
||||
(wrap-trapk pid (handlers-message hs)))
|
||||
(wrap-trapk pid k))]
|
||||
[(? delete-role?) preaction]
|
||||
[(? send-message?) preaction]
|
||||
(wrap-trapk pid (handlers-message hs))))]
|
||||
[(delete-role pre-eid reason)
|
||||
(delete-role (eid pid pre-eid) reason)]
|
||||
[(? send-message? p) p]
|
||||
[(spawn main k)
|
||||
(spawn main (wrap-trapk pid k))]
|
||||
[(? kill?) preaction]))
|
||||
[(? kill? p) p]))
|
||||
|
||||
(define (nested-vm boot)
|
||||
(lambda () (run-vm (make-vm boot))))
|
||||
(lambda (self-pid) (run-vm (make-vm boot))))
|
||||
|
||||
(define (ground-vm boot)
|
||||
(let loop ((state (make-vm boot)))
|
||||
|
|
Loading…
Reference in New Issue