From cf0049633811a2518a70894bdaf3fd1d213dc5e0 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 23 Jan 2016 18:24:07 -0500 Subject: [PATCH] Monolithic semantics. --- Makefile | 2 +- prospect-monolithic/README.md | 4 + prospect-monolithic/core.rkt | 460 ++++++++++++++++++ prospect-monolithic/demand-matcher.rkt | 127 +++++ prospect-monolithic/drivers/tcp.rkt | 183 +++++++ prospect-monolithic/drivers/timer.rkt | 92 ++++ prospect-monolithic/drivers/udp.rkt | 90 ++++ prospect-monolithic/drivers/websocket.rkt | 183 +++++++ prospect-monolithic/examples/.gitignore | 2 + prospect-monolithic/examples/Makefile | 14 + prospect-monolithic/examples/bank-account.rkt | 28 ++ .../examples/box-and-client.rkt | 26 + prospect-monolithic/examples/chat-client.rkt | 27 + .../chat-no-quit-world-no-nesting.rkt | 47 ++ prospect-monolithic/examples/echo.rkt | 24 + prospect-monolithic/examples/example-lang.rkt | 75 +++ .../examples/example-plain.rkt | 72 +++ .../examples/example-quit-world.rkt | 35 ++ prospect-monolithic/examples/mini-echo.rkt | 25 + prospect-monolithic/examples/tcp-hello.rkt | 36 ++ .../examples/udp-hello-plain.rkt | 18 + prospect-monolithic/examples/ws-hello-ssl.rkt | 33 ++ prospect-monolithic/examples/ws-hello.rkt | 32 ++ prospect-monolithic/ground.rkt | 106 ++++ prospect-monolithic/lang.rkt | 51 ++ prospect-monolithic/lang/reader.rkt | 2 + prospect-monolithic/main.rkt | 7 + prospect-monolithic/mux.rkt | 104 ++++ prospect-monolithic/scn.rkt | 67 +++ prospect-monolithic/trace/stderr.rkt | 229 +++++++++ 30 files changed, 2200 insertions(+), 1 deletion(-) create mode 100644 prospect-monolithic/README.md create mode 100644 prospect-monolithic/core.rkt create mode 100644 prospect-monolithic/demand-matcher.rkt create mode 100644 prospect-monolithic/drivers/tcp.rkt create mode 100644 prospect-monolithic/drivers/timer.rkt create mode 100644 prospect-monolithic/drivers/udp.rkt create mode 100644 prospect-monolithic/drivers/websocket.rkt create mode 100644 prospect-monolithic/examples/.gitignore create mode 100644 prospect-monolithic/examples/Makefile create mode 100644 prospect-monolithic/examples/bank-account.rkt create mode 100644 prospect-monolithic/examples/box-and-client.rkt create mode 100644 prospect-monolithic/examples/chat-client.rkt create mode 100644 prospect-monolithic/examples/chat-no-quit-world-no-nesting.rkt create mode 100644 prospect-monolithic/examples/echo.rkt create mode 100644 prospect-monolithic/examples/example-lang.rkt create mode 100644 prospect-monolithic/examples/example-plain.rkt create mode 100644 prospect-monolithic/examples/example-quit-world.rkt create mode 100644 prospect-monolithic/examples/mini-echo.rkt create mode 100644 prospect-monolithic/examples/tcp-hello.rkt create mode 100644 prospect-monolithic/examples/udp-hello-plain.rkt create mode 100644 prospect-monolithic/examples/ws-hello-ssl.rkt create mode 100644 prospect-monolithic/examples/ws-hello.rkt create mode 100644 prospect-monolithic/ground.rkt create mode 100644 prospect-monolithic/lang.rkt create mode 100644 prospect-monolithic/lang/reader.rkt create mode 100644 prospect-monolithic/main.rkt create mode 100644 prospect-monolithic/mux.rkt create mode 100644 prospect-monolithic/scn.rkt create mode 100644 prospect-monolithic/trace/stderr.rkt diff --git a/Makefile b/Makefile index 1dab3d8..e1ec982 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ PACKAGENAME=prospect -COLLECTS=prospect +COLLECTS=prospect prospect-monolithic all: setup diff --git a/prospect-monolithic/README.md b/prospect-monolithic/README.md new file mode 100644 index 0000000..861ef84 --- /dev/null +++ b/prospect-monolithic/README.md @@ -0,0 +1,4 @@ +# prospect-monolithic + +This is an implementation of the monolithic semantics, without any use +of patches. diff --git a/prospect-monolithic/core.rkt b/prospect-monolithic/core.rkt new file mode 100644 index 0000000..863ff72 --- /dev/null +++ b/prospect-monolithic/core.rkt @@ -0,0 +1,460 @@ +#lang racket/base +;; Core implementation of Incremental Network Calculus. + +(provide (struct-out message) + (except-out (struct-out quit) quit) + (struct-out quit-network) + (rename-out [quit ]) + (except-out (struct-out spawn) spawn) + (rename-out [spawn ]) + (struct-out transition) + (struct-out network) + + (struct-out seal) + + (all-from-out "scn.rkt") + + ;; imported from route.rkt: + ? + wildcard? + ?! + (struct-out capture) + pretty-print-trie + trie->pretty-string + trie-non-empty? + trie-empty? + trie-empty + projection->pattern + compile-projection + trie-project + trie-project/set + trie-project/set/single + project-assertions + + event? + action? + + meta-label? + + prepend-at-meta + assertion + subscription + advertisement + + assertion-set-union + assertion-set-union* + scn/union + + (rename-out [make-quit quit]) + make-network + spawn-network + (rename-out [spawn-process spawn]) + spawn/stateless + make-spawn-network + + transition-bind + sequence-transitions + sequence-transitions* + sequence-transitions0 + sequence-transitions0* + + network-handle-event + clean-transition + + pretty-print-network) + +(require racket/set) +(require racket/match) +(require (only-in racket/list flatten)) +(require "../prospect/functional-queue.rkt") +(require "../prospect/route.rkt") +(require "scn.rkt") +(require "../prospect/trace.rkt") +(require "mux.rkt") +(require "../prospect/pretty.rkt") +(module+ test (require rackunit)) + +;; Events = SCNs ∪ Messages +(struct message (body) #:prefab) + +;; Actions ⊃ Events +(struct spawn (boot) #:prefab) +(struct quit-network () #:prefab) ;; NB. An action. Compare (quit), a Transition. + +;; A Behavior is a ((Option Event) Any -> Transition): a function +;; mapping an Event (or, in the #f case, a poll signal) and a +;; Process's current state to a Transition. +;; +;; A Transition is either +;; - #f, a signal from a Process that it is inert and need not be +;; scheduled until some Event relevant to it arrives; or, +;; - a (transition Any (Constreeof Action)), a new Process state to +;; be held by its Network and a sequence of Actions for the Network +;; to take on the transitioning Process's behalf. +;; - a (quit (Option Exn) (Constreeof Action)), signalling that the +;; Process should never again be handed an event, and that any +;; queued actions should be performed, followed by the sequence +;; of Actions given, and then the process should be +;; garbage-collected. The optional Exn is only used for +;; debugging purposes; #f means normal termination. +(struct transition (state actions) #:transparent) +(struct quit (exn actions) #:prefab) + +;; A PID is a Nat. +;; A Label is a PID or 'meta. + +;; VM private states +(struct network (mux ;; Multiplexer + pending-action-queue ;; (Queueof (Cons Label (U Action 'quit))) + runnable-pids ;; (Setof PID) + behaviors ;; (HashTable PID Behavior) + states ;; (HashTable PID Any) + ) + #:transparent + #:methods gen:prospect-pretty-printable + [(define (prospect-pretty-print w [p (current-output-port)]) + (pretty-print-network w p))]) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Seals are used by protocols to prevent the routing tries from +;; examining internal structure of values. + +(struct seal (contents)) ;; NB. Neither transparent nor prefab + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (event? x) (or (scn? x) (message? x))) +(define (action? x) (or (event? x) (spawn? x) (quit-network? x))) + +(define (prepend-at-meta pattern level) + (if (zero? level) + pattern + (at-meta (prepend-at-meta pattern (- level 1))))) + +(define (observe-at-meta pattern level) + (if (zero? level) + (pattern->trie #t (observe pattern)) + (trie-union + (pattern->trie #t (observe (prepend-at-meta pattern level))) + (pattern->trie #t (at-meta (embedded-trie (observe-at-meta pattern (- level 1)))))))) + +(define (assertion pattern #:meta-level [level 0]) + (pattern->trie #t (prepend-at-meta pattern level))) + +(define (subscription pattern #:meta-level [level 0]) + (observe-at-meta pattern level)) + +(define (advertisement pattern #:meta-level [level 0]) + (assertion (advertise pattern) #:meta-level level)) + +(define (assertion-set-union . tries) + (assertion-set-union* tries)) + +(define (assertion-set-union* tries) + (match tries + ['() (trie-empty)] + [(cons t1 rest) + (for/fold [(t1 t1)] [(t2 (in-list rest))] + (trie-union t1 t2 #:combiner (lambda (a b) #t)))])) + +(define (scn/union . tries) + (scn (assertion-set-union* tries))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (general-transition? v) + (or (not v) (transition? v) (quit? v))) + +(define (ensure-transition v) + (if (general-transition? v) + v + (raise (exn:fail:contract (format "Expected transition, quit or #f; got ~v" v) + (current-continuation-marks))))) + +(define (clean-transition t) + (match t + [#f #f] + [(quit exn actions) (quit exn (clean-actions actions))] + [(transition state actions) (transition state (clean-actions actions))])) + +(define (clean-actions actions) + (filter action? (flatten actions))) + +(define (send-event e pid w) + (define behavior (hash-ref (network-behaviors w) pid #f)) + (define old-state (hash-ref (network-states w) pid #f)) + (if (not behavior) + w + (begin + (trace-process-step e pid behavior old-state) + (invoke-process pid + (lambda () (clean-transition (ensure-transition (behavior e old-state)))) + (match-lambda + [#f w] + [(and q (quit exn final-actions)) + (trace-process-step-result e pid behavior old-state exn q) + (enqueue-actions (disable-process pid exn w) pid (append final-actions + (list 'quit)))] + [(and t (transition new-state new-actions)) + (trace-process-step-result e pid behavior old-state #f t) + (enqueue-actions (mark-pid-runnable (update-state w pid new-state) pid) + pid + new-actions)]) + (lambda (exn) + (trace-process-step-result e pid behavior old-state exn #f) + (enqueue-actions (disable-process pid exn w) pid (list 'quit))))))) + +(define (update-state w pid s) + (struct-copy network w [states (hash-set (network-states w) pid s)])) + +(define (disable-process pid exn w) + (when exn + (log-error "Process ~a died with exception:\n~a" + (cons pid (trace-pid-stack)) + (exn->string exn))) + (struct-copy network w + [behaviors (hash-remove (network-behaviors w) pid)] + [states (hash-remove (network-states w) pid)])) + +(define (invoke-process pid thunk k-ok k-exn) + (define-values (ok? result) + (call-in-trace-context + pid + (lambda () + (with-handlers ([(lambda (exn) #t) (lambda (exn) (values #f exn))]) + (values #t (with-continuation-mark 'minimart-process pid (thunk))))))) + (if ok? + (k-ok result) + (k-exn result))) + +(define (mark-pid-runnable w pid) + (struct-copy network w [runnable-pids (set-add (network-runnable-pids w) pid)])) + +(define (enqueue-actions w label actions) + (struct-copy network w + [pending-action-queue + (queue-append-list (network-pending-action-queue w) + (for/list [(a actions)] (cons label a)))])) + +(define (make-quit #:exception [exn #f] . actions) + (quit exn actions)) + +(define-syntax-rule (spawn-process behavior-exp initial-state-exp initial-action-tree-exp) + (spawn (lambda () + (local-require racket/contract) + (list behavior-exp + (transition initial-state-exp initial-action-tree-exp))))) + +(define-syntax-rule (spawn/stateless behavior-exp initial-action-tree-exp) + (spawn-process (stateless-behavior-wrap behavior-exp) + (void) + initial-action-tree-exp)) + +(define ((stateless-behavior-wrap b) e state) + (match (b e) + [#f #f] + [(? quit? q) q] + [actions (transition state actions)])) + +(define-syntax-rule (spawn-network boot-action ...) + (make-spawn-network (lambda () (list boot-action ...)))) + +(define (make-network boot-actions) + (network (mux) + (list->queue (for/list ((a (in-list (clean-actions boot-actions)))) (cons 'meta a))) + (set) + (hash) + (hash))) + +(define (make-spawn-network boot-actions-thunk) + (spawn (lambda () + (list network-handle-event + (transition (make-network (boot-actions-thunk)) '()))))) + +(define (transition-bind k t0) + (match t0 + [#f (error 'transition-bind "Cannot bind from transition #f with continuation ~v" k)] + [(quit _ _) t0] + [(transition state0 actions0) + (match (k state0) + [#f t0] + [(quit exn actions1) (quit exn (cons actions0 actions1))] + [(transition state1 actions1) (transition state1 (cons actions0 actions1))])])) + +(define (sequence-transitions t0 . steps) + (sequence-transitions* t0 steps)) + +(define (sequence-transitions* t0 steps) + (foldl transition-bind t0 steps)) + +(define (sequence-transitions0 state0 . steps) + (sequence-transitions0* state0 steps)) + +(define (sequence-transitions0* state0 steps) + (match steps + ['() #f] + [(cons step rest) + (match (step state0) + [#f (sequence-transitions0* state0 rest)] + [(? quit? q) q] + [(? transition? t) (sequence-transitions* t rest)])])) + +(define (inert? w) + (and (queue-empty? (network-pending-action-queue w)) + (set-empty? (network-runnable-pids w)))) + +(define (network-handle-event e w) + (if (or e (not (inert? w))) + (sequence-transitions (transition w '()) + (inject-event e) + perform-actions + (lambda (w) (or (step-children w) (transition w '())))) + (step-children w))) + +(define ((inject-event e) w) + (transition (match e + [#f w] + [(? scn? s) (enqueue-actions w 'meta (list (lift-scn s)))] + [(message body) (enqueue-actions w 'meta (list (message (at-meta body))))]) + '())) + +(define (perform-actions w) + (for/fold ([wt (transition (struct-copy network w [pending-action-queue (make-queue)]) '())]) + ((entry (in-list (queue->list (network-pending-action-queue w))))) + #:break (quit? wt) ;; TODO: should a quit action be delayed until the end of the turn? + (match-define [cons label a] entry) + (trace-internal-action label a (transition-state wt)) + (define wt1 (transition-bind (perform-action label a) wt)) + (trace-internal-action-result label a (transition-state wt) wt1) + wt1)) + +(define ((perform-action label a) w) + (match a + [(spawn boot) + (invoke-process 'booting + (lambda () + (match (boot) + [(and results (list (? procedure?) (? general-transition?))) + results] + [other + (error 'spawn + "Spawn boot procedure must yield boot spec; received ~v" + other)])) + (lambda (results) + (match-define (list behavior initial-transition) results) + (create-process w behavior initial-transition)) + (lambda (exn) + (log-error "Spawned process in network ~a died with exception:\n~a" + (trace-pid-stack) + (exn->string exn)) + (transition w '())))] + ['quit + (define-values (new-mux _label s aggregate-assertions) + (mux-remove-stream (network-mux w) label)) + ;; behavior & state in w already removed by disable-process + (deliver-scns w new-mux label s aggregate-assertions)] + [(quit-network) + (make-quit)] + [(? scn? s-orig) + (define-values (new-mux _label s aggregate-assertions) + (mux-update-stream (network-mux w) label s-orig)) + (deliver-scns w new-mux label s aggregate-assertions)] + [(and m (message body)) + (when (observe? body) + (log-warning "Stream ~a sent message containing query ~v" + (cons label (trace-pid-stack)) + body)) + (if (and (not (meta-label? label)) ;; it's from a local process, not envt + (at-meta? body)) ;; it relates to envt, not local + (transition w (message (at-meta-claim body))) + (transition (for/fold [(w w)] + [(pid (in-list (mux-route-message (network-mux w) body)))] + (send-event m pid w)) + '()))])) + +(define (create-process w behavior initial-transition) + (if (not initial-transition) + (transition w '()) ;; Uh, ok + (let () + (define-values (postprocess initial-actions) + (match (clean-transition initial-transition) + [(and q (quit exn initial-actions0)) + (values (lambda (w pid) + (trace-process-step-result 'boot pid behavior (void) exn q) + (disable-process pid exn w)) + (append initial-actions0 (list 'quit)))] + [(and t (transition initial-state initial-actions0)) + (values (lambda (w pid) + (trace-process-step-result 'boot pid behavior (void) #f t) + (mark-pid-runnable (update-state w pid initial-state) pid)) + initial-actions0)])) + (define-values (initial-scn remaining-initial-actions) + (match initial-actions + [(cons (? scn? s) rest) (values s rest)] + [other (values (scn (trie-empty)) other)])) + (define-values (new-mux new-pid s aggregate-assertions) + (mux-add-stream (network-mux w) initial-scn)) + (let* ((w (struct-copy network w + [behaviors (hash-set (network-behaviors w) + new-pid + behavior)])) + (w (enqueue-actions (postprocess w new-pid) new-pid remaining-initial-actions))) + (deliver-scns w new-mux new-pid s aggregate-assertions))))) + +(define (deliver-scns w new-mux acting-label s aggregate-assertions) + (define old-mux (network-mux w)) + (define-values (scns meta-action) + (compute-scns old-mux new-mux acting-label s aggregate-assertions)) + (transition (for/fold [(w (struct-copy network w [mux new-mux]))] + [(entry (in-list scns))] + (match-define (cons label (and event (scn new-assertions))) entry) + (if (equal? (biased-intersection (mux-routing-table old-mux) + (mux-interests-of old-mux label)) + new-assertions) + w + (send-event event label w))) + meta-action)) + +(define (step-children w) + (define runnable-pids (network-runnable-pids w)) + (if (set-empty? runnable-pids) + #f ;; network is inert. + (transition (for/fold [(w (struct-copy network w [runnable-pids (set)]))] + [(pid (in-set runnable-pids))] + (send-event #f pid w)) + '()))) + +(define (pretty-print-network w [p (current-output-port)]) + (match-define (network mux qs runnable behaviors states) w) + (fprintf p "NETWORK:\n") + (fprintf p " - ~a queued actions\n" (queue-length qs)) + (fprintf p " - ~a runnable pids ~a\n" (set-count runnable) (set->list runnable)) + (fprintf p " - ~a live processes\n" (hash-count states)) + (fprintf p " - ") + (display (indented-port-output 3 (lambda (p) (prospect-pretty-print mux p)) #:first-line? #f) p) + (for ([pid (set-union (hash-keys (mux-interest-table mux)) (hash-keys states))]) + (fprintf p " ---- process ~a, behavior ~v, STATE:\n" pid (hash-ref behaviors pid #f)) + (define state (hash-ref states pid #f)) + (display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p) + (newline p) + (fprintf p " process ~a, behavior ~v, CLAIMS:\n" pid (hash-ref behaviors pid #f)) + (display (indented-port-output 6 (lambda (p) + (pretty-print-trie (mux-interests-of mux pid) p))) + p) + (newline p))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(module+ test + (require racket/pretty) + + (define (step* w) + (let loop ((w w) (actions '())) + (pretty-print w) + (match (network-handle-event #f w) + [#f (values w #f (flatten actions))] + [(quit exn new-actions) (values w exn (flatten (cons actions new-actions)))] + [(transition new-w new-actions) (loop new-w (cons actions new-actions))]))) + + (step* (make-network '())) + ) diff --git a/prospect-monolithic/demand-matcher.rkt b/prospect-monolithic/demand-matcher.rkt new file mode 100644 index 0000000..c6835bb --- /dev/null +++ b/prospect-monolithic/demand-matcher.rkt @@ -0,0 +1,127 @@ +#lang racket/base +;; A structure (and process!) for matching supply to demand via observation of interests. + +(require racket/set) +(require racket/match) +(require "core.rkt") +(require "drivers/timer.rkt") +(require "../prospect/pretty.rkt") + +(provide (except-out (struct-out demand-matcher) demand-matcher) + (rename-out [make-demand-matcher demand-matcher]) + demand-matcher-update + spawn-demand-matcher) + +;; A DemandMatcher keeps track of demand for services based on some +;; Projection over a Trie, as well as a collection of functions +;; that can be used to increase supply in response to increased +;; demand, or handle a sudden drop in supply for which demand still +;; exists. +(struct demand-matcher (demand-spec ;; CompiledProjection + supply-spec ;; CompiledProjection + increase-handler ;; ChangeHandler + decrease-handler ;; ChangeHandler + current-demand ;; (Setof (Listof Any)) + current-supply) ;; (Setof (Listof Any)) + #:transparent + #:methods gen:prospect-pretty-printable + [(define (prospect-pretty-print s [p (current-output-port)]) + (pretty-print-demand-matcher s p))]) + +;; A ChangeHandler is a ((Constreeof Action) Any* -> (Constreeof Action)). +;; It is called with an accumulator of actions so-far-computed as its +;; first argument, and with a value for each capture in the +;; DemandMatcher's projection as the remaining arguments. + +;; ChangeHandler +;; Default handler of unexpected supply decrease. +(define (default-decrease-handler state . removed-captures) + state) + +(define (make-demand-matcher demand-spec supply-spec increase-handler decrease-handler) + (demand-matcher demand-spec + supply-spec + increase-handler + decrease-handler + (set) + (set))) + +;; DemandMatcher (Constreeof Action) SCN -> (Transition DemandMatcher) +;; Given a SCN from the environment, projects it into supply and +;; demand increase and decrease sets. Calls ChangeHandlers in response +;; to increased unsatisfied demand and decreased demanded supply. +(define (demand-matcher-update d s new-scn) + (match-define (demand-matcher demand-spec supply-spec inc-h dec-h demand supply) d) + (define new-demand (trie-project/set (scn-trie new-scn) demand-spec)) + (define new-supply (trie-project/set (scn-trie new-scn) supply-spec)) + (define added-demand (set-subtract new-demand demand)) + (define removed-demand (set-subtract demand new-demand)) + (define added-supply (set-subtract new-supply supply)) + (define removed-supply (set-subtract supply new-supply)) + + (when (not added-demand) (error 'demand-matcher "Wildcard demand of ~v:\n~a" + demand-spec + (trie->pretty-string (scn-trie new-scn)))) + (when (not added-supply) (error 'demand-matcher "Wildcard supply of ~v:\n~a" + supply-spec + (trie->pretty-string (scn-trie new-scn)))) + + (set! supply (set-union supply added-supply)) + (set! demand (set-subtract demand removed-demand)) + + (for [(captures (in-set removed-supply))] + (when (set-member? demand captures) (set! s (apply dec-h s captures)))) + (for [(captures (in-set added-demand))] + (when (not (set-member? supply captures)) (set! s (apply inc-h s captures)))) + + (set! supply (set-subtract supply removed-supply)) + (set! demand (set-union demand added-demand)) + + (transition (struct-copy demand-matcher d [current-demand demand] [current-supply supply]) s)) + +;; Behavior :> (Option Event) DemandMatcher -> (Transition DemandMatcher) +;; Handles events from the environment. Only cares about routing-updates. +(define (demand-matcher-handle-event e d) + (match e + [(? scn? s) + (demand-matcher-update d '() s)] + [_ #f])) + +;; Any* -> (Constreeof Action) +;; Default handler of unexpected supply decrease. +;; Ignores the situation. +(define (unexpected-supply-decrease . removed-captures) + '()) + +;; Projection Projection (Any* -> (Constreeof Action)) [(Any* -> (Constreeof Action))] -> Action +;; Spawns a demand matcher actor. +(define (spawn-demand-matcher demand-spec + supply-spec + increase-handler + [decrease-handler unexpected-supply-decrease] + #:meta-level [meta-level 0]) + (define d (make-demand-matcher (compile-projection (prepend-at-meta demand-spec meta-level)) + (compile-projection (prepend-at-meta supply-spec meta-level)) + (lambda (acs . rs) (cons (apply increase-handler rs) acs)) + (lambda (acs . rs) (cons (apply decrease-handler rs) acs)))) + (spawn demand-matcher-handle-event + d + (scn/union (subscription (projection->pattern demand-spec) #:meta-level meta-level) + (subscription (projection->pattern supply-spec) #:meta-level meta-level) + (advertisement (projection->pattern supply-spec) #:meta-level meta-level)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (pretty-print-demand-matcher s [p (current-output-port)]) + (match-define (demand-matcher demand-spec + supply-spec + increase-handler + decrease-handler + current-demand + current-supply) + s) + (fprintf p "DEMAND MATCHER:\n") + (fprintf p " - demand-spec: ~v\n" demand-spec) + (fprintf p " - supply-spec: ~v\n" supply-spec) + (fprintf p " - demand: ~v\n" current-demand) + (fprintf p " - supply: ~v\n" current-supply)) diff --git a/prospect-monolithic/drivers/tcp.rkt b/prospect-monolithic/drivers/tcp.rkt new file mode 100644 index 0000000..a253a64 --- /dev/null +++ b/prospect-monolithic/drivers/tcp.rkt @@ -0,0 +1,183 @@ +#lang racket/base + +(require racket/match) +(require (prefix-in tcp: racket/tcp)) +(require (only-in racket/port read-bytes-avail!-evt)) +(require "../../prospect/exn-util.rkt") +(require "../main.rkt") +(require "../demand-matcher.rkt") + +(require racket/unit) +(require net/tcp-sig) +(require net/tcp-unit) + +(provide (struct-out tcp-address) + (struct-out tcp-handle) + (struct-out tcp-listener) + (struct-out tcp-channel) + spawn-tcp-driver) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct tcp-address (host port) #:prefab) +(struct tcp-handle (id) #:prefab) +(struct tcp-listener (port) #:prefab) + +(struct tcp-channel (source destination subpacket) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Ground-level communication messages + +(struct tcp-accepted (remote-addr local-addr cin cout) #:prefab) +;; tcp-channel does double-duty as a ground-level message as well + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Driver + +(define (spawn-tcp-driver) + (list (spawn-demand-matcher (advertise (observe (tcp-channel ? (?! (tcp-listener ?)) ?))) + (advertise (advertise (tcp-channel ? (?! (tcp-listener ?)) ?))) + spawn-tcp-listener) + (spawn-demand-matcher (advertise (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) + (observe (tcp-channel (?! (tcp-handle ?)) (?! (tcp-address ? ?)) ?)) + spawn-tcp-connection))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Listener + +(struct listener-state (control-ch server-addr) #:transparent) + +(define (tcp-listener-thread control-ch listener server-addr) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (tcp:tcp-accept-evt listener) + (lambda (cin+cout) + (match-define (list cin cout) cin+cout) + (define-values (local-hostname local-port remote-hostname remote-port) + (tcp:tcp-addresses cin #t)) + (send-ground-message + (tcp-accepted (tcp-address remote-hostname remote-port) + server-addr + cin + cout)) + (loop blocked?)))))) + (tcp:tcp-close listener)) + +(define (tcp-listener-behavior e state) + (match e + [(scn r) + (define ch (listener-state-control-ch state)) + (cond [(trie-empty? r) (channel-put ch 'quit) (quit)] + [else (channel-put ch 'unblock) #f])] + [(message (at-meta (tcp-accepted remote-addr _ cin cout))) + (transition state (spawn-connection (listener-state-server-addr state) + remote-addr + cin + cout))] + [_ #f])) + +(define (spawn-tcp-listener server-addr) + (match-define (tcp-listener port) server-addr) + (define listener (tcp:tcp-listen port 128 #t)) + (define control-ch (make-channel)) + (thread (lambda () (tcp-listener-thread control-ch listener server-addr))) + (spawn tcp-listener-behavior + (listener-state control-ch server-addr) + (scn/union + (subscription (advertise (observe (tcp-channel ? server-addr ?)))) ;; monitor peer + (advertisement (advertise (tcp-channel ? server-addr ?))) ;; declare we might make connections + (subscription (tcp-accepted ? server-addr ? ?) #:meta-level 1) ;; events from driver thread + ))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Outbound Connection + +(define (spawn-tcp-connection local-addr remote-addr) + (match-define (tcp-address remote-hostname remote-port) remote-addr) + (define-values (cin cout) + (with-handlers ([exn:fail:network? (lambda (e) + ;; TODO: it'd be nice to + ;; somehow communicate the + ;; actual error to the local + ;; peer. + (log-error "~a" (exn->string e)) + (define o (open-output-string)) + (close-output-port o) + (values (open-input-string "") + o))]) + (tcp:tcp-connect remote-hostname remote-port))) + (spawn-connection local-addr remote-addr cin cout)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Connection + +(struct connection-state (control-ch cout) #:transparent) + +(define (read-bytes-avail-evt len input-port) + (guard-evt + (lambda () + (let ([bstr (make-bytes len)]) + (handle-evt + (read-bytes-avail!-evt bstr input-port) + (lambda (v) + (if (number? v) + (if (= v len) bstr (subbytes bstr 0 v)) + v))))))) + +(define (tcp-connection-thread remote-addr local-addr control-ch cin) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt (read-bytes-avail-evt 32768 cin) + (lambda (eof-or-bs) + (send-ground-message (tcp-channel remote-addr local-addr eof-or-bs)) + (loop (or blocked? (eof-object? eof-or-bs)))))))) + (close-input-port cin)) + +(define (shutdown-connection! state) + (match-define (connection-state control-ch cout) state) + (when control-ch (channel-put control-ch 'quit)) + (when cout (close-output-port cout))) + +(define (tcp-connection e state) + (with-handlers [((lambda (exn) #t) + (lambda (exn) + (shutdown-connection! state) + (raise exn)))] + (match e + [(message (at-meta (tcp-channel remote-addr local-addr (? eof-object?)))) + (shutdown-connection! state) + (quit)] + [(message (at-meta (tcp-channel remote-addr local-addr (? bytes? bs)))) + (transition state (message (tcp-channel remote-addr local-addr bs)))] + [(message (tcp-channel _ _ bs)) + (write-bytes bs (connection-state-cout state)) + (flush-output (connection-state-cout state)) + #f] + [(scn r) + (define ch (connection-state-control-ch state)) + (cond [(trie-empty? r) (shutdown-connection! state) (quit)] + [else (channel-put ch 'unblock) #f])] + [#f #f]))) + +(define (spawn-connection local-addr remote-addr cin cout) + (define control-ch (make-channel)) + (thread (lambda () (tcp-connection-thread remote-addr local-addr control-ch cin))) + (spawn tcp-connection + (connection-state control-ch cout) + (scn/union + (subscription (observe (tcp-channel remote-addr local-addr ?))) ;; monitor peer + (advertisement (tcp-channel remote-addr local-addr ?)) ;; may send segments to peer + (subscription (tcp-channel local-addr remote-addr ?)) ;; want segments from peer + (subscription (tcp-channel remote-addr local-addr ?) #:meta-level 1) ;; segments from driver thread + ))) diff --git a/prospect-monolithic/drivers/timer.rkt b/prospect-monolithic/drivers/timer.rkt new file mode 100644 index 0000000..2be29f2 --- /dev/null +++ b/prospect-monolithic/drivers/timer.rkt @@ -0,0 +1,92 @@ +#lang racket/base +;; Timer driver. + +;; Uses mutable state internally, but because the scope of the +;; mutation is limited to each timer process alone, it's easy to show +;; correct linear use of the various pointers. + +(require racket/set) +(require racket/match) +(require data/heap) +(require "../main.rkt") + +(struct pending-timer (deadline label) #:transparent) + +(provide (struct-out set-timer) + (struct-out timer-expired) + spawn-timer-driver) + +(struct set-timer (label msecs kind) #:prefab) +(struct timer-expired (label msecs) #:prefab) + +(define (spawn-timer-driver) + (define control-ch (make-channel)) + (thread (lambda () (timer-driver-thread-main control-ch))) + (define (timer-driver e count) + (match e + [(message (at-meta (and expiry (timer-expired _ _)))) + (transition (- count 1) + (list (message expiry) + (when (= count 1) + (scn/union (subscription (set-timer ? ? ?)) + (advertisement (timer-expired ? ?))))))] + [(message (and instruction (set-timer _ _ _))) + (channel-put control-ch instruction) + (transition (+ count 1) + (when (= count 0) + (scn/union (subscription (set-timer ? ? ?)) + (advertisement (timer-expired ? ?)) + (subscription (timer-expired ? ?) #:meta-level 1))))] + [_ #f])) + (spawn timer-driver + 0 ;; initial count + (scn/union (subscription (set-timer ? ? ?)) + (advertisement (timer-expired ? ?))))) + +(define (timer-driver-thread-main control-ch) + (define heap (make-timer-heap)) + (let loop () + (sync (match (next-timer heap) + [#f never-evt] + [t (handle-evt (timer-evt (pending-timer-deadline t)) + (lambda (now) + (for-each send-ground-message (fire-timers! heap now)) + (loop)))]) + (handle-evt control-ch + (match-lambda + [(set-timer label msecs 'relative) + (install-timer! heap label (+ (current-inexact-milliseconds) msecs)) + (loop)] + [(set-timer label msecs 'absolute) + (install-timer! heap label msecs) + (loop)] + ['quit (void)]))))) + +(define (make-timer-heap) + (make-heap (lambda (t1 t2) (<= (pending-timer-deadline t1) (pending-timer-deadline t2))))) + +(define (next-timer heap) + (and (positive? (heap-count heap)) + (heap-min heap))) + +(define (fire-timers! heap now) + (if (zero? (heap-count heap)) + '() + (let ((m (heap-min heap))) + (if (<= (pending-timer-deadline m) now) + (begin (heap-remove-min! heap) + (cons (timer-expired (pending-timer-label m) now) + (fire-timers! heap now))) + '())))) + +(define (install-timer! heap label deadline) + (define now (current-inexact-milliseconds)) + (heap-add! heap (pending-timer deadline label))) + +;; Racket's alarm-evt is almost the right design for timeouts: its +;; synchronisation value should be the (or some) value of the clock +;; after the asked-for time. That way it serves as timeout and +;; clock-reader in one. +(define (timer-evt msecs) + (handle-evt (alarm-evt msecs) + (lambda (_) (current-inexact-milliseconds)))) diff --git a/prospect-monolithic/drivers/udp.rkt b/prospect-monolithic/drivers/udp.rkt new file mode 100644 index 0000000..b7633ea --- /dev/null +++ b/prospect-monolithic/drivers/udp.rkt @@ -0,0 +1,90 @@ +#lang racket/base + +(require racket/match) +(require (prefix-in udp: racket/udp)) +(require "../main.rkt") +(require "../demand-matcher.rkt") + +(provide (struct-out udp-remote-address) + (struct-out udp-handle) + (struct-out udp-listener) + udp-address? + udp-local-address? + (struct-out udp-packet) + spawn-udp-driver) + +;; A UdpAddress is one of +;; -- a (udp-address String Uint16), representing a remote socket +;; -- a (udp-handle Any), representing a local socket on a kernel-assigned port +;; -- a (udp-listener Uint16), representing a local socket on a user-assigned port +;; Note that udp-handle-ids must be chosen carefully: they are scoped +;; to the local VM, i.e. shared between processes in that VM, so +;; processes must make sure not to accidentally clash in handle ID +;; selection. +(struct udp-remote-address (host port) #:prefab) +(struct udp-handle (id) #:prefab) +(struct udp-listener (port) #:prefab) + +(define (udp-address? x) + (or (udp-remote-address? x) + (udp-local-address? x))) + +(define (udp-local-address? x) + (or (udp-handle? x) + (udp-listener? x))) + +;; A UdpPacket is a (udp-packet UdpAddress UdpAddress Bytes), and +;; represents a packet appearing on our local "subnet" of the full UDP +;; network, complete with source, destination and contents. +(struct udp-packet (source destination body) #:prefab) + +;; -> Action +;; Spawns a process acting as a UDP socket factory. +(define (spawn-udp-driver) + (spawn-demand-matcher (observe (udp-packet ? (?! (udp-listener ?)) ?)) + (advertise (udp-packet ? (?! (udp-listener ?)) ?)) + spawn-udp-socket)) + +;; UdpLocalAddress -> Action +(define (spawn-udp-socket local-addr) + (define socket (udp:udp-open-socket #f #f)) + + (match local-addr + [(udp-listener port) (udp:udp-bind! socket #f port)] + [(udp-handle _) (udp:udp-bind! socket #f 0)]) ;; kernel-allocated port number + + (define control-ch (make-channel)) + (thread (lambda () (udp-receiver-thread local-addr socket control-ch))) + + (spawn (lambda (e s) + (match e + [(scn r) + (cond [(trie-non-empty? r) #f] ;; peer hasn't quit yet: do nothing. + [else (channel-put control-ch 'quit) + (quit)])] + [(message (at-meta (? udp-packet? p))) + (transition s (message p))] + [(message (udp-packet _ (udp-remote-address host port) body)) + (udp:udp-send-to socket host port body) + #f] + [_ #f])) + (void) + (scn/union (subscription (udp-packet ? local-addr ?) #:meta-level 1) + (subscription (udp-packet local-addr (udp-remote-address ? ?) ?)) + (advertisement (udp-packet (udp-remote-address ? ?) local-addr ?)) + (subscription (observe (udp-packet (udp-remote-address ? ?) local-addr ?)))))) + +;; UdpLocalAddress UdpSocket Channel -> Void +(define (udp-receiver-thread local-addr socket control-ch) + (define buffer (make-bytes 65536)) + (let loop () + (sync (handle-evt control-ch (match-lambda ['quit (void)])) + (handle-evt (udp:udp-receive!-evt socket buffer) + (lambda (receive-results) + (match-define (list len source-hostname source-port) receive-results) + (send-ground-message + (udp-packet (udp-remote-address source-hostname source-port) + local-addr + (subbytes buffer 0 len))) + (loop))))) + (udp:udp-close socket)) diff --git a/prospect-monolithic/drivers/websocket.rkt b/prospect-monolithic/drivers/websocket.rkt new file mode 100644 index 0000000..63538c4 --- /dev/null +++ b/prospect-monolithic/drivers/websocket.rkt @@ -0,0 +1,183 @@ +#lang racket/base + +(require racket/match) +(require net/rfc6455) +(require (only-in net/rfc6455/conn-api ws-conn-base-ip)) +(require "../main.rkt") +(require "../demand-matcher.rkt") + +(require racket/unit) +(require net/tcp-sig) +(require net/tcp-unit) +(require net/ssl-tcp-unit) +(require net/url) + +(provide (struct-out websocket-remote-client) + (struct-out websocket-local-server) + (struct-out websocket-local-client) + (struct-out websocket-remote-server) + (struct-out websocket-ssl-options) + (struct-out websocket-message) + spawn-websocket-driver) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Protocol messages + +(struct websocket-remote-client (id) #:prefab) +(struct websocket-local-server (port ssl-options) #:prefab) +(struct websocket-local-client (id) #:prefab) +(struct websocket-remote-server (url) #:prefab) +(struct websocket-ssl-options (cert-file key-file) #:prefab) +(struct websocket-message (from to body) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Ground-level communication messages + +(struct websocket-connection (id local-addr remote-addr connection control-ch) #:prefab) +(struct websocket-incoming-message (id message) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Driver + +(define (spawn-websocket-driver) + (define inbound-listener-message-pat (websocket-message ? (?! (websocket-local-server ? ?)) ?)) + (define outbound-conn-message-pat (websocket-message (?! (websocket-local-client ?)) + (?! (websocket-remote-server ?)) + ?)) + (list (spawn-demand-matcher (advertise (observe inbound-listener-message-pat)) + (advertise (advertise inbound-listener-message-pat)) + spawn-websocket-listener) + (spawn-demand-matcher (advertise outbound-conn-message-pat) + (observe outbound-conn-message-pat) + spawn-websocket-connection))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Listener + +(struct listener-state (shutdown-procedure server-addr) #:transparent) + +(define (websocket-listener e state) + (match e + [(scn (? trie-empty?)) + ((listener-state-shutdown-procedure state)) + (quit)] + [(message (at-meta (websocket-connection id local-addr remote-addr c control-ch))) + (transition state (spawn-connection local-addr remote-addr id c control-ch))] + [_ #f])) + +(define ((connection-handler server-addr) c dummy-state) + (define control-ch (make-channel)) + (define id (gensym 'ws)) + (send-ground-message + (websocket-connection id server-addr (websocket-remote-client id) c control-ch)) + (connection-thread-loop control-ch c id)) + +(define (connection-thread-loop control-ch c id) + (define c-input-port (ws-conn-base-ip c)) + (let loop ((blocked? #t)) + (sync (handle-evt control-ch + (match-lambda + ['unblock (loop #f)] + ['quit (void)])) + (if blocked? + never-evt + (handle-evt c-input-port + (lambda (dummy) + (define msg + (with-handlers ([exn:fail:network? (lambda (e) eof)]) + (ws-recv c #:payload-type 'text))) + (send-ground-message (websocket-incoming-message id msg)) + (loop (or blocked? (eof-object? msg)))))))) + (ws-close! c)) + +(define (ssl-options->ssl-tcp@ ssl-options) + (match-define (websocket-ssl-options cert-file key-file) ssl-options) + (define-unit-binding ssl-tcp@ + (make-ssl-tcp@ cert-file key-file #f #f #f #f #f) + (import) + (export tcp^)) + ssl-tcp@) + +(define (spawn-websocket-listener server-addr) + (match-define (websocket-local-server port ssl-options) server-addr) + (define shutdown-procedure (ws-serve #:port port + #:tcp@ (if ssl-options + (ssl-options->ssl-tcp@ ssl-options) + tcp@) + (connection-handler server-addr))) + (spawn websocket-listener + (listener-state shutdown-procedure server-addr) + (scn/union + (subscription (advertise (observe (websocket-message ? server-addr ?)))) ;; monitor peer + (advertisement (advertise (websocket-message ? server-addr ?))) ;; declare we might make connections + (subscription (websocket-connection ? server-addr ? ? ?) #:meta-level 1) ;; events from driver thd + ))) + +(define (spawn-websocket-connection local-addr remote-addr) + (match-define (websocket-remote-server url) remote-addr) + (define id (gensym 'ws)) + (define control-ch (make-channel)) + (thread + (lambda () + (log-info "Connecting to ~a ~a" url (current-inexact-milliseconds)) + (define c (with-handlers [(exn? values)] (ws-connect (string->url url)))) + (log-info "Connected to ~a ~a" url (current-inexact-milliseconds)) + (send-ground-message + (websocket-connection id local-addr remote-addr c control-ch)) + (when (not (exn? c)) + (connection-thread-loop control-ch c id)))) + (spawn (lambda (e buffered-messages-rev) + (match e + [(message (at-meta (websocket-connection _ _ _ c _))) + (when (not (exn? c)) + (for [(m (reverse buffered-messages-rev))] (ws-send! c m)) + (spawn-connection local-addr remote-addr id c control-ch)) + (quit)] + [(message (websocket-message _ _ m)) + (transition (cons m buffered-messages-rev) '())] + [_ #f])) + '() + (scn/union + (subscription (websocket-connection id local-addr remote-addr ? control-ch) #:meta-level 1) + (subscription (websocket-message local-addr remote-addr ?))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Connection + +(struct connection-state (local-addr remote-addr c control-ch) #:transparent) + +(define (shutdown-connection! state) + (channel-put (connection-state-control-ch state) 'quit) + (quit)) + +(define (websocket-connection-behaviour e state) + (with-handlers [((lambda (exn) #t) + (lambda (exn) + (shutdown-connection! state) + (raise exn)))] + (match e + [(message (at-meta (websocket-incoming-message _ (? eof-object?)))) + (shutdown-connection! state)] + [(message (at-meta (websocket-incoming-message _ (? bytes? bs)))) + (transition state (message (websocket-message (connection-state-remote-addr state) + (connection-state-local-addr state) + bs)))] + [(message (websocket-message _ _ m)) + (ws-send! (connection-state-c state) m) + #f] + [(scn (? trie-empty?)) + (shutdown-connection! state)] + [(scn _) + (channel-put (connection-state-control-ch state) 'unblock) + #f] + [_ #f]))) + +(define (spawn-connection local-addr remote-addr id c control-ch) + (spawn websocket-connection-behaviour + (connection-state local-addr remote-addr c control-ch) + (scn/union + (subscription (observe (websocket-message remote-addr local-addr ?))) ;; monitor peer + (advertisement (websocket-message remote-addr local-addr ?)) ;; may send messages to peer + (subscription (websocket-message local-addr remote-addr ?)) ;; want segments from peer + (subscription (websocket-incoming-message id ?) #:meta-level 1) ;; segments from driver thd + ))) diff --git a/prospect-monolithic/examples/.gitignore b/prospect-monolithic/examples/.gitignore new file mode 100644 index 0000000..07f54a0 --- /dev/null +++ b/prospect-monolithic/examples/.gitignore @@ -0,0 +1,2 @@ +private-key.pem +server-cert.pem diff --git a/prospect-monolithic/examples/Makefile b/prospect-monolithic/examples/Makefile new file mode 100644 index 0000000..8721ad7 --- /dev/null +++ b/prospect-monolithic/examples/Makefile @@ -0,0 +1,14 @@ +keys: private-key.pem server-cert.pem + +private-key.pem: + openssl genrsa -des3 -passout pass:a -out $@ 1024 + openssl rsa -passin pass:a -in $@ -out $@ + +server-cert.pem: private-key.pem + openssl req -new -x509 -nodes -sha1 -days 365 \ + -subj /CN=example.racket-rfc6455.leastfixedpoint.com \ + -passin pass:a \ + -key private-key.pem > $@ + +clean-keys: + rm -f private-key.pem server-cert.pem diff --git a/prospect-monolithic/examples/bank-account.rkt b/prospect-monolithic/examples/bank-account.rkt new file mode 100644 index 0000000..339274f --- /dev/null +++ b/prospect-monolithic/examples/bank-account.rkt @@ -0,0 +1,28 @@ +#lang prospect-monolithic +;; Hello-worldish "bank account" example. + +(struct account (balance) #:prefab) +(struct deposit (amount) #:prefab) + +(define (manager e balance) + (match e + [(message (deposit amount)) + (transition (+ balance amount) + (scn (assertion (account (+ balance amount)))))] + [_ #f])) + +(define (observer e _) + (when (scn? e) + (for [(balance (project-assertions (scn-trie e) (account (?!))))] + (printf "Balance changed to ~a\n" balance))) + #f) + +(define (updater e _) + (if (and (scn? e) (trie-non-empty? (scn-trie e))) + (quit (list (message (deposit +100)) + (message (deposit -30)))) + #f)) + +(spawn manager 0 (scn/union (assertion (observe (deposit ?))) (assertion (account 0)))) +(spawn observer (void) (scn (assertion (observe (account ?))))) +(spawn updater (void) (scn (assertion (observe (observe (deposit ?)))))) diff --git a/prospect-monolithic/examples/box-and-client.rkt b/prospect-monolithic/examples/box-and-client.rkt new file mode 100644 index 0000000..1c08fe4 --- /dev/null +++ b/prospect-monolithic/examples/box-and-client.rkt @@ -0,0 +1,26 @@ +#lang prospect-monolithic +;; Simple mutable box and count-to-infinity box client. + +(struct set-box (new-value) #:transparent) +(struct box-state (value) #:transparent) + +(spawn (lambda (e current-value) + (match e + [(message (set-box new-value)) + (log-info "box: taking on new-value ~v" new-value) + (transition new-value (scn/union (subscription (set-box ?)) + (assertion (box-state new-value))))] + [_ #f])) + 0 + (scn/union (subscription (set-box ?)) + (assertion (box-state 0)))) + +(spawn (lambda (e s) + (match e + [(scn assertions) + (transition s (for/list [(v (project-assertions assertions (box-state (?!))))] + (log-info "client: learned that box's value is now ~v" v) + (message (set-box (+ v 1)))))] + [_ #f])) + (void) + (scn (subscription (box-state ?)))) diff --git a/prospect-monolithic/examples/chat-client.rkt b/prospect-monolithic/examples/chat-client.rkt new file mode 100644 index 0000000..160dfeb --- /dev/null +++ b/prospect-monolithic/examples/chat-client.rkt @@ -0,0 +1,27 @@ +#lang prospect-monolithic + +(require (only-in racket/port read-bytes-line-evt)) +(require "../drivers/tcp.rkt") + +(define local-handle (tcp-handle 'chat)) +(define remote-handle (tcp-address "localhost" 5999)) + +(spawn-tcp-driver) +(spawn/stateless (lambda (e) + (match e + [(scn (? trie-empty?)) (quit)] + [(message (at-meta (external-event _ (list (? eof-object?))))) + (quit)] + [(message (at-meta (external-event _ (list line)))) + (message (tcp-channel local-handle remote-handle line))] + [(message (tcp-channel _ _ bs)) + (write-bytes bs) + (flush-output) + #f] + [_ #f])) + (scn/union + (subscription (external-event (read-bytes-line-evt (current-input-port) 'any) ?) + #:meta-level 1) + (subscription (tcp-channel remote-handle local-handle ?)) + (subscription (advertise (tcp-channel remote-handle local-handle ?))) + (advertisement (tcp-channel local-handle remote-handle ?)))) diff --git a/prospect-monolithic/examples/chat-no-quit-world-no-nesting.rkt b/prospect-monolithic/examples/chat-no-quit-world-no-nesting.rkt new file mode 100644 index 0000000..fc99390 --- /dev/null +++ b/prospect-monolithic/examples/chat-no-quit-world-no-nesting.rkt @@ -0,0 +1,47 @@ +#lang prospect-monolithic + +(require racket/set) +(require (only-in racket/string string-trim)) +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") + +(define (spawn-session them us) + (define user (gensym 'user)) + (define remote-detector (compile-projection (advertise (?! (tcp-channel ? ? ?))))) + (define peer-detector (compile-projection (advertise `(,(?!) says ,?)))) + (define (send-to-remote fmt . vs) + (message (tcp-channel us them (string->bytes/utf-8 (apply format fmt vs))))) + (define (say who fmt . vs) + (unless (equal? who user) (send-to-remote "~a ~a\n" who (apply format fmt vs)))) + (list (send-to-remote "Welcome, ~a.\n" user) + (spawn + (lambda (e peers) + (match e + [(message (tcp-channel _ _ bs)) + (transition peers (message `(,user says ,(string-trim (bytes->string/utf-8 bs)))))] + [(message `(,who says ,what)) + (transition peers (say who "says: ~a" what))] + [(scn assertions) + (if (trie-empty? (trie-project assertions remote-detector)) + (quit (send-to-remote "Goodbye!\n")) + (let ((new-peers (trie-project/set/single assertions peer-detector))) + (define arrived (set-subtract new-peers peers)) + (define departed (set-subtract peers new-peers)) + (transition new-peers + (list (for/list [(who arrived)] (say who "arrived.")) + (for/list [(who departed)] (say who "departed."))))))] + [#f #f])) + (set) + (scn/union + (subscription `(,? says ,?)) ;; read actual chat messages + (subscription (advertise `(,? says ,?))) ;; observe peer presence + (advertisement `(,user says ,?)) ;; advertise our presence + (subscription (tcp-channel them us ?)) ;; read from remote client + (subscription (advertise (tcp-channel them us ?))) ;; monitor remote client + (advertisement (tcp-channel us them ?)) ;; we will write to remote client + )))) + +(spawn-tcp-driver) +(spawn-demand-matcher (advertise (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) + (observe (tcp-channel (?!) (?! (tcp-listener 5999)) ?)) + spawn-session) diff --git a/prospect-monolithic/examples/echo.rkt b/prospect-monolithic/examples/echo.rkt new file mode 100644 index 0000000..32b464a --- /dev/null +++ b/prospect-monolithic/examples/echo.rkt @@ -0,0 +1,24 @@ +#lang prospect-monolithic + +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") + +(define server-id (tcp-listener 5999)) + +(spawn-tcp-driver) +(spawn-demand-matcher (advertise (tcp-channel (?!) server-id ?)) + (observe (tcp-channel (?!) server-id ?)) + (lambda (c) + (printf "Accepted connection from ~v\n" c) + (spawn (lambda (e state) + (match e + [(scn (? trie-empty?)) + (printf "Closed connection ~v\n" c) + (quit)] + [(message (tcp-channel src dst bs)) + (transition state (message (tcp-channel dst src bs)))] + [_ #f])) + (void) + (scn/union (subscription (advertise (tcp-channel c server-id ?))) + (subscription (tcp-channel c server-id ?)) + (advertisement (tcp-channel server-id c ?)))))) diff --git a/prospect-monolithic/examples/example-lang.rkt b/prospect-monolithic/examples/example-lang.rkt new file mode 100644 index 0000000..8c4c4e8 --- /dev/null +++ b/prospect-monolithic/examples/example-lang.rkt @@ -0,0 +1,75 @@ +#lang prospect-monolithic + +(require (only-in racket/port read-line-evt)) +(require "../drivers/timer.rkt") + +(define (quasi-spy e s) + (printf "----------------------------------------\n") + (printf "QUASI-SPY:\n") + (match e + [(scn r) (pretty-print-trie r)] + [other + (write other) + (newline)]) + (printf "========================================\n") + #f) +(spawn quasi-spy (void) (scn (subscription ?))) + +(define (r e s) + (match e + [(message body) (transition s (message (at-meta `(print (got ,body)))))] + [_ #f])) + +(define (b e n) + (match e + [#f (if (< n 10) + (transition (+ n 1) (message `(hello ,n))) + #f)] + [_ #f])) + +(spawn-network (spawn r (void) (scn (subscription ?))) + (spawn b 0 '())) + +(define (echoer e s) + (match e + [(message (at-meta (external-event _ (list (? eof-object?))))) + (quit)] + [(message (at-meta (external-event _ (list line)))) + (transition s (message `(print (got-line ,line))))] + [_ #f])) + +(spawn echoer + (void) + (scn (subscription (external-event (read-line-evt (current-input-port) 'any) ?) + #:meta-level 1))) + +(define (ticker e s) + (match e + [(scn r) + (printf "TICKER SCN RECEIVED:\n") + (pretty-print-trie r) + #f] + [(message (timer-expired 'tick now)) + (printf "TICK ~v\n" now) + (if (< s 3) + (transition (+ s 1) (message (set-timer 'tick 1000 'relative))) + (quit))] + [_ #f])) + +(spawn-timer-driver) +(message (set-timer 'tick 1000 'relative)) +(spawn ticker + 1 + (scn/union (subscription (observe (set-timer ? ? ?))) + (subscription (timer-expired 'tick ?)))) + +(define (printer e s) + (match e + [(message (list 'print v)) + (log-info "PRINTER: ~a" v) + #f] + [_ #f])) + +(spawn printer + (void) + (scn (subscription `(print ,?)))) diff --git a/prospect-monolithic/examples/example-plain.rkt b/prospect-monolithic/examples/example-plain.rkt new file mode 100644 index 0000000..ceb04cf --- /dev/null +++ b/prospect-monolithic/examples/example-plain.rkt @@ -0,0 +1,72 @@ +#lang racket/base + +(require racket/match) +(require (only-in racket/port read-line-evt)) +(require "../main.rkt") +(require "../drivers/timer.rkt") + +(define (quasi-spy e s) + (printf "----------------------------------------\n") + (printf "QUASI-SPY:\n") + (match e + [(scn r) (pretty-print-trie r)] + [other + (write other) + (newline)]) + (printf "========================================\n") + #f) + +(define (r e s) + (match e + [(message body) (transition s (message (at-meta `(print (got ,body)))))] + [_ #f])) + +(define (b e n) + (match e + [#f (if (< n 10) + (transition (+ n 1) (message `(hello ,n))) + #f)] + [_ #f])) + +(define (echoer e s) + (match e + [(message (at-meta (external-event _ (list (? eof-object?))))) + (quit)] + [(message (at-meta (external-event _ (list line)))) + (transition s (message `(print (got-line ,line))))] + [_ #f])) + +(define (ticker e s) + (match e + [(scn r) + (printf "TICKER SCN RECEIVED:\n") + (pretty-print-trie r) + #f] + [(message (timer-expired 'tick now)) + (printf "TICK ~v\n" now) + (if (< s 3) + (transition (+ s 1) (message (set-timer 'tick 1000 'relative))) + (quit))] + [_ #f])) + +(define (printer e s) + (match e + [(message (list 'print v)) + (log-info "PRINTER: ~a" v) + #f] + [_ #f])) + +(run-ground (spawn quasi-spy (void) (scn (subscription ?))) + (spawn-timer-driver) + (message (set-timer 'tick 1000 'relative)) + (spawn ticker + 1 + (scn/union (subscription (observe (set-timer ? ? ?))) + (subscription (timer-expired 'tick ?)))) + (spawn-network (spawn r (void) (scn (subscription ?))) + (spawn b 0 '())) + (spawn echoer + (void) + (scn (subscription (external-event (read-line-evt (current-input-port) 'any) ?) + #:meta-level 1))) + (spawn printer (void) (scn (subscription `(print ,?))))) diff --git a/prospect-monolithic/examples/example-quit-world.rkt b/prospect-monolithic/examples/example-quit-world.rkt new file mode 100644 index 0000000..e216916 --- /dev/null +++ b/prospect-monolithic/examples/example-quit-world.rkt @@ -0,0 +1,35 @@ +#lang prospect-monolithic +;; Demonstrates quit-network. + +(require (only-in racket/port read-bytes-line-evt)) + +(define (spawn-command-listener) + (spawn (lambda (e s) + (match e + [(message (at-meta (at-meta (external-event _ (list #"quit"))))) + (printf "Quitting just the leaf actor.\n") + (quit)] + [(message (at-meta (at-meta (external-event _ (list #"quit-network"))))) + (printf "Terminating the whole network.\n") + (transition s (quit-network))] + [_ #f])) + (void) + (scn (subscription (external-event (read-bytes-line-evt (current-input-port) 'any) ?) + #:meta-level 2)))) + +(define (spawn-ticker) + (define (sub-to-alarm) + (scn (subscription (external-event (alarm-evt (+ (current-inexact-milliseconds) 1000)) ?) + #:meta-level 2))) + (spawn (lambda (e s) + (match e + [(message (at-meta (at-meta (external-event _ _)))) + (printf "Tick!\n") + (transition s (sub-to-alarm))] + [_ #f])) + (void) + (sub-to-alarm))) + +(printf "Type 'quit' or 'quit-network'.\n") +(spawn-network (spawn-command-listener) + (spawn-ticker)) diff --git a/prospect-monolithic/examples/mini-echo.rkt b/prospect-monolithic/examples/mini-echo.rkt new file mode 100644 index 0000000..30f9010 --- /dev/null +++ b/prospect-monolithic/examples/mini-echo.rkt @@ -0,0 +1,25 @@ +#lang prospect-monolithic + +(struct echo-req (body) #:prefab) +(struct echo-resp (body) #:prefab) + +(spawn (lambda (e count) + (match e + [(message (echo-req body)) + (transition (+ count 1) + (message (echo-resp body)))] + [_ #f])) + 0 + (scn (subscription (echo-req ?)))) + +(spawn (lambda (e s) + (match e + [(message (echo-resp body)) + (printf "Received: ~v\n" body) + #f] + [_ #f])) + (void) + (list (scn (subscription (echo-resp ?))) + (message (echo-req 0)) + (message (echo-req 1)) + (message (echo-req 2)))) diff --git a/prospect-monolithic/examples/tcp-hello.rkt b/prospect-monolithic/examples/tcp-hello.rkt new file mode 100644 index 0000000..9634887 --- /dev/null +++ b/prospect-monolithic/examples/tcp-hello.rkt @@ -0,0 +1,36 @@ +#lang prospect-monolithic + +(require "../drivers/tcp.rkt") +(require "../demand-matcher.rkt") + +(spawn-tcp-driver) + +(define server-id (tcp-listener 5999)) + +(define (spawn-connection-handler c) + (log-info "spawn-connection-handler ~v" c) + (define (connection-handler e n) + (when e (log-info "connection-handler ~v: ~v /// ~v" c e n)) + (match e + [(scn (? trie-empty?)) (quit)] + [(message (tcp-channel src dst #"quit\n")) + (quit (message (tcp-channel dst src #"OK, then.\n")))] + [(message (tcp-channel src dst bs)) + (transition n (message (tcp-channel dst src (string->bytes/utf-8 + (format "You said: ~a" bs)))))] + [_ + (and (< n 5) + (transition (+ n 1) (message (tcp-channel server-id c (string->bytes/utf-8 + (format "msg ~v\n" n))))))])) + (spawn connection-handler + 0 + (scn/union (subscription (advertise (tcp-channel c server-id ?))) + (subscription (tcp-channel c server-id ?)) + (advertisement (tcp-channel server-id c ?))))) + +(spawn-demand-matcher (advertise (tcp-channel (?!) server-id ?)) + (observe (tcp-channel (?!) server-id ?)) + spawn-connection-handler + (lambda (c) + (log-info "Connection handler ~v decided to exit" c) + '())) diff --git a/prospect-monolithic/examples/udp-hello-plain.rkt b/prospect-monolithic/examples/udp-hello-plain.rkt new file mode 100644 index 0000000..a7fe3e6 --- /dev/null +++ b/prospect-monolithic/examples/udp-hello-plain.rkt @@ -0,0 +1,18 @@ +#lang prospect-monolithic + +(require "../drivers/udp.rkt") + +(spawn-udp-driver) + +(spawn (lambda (e s) + (match e + [(message (udp-packet src dst #"quit\n")) + (log-info "Got quit request") + (quit (message (udp-packet dst src #"Goodbye!\n")))] + [(message (udp-packet src dst body)) + (log-info "Got packet from ~v: ~v" src body) + (define reply (string->bytes/utf-8 (format "You said: ~a" body))) + (transition s (message (udp-packet dst src reply)))] + [_ #f])) + (void) + (scn (subscription (udp-packet ? (udp-listener 5999) ?)))) diff --git a/prospect-monolithic/examples/ws-hello-ssl.rkt b/prospect-monolithic/examples/ws-hello-ssl.rkt new file mode 100644 index 0000000..6161d86 --- /dev/null +++ b/prospect-monolithic/examples/ws-hello-ssl.rkt @@ -0,0 +1,33 @@ +#lang prospect-monolithic + +(require "../drivers/websocket.rkt") +(require "../demand-matcher.rkt") + +(spawn-websocket-driver) + +(define any-client (websocket-remote-client ?)) +(define server-id (websocket-local-server 8081 (websocket-ssl-options "server-cert.pem" + "private-key.pem"))) + +(define (spawn-connection-handler c) + (log-info "spawn-connection-handler ~v" c) + (define (connection-handler e n) + (when e (log-info "connection-handler ~v: ~v /// ~v" c e n)) + (match e + [(scn (? trie-empty?)) (quit)] + [_ + (if (< n 20) + (transition (+ n 1) (message (websocket-message server-id c (format "msg ~v" n)))) + #f)])) + (spawn connection-handler + 0 + (scn/union (subscription (advertise (websocket-message c server-id ?))) + (subscription (websocket-message c server-id ?)) + (advertisement (websocket-message server-id c ?))))) + +(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?)) + (observe (websocket-message (?! any-client) server-id ?)) + spawn-connection-handler + (lambda (c) + (log-info "Connection handler ~v decided to exit" c) + '())) diff --git a/prospect-monolithic/examples/ws-hello.rkt b/prospect-monolithic/examples/ws-hello.rkt new file mode 100644 index 0000000..3bc8946 --- /dev/null +++ b/prospect-monolithic/examples/ws-hello.rkt @@ -0,0 +1,32 @@ +#lang prospect-monolithic + +(require "../drivers/websocket.rkt") +(require "../demand-matcher.rkt") + +(spawn-websocket-driver) + +(define any-client (websocket-remote-client ?)) +(define server-id (websocket-local-server 8081 #f)) + +(define (spawn-connection-handler c) + (log-info "spawn-connection-handler ~v" c) + (define (connection-handler e n) + (when e (log-info "connection-handler ~v: ~v /// ~v" c e n)) + (match e + [(scn (? trie-empty?)) (quit)] + [_ + (if (< n 20) + (transition (+ n 1) (message (websocket-message server-id c (format "msg ~v" n)))) + #f)])) + (spawn connection-handler + 0 + (scn/union (subscription (advertise (websocket-message c server-id ?))) + (subscription (websocket-message c server-id ?)) + (advertisement (websocket-message server-id c ?))))) + +(spawn-demand-matcher (advertise (websocket-message (?! any-client) server-id ?)) + (observe (websocket-message (?! any-client) server-id ?)) + spawn-connection-handler + (lambda (c) + (log-info "Connection handler ~v decided to exit" c) + '())) diff --git a/prospect-monolithic/ground.rkt b/prospect-monolithic/ground.rkt new file mode 100644 index 0000000..0efeff9 --- /dev/null +++ b/prospect-monolithic/ground.rkt @@ -0,0 +1,106 @@ +#lang racket/base +;; Breaking the infinite tower of nested Networks, connecting to the "real world" at the fracture line. + +(require racket/async-channel) +(require racket/set) +(require racket/match) +(require racket/list) +(require "core.rkt") +(require "../prospect/trace.rkt") +(require "trace/stderr.rkt") +(require "../prospect/tset.rkt") + +(provide (struct-out external-event) + send-ground-message + send-ground-scn + run-ground) + +;;--------------------------------------------------------------------------- +;; Communication via regular subscription and messages from other threads + +;; (Parameterof (Option AsyncChannel)) +;; Communication channel from auxiliary (usually driver) threads to +;; the currently-active ground VM. +(define current-ground-event-async-channel (make-parameter (make-async-channel))) + +;; Any -> Void +;; Sends a message at the ground-VM metalevel. +(define (send-ground-message body) + (async-channel-put (current-ground-event-async-channel) (message body))) + +;; SCN -> Void +;; Injects a SCN into the ground-VM metalevel. It will appear to be +;; asserted by the environment in general. The obligation is on the caller +;; to ensure that assertions do not interfere between drivers. +(define (send-ground-scn s) + (async-channel-put (current-ground-event-async-channel) s)) + +;;--------------------------------------------------------------------------- +;; Communication via RacketEvents + +;; A GroundEvent is a pair of a Racket (evt?) event and its yielded +;; results. +;; - (external-event RacketEvent (Listof Any)) +(struct external-event (descriptor values) #:prefab) + +;; RacketEvent -> RacketEvent +;; Wraps a CML-style Racket event with a handler that sends the event +;; results via the ground VM. +(define (event-handler descriptor) + (handle-evt descriptor (lambda vs (message (external-event descriptor vs))))) + +;; Projection +;; Used to extract event descriptors and results from subscriptions +;; from the ground VM's contained Network. +(define event-projection (compile-projection (observe (external-event (?!) ?)))) + +;; Interests -> (Listof RacketEvent) +;; Projects out the active event subscriptions from the given interests. +(define (extract-active-events interests) + (define es (trie-project/set/single interests event-projection)) + ;; TODO: how should the following error be handled, ideally? + ;; In principle, security restrictions should make it impossible. + ;; But absent those, what should be done? Should an offending + ;; process be identified and terminated? + (unless es (error 'extract-active-events "User program subscribed to wildcard event")) + (for/list [(e (in-set es))] (event-handler e))) + +;;--------------------------------------------------------------------------- + +;; RacketEvent +;; Used only when the system is not provably inert, in order to let it +;; take further internal reductions. +(define idle-handler + (handle-evt (system-idle-evt) (lambda _ #f))) + +;; Action* -> Void +;; Runs a ground VM, booting the outermost Network with the given Actions. +(define (run-ground . boot-actions) + (let await-interrupt ((inert? #f) + (w (make-network boot-actions)) + (interests (trie-empty))) + ;; (log-info "GROUND INTERESTS:\n~a" (trie->pretty-string interests)) + (if (and inert? (trie-empty? interests)) + (begin (log-info "run-ground: Terminating because inert") + (void)) + (let ((e (apply sync + (current-ground-event-async-channel) + (if inert? never-evt idle-handler) + (extract-active-events interests)))) + (trace-process-step e #f network-handle-event w) + (define resulting-transition (clean-transition (network-handle-event e w))) + (trace-process-step-result e #f network-handle-event w #f resulting-transition) + (match resulting-transition + [#f ;; inert + (await-interrupt #t w interests)] + [(transition w actions) + (let process-actions ((actions actions) (interests interests)) + (match actions + ['() (await-interrupt #f w interests)] + [(cons a actions) + (match a + [(scn r) + (process-actions actions r)] + [_ + (log-warning "run-ground: ignoring useless meta-action ~v" a) + (process-actions actions interests)])]))]))))) diff --git a/prospect-monolithic/lang.rkt b/prospect-monolithic/lang.rkt new file mode 100644 index 0000000..1339535 --- /dev/null +++ b/prospect-monolithic/lang.rkt @@ -0,0 +1,51 @@ +#lang racket/base + +(require (for-syntax racket/base syntax/kerncase)) + +(require racket/match) +(require "main.rkt") + +(provide (rename-out [module-begin #%module-begin]) + (except-out (all-from-out racket/base) #%module-begin) + (all-from-out racket/match) + (all-from-out "main.rkt") + (for-syntax (all-from-out racket/base))) + +(define-syntax (module-begin stx) + (unless (eq? (syntax-local-context) 'module-begin) + (raise-syntax-error #f "allowed only around a module body" stx)) + (syntax-case stx () + [(_ forms ...) + (let () + (define (accumulate-actions action-ids final-forms forms) + (if (null? forms) + (let ((final-stx + #`(#%module-begin #,@(reverse final-forms) + (run-ground #,@(reverse action-ids))))) + ;;(pretty-print (syntax->datum final-stx)) + final-stx) + (syntax-case (local-expand (car forms) + 'module + (kernel-form-identifier-list)) () + [(head rest ...) + (if (free-identifier=? #'head #'begin) + (accumulate-actions action-ids + final-forms + (append (syntax->list #'(rest ...)) (cdr forms))) + (if (ormap (lambda (i) (free-identifier=? #'head i)) + (syntax->list #'(define-values define-syntaxes begin-for-syntax + module module* + #%module-begin + #%require #%provide))) + (accumulate-actions action-ids + (cons (car forms) final-forms) + (cdr forms)) + (accumulate-action (car forms) action-ids final-forms (cdr forms))))] + [non-pair-syntax + (accumulate-action (car forms) action-ids final-forms (cdr forms))]))) + (define (accumulate-action action action-ids final-forms remaining-forms) + (define temp (car (generate-temporaries (list action)))) + (accumulate-actions (cons temp action-ids) + (cons #`(define #,temp #,action) final-forms) + remaining-forms)) + (accumulate-actions '() '() (syntax->list #'(forms ...))))])) diff --git a/prospect-monolithic/lang/reader.rkt b/prospect-monolithic/lang/reader.rkt new file mode 100644 index 0000000..590baac --- /dev/null +++ b/prospect-monolithic/lang/reader.rkt @@ -0,0 +1,2 @@ +#lang s-exp syntax/module-reader +prospect-monolithic/lang diff --git a/prospect-monolithic/main.rkt b/prospect-monolithic/main.rkt new file mode 100644 index 0000000..263af57 --- /dev/null +++ b/prospect-monolithic/main.rkt @@ -0,0 +1,7 @@ +#lang racket/base + +(require "core.rkt") +(require "ground.rkt") + +(provide (all-from-out "core.rkt") + (all-from-out "ground.rkt")) diff --git a/prospect-monolithic/mux.rkt b/prospect-monolithic/mux.rkt new file mode 100644 index 0000000..cc05ffb --- /dev/null +++ b/prospect-monolithic/mux.rkt @@ -0,0 +1,104 @@ +#lang racket/base +;; General multiplexer. + +(provide meta-label? + (except-out (struct-out mux) mux) + (rename-out [mux ] [make-mux mux]) + mux-add-stream + mux-remove-stream + mux-update-stream + mux-route-message + mux-interests-of + compute-scns + compute-affected-pids + pretty-print-mux) + +(require racket/set) +(require racket/match) +(require "../prospect/route.rkt") +(require "scn.rkt") +(require "../prospect/trace.rkt") +(require "../prospect/tset.rkt") +(require "../prospect/pretty.rkt") + +;; A PID is a Nat. +;; A Label is a PID or 'meta. +;; Multiplexer private states +(struct mux (next-pid ;; PID + routing-table ;; (Matcherof (Setof Label)) + interest-table ;; (HashTable Label Matcher) + ) + #:transparent + #:methods gen:prospect-pretty-printable + [(define (prospect-pretty-print m [p (current-output-port)]) + (pretty-print-mux m p))]) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (meta-label? x) (eq? x 'meta)) + +(define (make-mux) + (mux 0 (trie-empty) (hash))) + +(define (mux-add-stream m initial-scn) + (define new-pid (mux-next-pid m)) + (mux-update-stream (struct-copy mux m [next-pid (+ new-pid 1)]) + new-pid + initial-scn)) + +(define (mux-remove-stream m label) + (mux-update-stream m label (scn (trie-empty)))) + +(define (mux-update-stream m label new-scn) + (define old-interests (mux-interests-of m label)) + (define old-routing-table (mux-routing-table m)) + (define new-interests (label-interests (scn-trie new-scn) (datum-tset label))) + (define new-routing-table (trie-union (trie-subtract old-routing-table old-interests) + new-interests)) + (define aggregate-assertions (trie-union old-interests new-interests)) + (values (struct-copy mux m + [routing-table new-routing-table] + [interest-table (if (trie-empty? new-interests) + (hash-remove (mux-interest-table m) label) + (hash-set (mux-interest-table m) label new-interests))]) + label + new-scn ;; unnecessary? + aggregate-assertions)) + +(define (compute-scns old-m new-m label s aggregate-assertions) + (define old-routing-table (mux-routing-table old-m)) + (define new-routing-table (mux-routing-table new-m)) + (define affected-pids + (let ((pids (compute-affected-pids old-routing-table aggregate-assertions))) ;; hmm + (tset-remove (tset-add pids label) 'meta))) ;; TODO: removing meta is weird + (values (for/list [(pid (tset->list affected-pids))] + (cons pid (scn (biased-intersection new-routing-table (mux-interests-of new-m pid))))) + (and (not (meta-label? label)) + (drop-scn (scn (strip-interests new-routing-table)))))) + +(define (compute-affected-pids routing-table cover) + (trie-match-trie cover + (trie-step routing-table struct:observe) + #:seed (datum-tset) + #:combiner (lambda (v1 v2 acc) (tset-union v2 acc)) + #:left-short (lambda (v r acc) + (tset-union acc (success-value (trie-step r EOS)))))) + +(define (mux-route-message m body) + (if (trie-lookup (mux-routing-table m) body #f) ;; some other stream has declared body + '() + (tset->list (trie-lookup (mux-routing-table m) (observe body) (datum-tset))))) + +(define (mux-interests-of m label) + (hash-ref (mux-interest-table m) label (trie-empty))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (pretty-print-mux m [p (current-output-port)]) + (match-define (mux next-pid routing-table interest-table) m) + (fprintf p "MUX:\n") + (fprintf p " - ~a labelled entities with claims\n" (hash-count interest-table)) + (fprintf p " - next label: ~a\n" next-pid) + (fprintf p " - routing-table:\n") + (display (indented-port-output 3 (lambda (p) (pretty-print-trie routing-table p))) p) + (newline p)) diff --git a/prospect-monolithic/scn.rkt b/prospect-monolithic/scn.rkt new file mode 100644 index 0000000..bd219d1 --- /dev/null +++ b/prospect-monolithic/scn.rkt @@ -0,0 +1,67 @@ +#lang racket/base +;; State Change Notifications, and assorted protocol constructors + +(provide (struct-out scn) + (struct-out observe) + (struct-out at-meta) + (struct-out advertise) + lift-scn + drop-scn + strip-interests + label-interests + strip-scn + label-scn + biased-intersection) + +(require racket/set) +(require racket/match) +(require "../prospect/route.rkt") +(require "../prospect/tset.rkt") +(require "../prospect/pretty.rkt") +(module+ test (require rackunit)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; State Change Notifications +(struct scn (trie) #:transparent + #:methods gen:prospect-pretty-printable + [(define (prospect-pretty-print d [p (current-output-port)]) + (pretty-print-trie (scn-trie d)))]) + +;; Claims, Interests, Locations, and Advertisements +(struct observe (claim) #:prefab) +(struct at-meta (claim) #:prefab) +(struct advertise (claim) #:prefab) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define at-meta-proj (compile-projection (at-meta (?!)))) + +(define (lift-scn s) + (scn (pattern->trie #t (at-meta (embedded-trie (scn-trie s)))))) + +(define (drop-interests pi) + (trie-project pi at-meta-proj + #:project-success (lambda (v) #t) + #:combiner (lambda (v1 v2) #t))) + +(define (drop-scn s) + (scn (drop-interests (scn-trie s)))) + +(define (strip-interests g) + (trie-relabel g (lambda (v) #t))) + +(define (label-interests g label) + (trie-relabel g (lambda (v) label))) + +(define (strip-scn s) + (scn (strip-interests (scn-trie s)))) + +(define (label-scn s label) + (scn (label-interests (scn-trie s) label))) + +(define (biased-intersection object subject) + (trie-intersect object + (trie-step subject struct:observe) + #:combiner (lambda (v1 v2) #t) + #:left-short (lambda (v r) (trie-step r EOS)))) diff --git a/prospect-monolithic/trace/stderr.rkt b/prospect-monolithic/trace/stderr.rkt new file mode 100644 index 0000000..2a01097 --- /dev/null +++ b/prospect-monolithic/trace/stderr.rkt @@ -0,0 +1,229 @@ +#lang racket/base + +(provide set-stderr-trace-flags!) + +(require racket/set) +(require racket/match) +(require racket/pretty) +(require (only-in racket/string string-join)) +(require "../../prospect/exn-util.rkt") +(require "../core.rkt") +(require "../../prospect/trace.rkt") +(require "../mux.rkt") +(require "../../prospect/pretty.rkt") + +(define (env-aref varname default alist) + (define key (or (getenv varname) default)) + (cond [(assoc key alist) => cadr] + [else (error 'env-aref + "Expected environment variable ~a to contain one of ~v; got ~v" + (map car alist) + key)])) + +(define colored-output? (env-aref "MINIMART_COLOR" "true" '(("true" #t) ("false" #f)))) + +(define flags (set)) +(define show-exceptions? #f) +(define show-scn-events? #f) +(define show-message-events? #f) +(define show-boot-events? #f) +(define show-events? #f) +(define show-process-states-pre? #f) +(define show-process-states-post? #f) +(define show-process-lifecycle? #f) +(define show-scn-actions? #f) +(define show-message-actions? #f) +(define show-actions? #f) +(define show-routing-table? #f) +(define network-is-boring? #t) + +(define (set-stderr-trace-flags! flags-string) + (set! flags (for/set [(c flags-string)] (string->symbol (string c)))) + (define-syntax-rule (set-flag! symbol variable) + (set! variable (set-member? flags 'symbol))) + (set-flag! x show-exceptions?) + (set-flag! r show-scn-events?) + (set-flag! m show-message-events?) + (set-flag! b show-boot-events?) + (set-flag! e show-events?) + (set-flag! s show-process-states-pre?) + (set-flag! t show-process-states-post?) + (set-flag! p show-process-lifecycle?) + (set-flag! R show-scn-actions?) + (set-flag! M show-message-actions?) + (set-flag! a show-actions?) + (set-flag! g show-routing-table?) + (set! network-is-boring? (not (set-member? flags 'N)))) + +(set-stderr-trace-flags! (or (getenv "MINIMART_TRACE") "")) + +(define YELLOW-ON-RED ";1;33;41") +(define WHITE-ON-RED ";1;37;41") +(define WHITE-ON-GREEN ";1;37;42") +(define GREY-ON-RED ";37;41") +(define GREY-ON-GREEN ";37;42") +(define RED ";31") +(define BRIGHT-RED ";1;31") +(define GREEN ";32") +(define BRIGHT-GREEN ";1;32") +(define YELLOW ";33") +(define BLUE ";34") +(define BRIGHT-BLUE ";1;34") +(define NORMAL "") + +(define (format-pids pids) + (match pids + ['() "ground"] + [(cons 'meta rest) (format "context of ~a" (format-pids rest))] + [_ (string-join (map number->string (reverse pids)) ":")])) + +(define (output fmt . args) + (apply fprintf (current-error-port) fmt args)) + +(define (boring-state? state) + (or (and (network? state) network-is-boring?) + (void? state))) + +(define (set-color! c) (when colored-output? (output "\e[0~am" c))) +(define (reset-color!) (when colored-output? (output "\e[0m"))) + +(define-syntax-rule (with-color c expr ...) + (begin (set-color! c) + (begin0 (begin expr ...) + (reset-color!)))) + +(define (display-trace) + (define receiver (make-log-receiver trace-logger 'info)) + (parameterize ((pretty-print-columns 100)) + (let loop () + (match-define (vector level message-string data event-name) (sync receiver)) + (match* (event-name data) + [('process-step (list pids e beh st)) + (define pidstr (format-pids pids)) + (match e + [#f + (when show-events? + (with-color YELLOW (output "~a is being polled for changes.\n" pidstr)))] + [(scn r) + (when (or show-events? show-scn-events?) + (with-color YELLOW + (output "~a is receiving SCN:\n" pidstr) + (pretty-print-trie r (current-error-port))))] + [(message body) + (when (or show-events? show-message-events?) + (with-color YELLOW + (output "~a is receiving a message:\n" pidstr) + (pretty-write body (current-error-port))))]) + (when show-process-states-pre? + (when (not (boring-state? st)) + (with-color YELLOW + (output "~a's state just before the event:\n" pidstr) + (prospect-pretty-print st (current-error-port)))))] + [('process-step-result (list pids e beh st exn t)) + (define pidstr (format-pids pids)) + (define relevant-exn? (and show-exceptions? exn)) + (define (exn-and-not b) (and relevant-exn? (not b))) + (match e + [#f + (when (exn-and-not show-events?) + (with-color YELLOW (output "~a was polled for changes.\n" pidstr)))] + ['boot + (when (or show-events? show-boot-events?) + (with-color YELLOW (output "~a was booted.\n" pidstr)))] + [(scn r) + (when (exn-and-not (or show-events? show-scn-events?)) + (with-color YELLOW + (output "~a received a SCN:\n" pidstr) + (pretty-print-trie r (current-error-port))))] + [(message body) + (when (exn-and-not (or show-events? show-message-events?)) + (with-color YELLOW + (output "~a received a message:\n" pidstr) + (pretty-write body (current-error-port))))]) + (when (exn-and-not (and show-process-states-pre? (not (boring-state? st)))) + (with-color YELLOW + (output "~a's state just before the event:\n" pidstr) + (prospect-pretty-print st (current-error-port)))) + (when relevant-exn? + (with-color WHITE-ON-RED + (output "Process ~a ~v died with exception:\n~a\n" + pidstr + beh + (exn->string exn)))) + (when (quit? t) + (with-color BRIGHT-RED + (output "Process ~a ~v exited normally.\n" pidstr beh))) + (when (or relevant-exn? show-process-states-post?) + (when (transition? t) + (unless (boring-state? (transition-state t)) + (when (not (equal? st (transition-state t))) + (with-color YELLOW + (output "~a's state just after the event:\n" pidstr) + (prospect-pretty-print (transition-state t) (current-error-port)))))))] + [('internal-action (list pids a old-w)) + (define pidstr (format-pids pids)) + (define oldcount (hash-count (network-behaviors old-w))) + (match a + [(? spawn?) + ;; Handle this in internal-action-result + (void)] + ['quit + (when (or show-process-lifecycle? show-actions?) + (define interests (mux-interests-of (network-mux old-w) (car pids))) + (with-color BRIGHT-RED + (output "~a exiting (~a total processes remain)\n" + pidstr + (- oldcount 1))) + (unless (trie-empty? interests) + (output "~a's final interests:\n" pidstr) + (pretty-print-trie interests (current-error-port))))] + [(quit-network) + (with-color BRIGHT-RED + (output "Process ~a performed a quit-network.\n" pidstr))] + [(scn r) + (when (or show-actions? show-scn-actions?) + (output "~a performing a SCN:\n" pidstr) + (pretty-print-trie r (current-error-port)))] + [(message body) + (when (or show-actions? show-message-actions?) + (output "~a sending a message:\n" pidstr) + (pretty-write body (current-error-port)))])] + [('internal-action-result (list pids a old-w t)) + (when (transition? t) + (define new-w (transition-state t)) + (define pidstr (format-pids pids)) + (define newcount (hash-count (network-behaviors new-w))) + (match a + [(? spawn?) + (when (or show-process-lifecycle? show-actions?) + (define newpid (mux-next-pid (network-mux old-w))) + (define newpidstr (format-pids (cons newpid (cdr pids)))) ;; replace parent pid + (define interests (mux-interests-of (network-mux new-w) newpid)) + (define behavior (hash-ref (network-behaviors new-w) newpid '#:missing-behavior)) + (define state (hash-ref (network-states new-w) newpid '#:missing-state)) + (with-color BRIGHT-GREEN + (output "~a ~v spawned from ~a (~a total processes now)\n" + newpidstr + behavior + pidstr + newcount)) + (unless (boring-state? state) + (output "~a's initial state:\n" newpidstr) + (prospect-pretty-print state (current-error-port))) + (unless (trie-empty? interests) + (output "~a's initial interests:\n" newpidstr) + (pretty-print-trie interests (current-error-port))))] + [_ + ;; other cases handled in internal-action + (void)]) + (when show-routing-table? + (define old-table (mux-routing-table (network-mux old-w))) + (define new-table (mux-routing-table (network-mux new-w))) + (when (not (equal? old-table new-table)) + (with-color BRIGHT-BLUE + (output "~a's routing table:\n" (format-pids (cdr pids))) + (pretty-print-trie new-table (current-error-port))))))]) + (loop)))) + +(void (when (not (set-empty? flags)) + (thread display-trace)))