@ -11,6 +11,7 @@
( struct-out network )
( struct-out seal )
clean-actions
( all-from-out " patch.rkt " )
@ -45,7 +46,6 @@
unpub
( rename-out [ make-quit quit ] )
make-network
spawn-network
( rename-out [ spawn-process spawn ] )
spawn/stateless
@ -57,9 +57,9 @@
sequence-transitions0
sequence-transitions0*
network-handle-event
clean-transition
fork-network
pretty-print-network )
( require racket/set )
@ -71,6 +71,7 @@
( require " trace.rkt " )
( require " mux.rkt " )
( require " pretty.rkt " )
( require racket/async-channel )
( module+ test ( require rackunit ) )
;; Events = Patches ∪ Messages
@ -104,16 +105,19 @@
;; VM private states
( struct network ( mux ;; Multiplexer
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
runnable-pids ;; (Setof PID)
behaviors ;; (HashTable PID Behavior)
states ;; (HashTable PID Any)
event-channel ;; incoming events
action-channel ;; outgoing actions
event-channels ;; (HashTable PID Channel) send events to children
action-channels ;; (HashTable PID Channel) actions coming from children
)
#:transparent
#:methods gen:prospect-pretty-printable
[ ( define ( prospect-pretty-print w [ p ( current-output-port ) ] )
( pretty-print-network w p ) ) ] )
( struct network-boot-spec ( actions ) #:transparent )
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Seals are used by protocols to prevent the routing tries from
;; examining internal structure of values.
@ -124,6 +128,7 @@
( define ( event? x ) ( or ( patch? x ) ( message? x ) ) )
( define ( action? x ) ( or ( event? x ) ( spawn? x ) ( quit-network? x ) ) )
( define ( internal-action? x ) ( or ( equal? x ' quit ) ( action? x ) ) )
( define ( prepend-at-meta pattern level )
( if ( zero? level )
@ -170,46 +175,226 @@
( define ( clean-actions actions )
( filter ( lambda ( x ) ( and ( action? x ) ( not ( patch-empty? x ) ) ) ) ( flatten actions ) ) )
;; Behavior Any (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread
;; Creates a thread running the behavior of a leaf actor.
;; Process incoming events and send any resulting actions.
( define ( fork-leaf behavior state pid event-channel action-channel )
( define ( send-action a ) ( async-channel-put action-channel ( cons pid a ) ) )
( define ( send-actions as ) ( for ( [ a ( in-list ( flatten as ) ) ] ) ( send-action a ) ) )
( thread
( lambda ( )
( let loop ( [ old-state state ] )
( define event ( async-channel-get event-channel ) )
( begin
( trace-process-step event pid behavior old-state )
( invoke-process pid
( lambda ( ) ( clean-transition ( ensure-transition ( behavior event old-state ) ) ) )
( match-lambda
[ #f ( loop old-state ) ]
[ ( and q ( quit exn final-actions ) )
( trace-process-step-result event pid behavior old-state exn q )
( send-actions final-actions )
( send-action ' quit ) ]
[ ( and t ( transition new-state new-actions ) )
( trace-process-step-result event pid behavior old-state #f t )
( send-actions new-actions )
( loop new-state ) ] )
( lambda ( exn )
( trace-process-step-result event pid behavior old-state exn #f )
( send-action ' quit ) ) ) ) ) ) ) )
;; PID (Listof Action) (AsyncChannelOf Event) (AsyncChannelOf Action) -> Thread
( define ( fork-network pid boot-actions event-channel action-channel )
;; Network PID Action (Network -> X) -> (U X #f)
( define ( action-step w src a k )
( trace-internal-action src a w )
( define wt1 ( perform-action src a w ) )
( trace-internal-action-result pid a w wt1 )
( match wt1
[ ( ? quit? ) #f ]
[ ( transition new-network actions )
( for ( [ a ( in-list ( flatten actions ) ) ] )
( async-channel-put action-channel ( cons pid a ) ) )
( k new-network ) ] ) )
( thread
( lambda ( )
;; run boot actions
( define w0
( for/fold ( [ w ( network ( mux )
event-channel
action-channel
( hash )
( hash ) ) ] )
( [ a ( in-list boot-actions ) ] )
( let ( [ wn ( action-step w ' meta a ( lambda ( x ) x ) ) ] )
( or wn ( kill-thread ( current-thread ) ) ) ) ) )
( let loop ( [ w w0 ] )
( define event-or-action
( apply sync ( cons ( network-event-channel w )
( hash-values ( network-action-channels w ) ) ) ) )
( match event-or-action
[ ( cons pid ( ? internal-action? action ) )
( action-step w pid action loop ) ]
[ ( ? patch? delta )
( action-step w ' meta ( lift-patch delta ) loop ) ]
[ ( message body )
( action-step w ' meta ( message ( at-meta body ) ) loop ) ] ) ) ) ) )
;; Label -> Action -> Network -> (Transition Network)
( define ( perform-action label a w )
( match a
[ ( spawn boot )
;; boot : -> (List Behavior Transition)
( invoke-process ' booting
( lambda ( )
( match ( boot )
[ ( and results ( ? network-boot-spec? ) )
results ]
[ ( and results ( list ( ? procedure? ) ( ? general-transition? ) ) )
results ]
[ other
( error ' spawn
" Spawn boot procedure must yield boot spec; received ~v "
other ) ] ) )
( match-lambda
[ ( network-boot-spec boot-actions )
( transition ( create-network w boot-actions ) ' ( ) ) ]
[ ( list behavior initial-transition )
( create-process w behavior initial-transition ) ] )
( lambda ( exn )
( log-error " Spawned process in network ~a died with exception: \n ~a "
( trace-pid-stack )
( exn->string exn ) )
( transition w ' ( ) ) ) ) ]
[ ' quit
( define-values ( new-mux _label delta delta-aggregate ) ( mux-remove-stream ( network-mux w ) label ) )
( let ( [ w ( disable-process w label #f ) ] )
;; behavior & state in w already removed by disable-process
( deliver-patches w new-mux label delta delta-aggregate ) ) ]
[ ( ? patch? delta-orig )
( define-values ( new-mux _label delta delta-aggregate )
( mux-update-stream ( network-mux w ) label delta-orig ) )
( deliver-patches w new-mux label delta delta-aggregate ) ]
[ ( and m ( message body ) )
( when ( observe? body )
( log-warning " Stream ~a sent message containing query ~v "
( cons label ( trace-pid-stack ) )
body ) )
( if ( and ( not ( meta-label? label ) ) ;; it's from a local process, not envt
( at-meta? body ) ) ;; it relates to envt, not local
( transition w ( message ( at-meta-claim body ) ) )
( transition ( for/fold [ ( w w ) ]
[ ( pid ( in-list ( mux-route-message ( network-mux w ) body ) ) ) ]
( send-event m pid w ) )
' ( ) ) ) ] ) )
( define ( empty-network )
( network ( mux )
( make-async-channel )
( make-async-channel )
( hash )
( hash ) ) )
( module+ test
( let* ( [ network ( empty-network ) ]
[ t ( perform-action ' meta ( spawn-process ( lambda ( e s ) #f ) #f ' ( ) ) network ) ] )
( check-true ( transition? t ) )
( match t
[ ( transition new-network actions )
( check-equal? ( hash-count ( network-event-channels new-network ) ) 1 )
( check-equal? ( hash-count ( network-action-channels new-network ) ) 1 )
( check-equal? actions ( patch #f #f ) ) ] ) ) )
;; Network PID Behavior Any (Listof Action) -> Network
( define ( boot-leaf w pid behavior initial-state boot-actions )
( define event-chan ( make-async-channel ) )
( define action-chan ( make-async-channel ) )
( for ( [ a ( in-list boot-actions ) ] )
( async-channel-put action-chan ( cons pid a ) ) )
( fork-leaf behavior initial-state pid event-chan action-chan )
( struct-copy network w
[ event-channels ( hash-set ( network-event-channels w ) pid event-chan ) ]
[ action-channels ( hash-set ( network-action-channels w ) pid action-chan ) ] ) )
;; Network (Listof Action) -> Network
( define ( create-network w boot-actions )
( define-values ( new-mux pid delta delta-aggregate )
( mux-add-stream ( network-mux w ) empty-patch ) )
( define event-chan ( make-async-channel ) )
( define action-chan ( make-async-channel ) )
( fork-network pid boot-actions event-chan action-chan )
( struct-copy network w
[ mux new-mux ]
[ event-channels ( hash-set ( network-event-channels w ) pid event-chan ) ]
[ action-channels ( hash-set ( network-action-channels w ) pid action-chan ) ] ) )
;; Network Behavior (Transition Any) -> (Transition Network)
( define ( create-process w behavior initial-transition )
( if ( not initial-transition )
( transition w ' ( ) ) ;; Uh, ok
( let ( )
;; postprocess : Network PID -> Network
( define-values ( postprocess initial-actions )
( match ( clean-transition initial-transition )
[ ( and q ( quit exn initial-actions0 ) )
( values ( lambda ( w pid remaining-actions )
( trace-process-step-result ' boot pid behavior ( void ) exn q )
( define chan ( make-async-channel ) )
( for ( [ a ( in-list remaining-actions ) ] )
( async-channel-put chan ( cons pid a ) ) )
( struct-copy network w
[ action-channels ( hash-set ( network-action-channels w ) pid chan ) ] ) )
( append initial-actions0 ( list ' quit ) ) ) ]
[ ( and t ( transition initial-state initial-actions0 ) )
( values ( lambda ( w pid remaining-actions )
( trace-process-step-result ' boot pid behavior ( void ) #f t )
( boot-leaf w pid behavior initial-state remaining-actions ) )
initial-actions0 ) ] ) )
;; put the initial patch into affect to allow for a form of continuity
;; between spawned actors and their actions
( define-values ( initial-patch remaining-initial-actions )
( match initial-actions
[ ( cons ( ? patch? p ) rest ) ( values p rest ) ]
[ other ( values empty-patch other ) ] ) )
( define-values ( new-mux new-pid delta delta-aggregate )
( mux-add-stream ( network-mux w ) initial-patch ) )
( let ( [ w ( postprocess w new-pid remaining-initial-actions ) ] )
( deliver-patches w new-mux new-pid delta delta-aggregate ) ) ) ) )
;; Event PID Network -> Network
( define ( send-event e pid w )
( define behavior ( hash-ref ( network-behaviors w ) pid #f ) )
( define old-state ( hash-ref ( network-states w ) pid #f ) )
( if ( not behavior )
w
( begin
( trace-process-step e pid behavior old-state )
( invoke-process pid
( lambda ( ) ( clean-transition ( ensure-transition ( behavior e old-state ) ) ) )
( match-lambda
[ #f w ]
[ ( and q ( quit exn final-actions ) )
( trace-process-step-result e pid behavior old-state exn q )
( enqueue-actions ( disable-process pid exn w ) pid ( append final-actions
( list ' quit ) ) ) ]
[ ( and t ( transition new-state new-actions ) )
( trace-process-step-result e pid behavior old-state #f t )
( enqueue-actions ( mark-pid-runnable ( update-state w pid new-state ) pid )
pid
new-actions ) ] )
( lambda ( exn )
( trace-process-step-result e pid behavior old-state exn #f )
( enqueue-actions ( disable-process pid exn w ) pid ( list ' quit ) ) ) ) ) ) )
( define ( update-state w pid s )
( struct-copy network w [ states ( hash-set ( network-states w ) pid s ) ] ) )
( define chan ( hash-ref ( network-event-channels w ) pid #f ) )
( when chan
( async-channel-put chan e ) )
w )
;; Patch PID Network -> Network
( define ( send-event/guard delta pid w )
( if ( patch-empty? delta )
w
( send-event delta pid w ) ) )
( define ( disable-process pid exn w )
;; Network Mux Label Patch Patch -> (Transition Network)
( define ( deliver-patches w new-mux acting-label delta delta-aggregate )
( define-values ( patches meta-action )
( compute-patches ( network-mux w ) new-mux acting-label delta delta-aggregate ) )
( transition ( for/fold [ ( w ( struct-copy network w [ mux new-mux ] ) ) ]
[ ( entry ( in-list patches ) ) ]
( match-define ( cons label event ) entry )
( send-event/guard event label w ) )
( if ( action? meta-action )
meta-action
' ( ) ) ) )
;; PID Exception Network -> Network
( define ( disable-process cw pid exn )
( when exn
( log-error " Process ~a died with exception: \n ~a "
( cons pid ( trace-pid-stack ) )
( exn->string exn ) ) )
( struct-copy network w
[ behaviors ( hash-remove ( network-behaviors w ) pid ) ]
[ states ( hash-remove ( network-states w ) pid ) ] ) )
( struct-copy network c w
[ event-channels ( hash-remove ( network-event-channels c w) pid ) ]
[ action-channels ( hash-remove ( network-action-channels c w) pid ) ] ) )
( define ( invoke-process pid thunk k-ok k-exn )
( define-values ( ok? result )
@ -222,15 +407,6 @@
( k-ok result )
( k-exn result ) ) )
( define ( mark-pid-runnable w pid )
( struct-copy network w [ runnable-pids ( set-add ( network-runnable-pids w ) pid ) ] ) )
( define ( enqueue-actions w label actions )
( struct-copy network w
[ pending-action-queue
( queue-append-list ( network-pending-action-queue w )
( for/list [ ( a actions ) ] ( cons label a ) ) ) ] ) )
( define ( make-quit #:exception [ exn #f ] . actions )
( quit exn actions ) )
@ -261,17 +437,8 @@
( define-syntax-rule ( spawn-network boot-action ... )
( make-spawn-network ( lambda ( ) ( list boot-action ... ) ) ) )
( define ( make-network boot-actions )
( network ( mux )
( list->queue ( for/list ( ( a ( in-list ( clean-actions boot-actions ) ) ) ) ( cons ' meta a ) ) )
( set )
( hash )
( hash ) ) )
( define ( make-spawn-network boot-actions-thunk )
( spawn ( lambda ( )
( list network-handle-event
( transition ( make-network ( boot-actions-thunk ) ) ' ( ) ) ) ) ) )
( spawn ( lambda ( ) ( network-boot-spec ( clean-actions ( boot-actions-thunk ) ) ) ) ) )
( define ( transition-bind k t0 )
( match t0
@ -301,158 +468,16 @@
[ ( ? quit? q ) q ]
[ ( ? transition? t ) ( sequence-transitions* t rest ) ] ) ] ) )
( define ( inert? w )
( and ( queue-empty? ( network-pending-action-queue w ) )
( set-empty? ( network-runnable-pids w ) ) ) )
( define ( network-handle-event e w )
( if ( or e ( not ( inert? w ) ) )
( sequence-transitions ( transition w ' ( ) )
( inject-event e )
perform-actions
( lambda ( w ) ( or ( step-children w ) ( transition w ' ( ) ) ) ) )
( step-children w ) ) )
( define ( ( inject-event e ) w )
( transition ( match e
[ #f w ]
[ ( ? patch? delta ) ( enqueue-actions w ' meta ( list ( lift-patch delta ) ) ) ]
[ ( message body ) ( enqueue-actions w ' meta ( list ( message ( at-meta body ) ) ) ) ] )
' ( ) ) )
( define ( perform-actions w )
( for/fold ( [ wt ( transition ( struct-copy network w [ pending-action-queue ( make-queue ) ] ) ' ( ) ) ] )
( ( entry ( in-list ( queue->list ( network-pending-action-queue w ) ) ) ) )
#:break ( quit? wt ) ;; TODO: should a quit action be delayed until the end of the turn?
( match-define [ cons label a ] entry )
( trace-internal-action label a ( transition-state wt ) )
( define wt1 ( transition-bind ( perform-action label a ) wt ) )
( trace-internal-action-result label a ( transition-state wt ) wt1 )
wt1 ) )
( define ( ( perform-action label a ) w )
( match a
[ ( spawn boot )
( invoke-process ' booting
( lambda ( )
( match ( boot )
[ ( and results ( list ( ? procedure? ) ( ? general-transition? ) ) )
results ]
[ other
( error ' spawn
" Spawn boot procedure must yield boot spec; received ~v "
other ) ] ) )
( lambda ( results )
( match-define ( list behavior initial-transition ) results )
( create-process w behavior initial-transition ) )
( lambda ( exn )
( log-error " Spawned process in network ~a died with exception: \n ~a "
( trace-pid-stack )
( exn->string exn ) )
( transition w ' ( ) ) ) ) ]
[ ' quit
( define-values ( new-mux _label delta delta-aggregate )
( mux-remove-stream ( network-mux w ) label ) )
;; behavior & state in w already removed by disable-process
( deliver-patches w new-mux label delta delta-aggregate ) ]
[ ( quit-network )
( make-quit ) ]
[ ( ? patch? delta-orig )
( define-values ( new-mux _label delta delta-aggregate )
( mux-update-stream ( network-mux w ) label delta-orig ) )
( deliver-patches w new-mux label delta delta-aggregate ) ]
[ ( and m ( message body ) )
( when ( observe? body )
( log-warning " Stream ~a sent message containing query ~v "
( cons label ( trace-pid-stack ) )
body ) )
( if ( and ( not ( meta-label? label ) ) ;; it's from a local process, not envt
( at-meta? body ) ) ;; it relates to envt, not local
( transition w ( message ( at-meta-claim body ) ) )
( transition ( for/fold [ ( w w ) ]
[ ( pid ( in-list ( mux-route-message ( network-mux w ) body ) ) ) ]
( send-event m pid w ) )
' ( ) ) ) ] ) )
( define ( create-process w behavior initial-transition )
( if ( not initial-transition )
( transition w ' ( ) ) ;; Uh, ok
( let ( )
( define-values ( postprocess initial-actions )
( match ( clean-transition initial-transition )
[ ( and q ( quit exn initial-actions0 ) )
( values ( lambda ( w pid )
( trace-process-step-result ' boot pid behavior ( void ) exn q )
( disable-process pid exn w ) )
( append initial-actions0 ( list ' quit ) ) ) ]
[ ( and t ( transition initial-state initial-actions0 ) )
( values ( lambda ( w pid )
( trace-process-step-result ' boot pid behavior ( void ) #f t )
( mark-pid-runnable ( update-state w pid initial-state ) pid ) )
initial-actions0 ) ] ) )
( define-values ( initial-patch remaining-initial-actions )
( match initial-actions
[ ( cons ( ? patch? p ) rest ) ( values p rest ) ]
[ other ( values empty-patch other ) ] ) )
( define-values ( new-mux new-pid delta delta-aggregate )
( mux-add-stream ( network-mux w ) initial-patch ) )
( let* ( ( w ( struct-copy network w
[ behaviors ( hash-set ( network-behaviors w )
new-pid
behavior ) ] ) )
( w ( enqueue-actions ( postprocess w new-pid ) new-pid remaining-initial-actions ) ) )
( deliver-patches w new-mux new-pid delta delta-aggregate ) ) ) ) )
( define ( deliver-patches w new-mux acting-label delta delta-aggregate )
( define-values ( patches meta-action )
( compute-patches ( network-mux w ) new-mux acting-label delta delta-aggregate ) )
( transition ( for/fold [ ( w ( struct-copy network w [ mux new-mux ] ) ) ]
[ ( entry ( in-list patches ) ) ]
( match-define ( cons label event ) entry )
( send-event/guard event label w ) )
meta-action ) )
( define ( step-children w )
( define runnable-pids ( network-runnable-pids w ) )
( if ( set-empty? runnable-pids )
#f ;; network is inert.
( transition ( for/fold [ ( w ( struct-copy network w [ runnable-pids ( set ) ] ) ) ]
[ ( pid ( in-set runnable-pids ) ) ]
( send-event #f pid w ) )
' ( ) ) ) )
( define ( pretty-print-network w [ p ( current-output-port ) ] )
( match-define ( network mux qs runnable behaviors state s) w )
( match-define ( network mux events-in actions-out event-chans action-chans ) w )
( fprintf p " NETWORK: \n " )
( fprintf p " - ~a queued actions \n " ( queue-length qs ) )
( fprintf p " - ~a runnable pids ~a \n " ( set-count runnable ) ( set->list runnable ) )
( fprintf p " - ~a live processes \n " ( hash-count states ) )
( fprintf p " - ~a live processes \n " ( hash-count event-chans ) )
( fprintf p " - " )
( display ( indented-port-output 3 ( lambda ( p ) ( prospect-pretty-print mux p ) ) #:first-line? #f ) p )
( newline p )
( for ( [ pid ( set-union ( hash-keys ( mux-interest-table mux ) ) ( hash-keys states ) ) ] )
( fprintf p " ---- process ~a, behavior ~v, STATE: \n " pid ( hash-ref behaviors pid #f ) )
( define state ( hash-ref states pid #f ) )
( display ( indented-port-output 6 ( lambda ( p ) ( prospect-pretty-print state p ) ) ) p )
( newline p )
( fprintf p " process ~a, behavior ~v, CLAIMS: \n " pid ( hash-ref behaviors pid #f ) )
( for ( [ pid ( set-union ( hash-keys ( mux-interest-table mux ) ) ( hash-keys event-chans ) ) ] )
( fprintf p " process ~a, CLAIMS: \n " pid )
( display ( indented-port-output 6 ( lambda ( p )
( pretty-print-trie ( mux-interests-of mux pid ) p ) ) )
p )
( newline p ) ) )
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
( module+ test
( require racket/pretty )
( define ( step* w )
( let loop ( ( w w ) ( actions ' ( ) ) )
( pretty-print w )
( match ( network-handle-event #f w )
[ #f ( values w #f ( flatten actions ) ) ]
[ ( quit exn new-actions ) ( values w exn ( flatten ( cons actions new-actions ) ) ) ]
[ ( transition new-w new-actions ) ( loop new-w ( cons actions new-actions ) ) ] ) ) )
( step* ( make-network ' ( ) ) )
)