diff --git a/prospect/core.rkt b/prospect/core.rkt index 6a87f0e..816fa7e 100644 --- a/prospect/core.rkt +++ b/prospect/core.rkt @@ -6,7 +6,6 @@ (rename-out [quit ]) (except-out (struct-out spawn) spawn) (rename-out [spawn ]) - (struct-out process) (struct-out transition) (struct-out world) @@ -52,7 +51,9 @@ sequence-transitions world-handle-event - clean-transition) + clean-transition + + pretty-print-world) (require racket/set) (require racket/match) @@ -61,6 +62,7 @@ (require "route.rkt") (require "patch.rkt") (require "trace.rkt") +(require "mux.rkt") (module+ test (require rackunit)) ;; Events = Patches ∪ Messages @@ -69,14 +71,6 @@ ;; Actions ⊃ Events (struct spawn (boot) #:prefab) -;; Processes (machine states): (process Matcher (U Behavior (disabled Behavior)) Any) -(struct process (interests behavior state) #:transparent) - -;; Disabled Behaviors, when found in a Process, indicate that the -;; process has been disabled and is waiting out the performance of its -;; final actions before finally being removed. -(struct disabled (behavior) #:transparent) - ;; A Behavior is a ((Option Event) Any -> Transition): a function ;; mapping an Event (or, in the #f case, a poll signal) and a ;; Process's current state to a Transition. @@ -98,12 +92,11 @@ ;; A Label is a PID or 'meta. ;; VM private states -(struct world (next-pid ;; PID +(struct world (mux ;; Multiplexer pending-action-queue ;; (Queueof (Cons Label (U Action 'quit))) runnable-pids ;; (Setof PID) - routing-table ;; (Matcherof (Setof Label)) - process-table ;; (HashTable PID Process) - environment-interests ;; (Matcherof (set 'meta)) + behaviors ;; (HashTable PID Behavior) + states ;; (HashTable PID Any) ) #:transparent) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -111,8 +104,6 @@ (define (event? x) (or (patch? x) (message? x))) (define (action? x) (or (event? x) (spawn? x))) -(define (meta-label? x) (eq? x 'meta)) - (define (prepend-at-meta pattern level) (if (zero? level) pattern @@ -156,27 +147,29 @@ (filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions))) (define (send-event e pid w) - (match (hash-ref (world-process-table w) pid #f) - [#f w] - [(process _ (? disabled?) _) w] ;; disabled due to earlier error or quit - [(and p (process _ behavior old-state)) - (invoke-process pid - (lambda () (clean-transition (ensure-transition (behavior e old-state)))) - (match-lambda - [#f w] - [(and q (quit final-actions)) - (trace-process-step e pid p #f q) - (enqueue-actions (disable-process pid #f w) pid (append final-actions - (list 'quit)))] - [(and t (transition new-state new-actions)) - (trace-process-step e pid p #f t) - (update-process pid - (struct-copy process p [state new-state]) - new-actions - w)]) - (lambda (exn) - (trace-process-step e pid p exn #f) - (enqueue-actions (disable-process pid exn w) pid (list 'quit))))])) + (define behavior (hash-ref (world-behaviors w) pid #f)) + (define old-state (hash-ref (world-states w) pid #f)) + (if (not behavior) + w + (invoke-process pid + (lambda () (clean-transition (ensure-transition (behavior e old-state)))) + (match-lambda + [#f w] + [(and q (quit final-actions)) + (trace-process-step e pid behavior old-state #f q) + (enqueue-actions (disable-process pid #f w) pid (append final-actions + (list 'quit)))] + [(and t (transition new-state new-actions)) + (trace-process-step e pid behavior old-state #f t) + (enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid) + pid + new-actions)]) + (lambda (exn) + (trace-process-step e pid behavior old-state exn #f) + (enqueue-actions (disable-process pid exn w) pid (list 'quit)))))) + +(define (update-state w pid s) + (struct-copy world w [states (hash-set (world-states w) pid s)])) (define (send-event/guard delta pid w) (if (patch-empty? delta) @@ -188,16 +181,9 @@ (log-error "Process ~a died with exception:\n~a" (cons pid (trace-pid-stack)) (exn->string exn))) - (match (hash-ref (world-process-table w) pid #f) - [#f w] - [old-p - (define new-p (struct-copy process old-p [behavior (disabled (process-behavior old-p))])) - (struct-copy world w [process-table (hash-set (world-process-table w) pid new-p)])])) - -(define (update-process pid p actions w) - (let* ((w (struct-copy world w [process-table (hash-set (world-process-table w) pid p)])) - (w (mark-pid-runnable w pid))) - (enqueue-actions w pid actions))) + (struct-copy world w + [behaviors (hash-remove (world-behaviors w) pid)] + [states (hash-remove (world-states w) pid)])) (define (invoke-process pid thunk k-ok k-exn) (define-values (ok? result) @@ -223,11 +209,10 @@ (quit actions)) (define-syntax-rule (spawn-process behavior-exp initial-state-exp initial-patch-exp ...) - (spawn (lambda (pid) - (process (apply-patch (matcher-empty) - (label-patch (patch-seq initial-patch-exp ...) (set pid))) - behavior-exp - initial-state-exp)))) + (spawn (lambda () + (list (patch-seq initial-patch-exp ...) + behavior-exp + initial-state-exp)))) (define-syntax-rule (spawn/stateless behavior-exp initial-patch-exp ...) (spawn-process (stateless-behavior-wrap behavior-exp) @@ -244,18 +229,17 @@ (make-spawn-world (lambda () (list boot-action ...)))) (define (make-world boot-actions) - (world 0 + (world (mux) (list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a))) (set) - (matcher-empty) (hash) - (matcher-empty))) + (hash))) (define (make-spawn-world boot-actions-thunk) - (spawn (lambda (pid) - (process (matcher-empty) - world-handle-event - (make-world (boot-actions-thunk)))))) + (spawn (lambda () + (list empty-patch + world-handle-event + (make-world (boot-actions-thunk)))))) (define (transition-bind k t0) (match t0 @@ -300,112 +284,56 @@ (define ((perform-action label a) w) (match a [(spawn boot) - (define new-pid (world-next-pid w)) (invoke-process 'booting (lambda () - (match (boot new-pid) - [(? process? p) p] - [other (error 'spawn - "Spawn boot procedure must yield process; received ~v" - other)])) - (lambda (new-p) - (define new-interests (process-interests new-p)) - (define new-w - (update-process new-pid - new-p - '() - (struct-copy world w [next-pid (+ new-pid 1)]))) - (apply-patch-in-world new-pid (patch new-interests (matcher-empty)) new-w)) + (match (boot) + [(and results (list (? patch?) (? procedure?) _)) + results] + [other + (error 'spawn + "Spawn boot procedure must yield boot spec; received ~v" + other)])) + (lambda (results) + (match-define (list initial-patch behavior initial-state) results) + (define-values (new-mux new-pid patches meta-action) + (mux-add-stream (world-mux w) initial-patch)) + (let* ((w (update-state w new-pid initial-state)) + (w (mark-pid-runnable w new-pid)) + (w (struct-copy world w + [mux new-mux] + [behaviors (hash-set (world-behaviors w) + new-pid + behavior)])) + (w (deliver-patches w patches meta-action))) + w)) (lambda (exn) (log-error "Spawned process in world ~a died with exception:\n~a" (trace-pid-stack) (exn->string exn)) (transition w '())))] ['quit - (define pt (world-process-table w)) - (match (hash-ref pt label #f) - [#f (transition w '())] - [(process interests _ _) - (define delta (patch (matcher-empty) interests)) - (define-values (discarded-actions retained-actions) - (queue-partition (lambda (e) (equal? (car e) label)) (world-pending-action-queue w))) - (when (not (queue-empty? discarded-actions)) - (log-error "Process ~a had ~a queued actions at exit" - label - (queue-length discarded-actions))) - (define new-w (struct-copy world w - [process-table (hash-remove pt label)] - [pending-action-queue retained-actions])) - (apply-patch-in-world label delta new-w)])] + (define-values (new-mux _label patches meta-action) (mux-remove-stream (world-mux w) label)) + (deliver-patches (struct-copy world w [mux new-mux]) + ;; ^ behavior & state already removed by disable-process + patches + meta-action)] [(? patch? delta-orig) - (define p (hash-ref (world-process-table w) label #f)) - (cond - [(or p (meta-label? label)) - (define old-interests (if (meta-label? label) - (world-environment-interests w) - (process-interests p))) - (define delta (limit-patch (label-patch delta-orig (set label)) old-interests)) - (define new-interests (apply-patch old-interests delta)) - (define new-w - (if (meta-label? label) - (struct-copy world w [environment-interests new-interests]) - (let* ((p (struct-copy process p [interests new-interests]))) - (struct-copy world w [process-table (hash-set (world-process-table w) label p)])))) - (apply-patch-in-world label delta new-w)] - [else ;; we can still apply actions for nonexistent processes, - ;; but we have to limit the patches by consulting the - ;; whole routing table, making their zombie patch actions - ;; potentially less efficient. - (define delta (limit-patch/routing-table (label-patch delta-orig (set label)) - (world-routing-table w))) - (apply-patch-in-world label delta w)])] + (define-values (new-mux _label patches meta-action) + (mux-update-stream (world-mux w) label delta-orig)) + (deliver-patches (struct-copy world w [mux new-mux]) + patches + meta-action)] [(and m (message body)) - (when (observe? body) - (log-warning "Process ~a sent message containing query ~v" - (cons label (trace-pid-stack)) - body)) - (cond - [(matcher-match-value (world-routing-table w) body #f) ;; some other process has declared m - (transition w '())] - [else - (define local-to-meta? (and (not (meta-label? label)) ;; it's from a local process, not envt - (at-meta? body))) ;; it relates to envt, not local - (define affected-pids (if local-to-meta? - (set) - (matcher-match-value (world-routing-table w) (observe body)))) - (transition (for/fold [(w w)] [(pid (in-set affected-pids))] - (send-event m pid w)) - (and local-to-meta? - (message (at-meta-claim body))))])])) + (define-values (send-to-meta? affected-pids) (mux-route-message (world-mux w) label body)) + (transition (for/fold [(w w)] [(pid (in-list affected-pids))] + (send-event m pid w)) + (and send-to-meta? (message (at-meta-claim body))))])) -;; PRECONDITION: delta has been limited to be minimal with respect to -;; existing interests of its label in w's routing table. -(define (apply-patch-in-world label delta w) - (define old-routing-table (world-routing-table w)) - (define new-routing-table (apply-patch old-routing-table delta)) - (define delta-aggregate (compute-aggregate-patch delta label old-routing-table)) - (define affected-pids (let ((pids (compute-affected-pids old-routing-table delta))) - (if (meta-label? label) pids (set-add pids label)))) - (transition (for/fold [(w (struct-copy world w [routing-table new-routing-table]))] - [(pid affected-pids)] - (cond [(equal? pid label) - (define feedback - (patch (biased-intersection new-routing-table (patch-added delta)) - (biased-intersection old-routing-table (patch-removed delta)))) - (send-event/guard feedback label w)] - [else - (define p (hash-ref (world-process-table w) pid)) - (define event (view-patch delta-aggregate (process-interests p))) - (send-event/guard event pid w)])) - (and (not (meta-label? label)) - (drop-patch delta-aggregate)))) - -(define (compute-affected-pids routing-table delta) - (define cover (matcher-union (patch-added delta) (patch-removed delta))) - (matcher-match-matcher (pattern->matcher #t (observe (embedded-matcher cover))) - routing-table - #:seed (set) - #:combiner (lambda (v1 v2 acc) (set-union v2 acc)))) +(define (deliver-patches w patches meta-action) + (transition (for/fold [(w w)] [(entry (in-list patches))] + (match-define (cons label event) entry) + (send-event/guard event label w)) + meta-action)) (define (step-children w) (define runnable-pids (world-runnable-pids w)) @@ -416,6 +344,33 @@ (send-event #f pid w)) '()))) +(define (pretty-print-world w [p (current-output-port)]) + (local-require racket/pretty) + (match-define (world mux qs runnable behaviors states) w) + (fprintf p "WORLD:\n") + (fprintf p " - ~a queued actions\n" (queue-length qs)) + (fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable)) + (fprintf p " - ~a live processes (~a with claims)\n" + (hash-count states) + (hash-count (mux-interest-table mux))) + (fprintf p " - next pid: ~a\n" (mux-next-pid mux)) + (fprintf p " - routing table:\n") + (pretty-print-matcher (mux-routing-table mux) p) + (for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))]) + (fprintf p " ---- process ~a, behavior ~v, STATE:\n" pid (hash-ref behaviors pid #f)) + (define state (hash-ref states pid #f)) + (display (indented-port-output 6 (lambda (p) + (if (world? state) + (pretty-print-world state p) + (pretty-write state p)))) + p) + (newline p) + (fprintf p " process ~a, behavior ~v, CLAIMS:\n" pid (hash-ref behaviors pid #f)) + (display (indented-port-output 6 (lambda (p) + (pretty-print-matcher (mux-interests-of mux pid) p))) + p) + (newline p))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (module+ test diff --git a/prospect/mux.rkt b/prospect/mux.rkt new file mode 100644 index 0000000..cae8886 --- /dev/null +++ b/prospect/mux.rkt @@ -0,0 +1,94 @@ +#lang racket/base +;; General multiplexer. + +(provide meta-label? + (except-out (struct-out mux) mux) + (rename-out [mux ] [make-mux mux]) + mux-add-stream + mux-remove-stream + mux-update-stream + mux-route-message + mux-interests-of) + +(require racket/set) +(require racket/match) +(require "route.rkt") +(require "patch.rkt") +(require "trace.rkt") + +;; A PID is a Nat. +;; A Label is a PID or 'meta. +;; Multiplexer private states +(struct mux (next-pid ;; PID + routing-table ;; (Matcherof (Setof Label)) + interest-table ;; (HashTable Label Matcher) + ) #:transparent) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (meta-label? x) (eq? x 'meta)) + +(define (make-mux) + (mux 0 (matcher-empty) (hash))) + +(define (mux-add-stream m initial-patch) + (define new-pid (mux-next-pid m)) + (mux-update-stream (struct-copy mux m [next-pid (+ new-pid 1)]) + new-pid + initial-patch)) + +(define (mux-remove-stream m label) + (mux-update-stream m label (patch (matcher-empty) (pattern->matcher #t ?)))) + +(define (mux-update-stream m label delta-orig) + (define old-interests (mux-interests-of m label)) + (define delta (limit-patch (label-patch delta-orig (set label)) old-interests)) + (define new-interests (apply-patch old-interests delta)) + (let* ((m (struct-copy mux m + [interest-table + (if (matcher-empty? new-interests) + (hash-remove (mux-interest-table m) label) + (hash-set (mux-interest-table m) label new-interests))]))) + ;; CONDITION at this point: delta has been labelled and limited to + ;; be minimal with respect to existing interests of its label. + (define old-routing-table (mux-routing-table m)) + (define new-routing-table (apply-patch old-routing-table delta)) + (define delta-aggregate (compute-aggregate-patch delta label old-routing-table)) + (define affected-pids (let ((pids (compute-affected-pids old-routing-table delta))) + (set-remove (set-add pids label) 'meta))) ;; TODO: removing meta is weird + (values (struct-copy mux m [routing-table new-routing-table]) + label + (for/list [(pid affected-pids)] + (cond [(equal? pid label) + (define feedback + (patch (biased-intersection new-routing-table (patch-added delta)) + (biased-intersection old-routing-table (patch-removed delta)))) + (cons label feedback)] + [else + (cons pid (view-patch delta-aggregate (mux-interests-of m pid)))])) + (and (not (meta-label? label)) + (drop-patch delta-aggregate))))) + +(define (compute-affected-pids routing-table delta) + (define cover (matcher-union (patch-added delta) (patch-removed delta))) + (matcher-match-matcher (pattern->matcher #t (observe (embedded-matcher cover))) + routing-table + #:seed (set) + #:combiner (lambda (v1 v2 acc) (set-union v2 acc)))) + +(define (mux-route-message m label body) + (when (observe? body) + (log-warning "Stream ~a sent message containing query ~v" + (cons label (trace-pid-stack)) + body)) + (cond + [(matcher-match-value (mux-routing-table m) body #f) ;; some other stream has declared body + (values #f '())] + [(and (not (meta-label? label)) ;; it's from a local process, not envt + (at-meta? body)) ;; it relates to envt, not local + (values #t '())] + [else + (values #f (set->list (matcher-match-value (mux-routing-table m) (observe body))))])) + +(define (mux-interests-of m label) + (hash-ref (mux-interest-table m) label (matcher-empty))) diff --git a/prospect/trace.rkt b/prospect/trace.rkt index 6ddbba1..15b560a 100644 --- a/prospect/trace.rkt +++ b/prospect/trace.rkt @@ -8,9 +8,12 @@ trace-process-step trace-internal-step - exn->string) ;; required from web-server/private/util + exn->string ;; required from web-server/private/util + string-indent + indented-port-output) (require (only-in web-server/private/util exn->string)) +(require (only-in racket/string string-join string-split)) (define trace-logger (make-logger 'minimart-trace)) @@ -30,13 +33,22 @@ (log-message trace-logger 'info name "" r #f))) ;; Event PID Process (Option Exception) (Option Transition) -> Void -(define (trace-process-step e pid p exn t) +(define (trace-process-step e pid beh st exn t) (when exn (log-error "Process ~a died with exception:\n~a" (cons pid (trace-pid-stack)) (exn->string exn))) - (record-trace-event 'process-step (list (cons pid (trace-pid-stack)) e p exn t))) + (record-trace-event 'process-step (list (cons pid (trace-pid-stack)) e beh st exn t))) ;; PID Action World Transition -> Void (define (trace-internal-step pid a w t) (record-trace-event 'internal-step (list (cons pid (trace-pid-stack)) a w t))) + +(define (string-indent amount s) + (define pad (make-string amount #\space)) + (string-join (for/list [(line (string-split s "\n"))] (string-append pad line)) "\n")) + +(define (indented-port-output amount f) + (define p (open-output-string)) + (f p) + (string-indent amount (get-output-string p))) diff --git a/prospect/trace/stderr.rkt b/prospect/trace/stderr.rkt index 9d43ea1..64cb238 100644 --- a/prospect/trace/stderr.rkt +++ b/prospect/trace/stderr.rkt @@ -9,6 +9,7 @@ (require (only-in web-server/private/util exn->string)) (require "../core.rkt") (require "../trace.rkt") +(require "../mux.rkt") (define (env-aref varname default alist) (define key (or (getenv varname) default)) @@ -95,7 +96,7 @@ (let loop () (match-define (vector level message-string data event-name) (sync receiver)) (match* (event-name data) - [('process-step (list pids e p exn t)) + [('process-step (list pids e beh st exn t)) (define pidstr (format-pids pids)) (define relevant-exn? (and show-exceptions? exn)) (match e @@ -113,22 +114,23 @@ (output "~a received a message:\n" pidstr) (pretty-write body (current-error-port))))]) (when (or relevant-exn? show-process-states-pre?) - (when (or relevant-exn? (not (boring-state? (process-state p)))) + (when (or relevant-exn? (not (boring-state? st))) (with-color YELLOW (output "~a's state just before the event:\n" pidstr) - (output-state (process-state p))))) + (output-state st)))) (when relevant-exn? (with-color WHITE-ON-RED - (output "Process ~a died with exception:\n~a\n" + (output "Process ~a ~v died with exception:\n~a\n" pidstr + beh (exn->string exn)))) (when (quit? t) (with-color BRIGHT-RED - (output "Process ~a exited normally.\n" pidstr))) + (output "Process ~a ~v exited normally.\n" pidstr beh))) (when (or relevant-exn? show-process-states-post?) (when (transition? t) (unless (boring-state? (transition-state t)) - (when (not (equal? (process-state p) (transition-state t))) + (when (not (equal? st (transition-state t))) (with-color YELLOW (output "~a's state just after the event:\n" pidstr) (output-state (transition-state t)))))))] @@ -136,16 +138,16 @@ (when t ;; inert worlds don't change interestingly (define pidstr (format-pids pids)) (define new-w (if (transition? t) (transition-state t) old-w)) - (define old-processes (world-process-table old-w)) - (define new-processes (world-process-table new-w)) - (define newcount (hash-count new-processes)) + (define newcount (hash-count (world-behaviors new-w))) (match a [(? spawn?) (when (or show-process-lifecycle? show-actions?) - (define newpid (set-first (set-subtract (hash-keys new-processes) - (hash-keys old-processes)))) + (define newpid (set-first (set-subtract (hash-keys (world-behaviors new-w)) + (hash-keys (world-behaviors old-w))))) (define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid - (match-define (process interests behavior state) (hash-ref new-processes newpid)) + (define interests (mux-interests-of (world-mux new-w) newpid)) + (define behavior (hash-ref (world-behaviors new-w) newpid)) + (define state (hash-ref (world-states new-w) newpid)) (with-color BRIGHT-GREEN (output "~a ~v spawned from ~a (~a total processes now)\n" newpidstr @@ -160,20 +162,14 @@ (pretty-print-matcher interests (current-error-port))))] ['quit (when (or show-process-lifecycle? show-actions?) - (match (hash-ref old-processes (car pids) (lambda () #f)) - [#f (void)] - [(process interests behavior state) - (with-color BRIGHT-RED - (output "~a ~v exited (~a total processes now)\n" - pidstr - behavior - newcount)) - (unless (boring-state? state) - (output "~a's final state:\n" pidstr) - (output-state state)) - (unless (matcher-empty? interests) - (output "~a's final interests:\n" pidstr) - (pretty-print-matcher interests (current-error-port)))]))] + (define interests (mux-interests-of (world-mux old-w) (car pids))) + (with-color BRIGHT-RED + (output "~a exited (~a total processes now)\n" + pidstr + newcount)) + (unless (matcher-empty? interests) + (output "~a's final interests:\n" pidstr) + (pretty-print-matcher interests (current-error-port))))] [(? patch? p) (when (or show-actions? show-patch-actions?) (output "~a performed a patch:\n" pidstr) @@ -183,11 +179,12 @@ (output "~a sent a message:\n" pidstr) (pretty-write body (current-error-port)))]) (when show-routing-table? - (when (not (equal? (world-routing-table old-w) (world-routing-table new-w))) + (define old-table (mux-routing-table (world-mux old-w))) + (define new-table (mux-routing-table (world-mux new-w))) + (when (not (equal? old-table new-table)) (with-color BRIGHT-BLUE (output "~a's routing table:\n" (format-pids (cdr pids))) - (pretty-print-matcher (world-routing-table new-w) - (current-error-port))))))]) + (pretty-print-matcher new-table (current-error-port))))))]) (loop)))) (void (when (not (set-empty? flags))