#lang racket/base ;; Core implementation of network actors and Network Calculus (NC) communication API. (require racket/set) (require racket/match) (require racket/list) (require "route.rkt") (require "gestalt.rkt") (require "functional-queue.rkt") (require "trace.rkt") (require "tset.rkt") (provide (struct-out routing-update) (struct-out message) (struct-out quit) (except-out (struct-out spawn) spawn) (rename-out [make-spawn spawn] [spawn ]) (struct-out process) (struct-out transition) (struct-out trigger-guard) (except-out (struct-out world) world) ;; imported from route.rkt: ? wildcard? ?! (struct-out capture) pretty-print-matcher matcher->pretty-string matcher-empty? (rename-out [projection->pattern matcher-projection->pattern] [compile-projection compile-matcher-projection]) sub pub gestalt-accepts? filter-event send feedback spawn-world deliver-event transition-bind sequence-transitions clean-actions routing-implementation) ;; A PID is a number uniquely identifying a Process within a World. ;; Note that PIDs are only meaningful within the context of their ;; World: they are not global Process identifiers. ;; TODO: support +Inf.0 as a level number ;; An Event is a communication from a World to a Process contained ;; within it. One of ;; - (routing-update Gestalt), description of change in the sender's interests/subscriptions ;; - (message Any Nat Boolean), a (multicast, in general) message sent by an actor ;; A message's (feedback?) field is #f when it is a message ;; originating from an advertiser/publisher and terminating with a ;; subscriber, and #t in the opposite case. (struct routing-update (gestalt) #:prefab) (struct message (body meta-level feedback?) #:prefab) ;; An Action is a communication from a Process to its containing ;; World, instructing the World to take some action on the Process's ;; behalf. One of ;; - an Event: change in the Process's interests, or message from the Process ;; - (spawn (Constreeof Action) Process): instruction to spawn a new process as described ;; - (quit): instruction to terminate the sending process (struct quit () #:prefab) (struct spawn (boot-proc process) #:prefab) ;; A PendingEvent is a description of a set of Events to be ;; communicated to a World's Processes. In naïve implementations of ;; NC, there is no distinction between Events and PendingEvents; here, ;; we must ensure that the buffering delay doesn't affect the Gestalts ;; communicated in routing-update Events, so a special record is used ;; to capture the appropriate Gestalt environment. ;; - (pending-routing-update Gestalt Gestalt (Option PID)) (struct pending-routing-update (aggregate affected-subgestalt known-target) #:prefab) ;; A Process (a.k.a. Actor) describes a single actor in a World. ;; - (process Gestalt Behavior Any) ;; The Gestalt describes the current interests of the Process: either ;; those it was spawned with, or the most recent interests from a ;; routing-update Action. (struct process (gestalt behavior state) #:transparent) ;; A World (a.k.a Configuration) is the state of an actor representing ;; a group of communicating Processes. The term is also used from time ;; to time to denote the actor having a World as its state and ;; world-handle-event as its Behavior. (struct world (next-pid ;; PID, for next-spawned process pending-event-queue ;; (Queueof PendingEvent) runnable-pids ;; (Setof PID), non-inert processes partial-gestalt ;; Gestalt, from local processes only; maps to PID full-gestalt ;; Gestalt, union of partial- and downward-gestalts process-table ;; (HashTable PID Process) downward-gestalt ;; Gestalt, representing interests of outside world process-actions ;; (Queueof (Pairof PID Action)) ) #:transparent) ;; A Behavior is a ((Option Event) Any -> Transition): a function ;; mapping an Event (or, in the #f case, a poll signal) and a ;; Process's current state to a Transition. ;; ;; A Transition is either ;; - #f, a signal from a Process that it is inert and need not be ;; scheduled until some Event relevant to it arrives; or, ;; - a (transition Any (Constreeof Action)), a new Process state to ;; be held by its World and a sequence of Actions for the World ;; to take on the transitioning Process's behalf. (struct transition (state actions) #:transparent) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Protocol and utilities ;; sub : Pattern [#:meta-level Nat] [#:level Nat] -> Gestalt ;; pub : Pattern [#:meta-level Nat] [#:level Nat] -> Gestalt ;; ;; Construct atomic Gestalts representing subscriptions/advertisements ;; matching the given pattern, at the given meta-level and level. ;; These are frequently used in combination with gestalt-union when ;; building spawn and routing-update Actions. (define (sub p #:meta-level [ml 0] #:level [l 0]) (simple-gestalt #f p l ml)) (define (pub p #:meta-level [ml 0] #:level [l 0]) (simple-gestalt #t p l ml)) ;; Gestalt Any -> Boolean ;; True iff m falls within the set of messages represented by the Gestalt. (define (gestalt-accepts? g m) (match-define (message b ml f?) m) (not (set-empty? (gestalt-match-value g b ml f?)))) ;; (Option Event) Gestalt -> (Option Event) ;; Returns a filtered version of e, narrowed to the perspective of g-filter. (define (filter-event e g-filter) (match e [#f #f] [(routing-update g) (routing-update (gestalt-filter g g-filter))] [(? message? m) (and (gestalt-accepts? g-filter m) m)])) ;; Behavior Any [Gestalt] -> Action ;; Constructs a spawn Action for a new process with the given behavior ;; and state. If a Gestalt is supplied, the new process will begin its ;; existence with the corresponding subscriptions/advertisements/ ;; conversational-responsibilities. (define (make-spawn #:boot [boot-proc (lambda (state) (transition state '()))] behavior state [gestalt (gestalt-empty)]) (spawn boot-proc (process gestalt behavior state))) ;; send : Any [#:meta-level Nat] -> Action ;; feedback : Any [#:meta-level Nat] -> Action ;; ;; Each constructs an Action that will deliver a body to peers at the ;; given meta-level. (send) constructs messages that will be delivered ;; to subscribers; (feedback), to advertisers. (define (send body #:meta-level [ml 0]) (message body ml #f)) (define (feedback body #:meta-level [ml 0]) (message body ml #t)) ;; Action* -> Action ;; Constructs an action which causes the creation of a new World ;; Process. The given actions will be taken by a primordial process ;; running in the context of the new World. (define (spawn-world . boot-actions) (make-spawn world-handle-event (enqueue-actions (world 0 (make-queue) (set) (gestalt-empty) (gestalt-empty) (hash) (gestalt-empty) (make-queue)) -1 (clean-actions boot-actions)))) ;; Any -> Boolean; type predicates for Event and Action respectively. (define (event? x) (or (routing-update? x) (message? x))) (define (action? x) (or (event? x) (spawn? x) (quit? x))) ;; (Any -> Transition) Transition -> Transition ;; A kind of monad-ish bind operator: threads the state in t0 through ;; k, appending the action sequence from t0 with that from the result ;; of calling k. ;; TODO: sort out exactly how #f should propagate here (define (transition-bind k t0) (match-define (transition state0 actions0) t0) (match (k state0) [#f t0] [(transition state1 actions1) (transition state1 (cons actions0 actions1))])) ;; Transition (Any -> Transition)* -> Transition ;; Each step is a function from state to Transition. The state in t0 ;; is threaded through the steps; the action sequences are appended. (define (sequence-transitions t0 . steps) (foldl transition-bind t0 steps)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Trigger guards ;; TriggerGuards wrap process Behavior and state, only passing through ;; routing-update Events to their contained process behavior/state if ;; there has been a change. All other Events go straight through. ;; ;; - (trigger-guard Gestalt Behavior Any) ;; ;; The structural similarity to Processes is meaningful: a Process ;; describes the current interests of the Process, as well as its ;; behavior. A TriggerGuard describes the current interests of the ;; Process's *environment*, and doesn't bother passing on a ;; routing-update unless the change is non-zero. (struct trigger-guard (gestalt handler state) #:transparent) ;; Behavior :> (Option Event) TriggerGuard -> Transition ;; Inspects the given event: if it is a routing update, the contained ;; Gestalt is compared to the TriggerGuard's record of the previous ;; Gestalt from the environment, and only if it is different is it ;; passed on. (define (trigger-guard-handle e s0) (match-define (trigger-guard old-gestalt handler old-state) s0) (define (deliver s) (match (ensure-transition (handler e old-state)) [#f (if (eq? s s0) #f (transition s '()))] [(transition new-state actions) (transition (struct-copy trigger-guard s [state new-state]) actions)])) (match e [(routing-update new-gestalt) (if (equal? new-gestalt old-gestalt) #f (deliver (struct-copy trigger-guard s0 [gestalt new-gestalt])))] [_ (deliver s0)])) ;; Process -> Process ;; Wraps a Process in a TriggerGuard. (define (trigger-guard-process p) (match-define (process _ b s) p) (struct-copy process p [behavior trigger-guard-handle] [state (trigger-guard #f b s)])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; World implementation ;; Each time a world is handed an event from its environment, it: ;; 1. dispatches PendingEvents ;; a. removing them one-at-a-time from the queue ;; b. converting them to Events and dispatching them to processes ;; c. updating process states and accumulating Actions in the queue ;; d. any process that returned non-#f is considered "non-idle" for step 3. ;; 2. performs Actions ;; a. removing them one-at-a-time from the queue ;; b. interpreting them ;; c. updating World state and accumulating PendingEvents in the queue ;; 3. steps non-idle processes ;; a. runs through the runnable-pids set of processes accumulated in 1d. above ;; b. any process that returned non-#f is put in the "non-idle" set for next time ;; 4. yields updated World state and world Actions to the environment. ;; ;; Note that routing-update Actions are queued as ;; pending-routing-update structures in order to preserve and ;; communicate transient Gestalt states to Processes. In addition, the ;; known-target field of a pending-routing-update structure is used to ;; provide NC's initial Gestalt signal to a newly-spawned process. ;; ;; TODO: should step 3 occur before step 1? ;; World PID (Listof Action) -> World ;; Stores actions taken by PID for later interpretation. (define (enqueue-actions w pid actions) (struct-copy world w [process-actions (queue-append-list (world-process-actions w) (for/list [(a actions)] (cons pid a)))])) ;; World -> Boolean ;; True if the World has no further reductions it can take. ;; ;; The code is written to maintain the runnable-pids set carefully, to ;; ensure we can locally decide whether we're inert or not without ;; having to search the whole deep process tree. (define (inert? w) (and (queue-empty? (world-pending-event-queue w)) (queue-empty? (world-process-actions w)) (set-empty? (world-runnable-pids w)))) ;; Event PID Process World -> World ;; Delivers the event to the process, then applies the resulting ;; transition, updating the world. (define (step-process e pid p w) (apply-transition pid (deliver-event e pid p) w)) ;; Event PID Process -> Transition ;; Delivers the event to the process. (define (deliver-event e pid p) (invoke-process (process-behavior p) e pid p)) ;; (Any -> (Option Transition)) PID Process -> (Option Transition) ;; Calls f in the context of the given process, catching exceptions. (define (invoke-process f e pid p) (define-values (maybe-exn t) (call-in-trace-context pid (lambda () (with-handlers ([(lambda (exn) #t) (lambda (exn) (values exn (transition (process-state p) (list (quit)))))]) (values #f (clean-transition (ensure-transition (with-continuation-mark 'minimart-process pid (f e (process-state p)))))))))) (trace-process-step e pid p maybe-exn t) t) ;; Any -> (Option Transition) ;; If its argument is non-#f, non-transition, raises an exception. (define (ensure-transition v) (if (or (not v) (transition? v)) v (raise (exn:fail:contract (format "Expected transition (or #f); got ~v" v) (current-continuation-marks))))) ;; (Option Transition) -> (Option Transition) ;; Filters and flattens action constree in argument. (define (clean-transition t) (and t (transition (transition-state t) (clean-actions (transition-actions t))))) ;; (Constreeof Any) -> (Listof Action) ;; Filters and flattens its argument to a list of actions. (define (clean-actions actions) (filter action? (flatten actions))) ;; World PID -> World ;; Marks the given PID as not-provably-inert. (define (mark-pid-runnable w pid) (struct-copy world w [runnable-pids (set-add (world-runnable-pids w) pid)])) ;; PID Transition World -> World ;; Examines the given Transition, updating PID's Process's state and ;; enqueueing Actions for later interpretation. When the Transition is ;; non-#f, PID's Process may wish to take further internal reductions, ;; so we mark it as runnable. (define (apply-transition pid t w) (match t [#f w] [(transition new-state new-actions) (let* ((w (transform-process pid w (lambda (p) (struct-copy process p [state new-state]))))) (enqueue-actions (mark-pid-runnable w pid) pid new-actions))])) ;; PendingEvent World -> World ;; Enqueue a PendingEvent for later interpretation and dispatch. (define (enqueue-pending-event e w) (struct-copy world w [pending-event-queue (enqueue (world-pending-event-queue w) e)])) ;; World -> Transition ;; Examines all queued actions, interpreting them, updating World ;; state, and possibly causing the World to send Actions for ;; interpretation to its own containing World in turn. (define (perform-actions w) (for/fold ([t (transition (struct-copy world w [process-actions (make-queue)]) '())]) ((entry (in-list (queue->list (world-process-actions w))))) (match-define (cons pid a) entry) (define t1 (transition-bind (perform-action pid a) t)) (trace-internal-step pid a (transition-state t) t1) t1)) ;; World -> Transition ;; Interprets queued PendingEvents, delivering resulting Events to Processes. (define (dispatch-pending-events w) (transition (for/fold ([w (struct-copy world w [pending-event-queue (make-queue)])]) ((e (in-list (queue->list (world-pending-event-queue w))))) (dispatch-pending-event e w)) '())) ;; PID World (Process -> Process) -> World ;; Extracts a Process by PID, maps fp over it, and stores the result back into the table. (define (transform-process pid w fp) (define pt (world-process-table w)) (match (hash-ref pt pid) [#f w] [p (struct-copy world w [process-table (hash-set pt pid (fp p))])])) ;; World -> World ;; Updates the World's cached copy of the union of its partial- and downward-gestalts. (define (update-full-gestalt w) (define new-full-gestalt (gestalt-union (world-partial-gestalt w) (world-downward-gestalt w))) (struct-copy world w [full-gestalt new-full-gestalt])) ;; World Gestalt (Option PID) -> World ;; Constructs and enqueues a PendingEvent describing a change to the ;; World's gestalt falling within the relevant-gestalt *subset* of it. (define (issue-local-routing-update w relevant-gestalt known-target) (enqueue-pending-event (pending-routing-update (world-full-gestalt w) relevant-gestalt known-target) w)) ;; World Gestalt (Option PID) -> Transition ;; Communicates a change in World's gestalt falling within the ;; relevant-gestalt *subset* of it both to local Processes and to the ;; World's own containing World. (define (issue-routing-update w relevant-gestalt known-target) (transition (issue-local-routing-update w relevant-gestalt known-target) (routing-update (drop-gestalt (world-partial-gestalt w))))) ;; World Gestalt Gestalt (Option PID) -> Transition ;; Communicates a change in the World gestalt corresponding to a ;; change in a single Process's gestalt. The old-gestalt is what the ;; Process used to be interested in; new-gestalt is its new interests. (define (apply-and-issue-routing-update w old-gestalt new-gestalt known-target) (define new-partial (gestalt-union (gestalt-subtract (world-partial-gestalt w) old-gestalt) new-gestalt)) (issue-routing-update (update-full-gestalt (struct-copy world w [partial-gestalt new-partial])) (gestalt-union old-gestalt new-gestalt) known-target)) ;; PID Action -> World -> Transition ;; Interprets a single Action performed by PID, updating World state ;; and possibly causing the World to take externally-visible Actions ;; as a result. (define ((perform-action pid a) w) (match a [(spawn boot-proc new-p) (let* ((new-pid (world-next-pid w)) (initial-t (invoke-process (lambda (e s) (boot-proc s)) '#:boot new-pid new-p)) (initial-actions (if initial-t (transition-actions initial-t) '())) (new-p (if initial-t (struct-copy process new-p [state (transition-state initial-t)]) new-p)) (new-p (trigger-guard-process new-p)) (new-gestalt (label-gestalt (process-gestalt new-p) new-pid)) (new-p (struct-copy process new-p [gestalt new-gestalt])) (w (struct-copy world w [next-pid (+ new-pid 1)] [process-table (hash-set (world-process-table w) new-pid new-p)])) (w (enqueue-actions w new-pid initial-actions))) (apply-and-issue-routing-update w (gestalt-empty) new-gestalt new-pid))] [(quit) (define pt (world-process-table w)) (define p (hash-ref pt pid (lambda () #f))) (if p (let* ((w (struct-copy world w [process-table (hash-remove pt pid)]))) (apply-and-issue-routing-update w (process-gestalt p) (gestalt-empty) #f)) (transition w '()))] [(routing-update gestalt) (define pt (world-process-table w)) (define p (hash-ref pt pid (lambda () #f))) (if p (let* ((old-gestalt (process-gestalt p)) (new-gestalt (label-gestalt gestalt pid)) (new-p (struct-copy process p [gestalt new-gestalt])) (w (struct-copy world w [process-table (hash-set pt pid new-p)]))) (apply-and-issue-routing-update w old-gestalt new-gestalt #f)) (transition w '()))] [(message body meta-level feedback?) (if (zero? meta-level) (transition (enqueue-pending-event a w) '()) (transition w (message body (- meta-level 1) feedback?)))])) ;; PendingEvent World -> World ;; Interprets a PendingEvent, delivering the resulting Event(s) to Processes. (define (dispatch-pending-event e w) (match e [(message body meta-level feedback?) (define pids (gestalt-match-value (world-partial-gestalt w) body meta-level feedback?)) (define pt (world-process-table w)) (for/fold ([w w]) [(pid (in-list (tset->list pids)))] (step-process e pid (hash-ref pt pid) w))] [(pending-routing-update g affected-subgestalt known-target) (define affected-pids (gestalt-match affected-subgestalt g)) (define pt (world-process-table w)) (for/fold ([w w]) [(pid (in-list (tset->list (if known-target (tset-add affected-pids known-target) affected-pids))))] (match (hash-ref pt pid (lambda () #f)) [#f w] [p (step-process (routing-update (gestalt-filter g (process-gestalt p))) pid p w)]))])) ;; World -> Transition ;; Polls the non-provably-inert processes identified by the ;; runnable-pids set (by sending them #f instead of an Event). ;; ;; N.B.: We also effectively compute whether this entire World is ;; inert here. ;; ;; This is roughly the "schedule" rule of the Network Calculus. (define (step-children w) (define runnable-pids (world-runnable-pids w)) (if (set-empty? runnable-pids) #f ;; world is inert. (transition (for/fold ([w (struct-copy world w [runnable-pids (set)])]) [(pid (in-set runnable-pids))] (define p (hash-ref (world-process-table w) pid (lambda () #f))) (if (not p) w (step-process #f pid p w))) '()))) ;; world needs another check to see if more can happen. ;; Behavior :> (Option Event) World -> Transition ;; World's behavior function. Lifts and dispatches an incoming event ;; to contained Processes. (define (world-handle-event e w) (if (or e (not (inert? w))) (sequence-transitions (transition (inject-event e w) '()) dispatch-pending-events perform-actions (lambda (w) (or (step-children w) (transition w '())))) (step-children w))) ;; Event World -> World ;; Translates an event from the World's container into PendingEvents ;; suitable for its own contained Processes. (define (inject-event e w) (match e [#f w] [(routing-update g) (define old-downward (world-downward-gestalt w)) (define new-downward (lift-gestalt (label-gestalt g 'out))) (issue-local-routing-update (update-full-gestalt (struct-copy world w [downward-gestalt new-downward])) (gestalt-union old-downward new-downward) #f)] [(message body meta-level feedback?) (enqueue-pending-event (message body (+ meta-level 1) feedback?) w)])) ;; Symbol ;; Describes the routing implementation, for use in profiling, debugging etc. (define routing-implementation 'fastrouting)