diff --git a/racket/syndicate/demand-matcher.rkt b/racket/syndicate/demand-matcher.rkt index 71dee26..f7a95d4 100644 --- a/racket/syndicate/demand-matcher.rkt +++ b/racket/syndicate/demand-matcher.rkt @@ -10,46 +10,105 @@ (provide (except-out (struct-out demand-matcher) demand-matcher) (rename-out [make-demand-matcher demand-matcher]) demand-matcher-update - spawn-demand-matcher - on-claim) + spawn-demand-matcher) ;; A DemandMatcher keeps track of demand for services based on some ;; Projection over a Trie, as well as a collection of functions ;; that can be used to increase supply in response to increased ;; demand, or handle a sudden drop in supply for which demand still ;; exists. -(struct demand-matcher (demand-spec ;; CompiledProjection - supply-spec ;; CompiledProjection - demand-spec-arity ;; Natural - supply-spec-arity ;; Natural - increase-handler ;; ChangeHandler - decrease-handler ;; ChangeHandler - current-demand ;; (Setof (Listof Any)) - current-supply) ;; (Setof (Listof Any)) +(struct demand-matcher (demand-spec ;; CompiledProjection + supply-spec ;; CompiledProjection + demand-spec-arity ;; Natural + supply-spec-arity ;; Natural + start-task ;; TaskCallback + on-task-exit ;; TaskCallback + current-demand ;; (Setof (Listof Any)) + current-supply ;; (Setof (Listof Any)) + task-supervisor ;; TaskSupervisor + supervision-states) ;; (Hashtable (Listof Any) Any) #:transparent #:methods gen:syndicate-pretty-printable [(define (syndicate-pretty-print s [p (current-output-port)]) (pretty-print-demand-matcher s p))]) -;; A ChangeHandler is a ((Constreeof Action) Any* -> (Constreeof Action)). +;; A TaskCallback is a ((Constreeof Action) Any* -> (Constreeof Action)). ;; It is called with an accumulator of actions so-far-computed as its ;; first argument, and with a value for each capture in the ;; DemandMatcher's projection as the remaining arguments. -;; ChangeHandler -;; Default handler of unexpected supply decrease. -(define (default-decrease-handler state . removed-captures) - state) +;; A TaskRunState describes demand for or supply of some task at the +;; time of taking a transition where some aspect of the supervisor's +;; relationship to a particular task has changed, and is one of +;; - 'low, meaning no demand or supply exists +;; - 'rising, demand or supply has just appeared +;; - 'high, demand or supply exists +;; - 'falling, demand or supply has just disappeared -(define (make-demand-matcher demand-spec supply-spec increase-handler decrease-handler) +;; A TaskSupervisor is a +;; ((Listof Any) TaskRunState TaskRunState Any TaskCallback TaskCallback (Constreeof Action) +;; -> (Transition Any)) + +(struct task-state (supply-increase-expected? supply-decrease-expected?) #:prefab) + +(define default-initial-supervision-state (task-state #f #f)) + +;; TaskSupervisor. See ../../doc/demand-matcher.md. +(define (default-task-supervisor captures + demand-state + supply-state + supervision-state + start-task + on-task-exit + actions0) + (match-define (task-state old-i old-d) (or supervision-state default-initial-supervision-state)) + (let*-values (((new-i) old-i) + ((new-d) old-d) + ((actions) actions0) + + ((new-i actions) (if (and (not old-i) + (or (eq? demand-state 'rising) + (and (eq? demand-state 'high) + (or (eq? supply-state 'low) + (eq? supply-state 'falling))))) + (if (and (eq? demand-state 'high) + (not old-d)) + (values new-i (apply on-task-exit actions captures)) + (values #t (apply start-task actions captures))) + (values new-i actions))) + ((new-i) (if (eq? supply-state 'rising) + #f + new-i)) + ((new-d) (if (and (eq? demand-state 'falling) + (or (eq? supply-state 'rising) + (eq? supply-state 'high) + old-i)) + #t + new-d)) + ((new-d) (if (eq? supply-state 'falling) + #f + new-d))) + (transition (and (or new-i new-d) + (task-state new-i new-d)) + actions))) + +(define (default-on-task-exit s . captures) + (log-error "demand-matcher: Unexpected drop in supply ~v" captures) + s) + +(define (make-demand-matcher demand-spec supply-spec start-task + [on-task-exit default-on-task-exit] + #:task-supervisor [task-supervisor default-task-supervisor]) (demand-matcher demand-spec supply-spec (projection-arity demand-spec) (projection-arity supply-spec) - increase-handler - decrease-handler + start-task + on-task-exit (set) - (set))) + (set) + task-supervisor + (hash))) (define (ensure-non-wild s kind spec direction t) (when (not s) @@ -60,19 +119,31 @@ direction (trie->pretty-string t)))) +(define (compute-state tasks added-tasks removed-tasks captures) + (define present? (set-member? tasks captures)) + (define changing? (or (set-member? added-tasks captures) + (set-member? removed-tasks captures))) + (match* (present? changing?) + [(#f #f) 'low] + [(#f #t) 'rising] + [(#t #f) 'high] + [(#t #t) 'falling])) + ;; DemandMatcher (Constreeof Action) Patch -> (Transition DemandMatcher) ;; Given a Patch from the environment, projects it into supply and ;; demand increase and decrease sets. Calls ChangeHandlers in response ;; to increased unsatisfied demand and decreased demanded supply. -(define (demand-matcher-update d s p) +(define (demand-matcher-update d actions0 p) (match-define (demand-matcher demand-spec supply-spec demand-arity supply-arity - inc-h - dec-h + start-task + on-task-exit demand - supply) d) + supply + task-supervisor + supervision-states) d) (define-values (added-demand removed-demand) (patch-project/set #:take demand-arity p demand-spec)) (define-values (added-supply removed-supply) @@ -92,18 +163,34 @@ (set! added-supply (set-subtract added-supply overlap)) (set! removed-supply (set-subtract removed-supply overlap))) - (set! supply (set-union supply added-supply)) - (set! demand (set-subtract demand removed-demand)) + (define all-changing-tasks + (set-union added-demand removed-demand added-supply removed-supply)) - (for [(captures (in-set removed-supply))] - (when (set-member? demand captures) (set! s (apply dec-h s captures)))) - (for [(captures (in-set added-demand))] - (when (not (set-member? supply captures)) (set! s (apply inc-h s captures)))) + (define-values (new-supervision-states actions) + (for/fold [(supervision-states supervision-states) (actions actions0)] + [(captures (in-set all-changing-tasks))] + (define demand-state (compute-state demand added-demand removed-demand captures)) + (define supply-state (compute-state supply added-supply removed-supply captures)) + (match-define (transition new-supervision-state new-actions) + (task-supervisor captures + demand-state + supply-state + (hash-ref supervision-states captures #f) + start-task + on-task-exit + actions)) + (values (if new-supervision-state + (hash-set supervision-states captures new-supervision-state) + (hash-remove supervision-states captures)) + (cons actions new-actions)))) - (set! supply (set-subtract supply removed-supply)) - (set! demand (set-union demand added-demand)) - - (transition (struct-copy demand-matcher d [current-demand demand] [current-supply supply]) s)) + (transition (struct-copy demand-matcher d + [current-demand + (set-subtract (set-union demand added-demand) removed-demand)] + [current-supply + (set-subtract (set-union supply added-supply) removed-supply)] + [supervision-states new-supervision-states]) + actions)) ;; Behavior :> (Option Event) DemandMatcher -> (Transition DemandMatcher) ;; Handles events from the environment. Only cares about routing-updates. @@ -113,24 +200,22 @@ (demand-matcher-update d '() p)] [_ #f])) -;; Any* -> (Constreeof Action) -;; Default handler of unexpected supply decrease. -;; Ignores the situation. -(define (unexpected-supply-decrease . removed-captures) - '()) - ;; Projection Projection (Any* -> (Constreeof Action)) [(Any* -> (Constreeof Action))] -> Action ;; Spawns a demand matcher actor. (define (spawn-demand-matcher demand-spec supply-spec - increase-handler - [decrease-handler unexpected-supply-decrease] + start-task + [on-task-exit #f] + #:task-supervisor [task-supervisor default-task-supervisor] #:name [name #f] #:meta-level [meta-level 0]) (define d (make-demand-matcher (prepend-at-meta demand-spec meta-level) (prepend-at-meta supply-spec meta-level) - (lambda (acs . rs) (cons (apply increase-handler rs) acs)) - (lambda (acs . rs) (cons (apply decrease-handler rs) acs)))) + (lambda (acs . rs) (cons (apply start-task rs) acs)) + (if on-task-exit + (lambda (acs . rs) (cons (apply on-task-exit rs) acs)) + default-on-task-exit) + #:task-supervisor task-supervisor)) (spawn #:name name demand-matcher-handle-event d @@ -138,47 +223,6 @@ (sub (projection->pattern supply-spec) #:meta-level meta-level) (pub (projection->pattern supply-spec) #:meta-level meta-level)))) -;; (Trie (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action))) -;; Trie Projection ... -;; -> Action -;; Spawns a process that observes the given projections. Any time the -;; environment's interests change in a relevant way, calls -;; check-and-maybe-spawn-fn with the aggregate interests and the -;; projection results. If check-and-maybe-spawn-fn returns #f, -;; continues to wait; otherwise, takes the action(s) returned, and -;; quits. -(define (on-claim #:timeout-msec [timeout-msec #f] - #:on-timeout [timeout-handler (lambda () '())] - #:name [name #f] - check-and-maybe-spawn-fn - base-interests - . projections) - (define timer-id (gensym 'on-claim)) - (define (on-claim-handler e current-aggregate) - (match e - [(? patch? p) - (define new-aggregate (update-interests current-aggregate p)) - (define projection-results - (map (lambda (p) (trie-project/set #:take (projection-arity p) new-aggregate p)) - projections)) - (define maybe-spawn (apply check-and-maybe-spawn-fn - new-aggregate - projection-results)) - (if maybe-spawn - (quit maybe-spawn) - (transition new-aggregate '()))] - [(message (timer-expired (== timer-id) _)) - (quit (timeout-handler))] - [_ #f])) - (list - (when timeout-msec (message (set-timer timer-id timeout-msec 'relative))) - (spawn #:name name - on-claim-handler - trie-empty - (patch-seq (patch base-interests trie-empty) - (patch-seq* (map (lambda (p) (sub projection->pattern)) projections)) - (sub (timer-expired timer-id ?)))))) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (pretty-print-demand-matcher s [p (current-output-port)]) @@ -186,13 +230,16 @@ supply-spec _demand-arity _supply-arity - _increase-handler - _decrease-handler + _start-task + _on-task-exit current-demand - current-supply) + current-supply + _task-supervisor + supervision-states) s) (fprintf p "DEMAND MATCHER:\n") (fprintf p " - demand-spec: ~v\n" demand-spec) (fprintf p " - supply-spec: ~v\n" supply-spec) (fprintf p " - demand: ~v\n" current-demand) - (fprintf p " - supply: ~v\n" current-supply)) + (fprintf p " - supply: ~v\n" current-supply) + (fprintf p " - supervision-states: ~v\n" supervision-states))