Compare commits

...

3 Commits

Author SHA1 Message Date
Sam Caldwell 3ec3721266 make sure meta-action is an action 2016-02-23 16:08:49 -05:00
Sam Caldwell 5c6f8aed31 fix network spawning issues 2016-02-23 16:05:51 -05:00
Sam Caldwell 79cd6eac1a Modify core behavior to run each actor concurrently
Each actor (including network actors) runs in its own thread and
communicates with its containing network via an (incoming) event
channel and an (outgoing) action channel.

Each network actor polls (syncs) on the incoming event channel from its
parent as well as the outgoing action channel from each of its child
actors. The network processes the effects of each event/action and
notifies its children by sending out the corresponding events.
2016-02-23 13:42:00 -05:00
3 changed files with 249 additions and 223 deletions

View File

@ -11,6 +11,7 @@
(struct-out network)
(struct-out seal)
clean-actions
(all-from-out "patch.rkt")
@ -45,7 +46,6 @@
unpub
(rename-out [make-quit quit])
make-network
spawn-network
(rename-out [spawn-process spawn])
spawn/stateless
@ -57,9 +57,9 @@
sequence-transitions0
sequence-transitions0*
network-handle-event
clean-transition
fork-network
pretty-print-network)
(require racket/set)
@ -71,6 +71,7 @@
(require "trace.rkt")
(require "mux.rkt")
(require "pretty.rkt")
(require racket/async-channel)
(module+ test (require rackunit))
;; Events = Patches Messages
@ -104,16 +105,19 @@
;; VM private states
(struct network (mux ;; Multiplexer
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
runnable-pids ;; (Setof PID)
behaviors ;; (HashTable PID Behavior)
states ;; (HashTable PID Any)
event-channel ;; incoming events
action-channel ;; outgoing actions
event-channels ;; (HashTable PID Channel) send events to children
action-channels ;; (HashTable PID Channel) actions coming from children
)
#:transparent
#:methods gen:prospect-pretty-printable
[(define (prospect-pretty-print w [p (current-output-port)])
(pretty-print-network w p))])
(struct network-boot-spec (actions) #:transparent)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Seals are used by protocols to prevent the routing tries from
;; examining internal structure of values.
@ -124,6 +128,7 @@
(define (event? x) (or (patch? x) (message? x)))
(define (action? x) (or (event? x) (spawn? x) (quit-network? x)))
(define (internal-action? x) (or (equal? x 'quit) (action? x)))
(define (prepend-at-meta pattern level)
(if (zero? level)
@ -170,46 +175,226 @@
(define (clean-actions actions)
(filter (lambda (x) (and (action? x) (not (patch-empty? x)))) (flatten actions)))
;; Behavior Any (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread
;; Creates a thread running the behavior of a leaf actor.
;; Process incoming events and send any resulting actions.
(define (fork-leaf behavior state pid event-channel action-channel)
(define (send-action a) (async-channel-put action-channel (cons pid a)))
(define (send-actions as) (for ([a (in-list (flatten as))]) (send-action a)))
(thread
(lambda ()
(let loop ([old-state state])
(define event (async-channel-get event-channel))
(begin
(trace-process-step event pid behavior old-state)
(invoke-process pid
(lambda () (clean-transition (ensure-transition (behavior event old-state))))
(match-lambda
[#f (loop old-state)]
[(and q (quit exn final-actions))
(trace-process-step-result event pid behavior old-state exn q)
(send-actions final-actions)
(send-action 'quit)]
[(and t (transition new-state new-actions))
(trace-process-step-result event pid behavior old-state #f t)
(send-actions new-actions)
(loop new-state)])
(lambda (exn)
(trace-process-step-result event pid behavior old-state exn #f)
(send-action 'quit))))))))
;; PID (Listof Action) (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread
(define (fork-network pid boot-actions event-channel action-channel)
;; Network PID Action (Network -> X) -> (U X #f)
(define (action-step w src a k)
(trace-internal-action src a w)
(define wt1 (perform-action src a w))
(trace-internal-action-result pid a w wt1)
(match wt1
[(? quit?) #f]
[(transition new-network actions)
(for ([a (in-list (flatten actions))])
(async-channel-put action-channel (cons pid a)))
(k new-network)]))
(thread
(lambda ()
;; run boot actions
(define w0
(for/fold ([w (network (mux)
event-channel
action-channel
(hash)
(hash))])
([a (in-list boot-actions)])
(let ([wn (action-step w 'meta a (lambda (x) x))])
(or wn (kill-thread (current-thread))))))
(let loop ([w w0])
(define event-or-action
(apply sync (cons (network-event-channel w)
(hash-values (network-action-channels w)))))
(match event-or-action
[(cons pid (? internal-action? action))
(action-step w pid action loop)]
[(? patch? delta)
(action-step w 'meta (lift-patch delta) loop)]
[(message body)
(action-step w 'meta (message (at-meta body)) loop)])))))
;; Label -> Action -> Network -> (Transition Network)
(define (perform-action label a w)
(match a
[(spawn boot)
;; boot : -> (List Behavior Transition)
(invoke-process 'booting
(lambda ()
(match (boot)
[(and results (? network-boot-spec?))
results]
[(and results (list (? procedure?) (? general-transition?)))
results]
[other
(error 'spawn
"Spawn boot procedure must yield boot spec; received ~v"
other)]))
(match-lambda
[(network-boot-spec boot-actions)
(transition (create-network w boot-actions) '())]
[(list behavior initial-transition)
(create-process w behavior initial-transition)])
(lambda (exn)
(log-error "Spawned process in network ~a died with exception:\n~a"
(trace-pid-stack)
(exn->string exn))
(transition w '())))]
['quit
(define-values (new-mux _label delta delta-aggregate) (mux-remove-stream (network-mux w) label))
(let ([w (disable-process w label #f)])
;; behavior & state in w already removed by disable-process
(deliver-patches w new-mux label delta delta-aggregate))]
[(? patch? delta-orig)
(define-values (new-mux _label delta delta-aggregate)
(mux-update-stream (network-mux w) label delta-orig))
(deliver-patches w new-mux label delta delta-aggregate)]
[(and m (message body))
(when (observe? body)
(log-warning "Stream ~a sent message containing query ~v"
(cons label (trace-pid-stack))
body))
(if (and (not (meta-label? label)) ;; it's from a local process, not envt
(at-meta? body)) ;; it relates to envt, not local
(transition w (message (at-meta-claim body)))
(transition (for/fold [(w w)]
[(pid (in-list (mux-route-message (network-mux w) body)))]
(send-event m pid w))
'()))]))
(define (empty-network)
(network (mux)
(make-async-channel)
(make-async-channel)
(hash)
(hash)))
(module+ test
(let* ([network (empty-network)]
[t (perform-action 'meta (spawn-process (lambda (e s) #f) #f '()) network)])
(check-true (transition? t))
(match t
[(transition new-network actions)
(check-equal? (hash-count (network-event-channels new-network)) 1)
(check-equal? (hash-count (network-action-channels new-network)) 1)
(check-equal? actions (patch #f #f))])))
;; Network PID Behavior Any (Listof Action) -> Network
(define (boot-leaf w pid behavior initial-state boot-actions)
(define event-chan (make-async-channel))
(define action-chan (make-async-channel))
(for ([a (in-list boot-actions)])
(async-channel-put action-chan (cons pid a)))
(fork-leaf behavior initial-state pid event-chan action-chan)
(struct-copy network w
[event-channels (hash-set (network-event-channels w) pid event-chan)]
[action-channels (hash-set (network-action-channels w) pid action-chan)]))
;; Network (Listof Action) -> Network
(define (create-network w boot-actions)
(define-values (new-mux pid delta delta-aggregate)
(mux-add-stream (network-mux w) empty-patch))
(define event-chan (make-async-channel))
(define action-chan (make-async-channel))
(fork-network pid boot-actions event-chan action-chan)
(struct-copy network w
[mux new-mux]
[event-channels (hash-set (network-event-channels w) pid event-chan)]
[action-channels (hash-set (network-action-channels w) pid action-chan)]))
;; Network Behavior (Transition Any) -> (Transition Network)
(define (create-process w behavior initial-transition)
(if (not initial-transition)
(transition w '()) ;; Uh, ok
(let ()
;; postprocess : Network PID -> Network
(define-values (postprocess initial-actions)
(match (clean-transition initial-transition)
[(and q (quit exn initial-actions0))
(values (lambda (w pid remaining-actions)
(trace-process-step-result 'boot pid behavior (void) exn q)
(define chan (make-async-channel))
(for ([a (in-list remaining-actions)])
(async-channel-put chan (cons pid a)))
(struct-copy network w
[action-channels (hash-set (network-action-channels w) pid chan)]))
(append initial-actions0 (list 'quit)))]
[(and t (transition initial-state initial-actions0))
(values (lambda (w pid remaining-actions)
(trace-process-step-result 'boot pid behavior (void) #f t)
(boot-leaf w pid behavior initial-state remaining-actions))
initial-actions0)]))
;; put the initial patch into affect to allow for a form of continuity
;; between spawned actors and their actions
(define-values (initial-patch remaining-initial-actions)
(match initial-actions
[(cons (? patch? p) rest) (values p rest)]
[other (values empty-patch other)]))
(define-values (new-mux new-pid delta delta-aggregate)
(mux-add-stream (network-mux w) initial-patch))
(let ([w (postprocess w new-pid remaining-initial-actions)])
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
;; Event PID Network -> Network
(define (send-event e pid w)
(define behavior (hash-ref (network-behaviors w) pid #f))
(define old-state (hash-ref (network-states w) pid #f))
(if (not behavior)
w
(begin
(trace-process-step e pid behavior old-state)
(invoke-process pid
(lambda () (clean-transition (ensure-transition (behavior e old-state))))
(match-lambda
[#f w]
[(and q (quit exn final-actions))
(trace-process-step-result e pid behavior old-state exn q)
(enqueue-actions (disable-process pid exn w) pid (append final-actions
(list 'quit)))]
[(and t (transition new-state new-actions))
(trace-process-step-result e pid behavior old-state #f t)
(enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid)
pid
new-actions)])
(lambda (exn)
(trace-process-step-result e pid behavior old-state exn #f)
(enqueue-actions (disable-process pid exn w) pid (list 'quit)))))))
(define (update-state w pid s)
(struct-copy network w [states (hash-set (network-states w) pid s)]))
(define chan (hash-ref (network-event-channels w) pid #f))
(when chan
(async-channel-put chan e))
w)
;; Patch PID Network -> Network
(define (send-event/guard delta pid w)
(if (patch-empty? delta)
w
(send-event delta pid w)))
(define (disable-process pid exn w)
;; Network Mux Label Patch Patch -> (Transition Network)
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
(define-values (patches meta-action)
(compute-patches (network-mux w) new-mux acting-label delta delta-aggregate))
(transition (for/fold [(w (struct-copy network w [mux new-mux]))]
[(entry (in-list patches))]
(match-define (cons label event) entry)
(send-event/guard event label w))
(if (action? meta-action)
meta-action
'())))
;; PID Exception Network -> Network
(define (disable-process cw pid exn)
(when exn
(log-error "Process ~a died with exception:\n~a"
(cons pid (trace-pid-stack))
(exn->string exn)))
(struct-copy network w
[behaviors (hash-remove (network-behaviors w) pid)]
[states (hash-remove (network-states w) pid)]))
(struct-copy network cw
[event-channels (hash-remove (network-event-channels cw) pid)]
[action-channels (hash-remove (network-action-channels cw) pid)]))
(define (invoke-process pid thunk k-ok k-exn)
(define-values (ok? result)
@ -222,15 +407,6 @@
(k-ok result)
(k-exn result)))
(define (mark-pid-runnable w pid)
(struct-copy network w [runnable-pids (set-add (network-runnable-pids w) pid)]))
(define (enqueue-actions w label actions)
(struct-copy network w
[pending-action-queue
(queue-append-list (network-pending-action-queue w)
(for/list [(a actions)] (cons label a)))]))
(define (make-quit #:exception [exn #f] . actions)
(quit exn actions))
@ -261,17 +437,8 @@
(define-syntax-rule (spawn-network boot-action ...)
(make-spawn-network (lambda () (list boot-action ...))))
(define (make-network boot-actions)
(network (mux)
(list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a)))
(set)
(hash)
(hash)))
(define (make-spawn-network boot-actions-thunk)
(spawn (lambda ()
(list network-handle-event
(transition (make-network (boot-actions-thunk)) '())))))
(spawn (lambda () (network-boot-spec (clean-actions (boot-actions-thunk))))))
(define (transition-bind k t0)
(match t0
@ -301,158 +468,16 @@
[(? quit? q) q]
[(? transition? t) (sequence-transitions* t rest)])]))
(define (inert? w)
(and (queue-empty? (network-pending-action-queue w))
(set-empty? (network-runnable-pids w))))
(define (network-handle-event e w)
(if (or e (not (inert? w)))
(sequence-transitions (transition w '())
(inject-event e)
perform-actions
(lambda (w) (or (step-children w) (transition w '()))))
(step-children w)))
(define ((inject-event e) w)
(transition (match e
[#f w]
[(? patch? delta) (enqueue-actions w 'meta (list (lift-patch delta)))]
[(message body) (enqueue-actions w 'meta (list (message (at-meta body))))])
'()))
(define (perform-actions w)
(for/fold ([wt (transition (struct-copy network w [pending-action-queue (make-queue)]) '())])
((entry (in-list (queue->list (network-pending-action-queue w)))))
#:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn?
(match-define [cons label a] entry)
(trace-internal-action label a (transition-state wt))
(define wt1 (transition-bind (perform-action label a) wt))
(trace-internal-action-result label a (transition-state wt) wt1)
wt1))
(define ((perform-action label a) w)
(match a
[(spawn boot)
(invoke-process 'booting
(lambda ()
(match (boot)
[(and results (list (? procedure?) (? general-transition?)))
results]
[other
(error 'spawn
"Spawn boot procedure must yield boot spec; received ~v"
other)]))
(lambda (results)
(match-define (list behavior initial-transition) results)
(create-process w behavior initial-transition))
(lambda (exn)
(log-error "Spawned process in network ~a died with exception:\n~a"
(trace-pid-stack)
(exn->string exn))
(transition w '())))]
['quit
(define-values (new-mux _label delta delta-aggregate)
(mux-remove-stream (network-mux w) label))
;; behavior & state in w already removed by disable-process
(deliver-patches w new-mux label delta delta-aggregate)]
[(quit-network)
(make-quit)]
[(? patch? delta-orig)
(define-values (new-mux _label delta delta-aggregate)
(mux-update-stream (network-mux w) label delta-orig))
(deliver-patches w new-mux label delta delta-aggregate)]
[(and m (message body))
(when (observe? body)
(log-warning "Stream ~a sent message containing query ~v"
(cons label (trace-pid-stack))
body))
(if (and (not (meta-label? label)) ;; it's from a local process, not envt
(at-meta? body)) ;; it relates to envt, not local
(transition w (message (at-meta-claim body)))
(transition (for/fold [(w w)]
[(pid (in-list (mux-route-message (network-mux w) body)))]
(send-event m pid w))
'()))]))
(define (create-process w behavior initial-transition)
(if (not initial-transition)
(transition w '()) ;; Uh, ok
(let ()
(define-values (postprocess initial-actions)
(match (clean-transition initial-transition)
[(and q (quit exn initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) exn q)
(disable-process pid exn w))
(append initial-actions0 (list 'quit)))]
[(and t (transition initial-state initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) #f t)
(mark-pid-runnable (update-state w pid initial-state) pid))
initial-actions0)]))
(define-values (initial-patch remaining-initial-actions)
(match initial-actions
[(cons (? patch? p) rest) (values p rest)]
[other (values empty-patch other)]))
(define-values (new-mux new-pid delta delta-aggregate)
(mux-add-stream (network-mux w) initial-patch))
(let* ((w (struct-copy network w
[behaviors (hash-set (network-behaviors w)
new-pid
behavior)]))
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
(deliver-patches w new-mux new-pid delta delta-aggregate)))))
(define (deliver-patches w new-mux acting-label delta delta-aggregate)
(define-values (patches meta-action)
(compute-patches (network-mux w) new-mux acting-label delta delta-aggregate))
(transition (for/fold [(w (struct-copy network w [mux new-mux]))]
[(entry (in-list patches))]
(match-define (cons label event) entry)
(send-event/guard event label w))
meta-action))
(define (step-children w)
(define runnable-pids (network-runnable-pids w))
(if (set-empty? runnable-pids)
#f ;; network is inert.
(transition (for/fold [(w (struct-copy network w [runnable-pids (set)]))]
[(pid (in-set runnable-pids))]
(send-event #f pid w))
'())))
(define (pretty-print-network w [p (current-output-port)])
(match-define (network mux qs runnable behaviors states) w)
(match-define (network mux events-in actions-out event-chans action-chans) w)
(fprintf p "NETWORK:\n")
(fprintf p " - ~a queued actions\n" (queue-length qs))
(fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable))
(fprintf p " - ~a live processes\n" (hash-count states))
(fprintf p " - ~a live processes\n" (hash-count event-chans))
(fprintf p " - ")
(display (indented-port-output 3 (lambda (p) (prospect-pretty-print mux p)) #:first-line? #f) p)
(newline p)
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))])
(fprintf p " ---- process ~a, behavior ~v, STATE:\n" pid (hash-ref behaviors pid #f))
(define state (hash-ref states pid #f))
(display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p)
(newline p)
(fprintf p " process ~a, behavior ~v, CLAIMS:\n" pid (hash-ref behaviors pid #f))
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys event-chans))])
(fprintf p " process ~a, CLAIMS:\n" pid )
(display (indented-port-output 6 (lambda (p)
(pretty-print-trie (mux-interests-of mux pid) p)))
p)
(newline p)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(module+ test
(require racket/pretty)
(define (step* w)
(let loop ((w w) (actions '()))
(pretty-print w)
(match (network-handle-event #f w)
[#f (values w #f (flatten actions))]
[(quit exn new-actions) (values w exn (flatten (cons actions new-actions)))]
[(transition new-w new-actions) (loop new-w (cons actions new-actions))])))
(step* (make-network '()))
)

View File

@ -76,31 +76,32 @@
;; Action* -> Void
;; Runs a ground VM, booting the outermost Network with the given Actions.
(define (run-ground . boot-actions)
(define event-chan (make-async-channel))
(define action-chan (make-async-channel))
(fork-network 0 (clean-actions boot-actions) event-chan action-chan)
(let await-interrupt ((inert? #f)
(w (make-network boot-actions))
(interests (trie-empty)))
;; (log-info "GROUND INTERESTS:\n~a" (trie->pretty-string interests))
(if (and inert? (trie-empty? interests))
(begin (log-info "run-ground: Terminating because inert")
(void))
(let ((e (apply sync
(let ((e/a (apply sync
(current-ground-event-async-channel)
action-chan
(if inert? never-evt idle-handler)
(extract-active-events interests))))
(trace-process-step e #f network-handle-event w)
(define resulting-transition (clean-transition (network-handle-event e w)))
(trace-process-step-result e #f network-handle-event w #f resulting-transition)
(match resulting-transition
[#f ;; inert
(await-interrupt #t w interests)]
[(transition w actions)
(let process-actions ((actions actions) (interests interests))
(match actions
['() (await-interrupt #f w interests)]
[(cons a actions)
(match a
[(? patch? p)
(process-actions actions (apply-patch interests (label-patch p (datum-tset 'root))))]
[_
(log-warning "run-ground: ignoring useless meta-action ~v" a)
(process-actions actions interests)])]))])))))
(match e/a
[#f
;; system is idle
(await-interrupt #t interests)]
[(? event? e)
(async-channel-put event-chan e)
(await-interrupt #f interests)]
[(cons pid a)
(match a
[(? patch? p)
(define new-interests (apply-patch interests (label-patch p (datum-tset 'root))))
(await-interrupt #f new-interests)]
[_
(log-warning "run-ground: ignoring useless meta-action ~v" a)
(await-interrupt #f interests)])])))))

View File

@ -163,7 +163,7 @@
(prospect-pretty-print (transition-state t) (current-error-port)))))))]
[('internal-action (list pids a old-w))
(define pidstr (format-pids pids))
(define oldcount (hash-count (network-behaviors old-w)))
(define oldcount (hash-count (network-event-channels old-w)))
(match a
[(? spawn?)
;; Handle this in internal-action-result
@ -193,15 +193,15 @@
(when (transition? t)
(define new-w (transition-state t))
(define pidstr (format-pids pids))
(define newcount (hash-count (network-behaviors new-w)))
(define newcount (hash-count (network-event-channels new-w)))
(match a
[(? spawn?)
(when (or show-process-lifecycle? show-actions?)
(define newpid (mux-next-pid (network-mux old-w)))
(define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid
(define interests (mux-interests-of (network-mux new-w) newpid))
(define behavior (hash-ref (network-behaviors new-w) newpid '#:missing-behavior))
(define state (hash-ref (network-states new-w) newpid '#:missing-state))
(define behavior '#:missing-behavior)
(define state '#:missing-state)
(with-color BRIGHT-GREEN
(output "~a ~v spawned from ~a (~a total processes now)\n"
newpidstr