Compare commits
3 Commits
main
...
concurrent
Author | SHA1 | Date |
---|---|---|
Sam Caldwell | 3ec3721266 | |
Sam Caldwell | 5c6f8aed31 | |
Sam Caldwell | 79cd6eac1a |
|
@ -11,6 +11,7 @@
|
||||||
(struct-out network)
|
(struct-out network)
|
||||||
|
|
||||||
(struct-out seal)
|
(struct-out seal)
|
||||||
|
clean-actions
|
||||||
|
|
||||||
(all-from-out "patch.rkt")
|
(all-from-out "patch.rkt")
|
||||||
|
|
||||||
|
@ -45,7 +46,6 @@
|
||||||
unpub
|
unpub
|
||||||
|
|
||||||
(rename-out [make-quit quit])
|
(rename-out [make-quit quit])
|
||||||
make-network
|
|
||||||
spawn-network
|
spawn-network
|
||||||
(rename-out [spawn-process spawn])
|
(rename-out [spawn-process spawn])
|
||||||
spawn/stateless
|
spawn/stateless
|
||||||
|
@ -57,9 +57,9 @@
|
||||||
sequence-transitions0
|
sequence-transitions0
|
||||||
sequence-transitions0*
|
sequence-transitions0*
|
||||||
|
|
||||||
network-handle-event
|
|
||||||
clean-transition
|
clean-transition
|
||||||
|
|
||||||
|
fork-network
|
||||||
pretty-print-network)
|
pretty-print-network)
|
||||||
|
|
||||||
(require racket/set)
|
(require racket/set)
|
||||||
|
@ -71,6 +71,7 @@
|
||||||
(require "trace.rkt")
|
(require "trace.rkt")
|
||||||
(require "mux.rkt")
|
(require "mux.rkt")
|
||||||
(require "pretty.rkt")
|
(require "pretty.rkt")
|
||||||
|
(require racket/async-channel)
|
||||||
(module+ test (require rackunit))
|
(module+ test (require rackunit))
|
||||||
|
|
||||||
;; Events = Patches ∪ Messages
|
;; Events = Patches ∪ Messages
|
||||||
|
@ -104,16 +105,19 @@
|
||||||
|
|
||||||
;; VM private states
|
;; VM private states
|
||||||
(struct network (mux ;; Multiplexer
|
(struct network (mux ;; Multiplexer
|
||||||
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
|
event-channel ;; incoming events
|
||||||
runnable-pids ;; (Setof PID)
|
action-channel ;; outgoing actions
|
||||||
behaviors ;; (HashTable PID Behavior)
|
event-channels ;; (HashTable PID Channel) send events to children
|
||||||
states ;; (HashTable PID Any)
|
action-channels ;; (HashTable PID Channel) actions coming from children
|
||||||
)
|
)
|
||||||
#:transparent
|
#:transparent
|
||||||
#:methods gen:prospect-pretty-printable
|
#:methods gen:prospect-pretty-printable
|
||||||
[(define (prospect-pretty-print w [p (current-output-port)])
|
[(define (prospect-pretty-print w [p (current-output-port)])
|
||||||
(pretty-print-network w p))])
|
(pretty-print-network w p))])
|
||||||
|
|
||||||
|
(struct network-boot-spec (actions) #:transparent)
|
||||||
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; Seals are used by protocols to prevent the routing tries from
|
;; Seals are used by protocols to prevent the routing tries from
|
||||||
;; examining internal structure of values.
|
;; examining internal structure of values.
|
||||||
|
@ -124,6 +128,7 @@
|
||||||
|
|
||||||
(define (event? x) (or (patch? x) (message? x)))
|
(define (event? x) (or (patch? x) (message? x)))
|
||||||
(define (action? x) (or (event? x) (spawn? x) (quit-network? x)))
|
(define (action? x) (or (event? x) (spawn? x) (quit-network? x)))
|
||||||
|
(define (internal-action? x) (or (equal? x 'quit) (action? x)))
|
||||||
|
|
||||||
(define (prepend-at-meta pattern level)
|
(define (prepend-at-meta pattern level)
|
||||||
(if (zero? level)
|
(if (zero? level)
|
||||||
|
@ -170,46 +175,226 @@
|
||||||
(define (clean-actions actions)
|
(define (clean-actions actions)
|
||||||
(filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions)))
|
(filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions)))
|
||||||
|
|
||||||
|
;; Behavior Any (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread
|
||||||
|
;; Creates a thread running the behavior of a leaf actor.
|
||||||
|
;; Process incoming events and send any resulting actions.
|
||||||
|
(define (fork-leaf behavior state pid event-channel action-channel)
|
||||||
|
(define (send-action a) (async-channel-put action-channel (cons pid a)))
|
||||||
|
(define (send-actions as) (for ([a (in-list (flatten as))]) (send-action a)))
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(let loop ([old-state state])
|
||||||
|
(define event (async-channel-get event-channel))
|
||||||
|
(begin
|
||||||
|
(trace-process-step event pid behavior old-state)
|
||||||
|
(invoke-process pid
|
||||||
|
(lambda () (clean-transition (ensure-transition (behavior event old-state))))
|
||||||
|
(match-lambda
|
||||||
|
[#f (loop old-state)]
|
||||||
|
[(and q (quit exn final-actions))
|
||||||
|
(trace-process-step-result event pid behavior old-state exn q)
|
||||||
|
(send-actions final-actions)
|
||||||
|
(send-action 'quit)]
|
||||||
|
[(and t (transition new-state new-actions))
|
||||||
|
(trace-process-step-result event pid behavior old-state #f t)
|
||||||
|
(send-actions new-actions)
|
||||||
|
(loop new-state)])
|
||||||
|
(lambda (exn)
|
||||||
|
(trace-process-step-result event pid behavior old-state exn #f)
|
||||||
|
(send-action 'quit))))))))
|
||||||
|
|
||||||
|
;; PID (Listof Action) (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread
|
||||||
|
(define (fork-network pid boot-actions event-channel action-channel)
|
||||||
|
;; Network PID Action (Network -> X) -> (U X #f)
|
||||||
|
(define (action-step w src a k)
|
||||||
|
(trace-internal-action src a w)
|
||||||
|
(define wt1 (perform-action src a w))
|
||||||
|
(trace-internal-action-result pid a w wt1)
|
||||||
|
(match wt1
|
||||||
|
[(? quit?) #f]
|
||||||
|
[(transition new-network actions)
|
||||||
|
(for ([a (in-list (flatten actions))])
|
||||||
|
(async-channel-put action-channel (cons pid a)))
|
||||||
|
(k new-network)]))
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
;; run boot actions
|
||||||
|
(define w0
|
||||||
|
(for/fold ([w (network (mux)
|
||||||
|
event-channel
|
||||||
|
action-channel
|
||||||
|
(hash)
|
||||||
|
(hash))])
|
||||||
|
([a (in-list boot-actions)])
|
||||||
|
(let ([wn (action-step w 'meta a (lambda (x) x))])
|
||||||
|
(or wn (kill-thread (current-thread))))))
|
||||||
|
(let loop ([w w0])
|
||||||
|
(define event-or-action
|
||||||
|
(apply sync (cons (network-event-channel w)
|
||||||
|
(hash-values (network-action-channels w)))))
|
||||||
|
(match event-or-action
|
||||||
|
[(cons pid (? internal-action? action))
|
||||||
|
(action-step w pid action loop)]
|
||||||
|
[(? patch? delta)
|
||||||
|
(action-step w 'meta (lift-patch delta) loop)]
|
||||||
|
[(message body)
|
||||||
|
(action-step w 'meta (message (at-meta body)) loop)])))))
|
||||||
|
|
||||||
|
;; Label -> Action -> Network -> (Transition Network)
|
||||||
|
(define (perform-action label a w)
|
||||||
|
(match a
|
||||||
|
[(spawn boot)
|
||||||
|
;; boot : -> (List Behavior Transition)
|
||||||
|
(invoke-process 'booting
|
||||||
|
(lambda ()
|
||||||
|
(match (boot)
|
||||||
|
[(and results (? network-boot-spec?))
|
||||||
|
results]
|
||||||
|
[(and results (list (? procedure?) (? general-transition?)))
|
||||||
|
results]
|
||||||
|
[other
|
||||||
|
(error 'spawn
|
||||||
|
"Spawn boot procedure must yield boot spec; received ~v"
|
||||||
|
other)]))
|
||||||
|
(match-lambda
|
||||||
|
[(network-boot-spec boot-actions)
|
||||||
|
(transition (create-network w boot-actions) '())]
|
||||||
|
[(list behavior initial-transition)
|
||||||
|
(create-process w behavior initial-transition)])
|
||||||
|
(lambda (exn)
|
||||||
|
(log-error "Spawned process in network ~a died with exception:\n~a"
|
||||||
|
(trace-pid-stack)
|
||||||
|
(exn->string exn))
|
||||||
|
(transition w '())))]
|
||||||
|
['quit
|
||||||
|
(define-values (new-mux _label delta delta-aggregate) (mux-remove-stream (network-mux w) label))
|
||||||
|
(let ([w (disable-process w label #f)])
|
||||||
|
;; behavior & state in w already removed by disable-process
|
||||||
|
(deliver-patches w new-mux label delta delta-aggregate))]
|
||||||
|
[(? patch? delta-orig)
|
||||||
|
(define-values (new-mux _label delta delta-aggregate)
|
||||||
|
(mux-update-stream (network-mux w) label delta-orig))
|
||||||
|
(deliver-patches w new-mux label delta delta-aggregate)]
|
||||||
|
[(and m (message body))
|
||||||
|
(when (observe? body)
|
||||||
|
(log-warning "Stream ~a sent message containing query ~v"
|
||||||
|
(cons label (trace-pid-stack))
|
||||||
|
body))
|
||||||
|
(if (and (not (meta-label? label)) ;; it's from a local process, not envt
|
||||||
|
(at-meta? body)) ;; it relates to envt, not local
|
||||||
|
(transition w (message (at-meta-claim body)))
|
||||||
|
(transition (for/fold [(w w)]
|
||||||
|
[(pid (in-list (mux-route-message (network-mux w) body)))]
|
||||||
|
(send-event m pid w))
|
||||||
|
'()))]))
|
||||||
|
|
||||||
|
(define (empty-network)
|
||||||
|
(network (mux)
|
||||||
|
(make-async-channel)
|
||||||
|
(make-async-channel)
|
||||||
|
(hash)
|
||||||
|
(hash)))
|
||||||
|
|
||||||
|
(module+ test
|
||||||
|
(let* ([network (empty-network)]
|
||||||
|
[t (perform-action 'meta (spawn-process (lambda (e s) #f) #f '()) network)])
|
||||||
|
(check-true (transition? t))
|
||||||
|
(match t
|
||||||
|
[(transition new-network actions)
|
||||||
|
(check-equal? (hash-count (network-event-channels new-network)) 1)
|
||||||
|
(check-equal? (hash-count (network-action-channels new-network)) 1)
|
||||||
|
(check-equal? actions (patch #f #f))])))
|
||||||
|
|
||||||
|
;; Network PID Behavior Any (Listof Action) -> Network
|
||||||
|
(define (boot-leaf w pid behavior initial-state boot-actions)
|
||||||
|
(define event-chan (make-async-channel))
|
||||||
|
(define action-chan (make-async-channel))
|
||||||
|
(for ([a (in-list boot-actions)])
|
||||||
|
(async-channel-put action-chan (cons pid a)))
|
||||||
|
(fork-leaf behavior initial-state pid event-chan action-chan)
|
||||||
|
(struct-copy network w
|
||||||
|
[event-channels (hash-set (network-event-channels w) pid event-chan)]
|
||||||
|
[action-channels (hash-set (network-action-channels w) pid action-chan)]))
|
||||||
|
|
||||||
|
;; Network (Listof Action) -> Network
|
||||||
|
(define (create-network w boot-actions)
|
||||||
|
(define-values (new-mux pid delta delta-aggregate)
|
||||||
|
(mux-add-stream (network-mux w) empty-patch))
|
||||||
|
(define event-chan (make-async-channel))
|
||||||
|
(define action-chan (make-async-channel))
|
||||||
|
(fork-network pid boot-actions event-chan action-chan)
|
||||||
|
(struct-copy network w
|
||||||
|
[mux new-mux]
|
||||||
|
[event-channels (hash-set (network-event-channels w) pid event-chan)]
|
||||||
|
[action-channels (hash-set (network-action-channels w) pid action-chan)]))
|
||||||
|
|
||||||
|
;; Network Behavior (Transition Any) -> (Transition Network)
|
||||||
|
(define (create-process w behavior initial-transition)
|
||||||
|
(if (not initial-transition)
|
||||||
|
(transition w '()) ;; Uh, ok
|
||||||
|
(let ()
|
||||||
|
;; postprocess : Network PID -> Network
|
||||||
|
(define-values (postprocess initial-actions)
|
||||||
|
(match (clean-transition initial-transition)
|
||||||
|
[(and q (quit exn initial-actions0))
|
||||||
|
(values (lambda (w pid remaining-actions)
|
||||||
|
(trace-process-step-result 'boot pid behavior (void) exn q)
|
||||||
|
(define chan (make-async-channel))
|
||||||
|
(for ([a (in-list remaining-actions)])
|
||||||
|
(async-channel-put chan (cons pid a)))
|
||||||
|
(struct-copy network w
|
||||||
|
[action-channels (hash-set (network-action-channels w) pid chan)]))
|
||||||
|
(append initial-actions0 (list 'quit)))]
|
||||||
|
[(and t (transition initial-state initial-actions0))
|
||||||
|
(values (lambda (w pid remaining-actions)
|
||||||
|
(trace-process-step-result 'boot pid behavior (void) #f t)
|
||||||
|
(boot-leaf w pid behavior initial-state remaining-actions))
|
||||||
|
initial-actions0)]))
|
||||||
|
;; put the initial patch into affect to allow for a form of continuity
|
||||||
|
;; between spawned actors and their actions
|
||||||
|
(define-values (initial-patch remaining-initial-actions)
|
||||||
|
(match initial-actions
|
||||||
|
[(cons (? patch? p) rest) (values p rest)]
|
||||||
|
[other (values empty-patch other)]))
|
||||||
|
(define-values (new-mux new-pid delta delta-aggregate)
|
||||||
|
(mux-add-stream (network-mux w) initial-patch))
|
||||||
|
(let ([w (postprocess w new-pid remaining-initial-actions)])
|
||||||
|
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
|
||||||
|
|
||||||
|
;; Event PID Network -> Network
|
||||||
(define (send-event e pid w)
|
(define (send-event e pid w)
|
||||||
(define behavior (hash-ref (network-behaviors w) pid #f))
|
(define chan (hash-ref (network-event-channels w) pid #f))
|
||||||
(define old-state (hash-ref (network-states w) pid #f))
|
(when chan
|
||||||
(if (not behavior)
|
(async-channel-put chan e))
|
||||||
w
|
w)
|
||||||
(begin
|
|
||||||
(trace-process-step e pid behavior old-state)
|
|
||||||
(invoke-process pid
|
|
||||||
(lambda () (clean-transition (ensure-transition (behavior e old-state))))
|
|
||||||
(match-lambda
|
|
||||||
[#f w]
|
|
||||||
[(and q (quit exn final-actions))
|
|
||||||
(trace-process-step-result e pid behavior old-state exn q)
|
|
||||||
(enqueue-actions (disable-process pid exn w) pid (append final-actions
|
|
||||||
(list 'quit)))]
|
|
||||||
[(and t (transition new-state new-actions))
|
|
||||||
(trace-process-step-result e pid behavior old-state #f t)
|
|
||||||
(enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid)
|
|
||||||
pid
|
|
||||||
new-actions)])
|
|
||||||
(lambda (exn)
|
|
||||||
(trace-process-step-result e pid behavior old-state exn #f)
|
|
||||||
(enqueue-actions (disable-process pid exn w) pid (list 'quit)))))))
|
|
||||||
|
|
||||||
(define (update-state w pid s)
|
|
||||||
(struct-copy network w [states (hash-set (network-states w) pid s)]))
|
|
||||||
|
|
||||||
|
;; Patch PID Network -> Network
|
||||||
(define (send-event/guard delta pid w)
|
(define (send-event/guard delta pid w)
|
||||||
(if (patch-empty? delta)
|
(if (patch-empty? delta)
|
||||||
w
|
w
|
||||||
(send-event delta pid w)))
|
(send-event delta pid w)))
|
||||||
|
|
||||||
(define (disable-process pid exn w)
|
;; Network Mux Label Patch Patch -> (Transition Network)
|
||||||
|
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
|
||||||
|
(define-values (patches meta-action)
|
||||||
|
(compute-patches (network-mux w) new-mux acting-label delta delta-aggregate))
|
||||||
|
(transition (for/fold [(w (struct-copy network w [mux new-mux]))]
|
||||||
|
[(entry (in-list patches))]
|
||||||
|
(match-define (cons label event) entry)
|
||||||
|
(send-event/guard event label w))
|
||||||
|
(if (action? meta-action)
|
||||||
|
meta-action
|
||||||
|
'())))
|
||||||
|
|
||||||
|
;; PID Exception Network -> Network
|
||||||
|
(define (disable-process cw pid exn)
|
||||||
(when exn
|
(when exn
|
||||||
(log-error "Process ~a died with exception:\n~a"
|
(log-error "Process ~a died with exception:\n~a"
|
||||||
(cons pid (trace-pid-stack))
|
(cons pid (trace-pid-stack))
|
||||||
(exn->string exn)))
|
(exn->string exn)))
|
||||||
(struct-copy network w
|
(struct-copy network cw
|
||||||
[behaviors (hash-remove (network-behaviors w) pid)]
|
[event-channels (hash-remove (network-event-channels cw) pid)]
|
||||||
[states (hash-remove (network-states w) pid)]))
|
[action-channels (hash-remove (network-action-channels cw) pid)]))
|
||||||
|
|
||||||
(define (invoke-process pid thunk k-ok k-exn)
|
(define (invoke-process pid thunk k-ok k-exn)
|
||||||
(define-values (ok? result)
|
(define-values (ok? result)
|
||||||
|
@ -222,15 +407,6 @@
|
||||||
(k-ok result)
|
(k-ok result)
|
||||||
(k-exn result)))
|
(k-exn result)))
|
||||||
|
|
||||||
(define (mark-pid-runnable w pid)
|
|
||||||
(struct-copy network w [runnable-pids (set-add (network-runnable-pids w) pid)]))
|
|
||||||
|
|
||||||
(define (enqueue-actions w label actions)
|
|
||||||
(struct-copy network w
|
|
||||||
[pending-action-queue
|
|
||||||
(queue-append-list (network-pending-action-queue w)
|
|
||||||
(for/list [(a actions)] (cons label a)))]))
|
|
||||||
|
|
||||||
(define (make-quit #:exception [exn #f] . actions)
|
(define (make-quit #:exception [exn #f] . actions)
|
||||||
(quit exn actions))
|
(quit exn actions))
|
||||||
|
|
||||||
|
@ -261,17 +437,8 @@
|
||||||
(define-syntax-rule (spawn-network boot-action ...)
|
(define-syntax-rule (spawn-network boot-action ...)
|
||||||
(make-spawn-network (lambda () (list boot-action ...))))
|
(make-spawn-network (lambda () (list boot-action ...))))
|
||||||
|
|
||||||
(define (make-network boot-actions)
|
|
||||||
(network (mux)
|
|
||||||
(list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a)))
|
|
||||||
(set)
|
|
||||||
(hash)
|
|
||||||
(hash)))
|
|
||||||
|
|
||||||
(define (make-spawn-network boot-actions-thunk)
|
(define (make-spawn-network boot-actions-thunk)
|
||||||
(spawn (lambda ()
|
(spawn (lambda () (network-boot-spec (clean-actions (boot-actions-thunk))))))
|
||||||
(list network-handle-event
|
|
||||||
(transition (make-network (boot-actions-thunk)) '())))))
|
|
||||||
|
|
||||||
(define (transition-bind k t0)
|
(define (transition-bind k t0)
|
||||||
(match t0
|
(match t0
|
||||||
|
@ -301,158 +468,16 @@
|
||||||
[(? quit? q) q]
|
[(? quit? q) q]
|
||||||
[(? transition? t) (sequence-transitions* t rest)])]))
|
[(? transition? t) (sequence-transitions* t rest)])]))
|
||||||
|
|
||||||
(define (inert? w)
|
|
||||||
(and (queue-empty? (network-pending-action-queue w))
|
|
||||||
(set-empty? (network-runnable-pids w))))
|
|
||||||
|
|
||||||
(define (network-handle-event e w)
|
|
||||||
(if (or e (not (inert? w)))
|
|
||||||
(sequence-transitions (transition w '())
|
|
||||||
(inject-event e)
|
|
||||||
perform-actions
|
|
||||||
(lambda (w) (or (step-children w) (transition w '()))))
|
|
||||||
(step-children w)))
|
|
||||||
|
|
||||||
(define ((inject-event e) w)
|
|
||||||
(transition (match e
|
|
||||||
[#f w]
|
|
||||||
[(? patch? delta) (enqueue-actions w 'meta (list (lift-patch delta)))]
|
|
||||||
[(message body) (enqueue-actions w 'meta (list (message (at-meta body))))])
|
|
||||||
'()))
|
|
||||||
|
|
||||||
(define (perform-actions w)
|
|
||||||
(for/fold ([wt (transition (struct-copy network w [pending-action-queue (make-queue)]) '())])
|
|
||||||
((entry (in-list (queue->list (network-pending-action-queue w)))))
|
|
||||||
#:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn?
|
|
||||||
(match-define [cons label a] entry)
|
|
||||||
(trace-internal-action label a (transition-state wt))
|
|
||||||
(define wt1 (transition-bind (perform-action label a) wt))
|
|
||||||
(trace-internal-action-result label a (transition-state wt) wt1)
|
|
||||||
wt1))
|
|
||||||
|
|
||||||
(define ((perform-action label a) w)
|
|
||||||
(match a
|
|
||||||
[(spawn boot)
|
|
||||||
(invoke-process 'booting
|
|
||||||
(lambda ()
|
|
||||||
(match (boot)
|
|
||||||
[(and results (list (? procedure?) (? general-transition?)))
|
|
||||||
results]
|
|
||||||
[other
|
|
||||||
(error 'spawn
|
|
||||||
"Spawn boot procedure must yield boot spec; received ~v"
|
|
||||||
other)]))
|
|
||||||
(lambda (results)
|
|
||||||
(match-define (list behavior initial-transition) results)
|
|
||||||
(create-process w behavior initial-transition))
|
|
||||||
(lambda (exn)
|
|
||||||
(log-error "Spawned process in network ~a died with exception:\n~a"
|
|
||||||
(trace-pid-stack)
|
|
||||||
(exn->string exn))
|
|
||||||
(transition w '())))]
|
|
||||||
['quit
|
|
||||||
(define-values (new-mux _label delta delta-aggregate)
|
|
||||||
(mux-remove-stream (network-mux w) label))
|
|
||||||
;; behavior & state in w already removed by disable-process
|
|
||||||
(deliver-patches w new-mux label delta delta-aggregate)]
|
|
||||||
[(quit-network)
|
|
||||||
(make-quit)]
|
|
||||||
[(? patch? delta-orig)
|
|
||||||
(define-values (new-mux _label delta delta-aggregate)
|
|
||||||
(mux-update-stream (network-mux w) label delta-orig))
|
|
||||||
(deliver-patches w new-mux label delta delta-aggregate)]
|
|
||||||
[(and m (message body))
|
|
||||||
(when (observe? body)
|
|
||||||
(log-warning "Stream ~a sent message containing query ~v"
|
|
||||||
(cons label (trace-pid-stack))
|
|
||||||
body))
|
|
||||||
(if (and (not (meta-label? label)) ;; it's from a local process, not envt
|
|
||||||
(at-meta? body)) ;; it relates to envt, not local
|
|
||||||
(transition w (message (at-meta-claim body)))
|
|
||||||
(transition (for/fold [(w w)]
|
|
||||||
[(pid (in-list (mux-route-message (network-mux w) body)))]
|
|
||||||
(send-event m pid w))
|
|
||||||
'()))]))
|
|
||||||
|
|
||||||
(define (create-process w behavior initial-transition)
|
|
||||||
(if (not initial-transition)
|
|
||||||
(transition w '()) ;; Uh, ok
|
|
||||||
(let ()
|
|
||||||
(define-values (postprocess initial-actions)
|
|
||||||
(match (clean-transition initial-transition)
|
|
||||||
[(and q (quit exn initial-actions0))
|
|
||||||
(values (lambda (w pid)
|
|
||||||
(trace-process-step-result 'boot pid behavior (void) exn q)
|
|
||||||
(disable-process pid exn w))
|
|
||||||
(append initial-actions0 (list 'quit)))]
|
|
||||||
[(and t (transition initial-state initial-actions0))
|
|
||||||
(values (lambda (w pid)
|
|
||||||
(trace-process-step-result 'boot pid behavior (void) #f t)
|
|
||||||
(mark-pid-runnable (update-state w pid initial-state) pid))
|
|
||||||
initial-actions0)]))
|
|
||||||
(define-values (initial-patch remaining-initial-actions)
|
|
||||||
(match initial-actions
|
|
||||||
[(cons (? patch? p) rest) (values p rest)]
|
|
||||||
[other (values empty-patch other)]))
|
|
||||||
(define-values (new-mux new-pid delta delta-aggregate)
|
|
||||||
(mux-add-stream (network-mux w) initial-patch))
|
|
||||||
(let* ((w (struct-copy network w
|
|
||||||
[behaviors (hash-set (network-behaviors w)
|
|
||||||
new-pid
|
|
||||||
behavior)]))
|
|
||||||
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
|
|
||||||
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
|
|
||||||
|
|
||||||
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
|
|
||||||
(define-values (patches meta-action)
|
|
||||||
(compute-patches (network-mux w) new-mux acting-label delta delta-aggregate))
|
|
||||||
(transition (for/fold [(w (struct-copy network w [mux new-mux]))]
|
|
||||||
[(entry (in-list patches))]
|
|
||||||
(match-define (cons label event) entry)
|
|
||||||
(send-event/guard event label w))
|
|
||||||
meta-action))
|
|
||||||
|
|
||||||
(define (step-children w)
|
|
||||||
(define runnable-pids (network-runnable-pids w))
|
|
||||||
(if (set-empty? runnable-pids)
|
|
||||||
#f ;; network is inert.
|
|
||||||
(transition (for/fold [(w (struct-copy network w [runnable-pids (set)]))]
|
|
||||||
[(pid (in-set runnable-pids))]
|
|
||||||
(send-event #f pid w))
|
|
||||||
'())))
|
|
||||||
|
|
||||||
(define (pretty-print-network w [p (current-output-port)])
|
(define (pretty-print-network w [p (current-output-port)])
|
||||||
(match-define (network mux qs runnable behaviors states) w)
|
(match-define (network mux events-in actions-out event-chans action-chans) w)
|
||||||
(fprintf p "NETWORK:\n")
|
(fprintf p "NETWORK:\n")
|
||||||
(fprintf p " - ~a queued actions\n" (queue-length qs))
|
(fprintf p " - ~a live processes\n" (hash-count event-chans))
|
||||||
(fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable))
|
|
||||||
(fprintf p " - ~a live processes\n" (hash-count states))
|
|
||||||
(fprintf p " - ")
|
(fprintf p " - ")
|
||||||
(display (indented-port-output 3 (lambda (p) (prospect-pretty-print mux p)) #:first-line? #f) p)
|
(display (indented-port-output 3 (lambda (p) (prospect-pretty-print mux p)) #:first-line? #f) p)
|
||||||
(newline p)
|
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys event-chans))])
|
||||||
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))])
|
(fprintf p " process ~a, CLAIMS:\n" pid )
|
||||||
(fprintf p " ---- process ~a, behavior ~v, STATE:\n" pid (hash-ref behaviors pid #f))
|
|
||||||
(define state (hash-ref states pid #f))
|
|
||||||
(display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p)
|
|
||||||
(newline p)
|
|
||||||
(fprintf p " process ~a, behavior ~v, CLAIMS:\n" pid (hash-ref behaviors pid #f))
|
|
||||||
(display (indented-port-output 6 (lambda (p)
|
(display (indented-port-output 6 (lambda (p)
|
||||||
(pretty-print-trie (mux-interests-of mux pid) p)))
|
(pretty-print-trie (mux-interests-of mux pid) p)))
|
||||||
p)
|
p)
|
||||||
(newline p)))
|
(newline p)))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(module+ test
|
|
||||||
(require racket/pretty)
|
|
||||||
|
|
||||||
(define (step* w)
|
|
||||||
(let loop ((w w) (actions '()))
|
|
||||||
(pretty-print w)
|
|
||||||
(match (network-handle-event #f w)
|
|
||||||
[#f (values w #f (flatten actions))]
|
|
||||||
[(quit exn new-actions) (values w exn (flatten (cons actions new-actions)))]
|
|
||||||
[(transition new-w new-actions) (loop new-w (cons actions new-actions))])))
|
|
||||||
|
|
||||||
(step* (make-network '()))
|
|
||||||
)
|
|
||||||
|
|
|
@ -76,31 +76,32 @@
|
||||||
;; Action* -> Void
|
;; Action* -> Void
|
||||||
;; Runs a ground VM, booting the outermost Network with the given Actions.
|
;; Runs a ground VM, booting the outermost Network with the given Actions.
|
||||||
(define (run-ground . boot-actions)
|
(define (run-ground . boot-actions)
|
||||||
|
(define event-chan (make-async-channel))
|
||||||
|
(define action-chan (make-async-channel))
|
||||||
|
(fork-network 0 (clean-actions boot-actions) event-chan action-chan)
|
||||||
(let await-interrupt ((inert? #f)
|
(let await-interrupt ((inert? #f)
|
||||||
(w (make-network boot-actions))
|
|
||||||
(interests (trie-empty)))
|
(interests (trie-empty)))
|
||||||
;; (log-info "GROUND INTERESTS:\n~a" (trie->pretty-string interests))
|
;; (log-info "GROUND INTERESTS:\n~a" (trie->pretty-string interests))
|
||||||
(if (and inert? (trie-empty? interests))
|
(if (and inert? (trie-empty? interests))
|
||||||
(begin (log-info "run-ground: Terminating because inert")
|
(begin (log-info "run-ground: Terminating because inert")
|
||||||
(void))
|
(void))
|
||||||
(let ((e (apply sync
|
(let ((e/a (apply sync
|
||||||
(current-ground-event-async-channel)
|
(current-ground-event-async-channel)
|
||||||
|
action-chan
|
||||||
(if inert? never-evt idle-handler)
|
(if inert? never-evt idle-handler)
|
||||||
(extract-active-events interests))))
|
(extract-active-events interests))))
|
||||||
(trace-process-step e #f network-handle-event w)
|
(match e/a
|
||||||
(define resulting-transition (clean-transition (network-handle-event e w)))
|
[#f
|
||||||
(trace-process-step-result e #f network-handle-event w #f resulting-transition)
|
;; system is idle
|
||||||
(match resulting-transition
|
(await-interrupt #t interests)]
|
||||||
[#f ;; inert
|
[(? event? e)
|
||||||
(await-interrupt #t w interests)]
|
(async-channel-put event-chan e)
|
||||||
[(transition w actions)
|
(await-interrupt #f interests)]
|
||||||
(let process-actions ((actions actions) (interests interests))
|
[(cons pid a)
|
||||||
(match actions
|
(match a
|
||||||
['() (await-interrupt #f w interests)]
|
[(? patch? p)
|
||||||
[(cons a actions)
|
(define new-interests (apply-patch interests (label-patch p (datum-tset 'root))))
|
||||||
(match a
|
(await-interrupt #f new-interests)]
|
||||||
[(? patch? p)
|
[_
|
||||||
(process-actions actions (apply-patch interests (label-patch p (datum-tset 'root))))]
|
(log-warning "run-ground: ignoring useless meta-action ~v" a)
|
||||||
[_
|
(await-interrupt #f interests)])])))))
|
||||||
(log-warning "run-ground: ignoring useless meta-action ~v" a)
|
|
||||||
(process-actions actions interests)])]))])))))
|
|
||||||
|
|
|
@ -163,7 +163,7 @@
|
||||||
(prospect-pretty-print (transition-state t) (current-error-port)))))))]
|
(prospect-pretty-print (transition-state t) (current-error-port)))))))]
|
||||||
[('internal-action (list pids a old-w))
|
[('internal-action (list pids a old-w))
|
||||||
(define pidstr (format-pids pids))
|
(define pidstr (format-pids pids))
|
||||||
(define oldcount (hash-count (network-behaviors old-w)))
|
(define oldcount (hash-count (network-event-channels old-w)))
|
||||||
(match a
|
(match a
|
||||||
[(? spawn?)
|
[(? spawn?)
|
||||||
;; Handle this in internal-action-result
|
;; Handle this in internal-action-result
|
||||||
|
@ -193,15 +193,15 @@
|
||||||
(when (transition? t)
|
(when (transition? t)
|
||||||
(define new-w (transition-state t))
|
(define new-w (transition-state t))
|
||||||
(define pidstr (format-pids pids))
|
(define pidstr (format-pids pids))
|
||||||
(define newcount (hash-count (network-behaviors new-w)))
|
(define newcount (hash-count (network-event-channels new-w)))
|
||||||
(match a
|
(match a
|
||||||
[(? spawn?)
|
[(? spawn?)
|
||||||
(when (or show-process-lifecycle? show-actions?)
|
(when (or show-process-lifecycle? show-actions?)
|
||||||
(define newpid (mux-next-pid (network-mux old-w)))
|
(define newpid (mux-next-pid (network-mux old-w)))
|
||||||
(define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid
|
(define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid
|
||||||
(define interests (mux-interests-of (network-mux new-w) newpid))
|
(define interests (mux-interests-of (network-mux new-w) newpid))
|
||||||
(define behavior (hash-ref (network-behaviors new-w) newpid '#:missing-behavior))
|
(define behavior '#:missing-behavior)
|
||||||
(define state (hash-ref (network-states new-w) newpid '#:missing-state))
|
(define state '#:missing-state)
|
||||||
(with-color BRIGHT-GREEN
|
(with-color BRIGHT-GREEN
|
||||||
(output "~a ~v spawned from ~a (~a total processes now)\n"
|
(output "~a ~v spawned from ~a (~a total processes now)\n"
|
||||||
newpidstr
|
newpidstr
|
||||||
|
|
Loading…
Reference in New Issue