Factored out mux.rkt

This commit is contained in:
Tony Garnock-Jones 2015-03-16 10:38:32 -04:00
parent bf316f792b
commit ec2eea9e25
4 changed files with 244 additions and 186 deletions

View File

@ -6,7 +6,6 @@
(rename-out [quit <quit>])
(except-out (struct-out spawn) spawn)
(rename-out [spawn <spawn>])
(struct-out process)
(struct-out transition)
(struct-out world)
@ -52,7 +51,9 @@
sequence-transitions
world-handle-event
clean-transition)
clean-transition
pretty-print-world)
(require racket/set)
(require racket/match)
@ -61,6 +62,7 @@
(require "route.rkt")
(require "patch.rkt")
(require "trace.rkt")
(require "mux.rkt")
(module+ test (require rackunit))
;; Events = Patches Messages
@ -69,14 +71,6 @@
;; Actions ⊃ Events
(struct spawn (boot) #:prefab)
;; Processes (machine states): (process Matcher (U Behavior (disabled Behavior)) Any)
(struct process (interests behavior state) #:transparent)
;; Disabled Behaviors, when found in a Process, indicate that the
;; process has been disabled and is waiting out the performance of its
;; final actions before finally being removed.
(struct disabled (behavior) #:transparent)
;; A Behavior is a ((Option Event) Any -> Transition): a function
;; mapping an Event (or, in the #f case, a poll signal) and a
;; Process's current state to a Transition.
@ -98,12 +92,11 @@
;; A Label is a PID or 'meta.
;; VM private states
(struct world (next-pid ;; PID
(struct world (mux ;; Multiplexer
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
runnable-pids ;; (Setof PID)
routing-table ;; (Matcherof (Setof Label))
process-table ;; (HashTable PID Process)
environment-interests ;; (Matcherof (set 'meta))
behaviors ;; (HashTable PID Behavior)
states ;; (HashTable PID Any)
) #:transparent)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -111,8 +104,6 @@
(define (event? x) (or (patch? x) (message? x)))
(define (action? x) (or (event? x) (spawn? x)))
(define (meta-label? x) (eq? x 'meta))
(define (prepend-at-meta pattern level)
(if (zero? level)
pattern
@ -156,27 +147,29 @@
(filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions)))
(define (send-event e pid w)
(match (hash-ref (world-process-table w) pid #f)
[#f w]
[(process _ (? disabled?) _) w] ;; disabled due to earlier error or quit
[(and p (process _ behavior old-state))
(invoke-process pid
(lambda () (clean-transition (ensure-transition (behavior e old-state))))
(match-lambda
[#f w]
[(and q (quit final-actions))
(trace-process-step e pid p #f q)
(enqueue-actions (disable-process pid #f w) pid (append final-actions
(list 'quit)))]
[(and t (transition new-state new-actions))
(trace-process-step e pid p #f t)
(update-process pid
(struct-copy process p [state new-state])
new-actions
w)])
(lambda (exn)
(trace-process-step e pid p exn #f)
(enqueue-actions (disable-process pid exn w) pid (list 'quit))))]))
(define behavior (hash-ref (world-behaviors w) pid #f))
(define old-state (hash-ref (world-states w) pid #f))
(if (not behavior)
w
(invoke-process pid
(lambda () (clean-transition (ensure-transition (behavior e old-state))))
(match-lambda
[#f w]
[(and q (quit final-actions))
(trace-process-step e pid behavior old-state #f q)
(enqueue-actions (disable-process pid #f w) pid (append final-actions
(list 'quit)))]
[(and t (transition new-state new-actions))
(trace-process-step e pid behavior old-state #f t)
(enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid)
pid
new-actions)])
(lambda (exn)
(trace-process-step e pid behavior old-state exn #f)
(enqueue-actions (disable-process pid exn w) pid (list 'quit))))))
(define (update-state w pid s)
(struct-copy world w [states (hash-set (world-states w) pid s)]))
(define (send-event/guard delta pid w)
(if (patch-empty? delta)
@ -188,16 +181,9 @@
(log-error "Process ~a died with exception:\n~a"
(cons pid (trace-pid-stack))
(exn->string exn)))
(match (hash-ref (world-process-table w) pid #f)
[#f w]
[old-p
(define new-p (struct-copy process old-p [behavior (disabled (process-behavior old-p))]))
(struct-copy world w [process-table (hash-set (world-process-table w) pid new-p)])]))
(define (update-process pid p actions w)
(let* ((w (struct-copy world w [process-table (hash-set (world-process-table w) pid p)]))
(w (mark-pid-runnable w pid)))
(enqueue-actions w pid actions)))
(struct-copy world w
[behaviors (hash-remove (world-behaviors w) pid)]
[states (hash-remove (world-states w) pid)]))
(define (invoke-process pid thunk k-ok k-exn)
(define-values (ok? result)
@ -223,11 +209,10 @@
(quit actions))
(define-syntax-rule (spawn-process behavior-exp initial-state-exp initial-patch-exp ...)
(spawn (lambda (pid)
(process (apply-patch (matcher-empty)
(label-patch (patch-seq initial-patch-exp ...) (set pid)))
behavior-exp
initial-state-exp))))
(spawn (lambda ()
(list (patch-seq initial-patch-exp ...)
behavior-exp
initial-state-exp))))
(define-syntax-rule (spawn/stateless behavior-exp initial-patch-exp ...)
(spawn-process (stateless-behavior-wrap behavior-exp)
@ -244,18 +229,17 @@
(make-spawn-world (lambda () (list boot-action ...))))
(define (make-world boot-actions)
(world 0
(world (mux)
(list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a)))
(set)
(matcher-empty)
(hash)
(matcher-empty)))
(hash)))
(define (make-spawn-world boot-actions-thunk)
(spawn (lambda (pid)
(process (matcher-empty)
world-handle-event
(make-world (boot-actions-thunk))))))
(spawn (lambda ()
(list empty-patch
world-handle-event
(make-world (boot-actions-thunk))))))
(define (transition-bind k t0)
(match t0
@ -300,112 +284,56 @@
(define ((perform-action label a) w)
(match a
[(spawn boot)
(define new-pid (world-next-pid w))
(invoke-process 'booting
(lambda ()
(match (boot new-pid)
[(? process? p) p]
[other (error 'spawn
"Spawn boot procedure must yield process; received ~v"
other)]))
(lambda (new-p)
(define new-interests (process-interests new-p))
(define new-w
(update-process new-pid
new-p
'()
(struct-copy world w [next-pid (+ new-pid 1)])))
(apply-patch-in-world new-pid (patch new-interests (matcher-empty)) new-w))
(match (boot)
[(and results (list (? patch?) (? procedure?) _))
results]
[other
(error 'spawn
"Spawn boot procedure must yield boot spec; received ~v"
other)]))
(lambda (results)
(match-define (list initial-patch behavior initial-state) results)
(define-values (new-mux new-pid patches meta-action)
(mux-add-stream (world-mux w) initial-patch))
(let* ((w (update-state w new-pid initial-state))
(w (mark-pid-runnable w new-pid))
(w (struct-copy world w
[mux new-mux]
[behaviors (hash-set (world-behaviors w)
new-pid
behavior)]))
(w (deliver-patches w patches meta-action)))
w))
(lambda (exn)
(log-error "Spawned process in world ~a died with exception:\n~a"
(trace-pid-stack)
(exn->string exn))
(transition w '())))]
['quit
(define pt (world-process-table w))
(match (hash-ref pt label #f)
[#f (transition w '())]
[(process interests _ _)
(define delta (patch (matcher-empty) interests))
(define-values (discarded-actions retained-actions)
(queue-partition (lambda (e) (equal? (car e) label)) (world-pending-action-queue w)))
(when (not (queue-empty? discarded-actions))
(log-error "Process ~a had ~a queued actions at exit"
label
(queue-length discarded-actions)))
(define new-w (struct-copy world w
[process-table (hash-remove pt label)]
[pending-action-queue retained-actions]))
(apply-patch-in-world label delta new-w)])]
(define-values (new-mux _label patches meta-action) (mux-remove-stream (world-mux w) label))
(deliver-patches (struct-copy world w [mux new-mux])
;; ^ behavior & state already removed by disable-process
patches
meta-action)]
[(? patch? delta-orig)
(define p (hash-ref (world-process-table w) label #f))
(cond
[(or p (meta-label? label))
(define old-interests (if (meta-label? label)
(world-environment-interests w)
(process-interests p)))
(define delta (limit-patch (label-patch delta-orig (set label)) old-interests))
(define new-interests (apply-patch old-interests delta))
(define new-w
(if (meta-label? label)
(struct-copy world w [environment-interests new-interests])
(let* ((p (struct-copy process p [interests new-interests])))
(struct-copy world w [process-table (hash-set (world-process-table w) label p)]))))
(apply-patch-in-world label delta new-w)]
[else ;; we can still apply actions for nonexistent processes,
;; but we have to limit the patches by consulting the
;; whole routing table, making their zombie patch actions
;; potentially less efficient.
(define delta (limit-patch/routing-table (label-patch delta-orig (set label))
(world-routing-table w)))
(apply-patch-in-world label delta w)])]
(define-values (new-mux _label patches meta-action)
(mux-update-stream (world-mux w) label delta-orig))
(deliver-patches (struct-copy world w [mux new-mux])
patches
meta-action)]
[(and m (message body))
(when (observe? body)
(log-warning "Process ~a sent message containing query ~v"
(cons label (trace-pid-stack))
body))
(cond
[(matcher-match-value (world-routing-table w) body #f) ;; some other process has declared m
(transition w '())]
[else
(define local-to-meta? (and (not (meta-label? label)) ;; it's from a local process, not envt
(at-meta? body))) ;; it relates to envt, not local
(define affected-pids (if local-to-meta?
(set)
(matcher-match-value (world-routing-table w) (observe body))))
(transition (for/fold [(w w)] [(pid (in-set affected-pids))]
(send-event m pid w))
(and local-to-meta?
(message (at-meta-claim body))))])]))
(define-values (send-to-meta? affected-pids) (mux-route-message (world-mux w) label body))
(transition (for/fold [(w w)] [(pid (in-list affected-pids))]
(send-event m pid w))
(and send-to-meta? (message (at-meta-claim body))))]))
;; PRECONDITION: delta has been limited to be minimal with respect to
;; existing interests of its label in w's routing table.
(define (apply-patch-in-world label delta w)
(define old-routing-table (world-routing-table w))
(define new-routing-table (apply-patch old-routing-table delta))
(define delta-aggregate (compute-aggregate-patch delta label old-routing-table))
(define affected-pids (let ((pids (compute-affected-pids old-routing-table delta)))
(if (meta-label? label) pids (set-add pids label))))
(transition (for/fold [(w (struct-copy world w [routing-table new-routing-table]))]
[(pid affected-pids)]
(cond [(equal? pid label)
(define feedback
(patch (biased-intersection new-routing-table (patch-added delta))
(biased-intersection old-routing-table (patch-removed delta))))
(send-event/guard feedback label w)]
[else
(define p (hash-ref (world-process-table w) pid))
(define event (view-patch delta-aggregate (process-interests p)))
(send-event/guard event pid w)]))
(and (not (meta-label? label))
(drop-patch delta-aggregate))))
(define (compute-affected-pids routing-table delta)
(define cover (matcher-union (patch-added delta) (patch-removed delta)))
(matcher-match-matcher (pattern->matcher #t (observe (embedded-matcher cover)))
routing-table
#:seed (set)
#:combiner (lambda (v1 v2 acc) (set-union v2 acc))))
(define (deliver-patches w patches meta-action)
(transition (for/fold [(w w)] [(entry (in-list patches))]
(match-define (cons label event) entry)
(send-event/guard event label w))
meta-action))
(define (step-children w)
(define runnable-pids (world-runnable-pids w))
@ -416,6 +344,33 @@
(send-event #f pid w))
'())))
(define (pretty-print-world w [p (current-output-port)])
(local-require racket/pretty)
(match-define (world mux qs runnable behaviors states) w)
(fprintf p "WORLD:\n")
(fprintf p " - ~a queued actions\n" (queue-length qs))
(fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable))
(fprintf p " - ~a live processes (~a with claims)\n"
(hash-count states)
(hash-count (mux-interest-table mux)))
(fprintf p " - next pid: ~a\n" (mux-next-pid mux))
(fprintf p " - routing table:\n")
(pretty-print-matcher (mux-routing-table mux) p)
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))])
(fprintf p " ---- process ~a, behavior ~v, STATE:\n" pid (hash-ref behaviors pid #f))
(define state (hash-ref states pid #f))
(display (indented-port-output 6 (lambda (p)
(if (world? state)
(pretty-print-world state p)
(pretty-write state p))))
p)
(newline p)
(fprintf p " process ~a, behavior ~v, CLAIMS:\n" pid (hash-ref behaviors pid #f))
(display (indented-port-output 6 (lambda (p)
(pretty-print-matcher (mux-interests-of mux pid) p)))
p)
(newline p)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(module+ test

94
prospect/mux.rkt Normal file
View File

@ -0,0 +1,94 @@
#lang racket/base
;; General multiplexer.
(provide meta-label?
(except-out (struct-out mux) mux)
(rename-out [mux <mux>] [make-mux mux])
mux-add-stream
mux-remove-stream
mux-update-stream
mux-route-message
mux-interests-of)
(require racket/set)
(require racket/match)
(require "route.rkt")
(require "patch.rkt")
(require "trace.rkt")
;; A PID is a Nat.
;; A Label is a PID or 'meta.
;; Multiplexer private states
(struct mux (next-pid ;; PID
routing-table ;; (Matcherof (Setof Label))
interest-table ;; (HashTable Label Matcher)
) #:transparent)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (meta-label? x) (eq? x 'meta))
(define (make-mux)
(mux 0 (matcher-empty) (hash)))
(define (mux-add-stream m initial-patch)
(define new-pid (mux-next-pid m))
(mux-update-stream (struct-copy mux m [next-pid (+ new-pid 1)])
new-pid
initial-patch))
(define (mux-remove-stream m label)
(mux-update-stream m label (patch (matcher-empty) (pattern->matcher #t ?))))
(define (mux-update-stream m label delta-orig)
(define old-interests (mux-interests-of m label))
(define delta (limit-patch (label-patch delta-orig (set label)) old-interests))
(define new-interests (apply-patch old-interests delta))
(let* ((m (struct-copy mux m
[interest-table
(if (matcher-empty? new-interests)
(hash-remove (mux-interest-table m) label)
(hash-set (mux-interest-table m) label new-interests))])))
;; CONDITION at this point: delta has been labelled and limited to
;; be minimal with respect to existing interests of its label.
(define old-routing-table (mux-routing-table m))
(define new-routing-table (apply-patch old-routing-table delta))
(define delta-aggregate (compute-aggregate-patch delta label old-routing-table))
(define affected-pids (let ((pids (compute-affected-pids old-routing-table delta)))
(set-remove (set-add pids label) 'meta))) ;; TODO: removing meta is weird
(values (struct-copy mux m [routing-table new-routing-table])
label
(for/list [(pid affected-pids)]
(cond [(equal? pid label)
(define feedback
(patch (biased-intersection new-routing-table (patch-added delta))
(biased-intersection old-routing-table (patch-removed delta))))
(cons label feedback)]
[else
(cons pid (view-patch delta-aggregate (mux-interests-of m pid)))]))
(and (not (meta-label? label))
(drop-patch delta-aggregate)))))
(define (compute-affected-pids routing-table delta)
(define cover (matcher-union (patch-added delta) (patch-removed delta)))
(matcher-match-matcher (pattern->matcher #t (observe (embedded-matcher cover)))
routing-table
#:seed (set)
#:combiner (lambda (v1 v2 acc) (set-union v2 acc))))
(define (mux-route-message m label body)
(when (observe? body)
(log-warning "Stream ~a sent message containing query ~v"
(cons label (trace-pid-stack))
body))
(cond
[(matcher-match-value (mux-routing-table m) body #f) ;; some other stream has declared body
(values #f '())]
[(and (not (meta-label? label)) ;; it's from a local process, not envt
(at-meta? body)) ;; it relates to envt, not local
(values #t '())]
[else
(values #f (set->list (matcher-match-value (mux-routing-table m) (observe body))))]))
(define (mux-interests-of m label)
(hash-ref (mux-interest-table m) label (matcher-empty)))

View File

@ -8,9 +8,12 @@
trace-process-step
trace-internal-step
exn->string) ;; required from web-server/private/util
exn->string ;; required from web-server/private/util
string-indent
indented-port-output)
(require (only-in web-server/private/util exn->string))
(require (only-in racket/string string-join string-split))
(define trace-logger (make-logger 'minimart-trace))
@ -30,13 +33,22 @@
(log-message trace-logger 'info name "" r #f)))
;; Event PID Process (Option Exception) (Option Transition) -> Void
(define (trace-process-step e pid p exn t)
(define (trace-process-step e pid beh st exn t)
(when exn
(log-error "Process ~a died with exception:\n~a"
(cons pid (trace-pid-stack))
(exn->string exn)))
(record-trace-event 'process-step (list (cons pid (trace-pid-stack)) e p exn t)))
(record-trace-event 'process-step (list (cons pid (trace-pid-stack)) e beh st exn t)))
;; PID Action World Transition -> Void
(define (trace-internal-step pid a w t)
(record-trace-event 'internal-step (list (cons pid (trace-pid-stack)) a w t)))
(define (string-indent amount s)
(define pad (make-string amount #\space))
(string-join (for/list [(line (string-split s "\n"))] (string-append pad line)) "\n"))
(define (indented-port-output amount f)
(define p (open-output-string))
(f p)
(string-indent amount (get-output-string p)))

View File

@ -9,6 +9,7 @@
(require (only-in web-server/private/util exn->string))
(require "../core.rkt")
(require "../trace.rkt")
(require "../mux.rkt")
(define (env-aref varname default alist)
(define key (or (getenv varname) default))
@ -95,7 +96,7 @@
(let loop ()
(match-define (vector level message-string data event-name) (sync receiver))
(match* (event-name data)
[('process-step (list pids e p exn t))
[('process-step (list pids e beh st exn t))
(define pidstr (format-pids pids))
(define relevant-exn? (and show-exceptions? exn))
(match e
@ -113,22 +114,23 @@
(output "~a received a message:\n" pidstr)
(pretty-write body (current-error-port))))])
(when (or relevant-exn? show-process-states-pre?)
(when (or relevant-exn? (not (boring-state? (process-state p))))
(when (or relevant-exn? (not (boring-state? st)))
(with-color YELLOW
(output "~a's state just before the event:\n" pidstr)
(output-state (process-state p)))))
(output-state st))))
(when relevant-exn?
(with-color WHITE-ON-RED
(output "Process ~a died with exception:\n~a\n"
(output "Process ~a ~v died with exception:\n~a\n"
pidstr
beh
(exn->string exn))))
(when (quit? t)
(with-color BRIGHT-RED
(output "Process ~a exited normally.\n" pidstr)))
(output "Process ~a ~v exited normally.\n" pidstr beh)))
(when (or relevant-exn? show-process-states-post?)
(when (transition? t)
(unless (boring-state? (transition-state t))
(when (not (equal? (process-state p) (transition-state t)))
(when (not (equal? st (transition-state t)))
(with-color YELLOW
(output "~a's state just after the event:\n" pidstr)
(output-state (transition-state t)))))))]
@ -136,16 +138,16 @@
(when t ;; inert worlds don't change interestingly
(define pidstr (format-pids pids))
(define new-w (if (transition? t) (transition-state t) old-w))
(define old-processes (world-process-table old-w))
(define new-processes (world-process-table new-w))
(define newcount (hash-count new-processes))
(define newcount (hash-count (world-behaviors new-w)))
(match a
[(? spawn?)
(when (or show-process-lifecycle? show-actions?)
(define newpid (set-first (set-subtract (hash-keys new-processes)
(hash-keys old-processes))))
(define newpid (set-first (set-subtract (hash-keys (world-behaviors new-w))
(hash-keys (world-behaviors old-w)))))
(define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid
(match-define (process interests behavior state) (hash-ref new-processes newpid))
(define interests (mux-interests-of (world-mux new-w) newpid))
(define behavior (hash-ref (world-behaviors new-w) newpid))
(define state (hash-ref (world-states new-w) newpid))
(with-color BRIGHT-GREEN
(output "~a ~v spawned from ~a (~a total processes now)\n"
newpidstr
@ -160,20 +162,14 @@
(pretty-print-matcher interests (current-error-port))))]
['quit
(when (or show-process-lifecycle? show-actions?)
(match (hash-ref old-processes (car pids) (lambda () #f))
[#f (void)]
[(process interests behavior state)
(with-color BRIGHT-RED
(output "~a ~v exited (~a total processes now)\n"
pidstr
behavior
newcount))
(unless (boring-state? state)
(output "~a's final state:\n" pidstr)
(output-state state))
(unless (matcher-empty? interests)
(output "~a's final interests:\n" pidstr)
(pretty-print-matcher interests (current-error-port)))]))]
(define interests (mux-interests-of (world-mux old-w) (car pids)))
(with-color BRIGHT-RED
(output "~a exited (~a total processes now)\n"
pidstr
newcount))
(unless (matcher-empty? interests)
(output "~a's final interests:\n" pidstr)
(pretty-print-matcher interests (current-error-port))))]
[(? patch? p)
(when (or show-actions? show-patch-actions?)
(output "~a performed a patch:\n" pidstr)
@ -183,11 +179,12 @@
(output "~a sent a message:\n" pidstr)
(pretty-write body (current-error-port)))])
(when show-routing-table?
(when (not (equal? (world-routing-table old-w) (world-routing-table new-w)))
(define old-table (mux-routing-table (world-mux old-w)))
(define new-table (mux-routing-table (world-mux new-w)))
(when (not (equal? old-table new-table))
(with-color BRIGHT-BLUE
(output "~a's routing table:\n" (format-pids (cdr pids)))
(pretty-print-matcher (world-routing-table new-w)
(current-error-port))))))])
(pretty-print-matcher new-table (current-error-port))))))])
(loop))))
(void (when (not (set-empty? flags))