Support other kinds of actions than patches when first spawning a process

This commit is contained in:
Tony Garnock-Jones 2015-12-03 12:53:07 -08:00
parent feb55c174c
commit 506d74ed42
22 changed files with 183 additions and 147 deletions

View File

@ -148,8 +148,11 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (general-transition? v)
(or (not v) (transition? v) (quit? v)))
(define (ensure-transition v)
(if (or (not v) (transition? v) (quit? v))
(if (general-transition? v)
v
(raise (exn:fail:contract (format "Expected transition, quit or #f; got ~v" v)
(current-continuation-marks)))))
@ -227,16 +230,15 @@
(define (make-quit #:exception [exn #f] . actions)
(quit exn actions))
(define-syntax-rule (spawn-process behavior-exp initial-state-exp initial-patch-exp ...)
(define-syntax-rule (spawn-process behavior-exp initial-state-exp initial-action-tree-exp)
(spawn (lambda ()
(list (patch-seq initial-patch-exp ...)
behavior-exp
initial-state-exp))))
(list behavior-exp
(transition initial-state-exp initial-action-tree-exp)))))
(define-syntax-rule (spawn/stateless behavior-exp initial-patch-exp ...)
(define-syntax-rule (spawn/stateless behavior-exp initial-action-tree-exp)
(spawn-process (stateless-behavior-wrap behavior-exp)
(void)
initial-patch-exp ...))
initial-action-tree-exp))
(define ((stateless-behavior-wrap b) e state)
(match (b e)
@ -256,9 +258,8 @@
(define (make-spawn-world boot-actions-thunk)
(spawn (lambda ()
(list empty-patch
world-handle-event
(make-world (boot-actions-thunk))))))
(list world-handle-event
(transition (make-world (boot-actions-thunk)) '())))))
(define (transition-bind k t0)
(match t0
@ -308,25 +309,15 @@
(invoke-process 'booting
(lambda ()
(match (boot)
[(and results (list (? patch?) (? procedure?) _))
[(and results (list (? procedure?) (? general-transition?)))
results]
[other
(error 'spawn
"Spawn boot procedure must yield boot spec; received ~v"
other)]))
(lambda (results)
(match-define (list initial-patch behavior initial-state) results)
(define-values (new-mux new-pid patches meta-action)
(mux-add-stream (world-mux w) initial-patch))
(let* ((w (update-state w new-pid initial-state))
(w (mark-pid-runnable w new-pid))
(w (struct-copy world w
[mux new-mux]
[behaviors (hash-set (world-behaviors w)
new-pid
behavior)]))
(w (deliver-patches w patches meta-action)))
w))
(match-define (list behavior initial-transition) results)
(create-process w behavior initial-transition))
(lambda (exn)
(log-error "Spawned process in world ~a died with exception:\n~a"
(trace-pid-stack)
@ -352,6 +343,37 @@
(send-event m pid w))
(and send-to-meta? (message (at-meta-claim body))))]))
(define (create-process w behavior initial-transition)
(if (not initial-transition)
w ;; Uh, ok
(let ()
(define-values (postprocess initial-actions)
(match (clean-transition initial-transition)
[(and q (quit exn initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) exn q)
(disable-process pid exn w))
(append initial-actions0 (list 'quit)))]
[(and t (transition initial-state initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) #f t)
(mark-pid-runnable (update-state w pid initial-state) pid))
initial-actions0)]))
(define-values (initial-patch remaining-initial-actions)
(match initial-actions
[(cons (? patch? p) rest) (values p rest)]
[other (values empty-patch other)]))
(define-values (new-mux new-pid patches meta-action)
(mux-add-stream (world-mux w) initial-patch))
(let* ((w (struct-copy world w
[mux new-mux]
[behaviors (hash-set (world-behaviors w)
new-pid
behavior)]))
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions))
(w (deliver-patches w patches meta-action)))
w))))
(define (deliver-patches w patches meta-action)
(transition (for/fold [(w w)] [(entry (in-list patches))]
(match-define (cons label event) entry)

View File

@ -99,9 +99,9 @@
(lambda (acs . rs) (cons (apply decrease-handler rs) acs))))
(spawn demand-matcher-handle-event
d
(sub (projection->pattern demand-spec) #:meta-level meta-level)
(sub (projection->pattern supply-spec) #:meta-level meta-level)
(pub (projection->pattern supply-spec) #:meta-level meta-level)))
(patch-seq (sub (projection->pattern demand-spec) #:meta-level meta-level)
(sub (projection->pattern supply-spec) #:meta-level meta-level)
(pub (projection->pattern supply-spec) #:meta-level meta-level))))
;; (Matcher (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action)))
;; Matcher Projection ...
@ -137,6 +137,6 @@
(when timeout-msec (message (set-timer timer-id timeout-msec 'relative)))
(spawn on-claim-handler
(matcher-empty)
(patch base-interests (matcher-empty))
(patch-seq* (map projection->pattern projections))
(sub (timer-expired timer-id ?)))))
(patch-seq (patch base-interests (matcher-empty))
(patch-seq* (map projection->pattern projections))
(sub (timer-expired timer-id ?))))))

View File

@ -90,10 +90,11 @@
(thread (lambda () (tcp-listener-thread control-ch listener server-addr)))
(spawn tcp-listener-behavior
(listener-state control-ch server-addr)
(sub (advertise (observe (tcp-channel ? server-addr ?)))) ;; monitor peer
(pub (advertise (tcp-channel ? server-addr ?))) ;; declare we might make connections
(sub (tcp-accepted ? server-addr ? ?) #:meta-level 1) ;; events from driver thread
))
(patch-seq
(sub (advertise (observe (tcp-channel ? server-addr ?)))) ;; monitor peer
(pub (advertise (tcp-channel ? server-addr ?))) ;; declare we might make connections
(sub (tcp-accepted ? server-addr ? ?) #:meta-level 1) ;; events from driver thread
)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Outbound Connection
@ -176,8 +177,9 @@
(thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin)))
(spawn tcp-connection
(connection-state control-ch cout)
(sub (observe (tcp-channel remote-addr local-addr ?))) ;; monitor peer
(pub (tcp-channel remote-addr local-addr ?)) ;; may send segments to peer
(sub (tcp-channel local-addr remote-addr ?)) ;; want segments from peer
(sub (tcp-channel remote-addr local-addr ?) #:meta-level 1) ;; segments from driver thread
))
(patch-seq
(sub (observe (tcp-channel remote-addr local-addr ?))) ;; monitor peer
(pub (tcp-channel remote-addr local-addr ?)) ;; may send segments to peer
(sub (tcp-channel local-addr remote-addr ?)) ;; want segments from peer
(sub (tcp-channel remote-addr local-addr ?) #:meta-level 1) ;; segments from driver thread
)))

View File

@ -35,8 +35,8 @@
[_ #f]))
(spawn timer-driver
0 ;; initial count
(sub (set-timer ? ? ?))
(pub (timer-expired ? ?))))
(patch-seq (sub (set-timer ? ? ?))
(pub (timer-expired ? ?)))))
(define (timer-driver-thread-main control-ch)
(define heap (make-timer-heap))

View File

@ -69,10 +69,10 @@
#f]
[_ #f]))
(void)
(sub (udp-packet ? local-addr ?) #:meta-level 1)
(sub (udp-packet local-addr (udp-remote-address ? ?) ?))
(pub (udp-packet (udp-remote-address ? ?) local-addr ?))
(sub (observe (udp-packet (udp-remote-address ? ?) local-addr ?)))))
(patch-seq (sub (udp-packet ? local-addr ?) #:meta-level 1)
(sub (udp-packet local-addr (udp-remote-address ? ?) ?))
(pub (udp-packet (udp-remote-address ? ?) local-addr ?))
(sub (observe (udp-packet (udp-remote-address ? ?) local-addr ?))))))
;; UdpLocalAddress UdpSocket Channel -> Void
(define (udp-receiver-thread local-addr socket control-ch)

View File

@ -107,10 +107,11 @@
(connection-handler server-addr)))
(spawn websocket-listener
(listener-state shutdown-procedure server-addr)
(sub (advertise (observe (websocket-message ? server-addr ?)))) ;; monitor peer
(pub (advertise (websocket-message ? server-addr ?))) ;; declare we might make connections
(sub (websocket-connection ? server-addr ? ? ?) #:meta-level 1) ;; events from driver thd
))
(patch-seq
(sub (advertise (observe (websocket-message ? server-addr ?)))) ;; monitor peer
(pub (advertise (websocket-message ? server-addr ?))) ;; declare we might make connections
(sub (websocket-connection ? server-addr ? ? ?) #:meta-level 1) ;; events from driver thd
)))
(define (spawn-websocket-connection local-addr remote-addr)
(match-define (websocket-remote-server url) remote-addr)
@ -136,8 +137,9 @@
(transition (cons m buffered-messages-rev) '())]
[_ #f]))
'()
(sub (websocket-connection id local-addr remote-addr ? control-ch) #:meta-level 1)
(sub (websocket-message local-addr remote-addr ?))))
(patch-seq
(sub (websocket-connection id local-addr remote-addr ? control-ch) #:meta-level 1)
(sub (websocket-message local-addr remote-addr ?)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Connection
@ -173,8 +175,9 @@
(define (spawn-connection local-addr remote-addr id c control-ch)
(spawn websocket-connection-behaviour
(connection-state local-addr remote-addr c control-ch)
(sub (observe (websocket-message remote-addr local-addr ?))) ;; monitor peer
(pub (websocket-message remote-addr local-addr ?)) ;; may send messages to peer
(sub (websocket-message local-addr remote-addr ?)) ;; want segments from peer
(sub (websocket-incoming-message id ?) #:meta-level 1) ;; segments from driver thd
))
(patch-seq
(sub (observe (websocket-message remote-addr local-addr ?))) ;; monitor peer
(pub (websocket-message remote-addr local-addr ?)) ;; may send messages to peer
(sub (websocket-message local-addr remote-addr ?)) ;; want segments from peer
(sub (websocket-incoming-message id ?) #:meta-level 1) ;; segments from driver thd
)))

View File

@ -6,6 +6,7 @@
(struct-out delete-endpoint)
make-endpoint-group
spawn-endpoint-group
boot-endpoint-group
endpoint-action?
(struct-out endpoint)
endpoint-group-handle-event
@ -50,20 +51,19 @@
(hash)
initial-state))
(define-syntax-rule (spawn-endpoint-group initial-state add-endpoint-instruction ...)
(<spawn> (lambda ()
(define-values (final-cumulative-patch final-actions final-g)
(interpret-endpoint-actions empty-patch
'()
(make-endpoint-group initial-state)
-1
(list add-endpoint-instruction ...)))
(when (not (null? final-actions))
(error 'spawn-endpoint-group
"Unexpected initial actions: ~v" final-actions))
(list final-cumulative-patch
endpoint-group-handle-event
final-g))))
(define-syntax-rule (spawn-endpoint-group initial-state action-constree ...)
(<spawn> (lambda () (boot-endpoint-group initial-state (list action-constree ...)))))
(define (boot-endpoint-group initial-state initial-actions)
(define-values (final-cumulative-patch final-actions final-g)
(interpret-endpoint-actions empty-patch
'()
(make-endpoint-group initial-state)
-1
initial-actions))
(list endpoint-group-handle-event
(transition final-g (incorporate-cumulative-patch final-actions
final-cumulative-patch))))
(define (endpoint-action? a)
(or (action? a)

View File

@ -12,14 +12,15 @@
[(message (at-meta (mouse-event _ _ _ "button-down"))) (transition s (callback))]
[_ #f]))
(void)
(let ((label-image (text label font-size foreground)))
(update-window name x y
(overlay label-image
(rectangle (+ (image-width label-image) 20)
(+ (image-height label-image) 20)
"solid"
background))))
(sub (mouse-event ? ? name ?) #:meta-level 1)))
(patch-seq
(let ((label-image (text label font-size foreground)))
(update-window name x y
(overlay label-image
(rectangle (+ (image-width label-image) 20)
(+ (image-height label-image) 20)
"solid"
background))))
(sub (mouse-event ? ? name ?) #:meta-level 1))))
(define (draggable-shape name orig-x orig-y image)
(struct idle (ticks x y) #:transparent)
@ -45,9 +46,9 @@
(mouse-sub name)))]
[(_ _) #f])
(idle 0 orig-x orig-y)
(sub (tick-event) #:meta-level 1)
(mouse-sub name)
(move-to orig-x orig-y)))
(patch-seq (sub (tick-event) #:meta-level 1)
(mouse-sub name)
(move-to orig-x orig-y))))
(big-bang-world #:width 640
#:height 480

View File

@ -11,8 +11,8 @@
(assert (box-state new-value))))]
[_ #f]))
0
(sub (set-box ?))
(assert (box-state 0)))
(patch-seq (sub (set-box ?))
(assert (box-state 0))))
(spawn (lambda (e s)
(match e
@ -23,4 +23,4 @@
(message (set-box (+ v 1)))))]
[_ #f]))
(void)
(sub (box-state ?)))
(patch-seq (sub (box-state ?))))

View File

@ -19,8 +19,9 @@
(flush-output)
#f]
[_ #f]))
(sub (external-event (read-bytes-line-evt (current-input-port) 'any) ?)
#:meta-level 1)
(sub (tcp-channel remote-handle local-handle ?))
(sub (advertise (tcp-channel remote-handle local-handle ?)))
(pub (tcp-channel local-handle remote-handle ?)))
(patch-seq
(sub (external-event (read-bytes-line-evt (current-input-port) 'any) ?)
#:meta-level 1)
(sub (tcp-channel remote-handle local-handle ?))
(sub (advertise (tcp-channel remote-handle local-handle ?)))
(pub (tcp-channel local-handle remote-handle ?))))

View File

@ -30,13 +30,14 @@
(list (for/list [(who arrived)] (say who "arrived."))
(for/list [(who departed)] (say who "departed.")))))]
[#f #f]))
(sub `(,? says ,?)) ;; read actual chat messages
(sub (advertise `(,? says ,?))) ;; observe peer presence
(pub `(,user says ,?)) ;; advertise our presence
(sub (tcp-channel them us ?) #:meta-level 1) ;; read from remote client
(sub (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client
(pub (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client
)))
(patch-seq
(sub `(,? says ,?)) ;; read actual chat messages
(sub (advertise `(,? says ,?))) ;; observe peer presence
(pub `(,user says ,?)) ;; advertise our presence
(sub (tcp-channel them us ?) #:meta-level 1) ;; read from remote client
(sub (advertise (tcp-channel them us ?)) #:meta-level 1) ;; monitor remote client
(pub (tcp-channel us them ?) #:meta-level 1) ;; we will write to remote client
))))
(spawn-tcp-driver)
(spawn-world

View File

@ -94,8 +94,8 @@
(transition-bind (process-suggestion suggestion) t)))]
[_ #f]))
(db-state (load-epoch directory) directory (set))
(sub (observe (binding ? ? ? ?)))
(sub (update ? ? ? ?))))
(patch-seq (sub (observe (binding ? ? ? ?)))
(sub (update ? ? ? ?)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -118,8 +118,8 @@
other-epoch other-version other-value))])]
[_ #f]))
(void)
(assert (update key epoch version value))
(sub (binding key ? ? ?))))
(patch-seq (assert (update key epoch version value))
(sub (binding key ? ? ?)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@ -19,6 +19,6 @@
(transition state (message (tcp-channel dst src bs)))]
[_ #f]))
(void)
(sub (advertise (tcp-channel c server-id ?)))
(sub (tcp-channel c server-id ?))
(pub (tcp-channel server-id c ?)))))
(patch-seq (sub (advertise (tcp-channel c server-id ?)))
(sub (tcp-channel c server-id ?))
(pub (tcp-channel server-id c ?))))))

