diff --git a/prospect/endpoint.rkt b/prospect/endpoint.rkt new file mode 100644 index 0000000..8bcc265 --- /dev/null +++ b/prospect/endpoint.rkt @@ -0,0 +1,203 @@ +#lang racket/base +;; Marketplace-style endpoints (analogous to threads) + +(provide (struct-out endpoint-group) + (struct-out add-endpoint) + (struct-out delete-endpoint) + make-endpoint-group + spawn-endpoint-group + endpoint-action? + (struct-out endpoint) + endpoint-group-handle-event + pretty-print-endpoint-group) + +(require racket/set) +(require racket/match) +(require (only-in racket/list flatten)) +(require "route.rkt") +(require "patch.rkt") +(require "core.rkt") +(require "mux.rkt") +(require "pretty.rkt") + +;; An EID is a Nat. + +;; Endpoint-group private states +(struct endpoint-group (next-eid ;; EID + routing-table ;; (Matcherof (Setof EID)) + interests ;; (HashTable EID Matcher) + endpoints ;; (HashTable EID Endpoint) + state ;; Any + ) + #:transparent + #:methods gen:prospect-pretty-printable + [(define (prospect-pretty-print g [p (current-output-port)]) + (pretty-print-endpoint-group g p))]) + +;; A Handler is a (Event State -> Transition) +;; A Transition reuses the struct from core, but with EndpointActions instead of plain Actions. +;; An EndpointAction is either an Action, or a +;; (add-endpoint (EID State -> (Values Endpoint Transition))), or a +;; (delete-endpoint EID) +(struct add-endpoint (function) #:prefab) +(struct delete-endpoint (eid) #:prefab) + +(define (make-endpoint-group initial-state) + (endpoint-group 0 + (matcher-empty) + (hash) + (hash) + initial-state)) + +(define-syntax-rule (spawn-endpoint-group initial-state add-endpoint-instruction ...) + ( (lambda () + (define-values (final-cumulative-patch final-actions final-g) + (interpret-endpoint-actions empty-patch + '() + (make-endpoint-group initial-state) + -1 + (list add-endpoint-instruction ...))) + (when (not (null? final-actions)) + (error 'spawn-endpoint-group + "Unexpected initial actions: ~v" final-actions)) + (list final-cumulative-patch + endpoint-group-handle-event + final-g)))) + +(define (endpoint-action? a) + (or (action? a) + (add-endpoint? a) + (delete-endpoint? a))) + +;; An Endpoint represents the behaviour of an endpoint. +(struct endpoint (pre-handler ;; Handler + peri-handler ;; Handler + post-handler ;; Handler + ) #:transparent) + +(define (inert-handler e state) #f) + +(define inert-endpoint + (endpoint inert-handler + inert-handler + inert-handler)) + +(define (endpoint-group-handle-event e g) + (match-define (endpoint-group _ routing-table interests endpoints state) g) + (define affected-eids + (match e + [#f (hash-keys endpoints)] + [(? patch?) (compute-affected-pids routing-table e)] + [(message body) (matcher-match-value routing-table (observe body))])) + (define tasks (for/list [(eid affected-eids)] + (list (if (patch? e) + (view-patch e (hash-ref interests eid matcher-empty)) + e) + eid + (hash-ref endpoints eid inert-endpoint)))) + (define t0 (transition g '())) + (define t1 (sequence-transitions t0 + (sequence-handlers tasks endpoint-pre-handler) + (sequence-handlers tasks endpoint-peri-handler) + (sequence-handlers tasks endpoint-post-handler))) + (if (eq? t1 t0) #f t1)) + +(define ((sequence-handlers tasks handler-getter) g) + (let/ec return + (define-values (final-cumulative-patch final-actions final-g idle?) + (for/fold ([cumulative-patch empty-patch] + [actions '()] + [g g] + [idle? #t]) + ([task tasks]) + (match-define (list e eid ep) task) + (match ((handler-getter ep) e (endpoint-group-state g)) + [#f (values cumulative-patch actions g idle?)] + [( ep-acs) (return (quit (filter action? (flatten ep-acs))))] + [(transition new-state ep-acs) + (define-values (cp acs next-g) + (interpret-endpoint-actions cumulative-patch + actions + (struct-copy endpoint-group g [state new-state]) + eid + ep-acs)) + (values cp acs next-g #f)]))) + (if idle? + #f + (transition final-g (incorporate-cumulative-patch final-actions final-cumulative-patch))))) + +(define (incorporate-cumulative-patch actions cumulative-patch) + (if (patch-empty? cumulative-patch) + actions + (cons actions cumulative-patch))) + +(define (interpret-endpoint-patch cumulative-patch actions g eid p0) + (define old-interests (hash-ref (endpoint-group-interests g) eid matcher-empty)) + (define old-routing-table (endpoint-group-routing-table g)) + (define p (limit-patch (label-patch p0 (set eid)) old-interests)) + (define p-aggregate (compute-aggregate-patch p eid old-routing-table)) + (define new-interests (apply-patch old-interests p)) + (define new-routing-table (apply-patch old-routing-table p)) + (values (patch-seq cumulative-patch p-aggregate) + actions + (struct-copy endpoint-group g + [routing-table new-routing-table] + [interests (if (matcher-empty? new-interests) + (hash-remove (endpoint-group-interests g) eid) + (hash-set (endpoint-group-interests g) + eid + new-interests))]))) + +(define (interpret-endpoint-action cumulative-patch actions g eid endpoint-action) + (match endpoint-action + [(or (? message?) + (? spawn?)) + (values empty-patch + (cons (incorporate-cumulative-patch actions cumulative-patch) endpoint-action) + g)] + [(? patch? p0) + (interpret-endpoint-patch cumulative-patch actions g eid p0)] + [(add-endpoint function) + (define new-eid (endpoint-group-next-eid g)) + (define-values (new-ep initial-transition) (function new-eid (endpoint-group-state g))) + (interpret-endpoint-actions cumulative-patch + actions + (struct-copy endpoint-group g + [next-eid (+ new-eid 1)] + [endpoints + (hash-set (endpoint-group-endpoints g) + new-eid + new-ep)] + [state (transition-state initial-transition)]) + new-eid + (transition-actions initial-transition))] + [(delete-endpoint eid) + (interpret-endpoint-patch cumulative-patch + actions + (struct-copy endpoint-group g + [endpoints + (hash-remove (endpoint-group-endpoints g) eid)]) + eid + (patch (matcher-empty) (pattern->matcher #t ?)))])) + +(define (interpret-endpoint-actions cumulative-patch actions g eid unflattened-endpoint-actions) + (define endpoint-actions (filter endpoint-action? (flatten unflattened-endpoint-actions))) + (for/fold ([cumulative-patch cumulative-patch] + [actions actions] + [g g]) + ([endpoint-action endpoint-actions]) + (interpret-endpoint-action cumulative-patch + actions + g + eid + endpoint-action))) + +(define (pretty-print-endpoint-group g [p (current-output-port)]) + (match-define (endpoint-group _ routing-table interests endpoints state) g) + (fprintf p "ENDPOINT GROUP:\n") + (fprintf p " ---- STATE:\n") + (display (indented-port-output 6 (lambda (p) (prospect-pretty-print state p))) p) + (newline p) + (fprintf p " - ~a endpoints\n" (hash-count endpoints)) + (fprintf p " - routing table:\n") + (pretty-print-matcher routing-table p)) diff --git a/prospect/examples/endpoint-example.rkt b/prospect/examples/endpoint-example.rkt new file mode 100644 index 0000000..ed9e1ed --- /dev/null +++ b/prospect/examples/endpoint-example.rkt @@ -0,0 +1,73 @@ +#lang prospect + +(require "../endpoint.rkt") +(require "../drivers/timer.rkt") + +(spawn-timer-driver) + +(define ((log-it eid stage) e u) + (log-info "endpoint ~a stage ~a state ~a: ~v" eid stage u e) + (and e (transition (+ u 1) + (if (equal? e (message 2)) + (if (equal? eid 0) + (list (unsub 2) (sub 5) (delete-endpoint 1)) + (list (unsub 2) (sub 5))) + '())))) + +(spawn (lambda (e u) + (when (message? e) (log-info "general: ~v" e)) + #f) + (void) + (sub ?) + (unsub (observe ?)) + (unsub (at-meta ?))) + +(spawn-endpoint-group 0 + (add-endpoint + (lambda (eid state) + (values (endpoint (log-it eid "pre") + (log-it eid "peri") + (log-it eid "post")) + (transition state + (list (sub 1) + (sub 2)))))) + (add-endpoint + (lambda (eid state) + (values (endpoint (log-it eid "pre") + (log-it eid "peri") + (log-it eid "post")) + (transition state + (list (sub 3) + (sub 2))))))) + +(define (after msec thunk) + (define id (gensym 'after)) + (if (zero? msec) + (thunk) + (list + (spawn (lambda (e s) (and (message? e) (quit (thunk)))) + (void) + (sub (timer-expired id ?))) + (message (set-timer id msec 'relative))))) + +(after 100 + (lambda () + (list + (message 0) + (message 1) + (message 2) + (message 3) + (message 4) + (message 5) + (message 6)))) + +(after 100 + (lambda () + (list + (message 0) + (message 1) + (message 2) + (message 3) + (message 4) + (message 5) + (message 6)))) diff --git a/prospect/mux.rkt b/prospect/mux.rkt index ceba083..8d896c3 100644 --- a/prospect/mux.rkt +++ b/prospect/mux.rkt @@ -8,7 +8,8 @@ mux-remove-stream mux-update-stream mux-route-message - mux-interests-of) + mux-interests-of + compute-affected-pids) (require racket/set) (require racket/match)