diff --git a/prospect/core.rkt b/prospect/core.rkt index 49ef0b2..dbb537b 100644 --- a/prospect/core.rkt +++ b/prospect/core.rkt @@ -11,6 +11,7 @@ (struct-out network) (struct-out seal) + clean-actions (all-from-out "patch.rkt") @@ -45,7 +46,6 @@ unpub (rename-out [make-quit quit]) - make-network spawn-network (rename-out [spawn-process spawn]) spawn/stateless @@ -57,9 +57,9 @@ sequence-transitions0 sequence-transitions0* - network-handle-event clean-transition + fork-network pretty-print-network) (require racket/set) @@ -71,6 +71,7 @@ (require "trace.rkt") (require "mux.rkt") (require "pretty.rkt") +(require racket/async-channel) (module+ test (require rackunit)) ;; Events = Patches ∪ Messages @@ -104,16 +105,19 @@ ;; VM private states (struct network (mux ;; Multiplexer - pending-action-queue ;; (Queueof (Cons Label (U Action 'quit))) - runnable-pids ;; (Setof PID) - behaviors ;; (HashTable PID Behavior) - states ;; (HashTable PID Any) + event-channel ;; incoming events + action-channel ;; outgoing actions + event-channels ;; (HashTable PID Channel) send events to children + action-channels ;; (HashTable PID Channel) actions coming from children ) #:transparent #:methods gen:prospect-pretty-printable [(define (prospect-pretty-print w [p (current-output-port)]) (pretty-print-network w p))]) +(struct network-boot-spec (actions) #:transparent) + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Seals are used by protocols to prevent the routing tries from ;; examining internal structure of values. @@ -124,6 +128,7 @@ (define (event? x) (or (patch? x) (message? x))) (define (action? x) (or (event? x) (spawn? x) (quit-network? x))) +(define (internal-action? x) (or (equal? x 'quit) (action? x))) (define (prepend-at-meta pattern level) (if (zero? level) @@ -170,46 +175,221 @@ (define (clean-actions actions) (filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions))) +;; Behavior Any (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread +;; Creates a thread running the behavior of a leaf actor. +;; Process incoming events and send any resulting actions. +(define (fork-leaf behavior state pid event-channel action-channel) + (define (send-action a) (async-channel-put action-channel (cons pid a))) + (define (send-actions as) (for ([a (in-list (flatten as))]) (send-action a))) + (thread + (lambda () + (let loop ([old-state state]) + (define event (async-channel-get event-channel)) + (begin + (trace-process-step event pid behavior old-state) + (invoke-process pid + (lambda () (clean-transition (ensure-transition (behavior event old-state)))) + (match-lambda + [#f (loop old-state)] + [(and q (quit exn final-actions)) + (trace-process-step-result event pid behavior old-state exn q) + (send-actions final-actions) + (send-action 'quit)] + [(and t (transition new-state new-actions)) + (trace-process-step-result event pid behavior old-state #f t) + (send-actions new-actions) + (loop new-state)]) + (lambda (exn) + (trace-process-step-result event pid behavior old-state exn #f) + (send-action 'quit)))))))) + +;; PID (Listof Action) (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread +(define (fork-network pid boot-actions event-channel action-channel) + ;; Network PID Action (Network -> X) -> (U X #f) + (define (action-step w src a k) + (trace-internal-action src a w) + (define wt1 (perform-action src a w)) + (trace-internal-action-result pid a w wt1) + (match wt1 + [(? quit?) #f] + [(transition new-network actions) + (for ([a (in-list (flatten actions))]) + (async-channel-put action-channel (cons pid a))) + (k new-network)])) + (thread + (lambda () + ;; run boot actions + (define w0 + (for/fold ([w (network (mux) + event-channel + action-channel + (hash) + (hash))]) + ([a (in-list boot-actions)]) + (let ([wn (action-step w 'meta a (lambda (x) x))]) + (or wn (kill-thread (current-thread)))))) + (let loop ([w w0]) + (define event-or-action + (apply sync (cons (network-event-channel w) + (hash-values (network-action-channels w))))) + (match event-or-action + [(cons pid (? internal-action? action)) + (action-step w pid action loop)] + [(? patch? delta) + (action-step w 'meta (lift-patch delta) loop)] + [(message body) + (action-step w 'meta (message (at-meta body)) loop)]))))) + +;; Label -> Action -> Network -> (Transition Network) +(define (perform-action label a w) + (match a + [(spawn boot) + ;; boot : -> (List Behavior Transition) + (invoke-process 'booting + (lambda () + (match (boot) + [(and results (? network-boot-spec?)) + results] + [(and results (list (? procedure?) (? general-transition?))) + results] + [other + (error 'spawn + "Spawn boot procedure must yield boot spec; received ~v" + other)])) + (match-lambda + [(network-boot-spec boot-actions) + (create-network w boot-actions)] + [(list behavior initial-transition) + (create-process w behavior initial-transition)]) + (lambda (exn) + (log-error "Spawned process in network ~a died with exception:\n~a" + (trace-pid-stack) + (exn->string exn)) + (transition w '())))] + ['quit + (define-values (new-mux _label delta delta-aggregate) (mux-remove-stream (network-mux w) label)) + (let ([w (disable-process w label #f)]) + ;; behavior & state in w already removed by disable-process + (deliver-patches w new-mux label delta delta-aggregate))] + [(? patch? delta-orig) + (define-values (new-mux _label delta delta-aggregate) + (mux-update-stream (network-mux w) label delta-orig)) + (deliver-patches w new-mux label delta delta-aggregate)] + [(and m (message body)) + (when (observe? body) + (log-warning "Stream ~a sent message containing query ~v" + (cons label (trace-pid-stack)) + body)) + (if (and (not (meta-label? label)) ;; it's from a local process, not envt + (at-meta? body)) ;; it relates to envt, not local + (transition w (message (at-meta-claim body))) + (transition (for/fold [(w w)] + [(pid (in-list (mux-route-message (network-mux w) body)))] + (send-event m pid w)) + '()))])) + +(define (empty-network) + (network (mux) + (make-async-channel) + (make-async-channel) + (hash) + (hash))) + +(module+ test + (let* ([network (empty-network)] + [t (perform-action 'meta (spawn-process (lambda (e s) #f) #f '()) network)]) + (check-true (transition? t)) + (match t + [(transition new-network actions) + (check-equal? (hash-count (network-event-channels new-network)) 1) + (check-equal? (hash-count (network-action-channels new-network)) 1) + (check-equal? actions (patch #f #f))]))) + +;; Network PID Behavior Any (Listof Action) -> Network +(define (boot-leaf w pid behavior initial-state boot-actions) + (define event-chan (make-async-channel)) + (define action-chan (make-async-channel)) + (for ([a (in-list boot-actions)]) + (async-channel-put action-chan (cons pid a))) + (fork-leaf behavior initial-state pid event-chan action-chan) + (struct-copy network w + [event-channels (hash-set (network-event-channels w) pid event-chan)] + [action-channels (hash-set (network-action-channels w) pid action-chan)])) + +;; Network PID (Listof Action) -> Network +(define (create-network w pid boot-actions) + (define event-chan (make-async-channel)) + (define action-chan (make-async-channel)) + (fork-network pid boot-actions event-chan action-chan) + (struct-copy network w + [event-channels (hash-set (network-event-channels w) pid event-chan)] + [action-channels (hash-set (network-action-channels w) pid action-chan)])) + +;; Network Behavior (Transition Any) -> (Transition Network) +(define (create-process w behavior initial-transition) + (if (not initial-transition) + (transition w '()) ;; Uh, ok + (let () + ;; postprocess : Network PID -> Network + (define-values (postprocess initial-actions) + (match (clean-transition initial-transition) + [(and q (quit exn initial-actions0)) + (values (lambda (w pid remaining-actions) + (trace-process-step-result 'boot pid behavior (void) exn q) + (define chan (make-async-channel)) + (for ([a (in-list remaining-actions)]) + (async-channel-put chan (cons pid a))) + (struct-copy network w + [action-channels (hash-set (network-action-channels w) pid chan)])) + (append initial-actions0 (list 'quit)))] + [(and t (transition initial-state initial-actions0)) + (values (lambda (w pid remaining-actions) + (trace-process-step-result 'boot pid behavior (void) #f t) + (boot-leaf w pid behavior initial-state remaining-actions)) + initial-actions0)])) + ;; put the initial patch into affect to allow for a form of continuity + ;; between spawned actors and their actions + (define-values (initial-patch remaining-initial-actions) + (match initial-actions + [(cons (? patch? p) rest) (values p rest)] + [other (values empty-patch other)])) + (define-values (new-mux new-pid delta delta-aggregate) + (mux-add-stream (network-mux w) initial-patch)) + (let ([w (postprocess w new-pid remaining-initial-actions)]) + (deliver-patches w new-mux new-pid delta delta-aggregate))))) + +;; Event PID Network -> Network (define (send-event e pid w) - (define behavior (hash-ref (network-behaviors w) pid #f)) - (define old-state (hash-ref (network-states w) pid #f)) - (if (not behavior) - w - (begin - (trace-process-step e pid behavior old-state) - (invoke-process pid - (lambda () (clean-transition (ensure-transition (behavior e old-state)))) - (match-lambda - [#f w] - [(and q (quit exn final-actions)) - (trace-process-step-result e pid behavior old-state exn q) - (enqueue-actions (disable-process pid exn w) pid (append final-actions - (list 'quit)))] - [(and t (transition new-state new-actions)) - (trace-process-step-result e pid behavior old-state #f t) - (enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid) - pid - new-actions)]) - (lambda (exn) - (trace-process-step-result e pid behavior old-state exn #f) - (enqueue-actions (disable-process pid exn w) pid (list 'quit))))))) - -(define (update-state w pid s) - (struct-copy network w [states (hash-set (network-states w) pid s)])) + (define chan (hash-ref (network-event-channels w) pid #f)) + (when chan + (async-channel-put chan e)) + w) +;; Patch PID Network -> Network (define (send-event/guard delta pid w) (if (patch-empty? delta) w (send-event delta pid w))) -(define (disable-process pid exn w) +;; Network Mux Label Patch Patch -> (Transition Network) +(define (deliver-patches w new-mux acting-label delta delta-aggregate) + (define-values (patches meta-action) + (compute-patches (network-mux w) new-mux acting-label delta delta-aggregate)) + (transition (for/fold [(w (struct-copy network w [mux new-mux]))] + [(entry (in-list patches))] + (match-define (cons label event) entry) + (send-event/guard event label w)) + meta-action)) + +;; PID Exception Network -> Network +(define (disable-process cw pid exn) (when exn (log-error "Process ~a died with exception:\n~a" (cons pid (trace-pid-stack)) (exn->string exn))) - (struct-copy network w - [behaviors (hash-remove (network-behaviors w) pid)] - [states (hash-remove (network-states w) pid)])) + (struct-copy network cw + [event-channels (hash-remove (network-event-channels cw) pid)] + [action-channels (hash-remove (network-action-channels cw) pid)])) (define (invoke-process pid thunk k-ok k-exn) (define-values (ok? result) @@ -222,15 +402,6 @@ (k-ok result) (k-exn result))) -(define (mark-pid-runnable w pid) - (struct-copy network w [runnable-pids (set-add (network-runnable-pids w) pid)])) - -(define (enqueue-actions w label actions) - (struct-copy network w - [pending-action-queue - (queue-append-list (network-pending-action-queue w) - (for/list [(a actions)] (cons label a)))])) - (define (make-quit #:exception [exn #f] . actions) (quit exn actions)) @@ -261,17 +432,8 @@ (define-syntax-rule (spawn-network boot-action ...) (make-spawn-network (lambda () (list boot-action ...)))) -(define (make-network boot-actions) - (network (mux) - (list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a))) - (set) - (hash) - (hash))) - (define (make-spawn-network boot-actions-thunk) - (spawn (lambda () - (list network-handle-event - (transition (make-network (boot-actions-thunk)) '()))))) + (network-boot-spec (clean-actions (boot-actions-thunk)))) (define (transition-bind k t0) (match t0 @@ -301,158 +463,16 @@ [(? quit? q) q] [(? transition? t) (sequence-transitions* t rest)])])) -(define (inert? w) - (and (queue-empty? (network-pending-action-queue w)) - (set-empty? (network-runnable-pids w)))) - -(define (network-handle-event e w) - (if (or e (not (inert? w))) - (sequence-transitions (transition w '()) - (inject-event e) - perform-actions - (lambda (w) (or (step-children w) (transition w '())))) - (step-children w))) - -(define ((inject-event e) w) - (transition (match e - [#f w] - [(? patch? delta) (enqueue-actions w 'meta (list (lift-patch delta)))] - [(message body) (enqueue-actions w 'meta (list (message (at-meta body))))]) - '())) - -(define (perform-actions w) - (for/fold ([wt (transition (struct-copy network w [pending-action-queue (make-queue)]) '())]) - ((entry (in-list (queue->list (network-pending-action-queue w))))) - #:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn? - (match-define [cons label a] entry) - (trace-internal-action label a (transition-state wt)) - (define wt1 (transition-bind (perform-action label a) wt)) - (trace-internal-action-result label a (transition-state wt) wt1) - wt1)) - -(define ((perform-action label a) w) - (match a - [(spawn boot) - (invoke-process 'booting - (lambda () - (match (boot) - [(and results (list (? procedure?) (? general-transition?))) - results] - [other - (error 'spawn - "Spawn boot procedure must yield boot spec; received ~v" - other)])) - (lambda (results) - (match-define (list behavior initial-transition) results) - (create-process w behavior initial-transition)) - (lambda (exn) - (log-error "Spawned process in network ~a died with exception:\n~a" - (trace-pid-stack) - (exn->string exn)) - (transition w '())))] - ['quit - (define-values (new-mux _label delta delta-aggregate) - (mux-remove-stream (network-mux w) label)) - ;; behavior & state in w already removed by disable-process - (deliver-patches w new-mux label delta delta-aggregate)] - [(quit-network) - (make-quit)] - [(? patch? delta-orig) - (define-values (new-mux _label delta delta-aggregate) - (mux-update-stream (network-mux w) label delta-orig)) - (deliver-patches w new-mux label delta delta-aggregate)] - [(and m (message body)) - (when (observe? body) - (log-warning "Stream ~a sent message containing query ~v" - (cons label (trace-pid-stack)) - body)) - (if (and (not (meta-label? label)) ;; it's from a local process, not envt - (at-meta? body)) ;; it relates to envt, not local - (transition w (message (at-meta-claim body))) - (transition (for/fold [(w w)] - [(pid (in-list (mux-route-message (network-mux w) body)))] - (send-event m pid w)) - '()))])) - -(define (create-process w behavior initial-transition) - (if (not initial-transition) - (transition w '()) ;; Uh, ok - (let () - (define-values (postprocess initial-actions) - (match (clean-transition initial-transition) - [(and q (quit exn initial-actions0)) - (values (lambda (w pid) - (trace-process-step-result 'boot pid behavior (void) exn q) - (disable-process pid exn w)) - (append initial-actions0 (list 'quit)))] - [(and t (transition initial-state initial-actions0)) - (values (lambda (w pid) - (trace-process-step-result 'boot pid behavior (void) #f t) - (mark-pid-runnable (update-state w pid initial-state) pid)) - initial-actions0)])) - (define-values (initial-patch remaining-initial-actions) - (match initial-actions - [(cons (? patch? p) rest) (values p rest)] - [other (values empty-patch other)])) - (define-values (new-mux new-pid delta delta-aggregate) - (mux-add-stream (network-mux w) initial-patch)) - (let* ((w (struct-copy network w - [behaviors (hash-set (network-behaviors w) - new-pid - behavior)])) - (w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions))) - (deliver-patches w new-mux new-pid delta delta-aggregate))))) - -(define (deliver-patches w new-mux acting-label delta delta-aggregate) - (define-values (patches meta-action) - (compute-patches (network-mux w) new-mux acting-label delta delta-aggregate)) - (transition (for/fold [(w (struct-copy network w [mux new-mux]))] - [(entry (in-list patches))] - (match-define (cons label event) entry) - (send-event/guard event label w)) - meta-action)) - -(define (step-children w) - (define runnable-pids (network-runnable-pids w)) - (if (set-empty? runnable-pids) - #f ;; network is inert. - (transition (for/fold [(w (struct-copy network w [runnable-pids (set)]))] - [(pid (in-set runnable-pids))] - (send-event #f pid w)) - '()))) - (define (pretty-print-network w [p (current-output-port)]) - (match-define (network mux qs runnable behaviors states) w) + (match-define (network mux events-in actions-out event-chans action-chans) w) (fprintf p "NETWORK:\n") - (fprintf p " - ~a queued actions\n" (queue-length qs)) - (fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable)) - (fprintf p " - ~a live processes\n" (hash-count states)) + (fprintf p " - ~a live processes\n" (hash-count event-chans)) (fprintf p " - ") (display (indented-port-output 3 (lambda (p) (prospect-pretty-print mux p)) #:first-line? #f) p) - (newline p) - (for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))]) - (fprintf p " ---- process ~a, behavior ~v, STATE:\n" pid (hash-ref behaviors pid #f)) - (define state (hash-ref states pid #f)) - (display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p) - (newline p) - (fprintf p " process ~a, behavior ~v, CLAIMS:\n" pid (hash-ref behaviors pid #f)) + (for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys event-chans))]) + (fprintf p " process ~a, CLAIMS:\n" pid ) (display (indented-port-output 6 (lambda (p) (pretty-print-trie (mux-interests-of mux pid) p))) p) (newline p))) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(module+ test - (require racket/pretty) - - (define (step* w) - (let loop ((w w) (actions '())) - (pretty-print w) - (match (network-handle-event #f w) - [#f (values w #f (flatten actions))] - [(quit exn new-actions) (values w exn (flatten (cons actions new-actions)))] - [(transition new-w new-actions) (loop new-w (cons actions new-actions))]))) - - (step* (make-network '())) - ) diff --git a/prospect/ground.rkt b/prospect/ground.rkt index ade1926..33c4d2f 100644 --- a/prospect/ground.rkt +++ b/prospect/ground.rkt @@ -76,31 +76,32 @@ ;; Action* -> Void ;; Runs a ground VM, booting the outermost Network with the given Actions. (define (run-ground . boot-actions) + (define event-chan (make-async-channel)) + (define action-chan (make-async-channel)) + (fork-network 0 (clean-actions boot-actions) event-chan action-chan) (let await-interrupt ((inert? #f) - (w (make-network boot-actions)) (interests (trie-empty))) ;; (log-info "GROUND INTERESTS:\n~a" (trie->pretty-string interests)) (if (and inert? (trie-empty? interests)) (begin (log-info "run-ground: Terminating because inert") (void)) - (let ((e (apply sync + (let ((e/a (apply sync (current-ground-event-async-channel) + action-chan (if inert? never-evt idle-handler) (extract-active-events interests)))) - (trace-process-step e #f network-handle-event w) - (define resulting-transition (clean-transition (network-handle-event e w))) - (trace-process-step-result e #f network-handle-event w #f resulting-transition) - (match resulting-transition - [#f ;; inert - (await-interrupt #t w interests)] - [(transition w actions) - (let process-actions ((actions actions) (interests interests)) - (match actions - ['() (await-interrupt #f w interests)] - [(cons a actions) - (match a - [(? patch? p) - (process-actions actions (apply-patch interests (label-patch p (datum-tset 'root))))] - [_ - (log-warning "run-ground: ignoring useless meta-action ~v" a) - (process-actions actions interests)])]))]))))) + (match e/a + [#f + ;; system is idle + (await-interrupt #t interests)] + [(? event? e) + (async-channel-put event-chan e) + (await-interrupt #f interests)] + [(cons pid a) + (match a + [(? patch? p) + (define new-interests (apply-patch interests (label-patch p (datum-tset 'root)))) + (await-interrupt #f new-interests)] + [_ + (log-warning "run-ground: ignoring useless meta-action ~v" a) + (await-interrupt #f interests)])]))))) diff --git a/prospect/trace/stderr.rkt b/prospect/trace/stderr.rkt index ad26bf6..2a26d50 100644 --- a/prospect/trace/stderr.rkt +++ b/prospect/trace/stderr.rkt @@ -163,7 +163,7 @@ (prospect-pretty-print (transition-state t) (current-error-port)))))))] [('internal-action (list pids a old-w)) (define pidstr (format-pids pids)) - (define oldcount (hash-count (network-behaviors old-w))) + (define oldcount (hash-count (network-event-channels old-w))) (match a [(? spawn?) ;; Handle this in internal-action-result @@ -193,15 +193,15 @@ (when (transition? t) (define new-w (transition-state t)) (define pidstr (format-pids pids)) - (define newcount (hash-count (network-behaviors new-w))) + (define newcount (hash-count (network-event-channels new-w))) (match a [(? spawn?) (when (or show-process-lifecycle? show-actions?) (define newpid (mux-next-pid (network-mux old-w))) (define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid (define interests (mux-interests-of (network-mux new-w) newpid)) - (define behavior (hash-ref (network-behaviors new-w) newpid '#:missing-behavior)) - (define state (hash-ref (network-states new-w) newpid '#:missing-state)) + (define behavior '#:missing-behavior) + (define state '#:missing-state) (with-color BRIGHT-GREEN (output "~a ~v spawned from ~a (~a total processes now)\n" newpidstr