450 lines
15 KiB
Racket
450 lines
15 KiB
Racket
#lang racket/base
|
|
;; Virtualized operating system, this time with presence.
|
|
|
|
(require racket/set)
|
|
(require racket/match)
|
|
(require (only-in racket/list flatten))
|
|
(require "unify.rkt")
|
|
|
|
(provide nested-vm
|
|
ground-vm
|
|
|
|
(struct-out topic)
|
|
topic-publisher
|
|
topic-subscriber
|
|
co-roles
|
|
co-topics
|
|
|
|
(struct-out handlers)
|
|
|
|
(except-out (struct-out transition) transition)
|
|
(rename-out [make-transition transition])
|
|
extend-transition
|
|
|
|
role
|
|
(except-out (struct-out add-role) add-role)
|
|
(rename-out [make-add-role add-role])
|
|
(except-out (struct-out delete-role) delete-role)
|
|
(rename-out [make-delete-role delete-role])
|
|
(struct-out send-message)
|
|
(except-out (struct-out spawn) spawn)
|
|
(rename-out [make-spawn spawn])
|
|
(except-out (struct-out kill) kill)
|
|
(rename-out [make-kill kill])
|
|
|
|
(struct-out at-meta-level)
|
|
|
|
;; Reexports from unify.rkt for convenience
|
|
wild
|
|
wild?
|
|
non-wild?)
|
|
|
|
;; Endpoints are the units of deduplication.
|
|
;; Flows (in canonical form) are the units of presence.
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; Data definitions
|
|
|
|
;; A PID is an (arbitrary) VM-unique process identifier. Concretely,
|
|
;; it's an integer.
|
|
|
|
;; A EID is an (arbitrary) VM-unique endpoint identifier. Concretely,
|
|
;; it's a list of two elements, the first being the endpoint's
|
|
;; process's PID and the second being an integer. (Except for the
|
|
;; ground-vm, where they're different because there aren't any PIDs.)
|
|
|
|
;; One endpoint, one topic.
|
|
|
|
;; A Flow is a Topic that comes from the intersection of two dual
|
|
;; topics.
|
|
|
|
;; A QuasiQueue<X> is a list of Xs in *reversed* order.
|
|
|
|
(struct vm (processes ;; Hash<PID, Process>
|
|
endpoints ;; Hash<EID, Endpoint>
|
|
next-process-id ;; PID
|
|
pending-actions ;; QuasiQueue<(cons PID Action)>
|
|
) #:transparent)
|
|
|
|
(struct endpoint (id ;; EID
|
|
topic ;; Topic
|
|
handlers ;; Handlers
|
|
) #:transparent)
|
|
|
|
(struct process (id ;; PID
|
|
state
|
|
next-endpoint-id-number ;; NonnegativeInteger
|
|
endpoints ;; Set<EID>
|
|
) #:transparent)
|
|
|
|
(struct topic (role pattern virtual?) #:prefab)
|
|
|
|
;; InterruptK = State -> Transition
|
|
;; TrapK<X> = X -> InterruptK
|
|
|
|
;; PresenceHandler = TrapK<EID * Topic>
|
|
;; AbsenceHandler = TrapK<EID * Topic * Reason>
|
|
;; MessageHandler = TrapK<EID * Topic * Message>
|
|
(struct handlers (presence absence message) #:transparent)
|
|
|
|
;; actions is a plain old ordered ConsTreeOf<Action>, not a
|
|
;; QuasiQueue.
|
|
(struct transition (state actions) #:transparent)
|
|
|
|
;; Preactions.
|
|
;; Ks are various TrapKs or #f, signifying lack of interest.
|
|
(struct add-role (topic handlers k) #:prefab)
|
|
(struct delete-role (eid reason) #:prefab)
|
|
(struct send-message (topic body) #:prefab)
|
|
(struct spawn (thunk k) #:prefab)
|
|
(struct kill (pid reason) #:prefab)
|
|
|
|
;; An Action is either a Preaction or an (at-meta-level Preaction).
|
|
(struct at-meta-level (preaction) #:prefab)
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
(require (for-syntax syntax/parse))
|
|
(require (for-syntax racket/base))
|
|
(define-syntax role
|
|
(lambda (stx)
|
|
(syntax-parse stx
|
|
[(_ topic-expr
|
|
#:state state-pattern
|
|
(~or (~optional (~seq #:on-presence presence) #:name "#:on-presence handler")
|
|
(~optional (~seq #:on-absence absence) #:name "#:on-absence handler")
|
|
(~optional (~seq #:on-ready ready) #:name "#:on-ready handler")
|
|
(~optional (~seq #:id eid) #:name "#:id")
|
|
(~optional (~seq #:topic topic) #:name "#:topic")
|
|
(~optional (~seq #:reason reason) #:name "#:reason"))
|
|
...
|
|
[message-pattern clause-body ...]
|
|
...)
|
|
(with-syntax ([eid (if (attribute eid) #'eid #'dummy-eid)]
|
|
[topic (if (attribute topic) #'topic #'dummy-topic)]
|
|
[reason (if (attribute reason) #'reason #'dummy-reason)])
|
|
(with-syntax ([presence-handler (if (not (attribute presence))
|
|
#'#f
|
|
#'(lambda (eid topic)
|
|
(lambda (state)
|
|
(match state
|
|
[state-pattern presence]))))]
|
|
[absence-handler (if (not (attribute absence))
|
|
#'#f
|
|
#'(lambda (eid topic reason)
|
|
(lambda (state)
|
|
(match state
|
|
[state-pattern absence]))))]
|
|
[ready-handler (if (not (attribute ready))
|
|
#'#f
|
|
#'(lambda (eid)
|
|
(lambda (state)
|
|
(match state
|
|
[state-pattern ready]))))]
|
|
[message-handler #'(lambda (eid topic message-body)
|
|
(lambda (state)
|
|
(match state
|
|
[state-pattern
|
|
(match message-body
|
|
[message-pattern clause-body ...]
|
|
...)])))])
|
|
#'(add-role topic-expr
|
|
(handlers presence-handler absence-handler message-handler)
|
|
ready-handler)))])))
|
|
|
|
(define (make-transition state . actions) (transition state actions))
|
|
(define (make-add-role topic handlers [k #f]) (add-role topic handlers k))
|
|
(define (make-delete-role eid [reason #f]) (delete-role eid reason))
|
|
(define (make-spawn thunk [k #f]) (spawn thunk k))
|
|
(define (make-kill [pid #f] [reason #f]) (kill pid reason))
|
|
|
|
(define (extend-transition t . more-actions)
|
|
(match t
|
|
[(transition state actions) (transition state (list actions more-actions))]
|
|
[state (transition state more-actions)]))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
;; Topics and roles
|
|
|
|
(define (topic-publisher pattern #:virtual? [virtual? #f])
|
|
(topic 'publisher pattern virtual?))
|
|
|
|
(define (topic-subscriber pattern #:virtual? [virtual? #f])
|
|
(topic 'subscriber pattern virtual?))
|
|
|
|
(define (co-roles r)
|
|
(case r
|
|
[(publisher) '(subscriber)]
|
|
[(subscriber) '(publisher)]
|
|
[else #f]))
|
|
|
|
(define (co-topics t)
|
|
(for/list ([co-role (co-roles (topic-role t))])
|
|
(struct-copy topic t [role co-role])))
|
|
|
|
(define (refine-topic remote-topic new-pattern)
|
|
(struct-copy topic remote-topic [pattern new-pattern]))
|
|
|
|
(define (roles-intersect? l r)
|
|
(memq l (co-roles r)))
|
|
|
|
;; Both left and right must be canonicalized.
|
|
(define (topic-intersection left right)
|
|
(and (roles-intersect? (topic-role left) (topic-role right))
|
|
(mgu-canonical (freshen (topic-pattern left)) (freshen (topic-pattern right)))))
|
|
|
|
;; True iff the flow between remote-topic and local-topic should be
|
|
;; visible to the local peer. This is the case when either local-topic
|
|
;; is virtual (in which case everything is seen) or otherwise if
|
|
;; remote-topic is also not virtual.
|
|
(define (flow-visible? local-topic remote-topic)
|
|
(or (topic-virtual? local-topic)
|
|
(not (topic-virtual? remote-topic))))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
;; QuasiQueue<X>
|
|
(define empty-quasi-queue '())
|
|
|
|
;; QuasiQueue<X> -> Boolean
|
|
(define quasi-queue-empty? null?)
|
|
|
|
;; X QuasiQueue<X> -> QuasiQueue<X>
|
|
(define (quasi-enqueue-one thing existing-quasi-queue)
|
|
(cons thing existing-quasi-queue))
|
|
|
|
;; List<X> QuasiQueue<X> -> QuasiQueue<X>
|
|
(define (quasi-enqueue-many many-things-in-order existing-quasi-queue)
|
|
(append (reverse many-things-in-order) existing-quasi-queue))
|
|
|
|
;; QuasiQueue<X> -> List<X>
|
|
(define (quasi-queue->list quasi-queue)
|
|
(reverse quasi-queue))
|
|
|
|
;; List<X> -> QuasiQueue<X>
|
|
(define (list->quasi-queue xs)
|
|
(reverse xs))
|
|
|
|
;;---------------------------------------------------------------------------
|
|
|
|
(define (make-vm boot)
|
|
(vm (hash)
|
|
(hash)
|
|
0
|
|
(list->quasi-queue (list (cons -1 (spawn boot #f))))))
|
|
|
|
(define (run-vm state)
|
|
(let loop ((remaining-actions (quasi-queue->list (vm-pending-actions state)))
|
|
(state (struct-copy vm state [pending-actions empty-quasi-queue]))
|
|
(outbound-actions empty-quasi-queue))
|
|
(match remaining-actions
|
|
['() (transition state (quasi-queue->list outbound-actions))]
|
|
[(cons (cons pid action) rest)
|
|
(match action
|
|
[(at-meta-level preaction)
|
|
(define transformed-preaction (transform-meta-action pid preaction))
|
|
(loop rest state (quasi-enqueue-one transformed-preaction outbound-actions))]
|
|
[preaction
|
|
(loop rest (perform-action pid preaction state) outbound-actions)])])))
|
|
|
|
(define (send-to-user f . args)
|
|
;; TODO: use this hook to find all the bits of code that will need
|
|
;; with-handlers and crash compensation.
|
|
(apply f args))
|
|
|
|
(define (perform-action pid preaction state)
|
|
(match preaction
|
|
[(add-role topic hs k) (do-subscribe pid topic hs k state)]
|
|
[(delete-role eid reason) (do-unsubscribe pid eid reason state)]
|
|
[(send-message topic body) (route-and-deliver topic body state)]
|
|
[(spawn thunk k) (do-spawn pid thunk k state)]
|
|
[(kill pid-to-kill reason) (do-kill (or pid-to-kill pid) reason state)]))
|
|
|
|
(define (do-subscribe pid topic hs k state)
|
|
(define old-process (hash-ref (vm-processes state) pid))
|
|
(define eid-number (process-next-endpoint-id-number old-process))
|
|
(define new-eid (list pid eid-number))
|
|
(struct-copy vm (for*/fold ([state (run-trapk state pid k new-eid)])
|
|
([(matching-pid p) (in-hash (vm-processes state))]
|
|
[matching-eid (in-set (process-endpoints p))]
|
|
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
|
[matching-topic (in-value (endpoint-topic e))]
|
|
[flow-pattern (in-value (topic-intersection topic matching-topic))]
|
|
#:when flow-pattern)
|
|
(define inbound-flow (refine-topic matching-topic flow-pattern))
|
|
(define outbound-flow (refine-topic topic flow-pattern))
|
|
(let* ((state (if (flow-visible? topic inbound-flow)
|
|
(run-trapk state
|
|
pid
|
|
(handlers-presence hs)
|
|
new-eid
|
|
inbound-flow)
|
|
state))
|
|
(state (if (flow-visible? matching-topic outbound-flow)
|
|
(run-trapk state
|
|
matching-pid
|
|
(handlers-presence (endpoint-handlers e))
|
|
matching-eid
|
|
outbound-flow)
|
|
state)))
|
|
state))
|
|
[processes (hash-set (vm-processes state)
|
|
pid
|
|
(struct-copy process old-process
|
|
[next-endpoint-id-number (+ eid-number 1)]
|
|
[endpoints
|
|
(set-add (process-endpoints old-process)
|
|
new-eid)]))]
|
|
[endpoints (hash-set (vm-endpoints state)
|
|
new-eid
|
|
(endpoint new-eid
|
|
topic
|
|
hs))]))
|
|
|
|
(define (do-unsubscribe pid eid reason state)
|
|
(cond
|
|
[(hash-has-key? (vm-endpoints state) eid)
|
|
(define endpoint-to-remove (hash-ref (vm-endpoints state) eid))
|
|
(define removed-topic (endpoint-topic endpoint-to-remove))
|
|
(define old-process (hash-ref (vm-processes state) pid))
|
|
(define new-process (struct-copy process old-process
|
|
[endpoints (set-remove (process-endpoints old-process) eid)]))
|
|
(let ((state (struct-copy vm state
|
|
[endpoints (hash-remove (vm-endpoints state) eid)]
|
|
[processes (if (set-empty? (process-endpoints new-process))
|
|
(hash-remove (vm-processes state) pid)
|
|
(hash-set (vm-processes state) pid new-process))])))
|
|
(for*/fold ([state state])
|
|
([(matching-pid p) (in-hash (vm-processes state))]
|
|
[matching-eid (in-set (process-endpoints p))]
|
|
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
|
[matching-topic (in-value (endpoint-topic e))]
|
|
[flow-pattern (in-value (topic-intersection removed-topic matching-topic))]
|
|
#:when flow-pattern)
|
|
(define outbound-flow (refine-topic removed-topic flow-pattern))
|
|
(run-trapk state
|
|
matching-pid
|
|
(handlers-absence (endpoint-handlers e))
|
|
matching-eid
|
|
outbound-flow
|
|
reason)))]
|
|
[else state]))
|
|
|
|
(define (route-and-deliver message-topic body state)
|
|
(define pids-and-endpoints
|
|
(for*/set ([(matching-pid p) (in-hash (vm-processes state))]
|
|
[matching-eid (in-set (process-endpoints p))]
|
|
[e (in-value (hash-ref (vm-endpoints state) matching-eid))]
|
|
[matching-topic (in-value (endpoint-topic e))]
|
|
[flow-pattern (in-value (topic-intersection message-topic matching-topic))]
|
|
#:when flow-pattern)
|
|
(cons matching-pid e)))
|
|
(for/fold ([state state]) ([pid-and-endpoint (in-set pids-and-endpoints)])
|
|
(define matching-pid (car pid-and-endpoint))
|
|
(define e (cdr pid-and-endpoint))
|
|
(run-trapk state
|
|
matching-pid
|
|
(handlers-message (endpoint-handlers e))
|
|
(endpoint-id e)
|
|
message-topic
|
|
body)))
|
|
|
|
(define (run-trapk state pid trap-k . args)
|
|
(if trap-k
|
|
(run-ready state pid (apply send-to-user trap-k args))
|
|
state))
|
|
|
|
(define (maybe-transition->transition t)
|
|
(if (transition? t)
|
|
t
|
|
(transition t '())))
|
|
|
|
(define (run-ready state pid interrupt-k)
|
|
(define old-process (hash-ref (vm-processes state) pid))
|
|
(match-define (transition new-process-state actions)
|
|
(maybe-transition->transition (send-to-user interrupt-k (process-state old-process))))
|
|
(struct-copy vm (enqueue-actions state pid actions)
|
|
[processes (hash-set (vm-processes state) pid
|
|
(struct-copy process old-process
|
|
[state new-process-state]))]))
|
|
|
|
(define (do-spawn spawning-pid thunk k state)
|
|
(match-define (transition initial-state initial-actions) (send-to-user thunk))
|
|
(define new-pid (vm-next-process-id state))
|
|
(run-trapk (struct-copy vm (enqueue-actions state new-pid initial-actions)
|
|
[processes (hash-set (vm-processes state) new-pid (process new-pid
|
|
initial-state
|
|
0
|
|
(set)))]
|
|
[next-process-id (+ new-pid 1)])
|
|
spawning-pid
|
|
k
|
|
new-pid))
|
|
|
|
(define (do-kill pid-to-kill reason state)
|
|
(cond
|
|
[(hash-has-key? (vm-processes state) pid-to-kill)
|
|
(let ((state (for/fold ([state state])
|
|
([eid (in-set (process-endpoints
|
|
(hash-ref (vm-processes state) pid-to-kill)))])
|
|
(do-unsubscribe pid-to-kill eid reason state))))
|
|
(struct-copy vm state
|
|
[processes (hash-remove (vm-processes state) pid-to-kill)]))]
|
|
[else state]))
|
|
|
|
(define (enqueue-actions state pid actions)
|
|
(struct-copy vm state
|
|
[pending-actions (quasi-enqueue-many (for/list ([a (flatten actions)]) (cons pid a))
|
|
(vm-pending-actions state))]))
|
|
|
|
(define (wrap-trapk pid trapk)
|
|
(lambda args
|
|
(lambda (state)
|
|
(apply run-trapk state pid trapk args))))
|
|
|
|
(define (transform-meta-action pid preaction)
|
|
(match preaction
|
|
[(add-role topic hs k)
|
|
(add-role topic
|
|
(handlers (wrap-trapk pid (handlers-presence hs))
|
|
(wrap-trapk pid (handlers-absence hs))
|
|
(wrap-trapk pid (handlers-message hs)))
|
|
(wrap-trapk pid k))]
|
|
[(? delete-role?) preaction]
|
|
[(? send-message?) preaction]
|
|
[(spawn thunk k)
|
|
(spawn thunk (wrap-trapk pid k))]
|
|
[(? kill?) preaction]))
|
|
|
|
(define (nested-vm boot)
|
|
(lambda () (run-vm (make-vm boot))))
|
|
|
|
(define (ground-vm boot)
|
|
(let loop ((state (make-vm boot)))
|
|
(match (run-vm state)
|
|
[(transition state actions)
|
|
(when (not (null? actions))
|
|
(error 'ground-vm "No meta-actions available in ground-vm: ~v" actions))
|
|
(define waiting? (quasi-queue-empty? (vm-pending-actions state)))
|
|
(define active-events (for/list ([(eid e) (in-hash (vm-endpoints state))]
|
|
#:when (and (evt? (topic-pattern (endpoint-topic e)))
|
|
(eq? (topic-role (endpoint-topic e))
|
|
'subscriber)))
|
|
(define evt (topic-pattern (endpoint-topic e)))
|
|
(wrap-evt evt (lambda (message)
|
|
(lambda (state)
|
|
(route-and-deliver (topic-publisher evt)
|
|
message
|
|
state))))))
|
|
(if (and waiting? (null? active-events))
|
|
;; About to block, and nothing can wake us
|
|
'done
|
|
(let ((interruptk (apply sync
|
|
(if waiting?
|
|
never-evt
|
|
(wrap-evt always-evt (lambda (dummy) values)))
|
|
active-events)))
|
|
(loop (interruptk state))))])))
|
|
|
|
;;(require racket/trace)
|
|
;;(trace perform-action) |