View File

@ -18,9 +18,9 @@
(when (message? e) (log-info "general: ~v" e))
#f)
(void)
(sub ?)
(unsub (observe ?))
(unsub (at-meta ?)))
(patch-seq (sub ?)
(unsub (observe ?))
(unsub (at-meta ?))))
(spawn-endpoint-group 0
(add-endpoint

View File

@ -28,7 +28,7 @@
[_ #f]))
(spawn-world (spawn r (void) (sub ?))
(spawn b 0))
(spawn b 0 '()))
(define (echoer e s)
(match e
@ -38,7 +38,8 @@
(transition s (message `(print (got-line ,line))))]
[_ #f]))
(spawn echoer (void)
(spawn echoer
(void)
(sub (external-event (read-line-evt (current-input-port) 'any) ?) #:meta-level 1))
(define (ticker e s)
@ -56,9 +57,10 @@
(spawn-timer-driver)
(message (set-timer 'tick 1000 'relative))
(spawn ticker 1
(sub (observe (set-timer ? ? ?)))
(sub (timer-expired 'tick ?)))
(spawn ticker
1
(patch-seq (sub (observe (set-timer ? ? ?)))
(sub (timer-expired 'tick ?))))
(define (printer e s)
(match e
@ -67,4 +69,6 @@
#f]
[_ #f]))
(spawn printer (void) (sub `(print ,?)))
(spawn printer
(void)
(sub `(print ,?)))

View File

@ -9,5 +9,5 @@
[1 (transition 2 (retract 'a #:meta-level 1))]
[_ #f]))
0
(assert 'a #:meta-level 1)
(assert (observe 'a) #:meta-level 1)))
(patch-seq (assert 'a #:meta-level 1)
(assert (observe 'a) #:meta-level 1))))

View File

@ -59,12 +59,14 @@
(run-ground (spawn quasi-spy (void) (sub ?))
(spawn-timer-driver)
(message (set-timer 'tick 1000 'relative))
(spawn ticker 1
(sub (observe (set-timer ? ? ?)))
(sub (timer-expired 'tick ?)))
(spawn ticker
1
(patch-seq (sub (observe (set-timer ? ? ?)))
(sub (timer-expired 'tick ?))))
(spawn-world (spawn r (void) (sub ?))
(spawn b 0))
(spawn echoer (void)
(spawn b 0 '()))
(spawn echoer
(void)
(sub (external-event (read-line-evt (current-input-port) 'any) ?)
#:meta-level 1))
(spawn printer (void) (sub `(print ,?))))

View File

@ -19,8 +19,8 @@
(assert `(parent-count ,new-count))))]
[_ #f]))
0
(sub `(parent ,? ,?))
(assert `(parent-count 0)))
(patch-seq (sub `(parent ,? ,?))
(assert `(parent-count 0))))
(define (insert-record record . monitors)
(printf "Record ~v inserted, depending on ~v\n" record monitors)
@ -36,9 +36,9 @@
(quit)]
[_ #f]))
(void)
(assert record)
(sub `(retract ,record))
(patch-seq* (map sub monitors))))
(patch-seq (assert record)
(sub `(retract ,record))
(patch-seq* (map sub monitors)))))
(insert-record `(parent john douglas))
(insert-record `(parent bob john))
@ -91,8 +91,8 @@
`(parent ,A ,C)
`(ancestor ,C ,B)))))))
(void)
(sub `(parent ,A ,C))
(sub `(ancestor ,C ,?)))))]
(patch-seq (sub `(parent ,A ,C))
(sub `(ancestor ,C ,?))))))]
[_ #f]))
(void)
(sub `(parent ,? ,?)))
@ -128,10 +128,11 @@
;; (assert `(ancestor ,A ,B))))]
;; [_ #f]))
;; (matcher-empty)
;; (sub `(parent ,A ,B))
;; (sub `(parent ,A ,?))
;; (sub `(ancestor ,? ,B))
;; (pub `(ancestor ,A ,B)))))
;; (patch-seq
;; (sub `(parent ,A ,B))
;; (sub `(parent ,A ,?))
;; (sub `(ancestor ,? ,B))
;; (pub `(ancestor ,A ,B))))))
(spawn (lambda (e s)
(when (patch? e) (pretty-print-patch e))
@ -143,11 +144,10 @@
(define id (gensym 'after))
(if (zero? msec)
(thunk)
(list
(spawn (lambda (e s) (and (message? e) (quit (thunk))))
(void)
(sub (timer-expired id ?)))
(message (set-timer id msec 'relative)))))
(spawn (lambda (e s) (and (message? e) (quit (thunk))))
(void)
(list (message (set-timer id msec 'relative))
(sub (timer-expired id ?))))))
(define use-delays? #t)

View File

@ -72,8 +72,8 @@
(transition-bind (process-suggestion suggestion) t)))]
[_ #f]))
(db-state 0 (hash) (set))
(sub (observe (binding ? ? ? ?)))
(sub (update ? ? ? ?))))
(patch-seq (sub (observe (binding ? ? ? ?)))
(sub (update ? ? ? ?)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -96,8 +96,8 @@
other-epoch other-version other-value))])]
[_ #f]))
(void)
(assert (update key epoch version value))
(sub (binding key ? ? ?))))
(patch-seq (assert (update key epoch version value))
(sub (binding key ? ? ?)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@ -24,9 +24,9 @@
(format "msg ~v\n" n))))))]))
(spawn connection-handler
0
(sub (advertise (tcp-channel c server-id ?)))
(sub (tcp-channel c server-id ?))
(pub (tcp-channel server-id c ?))))
(patch-seq (sub (advertise (tcp-channel c server-id ?)))
(sub (tcp-channel c server-id ?))
(pub (tcp-channel server-id c ?)))))
(spawn-demand-matcher (advertise (tcp-channel (?!) server-id ?))
(observe (tcp-channel (?!) server-id ?))

View File

@ -21,9 +21,9 @@
#f)]))
(spawn connection-handler
0
(sub (advertise (websocket-message c server-id ?)))
(sub (websocket-message c server-id ?))
(pub (websocket-message server-id c ?))))
(patch-seq (sub (advertise (websocket-message c server-id ?)))
(sub (websocket-message c server-id ?))
(pub (websocket-message server-id c ?)))))
(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?))
(observe (websocket-message (?! any-client) server-id ?))

View File

@ -20,9 +20,9 @@
#f)]))
(spawn connection-handler
0
(sub (advertise (websocket-message c server-id ?)))
(sub (websocket-message c server-id ?))
(pub (websocket-message server-id c ?))))
(patch-seq (sub (advertise (websocket-message c server-id ?)))
(sub (websocket-message c server-id ?))
(pub (websocket-message server-id c ?)))))
(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?))
(observe (websocket-message (?! any-client) server-id ?))