Monolithic semantics.

This commit is contained in:
Tony Garnock-Jones 2016-01-23 18:24:07 -05:00
parent 3c5a6f00ed
commit cf00496338
30 changed files with 2200 additions and 1 deletions

View File

@ -1,5 +1,5 @@
PACKAGENAME=prospect
COLLECTS=prospect
COLLECTS=prospect prospect-monolithic
all: setup

View File

@ -0,0 +1,4 @@
# prospect-monolithic
This is an implementation of the monolithic semantics, without any use
of patches.

View File

@ -0,0 +1,460 @@
#lang racket/base
;; Core implementation of Incremental Network Calculus.
(provide (struct-out message)
(except-out (struct-out quit) quit)
(struct-out quit-network)
(rename-out [quit <quit>])
(except-out (struct-out spawn) spawn)
(rename-out [spawn <spawn>])
(struct-out transition)
(struct-out network)
(struct-out seal)
(all-from-out "scn.rkt")
;; imported from route.rkt:
?
wildcard?
?!
(struct-out capture)
pretty-print-trie
trie->pretty-string
trie-non-empty?
trie-empty?
trie-empty
projection->pattern
compile-projection
trie-project
trie-project/set
trie-project/set/single
project-assertions
event?
action?
meta-label?
prepend-at-meta
assertion
subscription
advertisement
assertion-set-union
assertion-set-union*
scn/union
(rename-out [make-quit quit])
make-network
spawn-network
(rename-out [spawn-process spawn])
spawn/stateless
make-spawn-network
transition-bind
sequence-transitions
sequence-transitions*
sequence-transitions0
sequence-transitions0*
network-handle-event
clean-transition
pretty-print-network)
(require racket/set)
(require racket/match)
(require (only-in racket/list flatten))
(require "../prospect/functional-queue.rkt")
(require "../prospect/route.rkt")
(require "scn.rkt")
(require "../prospect/trace.rkt")
(require "mux.rkt")
(require "../prospect/pretty.rkt")
(module+ test (require rackunit))
;; Events = SCNs Messages
(struct message (body) #:prefab)
;; Actions ⊃ Events
(struct spawn (boot) #:prefab)
(struct quit-network () #:prefab) ;; NB. An action. Compare (quit), a Transition.
;; A Behavior is a ((Option Event) Any -> Transition): a function
;; mapping an Event (or, in the #f case, a poll signal) and a
;; Process's current state to a Transition.
;;
;; A Transition is either
;; - #f, a signal from a Process that it is inert and need not be
;; scheduled until some Event relevant to it arrives; or,
;; - a (transition Any (Constreeof Action)), a new Process state to
;; be held by its Network and a sequence of Actions for the Network
;; to take on the transitioning Process's behalf.
;; - a (quit (Option Exn) (Constreeof Action)), signalling that the
;; Process should never again be handed an event, and that any
;; queued actions should be performed, followed by the sequence
;; of Actions given, and then the process should be
;; garbage-collected. The optional Exn is only used for
;; debugging purposes; #f means normal termination.
(struct transition (state actions) #:transparent)
(struct quit (exn actions) #:prefab)
;; A PID is a Nat.
;; A Label is a PID or 'meta.
;; VM private states
(struct network (mux ;; Multiplexer
pending-action-queue ;; (Queueof (Cons Label (U Action 'quit)))
runnable-pids ;; (Setof PID)
behaviors ;; (HashTable PID Behavior)
states ;; (HashTable PID Any)
)
#:transparent
#:methods gen:prospect-pretty-printable
[(define (prospect-pretty-print w [p (current-output-port)])
(pretty-print-network w p))])
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Seals are used by protocols to prevent the routing tries from
;; examining internal structure of values.
(struct seal (contents)) ;; NB. Neither transparent nor prefab
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (event? x) (or (scn? x) (message? x)))
(define (action? x) (or (event? x) (spawn? x) (quit-network? x)))
(define (prepend-at-meta pattern level)
(if (zero? level)
pattern
(at-meta (prepend-at-meta pattern (- level 1)))))
(define (observe-at-meta pattern level)
(if (zero? level)
(pattern->trie #t (observe pattern))
(trie-union
(pattern->trie #t (observe (prepend-at-meta pattern level)))
(pattern->trie #t (at-meta (embedded-trie (observe-at-meta pattern (- level 1))))))))
(define (assertion pattern #:meta-level [level 0])
(pattern->trie #t (prepend-at-meta pattern level)))
(define (subscription pattern #:meta-level [level 0])
(observe-at-meta pattern level))
(define (advertisement pattern #:meta-level [level 0])
(assertion (advertise pattern) #:meta-level level))
(define (assertion-set-union . tries)
(assertion-set-union* tries))
(define (assertion-set-union* tries)
(match tries
['() (trie-empty)]
[(cons t1 rest)
(for/fold [(t1 t1)] [(t2 (in-list rest))]
(trie-union t1 t2 #:combiner (lambda (a b) #t)))]))
(define (scn/union . tries)
(scn (assertion-set-union* tries)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (general-transition? v)
(or (not v) (transition? v) (quit? v)))
(define (ensure-transition v)
(if (general-transition? v)
v
(raise (exn:fail:contract (format "Expected transition, quit or #f; got ~v" v)
(current-continuation-marks)))))
(define (clean-transition t)
(match t
[#f #f]
[(quit exn actions) (quit exn (clean-actions actions))]
[(transition state actions) (transition state (clean-actions actions))]))
(define (clean-actions actions)
(filter action? (flatten actions)))
(define (send-event e pid w)
(define behavior (hash-ref (network-behaviors w) pid #f))
(define old-state (hash-ref (network-states w) pid #f))
(if (not behavior)
w
(begin
(trace-process-step e pid behavior old-state)
(invoke-process pid
(lambda () (clean-transition (ensure-transition (behavior e old-state))))
(match-lambda
[#f w]
[(and q (quit exn final-actions))
(trace-process-step-result e pid behavior old-state exn q)
(enqueue-actions (disable-process pid exn w) pid (append final-actions
(list 'quit)))]
[(and t (transition new-state new-actions))
(trace-process-step-result e pid behavior old-state #f t)
(enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid)
pid
new-actions)])
(lambda (exn)
(trace-process-step-result e pid behavior old-state exn #f)
(enqueue-actions (disable-process pid exn w) pid (list 'quit)))))))
(define (update-state w pid s)
(struct-copy network w [states (hash-set (network-states w) pid s)]))
(define (disable-process pid exn w)
(when exn
(log-error "Process ~a died with exception:\n~a"
(cons pid (trace-pid-stack))
(exn->string exn)))
(struct-copy network w
[behaviors (hash-remove (network-behaviors w) pid)]
[states (hash-remove (network-states w) pid)]))
(define (invoke-process pid thunk k-ok k-exn)
(define-values (ok? result)
(call-in-trace-context
pid
(lambda ()
(with-handlers ([(lambda (exn) #t) (lambda (exn) (values #f exn))])
(values #t (with-continuation-mark 'minimart-process pid (thunk)))))))
(if ok?
(k-ok result)
(k-exn result)))
(define (mark-pid-runnable w pid)
(struct-copy network w [runnable-pids (set-add (network-runnable-pids w) pid)]))
(define (enqueue-actions w label actions)
(struct-copy network w
[pending-action-queue
(queue-append-list (network-pending-action-queue w)
(for/list [(a actions)] (cons label a)))]))
(define (make-quit #:exception [exn #f] . actions)
(quit exn actions))
(define-syntax-rule (spawn-process behavior-exp initial-state-exp initial-action-tree-exp)
(spawn (lambda ()
(local-require racket/contract)
(list behavior-exp
(transition initial-state-exp initial-action-tree-exp)))))
(define-syntax-rule (spawn/stateless behavior-exp initial-action-tree-exp)
(spawn-process (stateless-behavior-wrap behavior-exp)
(void)
initial-action-tree-exp))
(define ((stateless-behavior-wrap b) e state)
(match (b e)
[#f #f]
[(? quit? q) q]
[actions (transition state actions)]))
(define-syntax-rule (spawn-network boot-action ...)
(make-spawn-network (lambda () (list boot-action ...))))
(define (make-network boot-actions)
(network (mux)
(list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a)))
(set)
(hash)
(hash)))
(define (make-spawn-network boot-actions-thunk)
(spawn (lambda ()
(list network-handle-event
(transition (make-network (boot-actions-thunk)) '())))))
(define (transition-bind k t0)
(match t0
[#f (error 'transition-bind "Cannot bind from transition #f with continuation ~v" k)]
[(quit _ _) t0]
[(transition state0 actions0)
(match (k state0)
[#f t0]
[(quit exn actions1) (quit exn (cons actions0 actions1))]
[(transition state1 actions1) (transition state1 (cons actions0 actions1))])]))
(define (sequence-transitions t0 . steps)
(sequence-transitions* t0 steps))
(define (sequence-transitions* t0 steps)
(foldl transition-bind t0 steps))
(define (sequence-transitions0 state0 . steps)
(sequence-transitions0* state0 steps))
(define (sequence-transitions0* state0 steps)
(match steps
['() #f]
[(cons step rest)
(match (step state0)
[#f (sequence-transitions0* state0 rest)]
[(? quit? q) q]
[(? transition? t) (sequence-transitions* t rest)])]))
(define (inert? w)
(and (queue-empty? (network-pending-action-queue w))
(set-empty? (network-runnable-pids w))))
(define (network-handle-event e w)
(if (or e (not (inert? w)))
(sequence-transitions (transition w '())
(inject-event e)
perform-actions
(lambda (w) (or (step-children w) (transition w '()))))
(step-children w)))
(define ((inject-event e) w)
(transition (match e
[#f w]
[(? scn? s) (enqueue-actions w 'meta (list (lift-scn s)))]
[(message body) (enqueue-actions w 'meta (list (message (at-meta body))))])
'()))
(define (perform-actions w)
(for/fold ([wt (transition (struct-copy network w [pending-action-queue (make-queue)]) '())])
((entry (in-list (queue->list (network-pending-action-queue w)))))
#:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn?
(match-define [cons label a] entry)
(trace-internal-action label a (transition-state wt))
(define wt1 (transition-bind (perform-action label a) wt))
(trace-internal-action-result label a (transition-state wt) wt1)
wt1))
(define ((perform-action label a) w)
(match a
[(spawn boot)
(invoke-process 'booting
(lambda ()
(match (boot)
[(and results (list (? procedure?) (? general-transition?)))
results]
[other
(error 'spawn
"Spawn boot procedure must yield boot spec; received ~v"
other)]))
(lambda (results)
(match-define (list behavior initial-transition) results)
(create-process w behavior initial-transition))
(lambda (exn)
(log-error "Spawned process in network ~a died with exception:\n~a"
(trace-pid-stack)
(exn->string exn))
(transition w '())))]
['quit
(define-values (new-mux _label s aggregate-assertions)
(mux-remove-stream (network-mux w) label))
;; behavior & state in w already removed by disable-process
(deliver-scns w new-mux label s aggregate-assertions)]
[(quit-network)
(make-quit)]
[(? scn? s-orig)
(define-values (new-mux _label s aggregate-assertions)
(mux-update-stream (network-mux w) label s-orig))
(deliver-scns w new-mux label s aggregate-assertions)]
[(and m (message body))
(when (observe? body)
(log-warning "Stream ~a sent message containing query ~v"
(cons label (trace-pid-stack))
body))
(if (and (not (meta-label? label)) ;; it's from a local process, not envt
(at-meta? body)) ;; it relates to envt, not local
(transition w (message (at-meta-claim body)))
(transition (for/fold [(w w)]
[(pid (in-list (mux-route-message (network-mux w) body)))]
(send-event m pid w))
'()))]))
(define (create-process w behavior initial-transition)
(if (not initial-transition)
(transition w '()) ;; Uh, ok
(let ()
(define-values (postprocess initial-actions)
(match (clean-transition initial-transition)
[(and q (quit exn initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) exn q)
(disable-process pid exn w))
(append initial-actions0 (list 'quit)))]
[(and t (transition initial-state initial-actions0))
(values (lambda (w pid)
(trace-process-step-result 'boot pid behavior (void) #f t)
(mark-pid-runnable (update-state w pid initial-state) pid))
initial-actions0)]))
(define-values (initial-scn remaining-initial-actions)
(match initial-actions
[(cons (? scn? s) rest) (values s rest)]
[other (values (scn (trie-empty)) other)]))
(define-values (new-mux new-pid s aggregate-assertions)
(mux-add-stream (network-mux w) initial-scn))
(let* ((w (struct-copy network w
[behaviors (hash-set (network-behaviors w)
new-pid
behavior)]))
(w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions)))
(deliver-scns w new-mux new-pid s aggregate-assertions)))))
(define (deliver-scns w new-mux acting-label s aggregate-assertions)
(define old-mux (network-mux w))
(define-values (scns meta-action)
(compute-scns old-mux new-mux acting-label s aggregate-assertions))
(transition (for/fold [(w (struct-copy network w [mux new-mux]))]
[(entry (in-list scns))]
(match-define (cons label (and event (scn new-assertions))) entry)
(if (equal? (biased-intersection (mux-routing-table old-mux)
(mux-interests-of old-mux label))
new-assertions)
w
(send-event event label w)))
meta-action))
(define (step-children w)
(define runnable-pids (network-runnable-pids w))
(if (set-empty? runnable-pids)
#f ;; network is inert.
(transition (for/fold [(w (struct-copy network w [runnable-pids (set)]))]
[(pid (in-set runnable-pids))]
(send-event #f pid w))
'())))
(define (pretty-print-network w [p (current-output-port)])
(match-define (network mux qs runnable behaviors states) w)
(fprintf p "NETWORK:\n")
(fprintf p " - ~a queued actions\n" (queue-length qs))
(fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable))
(fprintf p " - ~a live processes\n" (hash-count states))
(fprintf p " - ")
(display (indented-port-output 3 (lambda (p) (prospect-pretty-print mux p)) #:first-line? #f) p)
(for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))])
(fprintf p " ---- process ~a, behavior ~v, STATE:\n" pid (hash-ref behaviors pid #f))
(define state (hash-ref states pid #f))
(display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p)
(newline p)
(fprintf p " process ~a, behavior ~v, CLAIMS:\n" pid (hash-ref behaviors pid #f))
(display (indented-port-output 6 (lambda (p)
(pretty-print-trie (mux-interests-of mux pid) p)))
p)
(newline p)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(module+ test
(require racket/pretty)
(define (step* w)
(let loop ((w w) (actions '()))
(pretty-print w)
(match (network-handle-event #f w)
[#f (values w #f (flatten actions))]
[(quit exn new-actions) (values w exn (flatten (cons actions new-actions)))]
[(transition new-w new-actions) (loop new-w (cons actions new-actions))])))
(step* (make-network '()))
)

View File

@ -0,0 +1,127 @@
#lang racket/base
;; A structure (and process!) for matching supply to demand via observation of interests.
(require racket/set)
(require racket/match)
(require "core.rkt")
(require "drivers/timer.rkt")
(require "../prospect/pretty.rkt")
(provide (except-out (struct-out demand-matcher) demand-matcher)
(rename-out [make-demand-matcher demand-matcher])
demand-matcher-update
spawn-demand-matcher)
;; A DemandMatcher keeps track of demand for services based on some
;; Projection over a Trie, as well as a collection of functions
;; that can be used to increase supply in response to increased
;; demand, or handle a sudden drop in supply for which demand still
;; exists.
(struct demand-matcher (demand-spec ;; CompiledProjection
supply-spec ;; CompiledProjection
increase-handler ;; ChangeHandler
decrease-handler ;; ChangeHandler
current-demand ;; (Setof (Listof Any))
current-supply) ;; (Setof (Listof Any))
#:transparent
#:methods gen:prospect-pretty-printable
[(define (prospect-pretty-print s [p (current-output-port)])
(pretty-print-demand-matcher s p))])
;; A ChangeHandler is a ((Constreeof Action) Any* -> (Constreeof Action)).
;; It is called with an accumulator of actions so-far-computed as its
;; first argument, and with a value for each capture in the
;; DemandMatcher's projection as the remaining arguments.
;; ChangeHandler
;; Default handler of unexpected supply decrease.
(define (default-decrease-handler state . removed-captures)
state)
(define (make-demand-matcher demand-spec supply-spec increase-handler decrease-handler)
(demand-matcher demand-spec
supply-spec
increase-handler
decrease-handler
(set)
(set)))
;; DemandMatcher (Constreeof Action) SCN -> (Transition DemandMatcher)
;; Given a SCN from the environment, projects it into supply and
;; demand increase and decrease sets. Calls ChangeHandlers in response
;; to increased unsatisfied demand and decreased demanded supply.
(define (demand-matcher-update d s new-scn)
(match-define (demand-matcher demand-spec supply-spec inc-h dec-h demand supply) d)
(define new-demand (trie-project/set (scn-trie new-scn) demand-spec))
(define new-supply (trie-project/set (scn-trie new-scn) supply-spec))
(define added-demand (set-subtract new-demand demand))
(define removed-demand (set-subtract demand new-demand))
(define added-supply (set-subtract new-supply supply))
(define removed-supply (set-subtract supply new-supply))
(when (not added-demand) (error 'demand-matcher "Wildcard demand of ~v:\n~a"
demand-spec
(trie->pretty-string (scn-trie new-scn))))
(when (not added-supply) (error 'demand-matcher "Wildcard supply of ~v:\n~a"
supply-spec
(trie->pretty-string (scn-trie new-scn))))
(set! supply (set-union supply added-supply))
(set! demand (set-subtract demand removed-demand))
(for [(captures (in-set removed-supply))]
(when (set-member? demand captures) (set! s (apply dec-h s captures))))
(for [(captures (in-set added-demand))]
(when (not (set-member? supply captures)) (set! s (apply inc-h s captures))))
(set! supply (set-subtract supply removed-supply))
(set! demand (set-union demand added-demand))
(transition (struct-copy demand-matcher d [current-demand demand] [current-supply supply]) s))
;; Behavior :> (Option Event) DemandMatcher -> (Transition DemandMatcher)
;; Handles events from the environment. Only cares about routing-updates.
(define (demand-matcher-handle-event e d)
(match e
[(? scn? s)
(demand-matcher-update d '() s)]
[_ #f]))
;; Any* -> (Constreeof Action)
;; Default handler of unexpected supply decrease.
;; Ignores the situation.
(define (unexpected-supply-decrease . removed-captures)
'())
;; Projection Projection (Any* -> (Constreeof Action)) [(Any* -> (Constreeof Action))] -> Action
;; Spawns a demand matcher actor.
(define (spawn-demand-matcher demand-spec
supply-spec
increase-handler
[decrease-handler unexpected-supply-decrease]
#:meta-level [meta-level 0])
(define d (make-demand-matcher (compile-projection (prepend-at-meta demand-spec meta-level))
(compile-projection (prepend-at-meta supply-spec meta-level))
(lambda (acs . rs) (cons (apply increase-handler rs) acs))
(lambda (acs . rs) (cons (apply decrease-handler rs) acs))))
(spawn demand-matcher-handle-event
d
(scn/union (subscription (projection->pattern demand-spec) #:meta-level meta-level)
(subscription (projection->pattern supply-spec) #:meta-level meta-level)
(advertisement (projection->pattern supply-spec) #:meta-level meta-level))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (pretty-print-demand-matcher s [p (current-output-port)])
(match-define (demand-matcher demand-spec
supply-spec
increase-handler
decrease-handler
current-demand
current-supply)
s)
(fprintf p "DEMAND MATCHER:\n")
(fprintf p " - demand-spec: ~v\n" demand-spec)
(fprintf p " - supply-spec: ~v\n" supply-spec)
(fprintf p " - demand: ~v\n" current-demand)
(fprintf p " - supply: ~v\n" current-supply))

View File

@ -0,0 +1,183 @@
#lang racket/base
(require racket/match)
(require (prefix-in tcp: racket/tcp))
(require (only-in racket/port read-bytes-avail!-evt))
(require "../../prospect/exn-util.rkt")
(require "../main.rkt")
(require "../demand-matcher.rkt")
(require racket/unit)
(require net/tcp-sig)
(require net/tcp-unit)
(provide (struct-out tcp-address)
(struct-out tcp-handle)
(struct-out tcp-listener)
(struct-out tcp-channel)
spawn-tcp-driver)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Protocol messages
(struct tcp-address (host port) #:prefab)
(struct tcp-handle (id) #:prefab)
(struct tcp-listener (port) #:prefab)
(struct tcp-channel (source destination subpacket) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Ground-level communication messages
(struct tcp-accepted (remote-addr local-addr cin cout) #:prefab)
;; tcp-channel does double-duty as a ground-level message as well
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Driver
(define (spawn-tcp-driver)
(list (spawn-demand-matcher (advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?)))
(advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?)))
spawn-tcp-listener)
(spawn-demand-matcher (advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
(observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?))
spawn-tcp-connection)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Listener
(struct listener-state (control-ch server-addr) #:transparent)
(define (tcp-listener-thread control-ch listener server-addr)
(let loop ((blocked? #t))
(sync (handle-evt control-ch
(match-lambda
['unblock (loop #f)]
['quit (void)]))
(if blocked?
never-evt
(handle-evt (tcp:tcp-accept-evt listener)
(lambda (cin+cout)
(match-define (list cin cout) cin+cout)
(define-values (local-hostname local-port remote-hostname remote-port)
(tcp:tcp-addresses cin #t))
(send-ground-message
(tcp-accepted (tcp-address remote-hostname remote-port)
server-addr
cin
cout))
(loop blocked?))))))
(tcp:tcp-close listener))
(define (tcp-listener-behavior e state)
(match e
[(scn r)
(define ch (listener-state-control-ch state))
(cond [(trie-empty? r) (channel-put ch 'quit) (quit)]
[else (channel-put ch 'unblock) #f])]
[(message (at-meta (tcp-accepted remote-addr _ cin cout)))
(transition state (spawn-connection (listener-state-server-addr state)
remote-addr
cin
cout))]
[_ #f]))
(define (spawn-tcp-listener server-addr)
(match-define (tcp-listener port) server-addr)
(define listener (tcp:tcp-listen port 128 #t))
(define control-ch (make-channel))
(thread (lambda () (tcp-listener-thread control-ch listener server-addr)))
(spawn tcp-listener-behavior
(listener-state control-ch server-addr)
(scn/union
(subscription (advertise (observe (tcp-channel ? server-addr ?)))) ;; monitor peer
(advertisement (advertise (tcp-channel ? server-addr ?))) ;; declare we might make connections
(subscription (tcp-accepted ? server-addr ? ?) #:meta-level 1) ;; events from driver thread
)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Outbound Connection
(define (spawn-tcp-connection local-addr remote-addr)
(match-define (tcp-address remote-hostname remote-port) remote-addr)
(define-values (cin cout)
(with-handlers ([exn:fail:network? (lambda (e)
;; TODO: it'd be nice to
;; somehow communicate the
;; actual error to the local
;; peer.
(log-error "~a" (exn->string e))
(define o (open-output-string))
(close-output-port o)
(values (open-input-string "")
o))])
(tcp:tcp-connect remote-hostname remote-port)))
(spawn-connection local-addr remote-addr cin cout))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Connection
(struct connection-state (control-ch cout) #:transparent)
(define (read-bytes-avail-evt len input-port)
(guard-evt
(lambda ()
(let ([bstr (make-bytes len)])
(handle-evt
(read-bytes-avail!-evt bstr input-port)
(lambda (v)
(if (number? v)
(if (= v len) bstr (subbytes bstr 0 v))
v)))))))
(define (tcp-connection-thread remote-addr local-addr control-ch cin)
(let loop ((blocked? #t))
(sync (handle-evt control-ch
(match-lambda
['unblock (loop #f)]
['quit (void)]))
(if blocked?
never-evt
(handle-evt (read-bytes-avail-evt 32768 cin)
(lambda (eof-or-bs)
(send-ground-message (tcp-channel remote-addr local-addr eof-or-bs))
(loop (or blocked? (eof-object? eof-or-bs))))))))
(close-input-port cin))
(define (shutdown-connection! state)
(match-define (connection-state control-ch cout) state)
(when control-ch (channel-put control-ch 'quit))
(when cout (close-output-port cout)))
(define (tcp-connection e state)
(with-handlers [((lambda (exn) #t)
(lambda (exn)
(shutdown-connection! state)
(raise exn)))]
(match e
[(message (at-meta (tcp-channel remote-addr local-addr (? eof-object?))))
(shutdown-connection! state)
(quit)]
[(message (at-meta (tcp-channel remote-addr local-addr (? bytes? bs))))
(transition state (message (tcp-channel remote-addr local-addr bs)))]
[(message (tcp-channel _ _ bs))
(write-bytes bs (connection-state-cout state))
(flush-output (connection-state-cout state))
#f]
[(scn r)
(define ch (connection-state-control-ch state))
(cond [(trie-empty? r) (shutdown-connection! state) (quit)]
[else (channel-put ch 'unblock) #f])]
[#f #f])))
(define (spawn-connection local-addr remote-addr cin cout)
(define control-ch (make-channel))
(thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin)))
(spawn tcp-connection
(connection-state control-ch cout)
(scn/union
(subscription (observe (tcp-channel remote-addr local-addr ?))) ;; monitor peer
(advertisement (tcp-channel remote-addr local-addr ?)) ;; may send segments to peer
(subscription (tcp-channel local-addr remote-addr ?)) ;; want segments from peer
(subscription (tcp-channel remote-addr local-addr ?) #:meta-level 1) ;; segments from driver thread
)))

View File

@ -0,0 +1,92 @@
#lang racket/base
;; Timer driver.
;; Uses mutable state internally, but because the scope of the
;; mutation is limited to each timer process alone, it's easy to show
;; correct linear use of the various pointers.
(require racket/set)
(require racket/match)
(require data/heap)
(require "../main.rkt")
(struct pending-timer (deadline label) #:transparent)
(provide (struct-out set-timer)
(struct-out timer-expired)
spawn-timer-driver)
(struct set-timer (label msecs kind) #:prefab)
(struct timer-expired (label msecs) #:prefab)
(define (spawn-timer-driver)
(define control-ch (make-channel))
(thread (lambda () (timer-driver-thread-main control-ch)))
(define (timer-driver e count)
(match e
[(message (at-meta (and expiry (timer-expired _ _))))
(transition (- count 1)
(list (message expiry)
(when (= count 1)
(scn/union (subscription (set-timer ? ? ?))
(advertisement (timer-expired ? ?))))))]
[(message (and instruction (set-timer _ _ _)))
(channel-put control-ch instruction)
(transition (+ count 1)
(when (= count 0)
(scn/union (subscription (set-timer ? ? ?))
(advertisement (timer-expired ? ?))
(subscription (timer-expired ? ?) #:meta-level 1))))]
[_ #f]))
(spawn timer-driver
0 ;; initial count
(scn/union (subscription (set-timer ? ? ?))
(advertisement (timer-expired ? ?)))))
(define (timer-driver-thread-main control-ch)
(define heap (make-timer-heap))
(let loop ()
(sync (match (next-timer heap)
[#f never-evt]
[t (handle-evt (timer-evt (pending-timer-deadline t))
(lambda (now)
(for-each send-ground-message (fire-timers! heap now))
(loop)))])
(handle-evt control-ch
(match-lambda
[(set-timer label msecs 'relative)
(install-timer! heap label (+ (current-inexact-milliseconds) msecs))
(loop)]
[(set-timer label msecs 'absolute)
(install-timer! heap label msecs)
(loop)]
['quit (void)])))))
(define (make-timer-heap)
(make-heap (lambda (t1 t2) (<= (pending-timer-deadline t1) (pending-timer-deadline t2)))))
(define (next-timer heap)
(and (positive? (heap-count heap))
(heap-min heap)))
(define (fire-timers! heap now)
(if (zero? (heap-count heap))
'()
(let ((m (heap-min heap)))
(if (<= (pending-timer-deadline m) now)
(begin (heap-remove-min! heap)
(cons (timer-expired (pending-timer-label m) now)
(fire-timers! heap now)))
'()))))
(define (install-timer! heap label deadline)
(define now (current-inexact-milliseconds))
(heap-add! heap (pending-timer deadline label)))
;; Racket's alarm-evt is almost the right design for timeouts: its
;; synchronisation value should be the (or some) value of the clock
;; after the asked-for time. That way it serves as timeout and
;; clock-reader in one.
(define (timer-evt msecs)
(handle-evt (alarm-evt msecs)
(lambda (_) (current-inexact-milliseconds))))

View File

@ -0,0 +1,90 @@
#lang racket/base
(require racket/match)
(require (prefix-in udp: racket/udp))
(require "../main.rkt")
(require "../demand-matcher.rkt")
(provide (struct-out udp-remote-address)
(struct-out udp-handle)
(struct-out udp-listener)
udp-address?
udp-local-address?
(struct-out udp-packet)
spawn-udp-driver)
;; A UdpAddress is one of
;; -- a (udp-address String Uint16), representing a remote socket
;; -- a (udp-handle Any), representing a local socket on a kernel-assigned port
;; -- a (udp-listener Uint16), representing a local socket on a user-assigned port
;; Note that udp-handle-ids must be chosen carefully: they are scoped
;; to the local VM, i.e. shared between processes in that VM, so
;; processes must make sure not to accidentally clash in handle ID
;; selection.
(struct udp-remote-address (host port) #:prefab)
(struct udp-handle (id) #:prefab)
(struct udp-listener (port) #:prefab)
(define (udp-address? x)
(or (udp-remote-address? x)
(udp-local-address? x)))
(define (udp-local-address? x)
(or (udp-handle? x)
(udp-listener? x)))
;; A UdpPacket is a (udp-packet UdpAddress UdpAddress Bytes), and
;; represents a packet appearing on our local "subnet" of the full UDP
;; network, complete with source, destination and contents.
(struct udp-packet (source destination body) #:prefab)
;; -> Action
;; Spawns a process acting as a UDP socket factory.
(define (spawn-udp-driver)
(spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?))
(advertise (udp-packet ? (?! (udp-listener ?)) ?))
spawn-udp-socket))
;; UdpLocalAddress -> Action
(define (spawn-udp-socket local-addr)
(define socket (udp:udp-open-socket #f #f))
(match local-addr
[(udp-listener port) (udp:udp-bind! socket #f port)]
[(udp-handle _) (udp:udp-bind! socket #f 0)]) ;; kernel-allocated port number
(define control-ch (make-channel))
(thread (lambda () (udp-receiver-thread local-addr socket control-ch)))
(spawn (lambda (e s)
(match e
[(scn r)
(cond [(trie-non-empty? r) #f] ;; peer hasn't quit yet: do nothing.
[else (channel-put control-ch 'quit)
(quit)])]
[(message (at-meta (? udp-packet? p)))
(transition s (message p))]
[(message (udp-packet _ (udp-remote-address host port) body))
(udp:udp-send-to socket host port body)
#f]
[_ #f]))
(void)
(scn/union (subscription (udp-packet ? local-addr ?) #:meta-level 1)
(subscription (udp-packet local-addr (udp-remote-address ? ?) ?))
(advertisement (udp-packet (udp-remote-address ? ?) local-addr ?))
(subscription (observe (udp-packet (udp-remote-address ? ?) local-addr ?))))))
;; UdpLocalAddress UdpSocket Channel -> Void
(define (udp-receiver-thread local-addr socket control-ch)
(define buffer (make-bytes 65536))
(let loop ()
(sync (handle-evt control-ch (match-lambda ['quit (void)]))
(handle-evt (udp:udp-receive!-evt socket buffer)
(lambda (receive-results)
(match-define (list len source-hostname source-port) receive-results)
(send-ground-message
(udp-packet (udp-remote-address source-hostname source-port)
local-addr
(subbytes buffer 0 len)))
(loop)))))
(udp:udp-close socket))

View File

@ -0,0 +1,183 @@
#lang racket/base
(require racket/match)
(require net/rfc6455)
(require (only-in net/rfc6455/conn-api ws-conn-base-ip))
(require "../main.rkt")
(require "../demand-matcher.rkt")
(require racket/unit)
(require net/tcp-sig)
(require net/tcp-unit)
(require net/ssl-tcp-unit)
(require net/url)
(provide (struct-out websocket-remote-client)
(struct-out websocket-local-server)
(struct-out websocket-local-client)
(struct-out websocket-remote-server)
(struct-out websocket-ssl-options)
(struct-out websocket-message)
spawn-websocket-driver)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Protocol messages
(struct websocket-remote-client (id) #:prefab)
(struct websocket-local-server (port ssl-options) #:prefab)
(struct websocket-local-client (id) #:prefab)
(struct websocket-remote-server (url) #:prefab)
(struct websocket-ssl-options (cert-file key-file) #:prefab)
(struct websocket-message (from to body) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Ground-level communication messages
(struct websocket-connection (id local-addr remote-addr connection control-ch) #:prefab)
(struct websocket-incoming-message (id message) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Driver
(define (spawn-websocket-driver)
(define inbound-listener-message-pat (websocket-message ? (?! (websocket-local-server ? ?)) ?))
(define outbound-conn-message-pat (websocket-message (?! (websocket-local-client ?))
(?! (websocket-remote-server ?))
?))
(list (spawn-demand-matcher (advertise (observe inbound-listener-message-pat))
(advertise (advertise inbound-listener-message-pat))
spawn-websocket-listener)
(spawn-demand-matcher (advertise outbound-conn-message-pat)
(observe outbound-conn-message-pat)
spawn-websocket-connection)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Listener
(struct listener-state (shutdown-procedure server-addr) #:transparent)
(define (websocket-listener e state)
(match e
[(scn (? trie-empty?))
((listener-state-shutdown-procedure state))
(quit)]
[(message (at-meta (websocket-connection id local-addr remote-addr c control-ch)))
(transition state (spawn-connection local-addr remote-addr id c control-ch))]
[_ #f]))
(define ((connection-handler server-addr) c dummy-state)
(define control-ch (make-channel))
(define id (gensym 'ws))
(send-ground-message
(websocket-connection id server-addr (websocket-remote-client id) c control-ch))
(connection-thread-loop control-ch c id))
(define (connection-thread-loop control-ch c id)
(define c-input-port (ws-conn-base-ip c))
(let loop ((blocked? #t))
(sync (handle-evt control-ch
(match-lambda
['unblock (loop #f)]
['quit (void)]))
(if blocked?
never-evt
(handle-evt c-input-port
(lambda (dummy)
(define msg
(with-handlers ([exn:fail:network? (lambda (e) eof)])
(ws-recv c #:payload-type 'text)))
(send-ground-message (websocket-incoming-message id msg))
(loop (or blocked? (eof-object? msg))))))))
(ws-close! c))
(define (ssl-options->ssl-tcp@ ssl-options)
(match-define (websocket-ssl-options cert-file key-file) ssl-options)
(define-unit-binding ssl-tcp@
(make-ssl-tcp@ cert-file key-file #f #f #f #f #f)
(import)
(export tcp^))
ssl-tcp@)
(define (spawn-websocket-listener server-addr)
(match-define (websocket-local-server port ssl-options) server-addr)
(define shutdown-procedure (ws-serve #:port port
#:tcp@ (if ssl-options
(ssl-options->ssl-tcp@ ssl-options)
tcp@)
(connection-handler server-addr)))
(spawn websocket-listener
(listener-state shutdown-procedure server-addr)
(scn/union
(subscription (advertise (observe (websocket-message ? server-addr ?)))) ;; monitor peer
(advertisement (advertise (websocket-message ? server-addr ?))) ;; declare we might make connections
(subscription (websocket-connection ? server-addr ? ? ?) #:meta-level 1) ;; events from driver thd
)))
(define (spawn-websocket-connection local-addr remote-addr)
(match-define (websocket-remote-server url) remote-addr)
(define id (gensym 'ws))
(define control-ch (make-channel))
(thread
(lambda ()
(log-info "Connecting to ~a ~a" url (current-inexact-milliseconds))
(define c (with-handlers [(exn? values)] (ws-connect (string->url url))))
(log-info "Connected to ~a ~a" url (current-inexact-milliseconds))
(send-ground-message
(websocket-connection id local-addr remote-addr c control-ch))
(when (not (exn? c))
(connection-thread-loop control-ch c id))))
(spawn (lambda (e buffered-messages-rev)
(match e
[(message (at-meta (websocket-connection _ _ _ c _)))
(when (not (exn? c))
(for [(m (reverse buffered-messages-rev))] (ws-send! c m))
(spawn-connection local-addr remote-addr id c control-ch))
(quit)]
[(message (websocket-message _ _ m))
(transition (cons m buffered-messages-rev) '())]
[_ #f]))
'()
(scn/union
(subscription (websocket-connection id local-addr remote-addr ? control-ch) #:meta-level 1)
(subscription (websocket-message local-addr remote-addr ?)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Connection
(struct connection-state (local-addr remote-addr c control-ch) #:transparent)
(define (shutdown-connection! state)
(channel-put (connection-state-control-ch state) 'quit)
(quit))
(define (websocket-connection-behaviour e state)
(with-handlers [((lambda (exn) #t)
(lambda (exn)
(shutdown-connection! state)
(raise exn)))]
(match e
[(message (at-meta (websocket-incoming-message _ (? eof-object?))))
(shutdown-connection! state)]
[(message (at-meta (websocket-incoming-message _ (? bytes? bs))))
(transition state (message (websocket-message (connection-state-remote-addr state)
(connection-state-local-addr state)
bs)))]
[(message (websocket-message _ _ m))
(ws-send! (connection-state-c state) m)
#f]
[(scn (? trie-empty?))
(shutdown-connection! state)]
[(scn _)
(channel-put (connection-state-control-ch state) 'unblock)
#f]
[_ #f])))
(define (spawn-connection local-addr remote-addr id c control-ch)
(spawn websocket-connection-behaviour
(connection-state local-addr remote-addr c control-ch)
(scn/union
(subscription (observe (websocket-message remote-addr local-addr ?))) ;; monitor peer
(advertisement (websocket-message remote-addr local-addr ?)) ;; may send messages to peer
(subscription (websocket-message local-addr remote-addr ?)) ;; want segments from peer
(subscription (websocket-incoming-message id ?) #:meta-level 1) ;; segments from driver thd
)))

View File

@ -0,0 +1,2 @@
private-key.pem
server-cert.pem

View File

@ -0,0 +1,14 @@
keys: private-key.pem server-cert.pem
private-key.pem:
openssl genrsa -des3 -passout pass:a -out $@ 1024
openssl rsa -passin pass:a -in $@ -out $@
server-cert.pem: private-key.pem
openssl req -new -x509 -nodes -sha1 -days 365 \
-subj /CN=example.racket-rfc6455.leastfixedpoint.com \
-passin pass:a \
-key private-key.pem > $@
clean-keys:
rm -f private-key.pem server-cert.pem

View File

@ -0,0 +1,28 @@
#lang prospect-monolithic
;; Hello-worldish "bank account" example.
(struct account (balance) #:prefab)
(struct deposit (amount) #:prefab)
(define (manager e balance)
(match e
[(message (deposit amount))
(transition (+ balance amount)
(scn (assertion (account (+ balance amount)))))]
[_ #f]))
(define (observer e _)
(when (scn? e)
(for [(balance (project-assertions (scn-trie e) (account (?!))))]
(printf "Balance changed to ~a\n" balance)))
#f)
(define (updater e _)
(if (and (scn? e) (trie-non-empty? (scn-trie e)))
(quit (list (message (deposit +100))
(message (deposit -30))))
#f))
(spawn manager 0 (scn/union (assertion (observe (deposit ?))) (assertion (account 0))))
(spawn observer (void) (scn (assertion (observe (account ?)))))
(spawn updater (void) (scn (assertion (observe (observe (deposit ?))))))

View File

@ -0,0 +1,26 @@
#lang prospect-monolithic
;; Simple mutable box and count-to-infinity box client.
(struct set-box (new-value) #:transparent)
(struct box-state (value) #:transparent)
(spawn (lambda (e current-value)
(match e
[(message (set-box new-value))
(log-info "box: taking on new-value ~v" new-value)
(transition new-value (scn/union (subscription (set-box ?))
(assertion (box-state new-value))))]
[_ #f]))
0
(scn/union (subscription (set-box ?))
(assertion (box-state 0))))
(spawn (lambda (e s)
(match e
[(scn assertions)
(transition s (for/list [(v (project-assertions assertions (box-state (?!))))]
(log-info "client: learned that box's value is now ~v" v)
(message (set-box (+ v 1)))))]
[_ #f]))
(void)
(scn (subscription (box-state ?))))

View File

@ -0,0 +1,27 @@
#lang prospect-monolithic
(require (only-in racket/port read-bytes-line-evt))
(require "../drivers/tcp.rkt")
(define local-handle (tcp-handle 'chat))
(define remote-handle (tcp-address "localhost" 5999))
(spawn-tcp-driver)
(spawn/stateless (lambda (e)
(match e
[(scn (? trie-empty?)) (quit)]
[(message (at-meta (external-event _ (list (? eof-object?)))))
(quit)]
[(message (at-meta (external-event _ (list line))))
(message (tcp-channel local-handle remote-handle line))]
[(message (tcp-channel _ _ bs))
(write-bytes bs)
(flush-output)
#f]
[_ #f]))
(scn/union
(subscription (external-event (read-bytes-line-evt (current-input-port) 'any) ?)
#:meta-level 1)
(subscription (tcp-channel remote-handle local-handle ?))
(subscription (advertise (tcp-channel remote-handle local-handle ?)))
(advertisement (tcp-channel local-handle remote-handle ?))))

View File

@ -0,0 +1,47 @@
#lang prospect-monolithic
(require racket/set)
(require (only-in racket/string string-trim))
(require "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(define (spawn-session them us)
(define user (gensym 'user))
(define remote-detector (compile-projection (advertise (?! (tcp-channel ? ? ?)))))
(define peer-detector (compile-projection (advertise `(,(?!) says ,?))))
(define (send-to-remote fmt . vs)
(message (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs)))))
(define (say who fmt . vs)
(unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs))))
(list (send-to-remote "Welcome, ~a.\n" user)
(spawn
(lambda (e peers)
(match e
[(message (tcp-channel _ _ bs))
(transition peers (message `(,user says ,(string-trim (bytes->string/utf-8 bs)))))]
[(message `(,who says ,what))
(transition peers (say who "says: ~a" what))]
[(scn assertions)
(if (trie-empty? (trie-project assertions remote-detector))
(quit (send-to-remote "Goodbye!\n"))
(let ((new-peers (trie-project/set/single assertions peer-detector)))
(define arrived (set-subtract new-peers peers))
(define departed (set-subtract peers new-peers))
(transition new-peers
(list (for/list [(who arrived)] (say who "arrived."))
(for/list [(who departed)] (say who "departed."))))))]
[#f #f]))
(set)
(scn/union
(subscription `(,? says ,?)) ;; read actual chat messages
(subscription (advertise `(,? says ,?))) ;; observe peer presence
(advertisement `(,user says ,?)) ;; advertise our presence
(subscription (tcp-channel them us ?)) ;; read from remote client
(subscription (advertise (tcp-channel them us ?))) ;; monitor remote client
(advertisement (tcp-channel us them ?)) ;; we will write to remote client
))))
(spawn-tcp-driver)
(spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
(observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?))
spawn-session)

View File

@ -0,0 +1,24 @@
#lang prospect-monolithic
(require "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(define server-id (tcp-listener 5999))
(spawn-tcp-driver)
(spawn-demand-matcher (advertise (tcp-channel (?!) server-id ?))
(observe (tcp-channel (?!) server-id ?))
(lambda (c)
(printf "Accepted connection from ~v\n" c)
(spawn (lambda (e state)
(match e
[(scn (? trie-empty?))
(printf "Closed connection ~v\n" c)
(quit)]
[(message (tcp-channel src dst bs))
(transition state (message (tcp-channel dst src bs)))]
[_ #f]))
(void)
(scn/union (subscription (advertise (tcp-channel c server-id ?)))
(subscription (tcp-channel c server-id ?))
(advertisement (tcp-channel server-id c ?))))))

View File

@ -0,0 +1,75 @@
#lang prospect-monolithic
(require (only-in racket/port read-line-evt))
(require "../drivers/timer.rkt")
(define (quasi-spy e s)
(printf "----------------------------------------\n")
(printf "QUASI-SPY:\n")
(match e
[(scn r) (pretty-print-trie r)]
[other
(write other)
(newline)])
(printf "========================================\n")
#f)
(spawn quasi-spy (void) (scn (subscription ?)))
(define (r e s)
(match e
[(message body) (transition s (message (at-meta `(print (got ,body)))))]
[_ #f]))
(define (b e n)
(match e
[#f (if (< n 10)
(transition (+ n 1) (message `(hello ,n)))
#f)]
[_ #f]))
(spawn-network (spawn r (void) (scn (subscription ?)))
(spawn b 0 '()))
(define (echoer e s)
(match e
[(message (at-meta (external-event _ (list (? eof-object?)))))
(quit)]
[(message (at-meta (external-event _ (list line))))
(transition s (message `(print (got-line ,line))))]
[_ #f]))
(spawn echoer
(void)
(scn (subscription (external-event (read-line-evt (current-input-port) 'any) ?)
#:meta-level 1)))
(define (ticker e s)
(match e
[(scn r)
(printf "TICKER SCN RECEIVED:\n")
(pretty-print-trie r)
#f]
[(message (timer-expired 'tick now))
(printf "TICK ~v\n" now)
(if (< s 3)
(transition (+ s 1) (message (set-timer 'tick 1000 'relative)))
(quit))]
[_ #f]))
(spawn-timer-driver)
(message (set-timer 'tick 1000 'relative))
(spawn ticker
1
(scn/union (subscription (observe (set-timer ? ? ?)))
(subscription (timer-expired 'tick ?))))
(define (printer e s)
(match e
[(message (list 'print v))
(log-info "PRINTER: ~a" v)
#f]
[_ #f]))
(spawn printer
(void)
(scn (subscription `(print ,?))))

View File

@ -0,0 +1,72 @@
#lang racket/base
(require racket/match)
(require (only-in racket/port read-line-evt))
(require "../main.rkt")
(require "../drivers/timer.rkt")
(define (quasi-spy e s)
(printf "----------------------------------------\n")
(printf "QUASI-SPY:\n")
(match e
[(scn r) (pretty-print-trie r)]
[other
(write other)
(newline)])
(printf "========================================\n")
#f)
(define (r e s)
(match e
[(message body) (transition s (message (at-meta `(print (got ,body)))))]
[_ #f]))
(define (b e n)
(match e
[#f (if (< n 10)
(transition (+ n 1) (message `(hello ,n)))
#f)]
[_ #f]))
(define (echoer e s)
(match e
[(message (at-meta (external-event _ (list (? eof-object?)))))
(quit)]
[(message (at-meta (external-event _ (list line))))
(transition s (message `(print (got-line ,line))))]
[_ #f]))
(define (ticker e s)
(match e
[(scn r)
(printf "TICKER SCN RECEIVED:\n")
(pretty-print-trie r)
#f]
[(message (timer-expired 'tick now))
(printf "TICK ~v\n" now)
(if (< s 3)
(transition (+ s 1) (message (set-timer 'tick 1000 'relative)))
(quit))]
[_ #f]))
(define (printer e s)
(match e
[(message (list 'print v))
(log-info "PRINTER: ~a" v)
#f]
[_ #f]))
(run-ground (spawn quasi-spy (void) (scn (subscription ?)))
(spawn-timer-driver)
(message (set-timer 'tick 1000 'relative))
(spawn ticker
1
(scn/union (subscription (observe (set-timer ? ? ?)))
(subscription (timer-expired 'tick ?))))
(spawn-network (spawn r (void) (scn (subscription ?)))
(spawn b 0 '()))
(spawn echoer
(void)
(scn (subscription (external-event (read-line-evt (current-input-port) 'any) ?)
#:meta-level 1)))
(spawn printer (void) (scn (subscription `(print ,?)))))

View File

@ -0,0 +1,35 @@
#lang prospect-monolithic
;; Demonstrates quit-network.
(require (only-in racket/port read-bytes-line-evt))
(define (spawn-command-listener)
(spawn (lambda (e s)
(match e
[(message (at-meta (at-meta (external-event _ (list #"quit")))))
(printf "Quitting just the leaf actor.\n")
(quit)]
[(message (at-meta (at-meta (external-event _ (list #"quit-network")))))
(printf "Terminating the whole network.\n")
(transition s (quit-network))]
[_ #f]))
(void)
(scn (subscription (external-event (read-bytes-line-evt (current-input-port) 'any) ?)
#:meta-level 2))))
(define (spawn-ticker)
(define (sub-to-alarm)
(scn (subscription (external-event (alarm-evt (+ (current-inexact-milliseconds) 1000)) ?)
#:meta-level 2)))
(spawn (lambda (e s)
(match e
[(message (at-meta (at-meta (external-event _ _))))
(printf "Tick!\n")
(transition s (sub-to-alarm))]
[_ #f]))
(void)
(sub-to-alarm)))
(printf "Type 'quit' or 'quit-network'.\n")
(spawn-network (spawn-command-listener)
(spawn-ticker))

View File

@ -0,0 +1,25 @@
#lang prospect-monolithic
(struct echo-req (body) #:prefab)
(struct echo-resp (body) #:prefab)
(spawn (lambda (e count)
(match e
[(message (echo-req body))
(transition (+ count 1)
(message (echo-resp body)))]
[_ #f]))
0
(scn (subscription (echo-req ?))))
(spawn (lambda (e s)
(match e
[(message (echo-resp body))
(printf "Received: ~v\n" body)
#f]
[_ #f]))
(void)
(list (scn (subscription (echo-resp ?)))
(message (echo-req 0))
(message (echo-req 1))
(message (echo-req 2))))

View File

@ -0,0 +1,36 @@
#lang prospect-monolithic
(require "../drivers/tcp.rkt")
(require "../demand-matcher.rkt")
(spawn-tcp-driver)
(define server-id (tcp-listener 5999))
(define (spawn-connection-handler c)
(log-info "spawn-connection-handler ~v" c)
(define (connection-handler e n)
(when e (log-info "connection-handler ~v: ~v /// ~v" c e n))
(match e
[(scn (? trie-empty?)) (quit)]
[(message (tcp-channel src dst #"quit\n"))
(quit (message (tcp-channel dst src #"OK, then.\n")))]
[(message (tcp-channel src dst bs))
(transition n (message (tcp-channel dst src (string->bytes/utf-8
(format "You said: ~a" bs)))))]
[_
(and (< n 5)
(transition (+ n 1) (message (tcp-channel server-id c (string->bytes/utf-8
(format "msg ~v\n" n))))))]))
(spawn connection-handler
0
(scn/union (subscription (advertise (tcp-channel c server-id ?)))
(subscription (tcp-channel c server-id ?))
(advertisement (tcp-channel server-id c ?)))))
(spawn-demand-matcher (advertise (tcp-channel (?!) server-id ?))
(observe (tcp-channel (?!) server-id ?))
spawn-connection-handler
(lambda (c)
(log-info "Connection handler ~v decided to exit" c)
'()))

View File

@ -0,0 +1,18 @@
#lang prospect-monolithic
(require "../drivers/udp.rkt")
(spawn-udp-driver)
(spawn (lambda (e s)
(match e
[(message (udp-packet src dst #"quit\n"))
(log-info "Got quit request")
(quit (message (udp-packet dst src #"Goodbye!\n")))]
[(message (udp-packet src dst body))
(log-info "Got packet from ~v: ~v" src body)
(define reply (string->bytes/utf-8 (format "You said: ~a" body)))
(transition s (message (udp-packet dst src reply)))]
[_ #f]))
(void)
(scn (subscription (udp-packet ? (udp-listener 5999) ?))))

View File

@ -0,0 +1,33 @@
#lang prospect-monolithic
(require "../drivers/websocket.rkt")
(require "../demand-matcher.rkt")
(spawn-websocket-driver)
(define any-client (websocket-remote-client ?))
(define server-id (websocket-local-server 8081 (websocket-ssl-options "server-cert.pem"
"private-key.pem")))
(define (spawn-connection-handler c)
(log-info "spawn-connection-handler ~v" c)
(define (connection-handler e n)
(when e (log-info "connection-handler ~v: ~v /// ~v" c e n))
(match e
[(scn (? trie-empty?)) (quit)]
[_
(if (< n 20)
(transition (+ n 1) (message (websocket-message server-id c (format "msg ~v" n))))
#f)]))
(spawn connection-handler
0
(scn/union (subscription (advertise (websocket-message c server-id ?)))
(subscription (websocket-message c server-id ?))
(advertisement (websocket-message server-id c ?)))))
(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?))
(observe (websocket-message (?! any-client) server-id ?))
spawn-connection-handler
(lambda (c)
(log-info "Connection handler ~v decided to exit" c)
'()))

View File

@ -0,0 +1,32 @@
#lang prospect-monolithic
(require "../drivers/websocket.rkt")
(require "../demand-matcher.rkt")
(spawn-websocket-driver)
(define any-client (websocket-remote-client ?))
(define server-id (websocket-local-server 8081 #f))
(define (spawn-connection-handler c)
(log-info "spawn-connection-handler ~v" c)
(define (connection-handler e n)
(when e (log-info "connection-handler ~v: ~v /// ~v" c e n))
(match e
[(scn (? trie-empty?)) (quit)]
[_
(if (< n 20)
(transition (+ n 1) (message (websocket-message server-id c (format "msg ~v" n))))
#f)]))
(spawn connection-handler
0
(scn/union (subscription (advertise (websocket-message c server-id ?)))
(subscription (websocket-message c server-id ?))
(advertisement (websocket-message server-id c ?)))))
(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?))
(observe (websocket-message (?! any-client) server-id ?))
spawn-connection-handler
(lambda (c)
(log-info "Connection handler ~v decided to exit" c)
'()))

View File

@ -0,0 +1,106 @@
#lang racket/base
;; Breaking the infinite tower of nested Networks, connecting to the "real world" at the fracture line.
(require racket/async-channel)
(require racket/set)
(require racket/match)
(require racket/list)
(require "core.rkt")
(require "../prospect/trace.rkt")
(require "trace/stderr.rkt")
(require "../prospect/tset.rkt")
(provide (struct-out external-event)
send-ground-message
send-ground-scn
run-ground)
;;---------------------------------------------------------------------------
;; Communication via regular subscription and messages from other threads
;; (Parameterof (Option AsyncChannel))
;; Communication channel from auxiliary (usually driver) threads to
;; the currently-active ground VM.
(define current-ground-event-async-channel (make-parameter (make-async-channel)))
;; Any -> Void
;; Sends a message at the ground-VM metalevel.
(define (send-ground-message body)
(async-channel-put (current-ground-event-async-channel) (message body)))
;; SCN -> Void
;; Injects a SCN into the ground-VM metalevel. It will appear to be
;; asserted by the environment in general. The obligation is on the caller
;; to ensure that assertions do not interfere between drivers.
(define (send-ground-scn s)
(async-channel-put (current-ground-event-async-channel) s))
;;---------------------------------------------------------------------------
;; Communication via RacketEvents
;; A GroundEvent is a pair of a Racket (evt?) event and its yielded
;; results.
;; - (external-event RacketEvent (Listof Any))
(struct external-event (descriptor values) #:prefab)
;; RacketEvent -> RacketEvent
;; Wraps a CML-style Racket event with a handler that sends the event
;; results via the ground VM.
(define (event-handler descriptor)
(handle-evt descriptor (lambda vs (message (external-event descriptor vs)))))
;; Projection
;; Used to extract event descriptors and results from subscriptions
;; from the ground VM's contained Network.
(define event-projection (compile-projection (observe (external-event (?!) ?))))
;; Interests -> (Listof RacketEvent)
;; Projects out the active event subscriptions from the given interests.
(define (extract-active-events interests)
(define es (trie-project/set/single interests event-projection))
;; TODO: how should the following error be handled, ideally?
;; In principle, security restrictions should make it impossible.
;; But absent those, what should be done? Should an offending
;; process be identified and terminated?
(unless es (error 'extract-active-events "User program subscribed to wildcard event"))
(for/list [(e (in-set es))] (event-handler e)))
;;---------------------------------------------------------------------------
;; RacketEvent
;; Used only when the system is not provably inert, in order to let it
;; take further internal reductions.
(define idle-handler
(handle-evt (system-idle-evt) (lambda _ #f)))
;; Action* -> Void
;; Runs a ground VM, booting the outermost Network with the given Actions.
(define (run-ground . boot-actions)
(let await-interrupt ((inert? #f)
(w (make-network boot-actions))
(interests (trie-empty)))
;; (log-info "GROUND INTERESTS:\n~a" (trie->pretty-string interests))
(if (and inert? (trie-empty? interests))
(begin (log-info "run-ground: Terminating because inert")
(void))
(let ((e (apply sync
(current-ground-event-async-channel)
(if inert? never-evt idle-handler)
(extract-active-events interests))))
(trace-process-step e #f network-handle-event w)
(define resulting-transition (clean-transition (network-handle-event e w)))
(trace-process-step-result e #f network-handle-event w #f resulting-transition)
(match resulting-transition
[#f ;; inert
(await-interrupt #t w interests)]
[(transition w actions)
(let process-actions ((actions actions) (interests interests))
(match actions
['() (await-interrupt #f w interests)]
[(cons a actions)
(match a
[(scn r)
(process-actions actions r)]
[_
(log-warning "run-ground: ignoring useless meta-action ~v" a)
(process-actions actions interests)])]))])))))

View File

@ -0,0 +1,51 @@
#lang racket/base
(require (for-syntax racket/base syntax/kerncase))
(require racket/match)
(require "main.rkt")
(provide (rename-out [module-begin #%module-begin])
(except-out (all-from-out racket/base) #%module-begin)
(all-from-out racket/match)
(all-from-out "main.rkt")
(for-syntax (all-from-out racket/base)))
(define-syntax (module-begin stx)
(unless (eq? (syntax-local-context) 'module-begin)
(raise-syntax-error #f "allowed only around a module body" stx))
(syntax-case stx ()
[(_ forms ...)
(let ()
(define (accumulate-actions action-ids final-forms forms)
(if (null? forms)
(let ((final-stx
#`(#%module-begin #,@(reverse final-forms)
(run-ground #,@(reverse action-ids)))))
;;(pretty-print (syntax->datum final-stx))
final-stx)
(syntax-case (local-expand (car forms)
'module
(kernel-form-identifier-list)) ()
[(head rest ...)
(if (free-identifier=? #'head #'begin)
(accumulate-actions action-ids
final-forms
(append (syntax->list #'(rest ...)) (cdr forms)))
(if (ormap (lambda (i) (free-identifier=? #'head i))
(syntax->list #'(define-values define-syntaxes begin-for-syntax
module module*
#%module-begin
#%require #%provide)))
(accumulate-actions action-ids
(cons (car forms) final-forms)
(cdr forms))
(accumulate-action (car forms) action-ids final-forms (cdr forms))))]
[non-pair-syntax
(accumulate-action (car forms) action-ids final-forms (cdr forms))])))
(define (accumulate-action action action-ids final-forms remaining-forms)
(define temp (car (generate-temporaries (list action))))
(accumulate-actions (cons temp action-ids)
(cons #`(define #,temp #,action) final-forms)
remaining-forms))
(accumulate-actions '() '() (syntax->list #'(forms ...))))]))

View File

@ -0,0 +1,2 @@
#lang s-exp syntax/module-reader
prospect-monolithic/lang

View File

@ -0,0 +1,7 @@
#lang racket/base
(require "core.rkt")
(require "ground.rkt")
(provide (all-from-out "core.rkt")
(all-from-out "ground.rkt"))

104
prospect-monolithic/mux.rkt Normal file
View File

@ -0,0 +1,104 @@
#lang racket/base
;; General multiplexer.
(provide meta-label?
(except-out (struct-out mux) mux)
(rename-out [mux <mux>] [make-mux mux])
mux-add-stream
mux-remove-stream
mux-update-stream
mux-route-message
mux-interests-of
compute-scns
compute-affected-pids
pretty-print-mux)
(require racket/set)
(require racket/match)
(require "../prospect/route.rkt")
(require "scn.rkt")
(require "../prospect/trace.rkt")
(require "../prospect/tset.rkt")
(require "../prospect/pretty.rkt")
;; A PID is a Nat.
;; A Label is a PID or 'meta.
;; Multiplexer private states
(struct mux (next-pid ;; PID
routing-table ;; (Matcherof (Setof Label))
interest-table ;; (HashTable Label Matcher)
)
#:transparent
#:methods gen:prospect-pretty-printable
[(define (prospect-pretty-print m [p (current-output-port)])
(pretty-print-mux m p))])
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (meta-label? x) (eq? x 'meta))
(define (make-mux)
(mux 0 (trie-empty) (hash)))
(define (mux-add-stream m initial-scn)
(define new-pid (mux-next-pid m))
(mux-update-stream (struct-copy mux m [next-pid (+ new-pid 1)])
new-pid
initial-scn))
(define (mux-remove-stream m label)
(mux-update-stream m label (scn (trie-empty))))
(define (mux-update-stream m label new-scn)
(define old-interests (mux-interests-of m label))
(define old-routing-table (mux-routing-table m))
(define new-interests (label-interests (scn-trie new-scn) (datum-tset label)))
(define new-routing-table (trie-union (trie-subtract old-routing-table old-interests)
new-interests))
(define aggregate-assertions (trie-union old-interests new-interests))
(values (struct-copy mux m
[routing-table new-routing-table]
[interest-table (if (trie-empty? new-interests)
(hash-remove (mux-interest-table m) label)
(hash-set (mux-interest-table m) label new-interests))])
label
new-scn ;; unnecessary?
aggregate-assertions))
(define (compute-scns old-m new-m label s aggregate-assertions)
(define old-routing-table (mux-routing-table old-m))
(define new-routing-table (mux-routing-table new-m))
(define affected-pids
(let ((pids (compute-affected-pids old-routing-table aggregate-assertions))) ;; hmm
(tset-remove (tset-add pids label) 'meta))) ;; TODO: removing meta is weird
(values (for/list [(pid (tset->list affected-pids))]
(cons pid (scn (biased-intersection new-routing-table (mux-interests-of new-m pid)))))
(and (not (meta-label? label))
(drop-scn (scn (strip-interests new-routing-table))))))
(define (compute-affected-pids routing-table cover)
(trie-match-trie cover
(trie-step routing-table struct:observe)
#:seed (datum-tset)
#:combiner (lambda (v1 v2 acc) (tset-union v2 acc))
#:left-short (lambda (v r acc)
(tset-union acc (success-value (trie-step r EOS))))))
(define (mux-route-message m body)
(if (trie-lookup (mux-routing-table m) body #f) ;; some other stream has declared body
'()
(tset->list (trie-lookup (mux-routing-table m) (observe body) (datum-tset)))))
(define (mux-interests-of m label)
(hash-ref (mux-interest-table m) label (trie-empty)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (pretty-print-mux m [p (current-output-port)])
(match-define (mux next-pid routing-table interest-table) m)
(fprintf p "MUX:\n")
(fprintf p " - ~a labelled entities with claims\n" (hash-count interest-table))
(fprintf p " - next label: ~a\n" next-pid)
(fprintf p " - routing-table:\n")
(display (indented-port-output 3 (lambda (p) (pretty-print-trie routing-table p))) p)
(newline p))

View File

@ -0,0 +1,67 @@
#lang racket/base
;; State Change Notifications, and assorted protocol constructors
(provide (struct-out scn)
(struct-out observe)
(struct-out at-meta)
(struct-out advertise)
lift-scn
drop-scn
strip-interests
label-interests
strip-scn
label-scn
biased-intersection)
(require racket/set)
(require racket/match)
(require "../prospect/route.rkt")
(require "../prospect/tset.rkt")
(require "../prospect/pretty.rkt")
(module+ test (require rackunit))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; State Change Notifications
(struct scn (trie) #:transparent
#:methods gen:prospect-pretty-printable
[(define (prospect-pretty-print d [p (current-output-port)])
(pretty-print-trie (scn-trie d)))])
;; Claims, Interests, Locations, and Advertisements
(struct observe (claim) #:prefab)
(struct at-meta (claim) #:prefab)
(struct advertise (claim) #:prefab)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define at-meta-proj (compile-projection (at-meta (?!))))
(define (lift-scn s)
(scn (pattern->trie #t (at-meta (embedded-trie (scn-trie s))))))
(define (drop-interests pi)
(trie-project pi at-meta-proj
#:project-success (lambda (v) #t)
#:combiner (lambda (v1 v2) #t)))
(define (drop-scn s)
(scn (drop-interests (scn-trie s))))
(define (strip-interests g)
(trie-relabel g (lambda (v) #t)))
(define (label-interests g label)
(trie-relabel g (lambda (v) label)))
(define (strip-scn s)
(scn (strip-interests (scn-trie s))))
(define (label-scn s label)
(scn (label-interests (scn-trie s) label)))
(define (biased-intersection object subject)
(trie-intersect object
(trie-step subject struct:observe)
#:combiner (lambda (v1 v2) #t)
#:left-short (lambda (v r) (trie-step r EOS))))

View File

@ -0,0 +1,229 @@
#lang racket/base
(provide set-stderr-trace-flags!)
(require racket/set)
(require racket/match)
(require racket/pretty)
(require (only-in racket/string string-join))
(require "../../prospect/exn-util.rkt")
(require "../core.rkt")
(require "../../prospect/trace.rkt")
(require "../mux.rkt")
(require "../../prospect/pretty.rkt")
(define (env-aref varname default alist)
(define key (or (getenv varname) default))
(cond [(assoc key alist) => cadr]
[else (error 'env-aref
"Expected environment variable ~a to contain one of ~v; got ~v"
(map car alist)
key)]))
(define colored-output? (env-aref "MINIMART_COLOR" "true" '(("true" #t) ("false" #f))))
(define flags (set))
(define show-exceptions? #f)
(define show-scn-events? #f)
(define show-message-events? #f)
(define show-boot-events? #f)
(define show-events? #f)
(define show-process-states-pre? #f)
(define show-process-states-post? #f)
(define show-process-lifecycle? #f)
(define show-scn-actions? #f)
(define show-message-actions? #f)
(define show-actions? #f)
(define show-routing-table? #f)
(define network-is-boring? #t)
(define (set-stderr-trace-flags! flags-string)
(set! flags (for/set [(c flags-string)] (string->symbol (string c))))
(define-syntax-rule (set-flag! symbol variable)
(set! variable (set-member? flags 'symbol)))
(set-flag! x show-exceptions?)
(set-flag! r show-scn-events?)
(set-flag! m show-message-events?)
(set-flag! b show-boot-events?)
(set-flag! e show-events?)
(set-flag! s show-process-states-pre?)
(set-flag! t show-process-states-post?)
(set-flag! p show-process-lifecycle?)
(set-flag! R show-scn-actions?)
(set-flag! M show-message-actions?)
(set-flag! a show-actions?)
(set-flag! g show-routing-table?)
(set! network-is-boring? (not (set-member? flags 'N))))
(set-stderr-trace-flags! (or (getenv "MINIMART_TRACE") ""))
(define YELLOW-ON-RED ";1;33;41")
(define WHITE-ON-RED ";1;37;41")
(define WHITE-ON-GREEN ";1;37;42")
(define GREY-ON-RED ";37;41")
(define GREY-ON-GREEN ";37;42")
(define RED ";31")
(define BRIGHT-RED ";1;31")
(define GREEN ";32")
(define BRIGHT-GREEN ";1;32")
(define YELLOW ";33")
(define BLUE ";34")
(define BRIGHT-BLUE ";1;34")
(define NORMAL "")
(define (format-pids pids)
(match pids
['() "ground"]
[(cons 'meta rest) (format "context of ~a" (format-pids rest))]
[_ (string-join (map number->string (reverse pids)) ":")]))
(define (output fmt . args)
(apply fprintf (current-error-port) fmt args))
(define (boring-state? state)
(or (and (network? state) network-is-boring?)
(void? state)))
(define (set-color! c) (when colored-output? (output "\e[0~am" c)))
(define (reset-color!) (when colored-output? (output "\e[0m")))
(define-syntax-rule (with-color c expr ...)
(begin (set-color! c)
(begin0 (begin expr ...)
(reset-color!))))
(define (display-trace)
(define receiver (make-log-receiver trace-logger 'info))
(parameterize ((pretty-print-columns 100))
(let loop ()
(match-define (vector level message-string data event-name) (sync receiver))
(match* (event-name data)
[('process-step (list pids e beh st))
(define pidstr (format-pids pids))
(match e
[#f
(when show-events?
(with-color YELLOW (output "~a is being polled for changes.\n" pidstr)))]
[(scn r)
(when (or show-events? show-scn-events?)
(with-color YELLOW
(output "~a is receiving SCN:\n" pidstr)
(pretty-print-trie r (current-error-port))))]
[(message body)
(when (or show-events? show-message-events?)
(with-color YELLOW
(output "~a is receiving a message:\n" pidstr)
(pretty-write body (current-error-port))))])
(when show-process-states-pre?
(when (not (boring-state? st))
(with-color YELLOW
(output "~a's state just before the event:\n" pidstr)
(prospect-pretty-print st (current-error-port)))))]
[('process-step-result (list pids e beh st exn t))
(define pidstr (format-pids pids))
(define relevant-exn? (and show-exceptions? exn))
(define (exn-and-not b) (and relevant-exn? (not b)))
(match e
[#f
(when (exn-and-not show-events?)
(with-color YELLOW (output "~a was polled for changes.\n" pidstr)))]
['boot
(when (or show-events? show-boot-events?)
(with-color YELLOW (output "~a was booted.\n" pidstr)))]
[(scn r)
(when (exn-and-not (or show-events? show-scn-events?))
(with-color YELLOW
(output "~a received a SCN:\n" pidstr)
(pretty-print-trie r (current-error-port))))]
[(message body)
(when (exn-and-not (or show-events? show-message-events?))
(with-color YELLOW
(output "~a received a message:\n" pidstr)
(pretty-write body (current-error-port))))])
(when (exn-and-not (and show-process-states-pre? (not (boring-state? st))))
(with-color YELLOW
(output "~a's state just before the event:\n" pidstr)
(prospect-pretty-print st (current-error-port))))
(when relevant-exn?
(with-color WHITE-ON-RED
(output "Process ~a ~v died with exception:\n~a\n"
pidstr
beh
(exn->string exn))))
(when (quit? t)
(with-color BRIGHT-RED
(output "Process ~a ~v exited normally.\n" pidstr beh)))
(when (or relevant-exn? show-process-states-post?)
(when (transition? t)
(unless (boring-state? (transition-state t))
(when (not (equal? st (transition-state t)))
(with-color YELLOW
(output "~a's state just after the event:\n" pidstr)
(prospect-pretty-print (transition-state t) (current-error-port)))))))]
[('internal-action (list pids a old-w))
(define pidstr (format-pids pids))
(define oldcount (hash-count (network-behaviors old-w)))
(match a
[(? spawn?)
;; Handle this in internal-action-result
(void)]
['quit
(when (or show-process-lifecycle? show-actions?)
(define interests (mux-interests-of (network-mux old-w) (car pids)))
(with-color BRIGHT-RED
(output "~a exiting (~a total processes remain)\n"
pidstr
(- oldcount 1)))
(unless (trie-empty? interests)
(output "~a's final interests:\n" pidstr)
(pretty-print-trie interests (current-error-port))))]
[(quit-network)
(with-color BRIGHT-RED
(output "Process ~a performed a quit-network.\n" pidstr))]
[(scn r)
(when (or show-actions? show-scn-actions?)
(output "~a performing a SCN:\n" pidstr)
(pretty-print-trie r (current-error-port)))]
[(message body)
(when (or show-actions? show-message-actions?)
(output "~a sending a message:\n" pidstr)
(pretty-write body (current-error-port)))])]
[('internal-action-result (list pids a old-w t))
(when (transition? t)
(define new-w (transition-state t))
(define pidstr (format-pids pids))
(define newcount (hash-count (network-behaviors new-w)))
(match a
[(? spawn?)
(when (or show-process-lifecycle? show-actions?)
(define newpid (mux-next-pid (network-mux old-w)))
(define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid
(define interests (mux-interests-of (network-mux new-w) newpid))
(define behavior (hash-ref (network-behaviors new-w) newpid '#:missing-behavior))
(define state (hash-ref (network-states new-w) newpid '#:missing-state))
(with-color BRIGHT-GREEN
(output "~a ~v spawned from ~a (~a total processes now)\n"
newpidstr
behavior
pidstr
newcount))
(unless (boring-state? state)
(output "~a's initial state:\n" newpidstr)
(prospect-pretty-print state (current-error-port)))
(unless (trie-empty? interests)
(output "~a's initial interests:\n" newpidstr)
(pretty-print-trie interests (current-error-port))))]
[_
;; other cases handled in internal-action
(void)])
(when show-routing-table?
(define old-table (mux-routing-table (network-mux old-w)))
(define new-table (mux-routing-table (network-mux new-w)))
(when (not (equal? old-table new-table))
(with-color BRIGHT-BLUE
(output "~a's routing table:\n" (format-pids (cdr pids)))
(pretty-print-trie new-table (current-error-port))))))])
(loop))))
(void (when (not (set-empty? flags))
(thread display-trace)))