Update Racket demand-matcher to match improvements in the js version.

This commit is contained in:
Tony Garnock-Jones 2016-05-14 17:07:50 -04:00
parent c7d91ac37f
commit 7a4f1d8931
1 changed files with 135 additions and 88 deletions

View File

@ -10,46 +10,105 @@
(provide (except-out (struct-out demand-matcher) demand-matcher) (provide (except-out (struct-out demand-matcher) demand-matcher)
(rename-out [make-demand-matcher demand-matcher]) (rename-out [make-demand-matcher demand-matcher])
demand-matcher-update demand-matcher-update
spawn-demand-matcher spawn-demand-matcher)
on-claim)
;; A DemandMatcher keeps track of demand for services based on some ;; A DemandMatcher keeps track of demand for services based on some
;; Projection over a Trie, as well as a collection of functions ;; Projection over a Trie, as well as a collection of functions
;; that can be used to increase supply in response to increased ;; that can be used to increase supply in response to increased
;; demand, or handle a sudden drop in supply for which demand still ;; demand, or handle a sudden drop in supply for which demand still
;; exists. ;; exists.
(struct demand-matcher (demand-spec ;; CompiledProjection (struct demand-matcher (demand-spec ;; CompiledProjection
supply-spec ;; CompiledProjection supply-spec ;; CompiledProjection
demand-spec-arity ;; Natural demand-spec-arity ;; Natural
supply-spec-arity ;; Natural supply-spec-arity ;; Natural
increase-handler ;; ChangeHandler start-task ;; TaskCallback
decrease-handler ;; ChangeHandler on-task-exit ;; TaskCallback
current-demand ;; (Setof (Listof Any)) current-demand ;; (Setof (Listof Any))
current-supply) ;; (Setof (Listof Any)) current-supply ;; (Setof (Listof Any))
task-supervisor ;; TaskSupervisor
supervision-states) ;; (Hashtable (Listof Any) Any)
#:transparent #:transparent
#:methods gen:syndicate-pretty-printable #:methods gen:syndicate-pretty-printable
[(define (syndicate-pretty-print s [p (current-output-port)]) [(define (syndicate-pretty-print s [p (current-output-port)])
(pretty-print-demand-matcher s p))]) (pretty-print-demand-matcher s p))])
;; A ChangeHandler is a ((Constreeof Action) Any* -> (Constreeof Action)). ;; A TaskCallback is a ((Constreeof Action) Any* -> (Constreeof Action)).
;; It is called with an accumulator of actions so-far-computed as its ;; It is called with an accumulator of actions so-far-computed as its
;; first argument, and with a value for each capture in the ;; first argument, and with a value for each capture in the
;; DemandMatcher's projection as the remaining arguments. ;; DemandMatcher's projection as the remaining arguments.
;; ChangeHandler ;; A TaskRunState describes demand for or supply of some task at the
;; Default handler of unexpected supply decrease. ;; time of taking a transition where some aspect of the supervisor's
(define (default-decrease-handler state . removed-captures) ;; relationship to a particular task has changed, and is one of
state) ;; - 'low, meaning no demand or supply exists
;; - 'rising, demand or supply has just appeared
;; - 'high, demand or supply exists
;; - 'falling, demand or supply has just disappeared
(define (make-demand-matcher demand-spec supply-spec increase-handler decrease-handler) ;; A TaskSupervisor is a
;; ((Listof Any) TaskRunState TaskRunState Any TaskCallback TaskCallback (Constreeof Action)
;; -> (Transition Any))
(struct task-state (supply-increase-expected? supply-decrease-expected?) #:prefab)
(define default-initial-supervision-state (task-state #f #f))
;; TaskSupervisor. See ../../doc/demand-matcher.md.
(define (default-task-supervisor captures
demand-state
supply-state
supervision-state
start-task
on-task-exit
actions0)
(match-define (task-state old-i old-d) (or supervision-state default-initial-supervision-state))
(let*-values (((new-i) old-i)
((new-d) old-d)
((actions) actions0)
((new-i actions) (if (and (not old-i)
(or (eq? demand-state 'rising)
(and (eq? demand-state 'high)
(or (eq? supply-state 'low)
(eq? supply-state 'falling)))))
(if (and (eq? demand-state 'high)
(not old-d))
(values new-i (apply on-task-exit actions captures))
(values #t (apply start-task actions captures)))
(values new-i actions)))
((new-i) (if (eq? supply-state 'rising)
#f
new-i))
((new-d) (if (and (eq? demand-state 'falling)
(or (eq? supply-state 'rising)
(eq? supply-state 'high)
old-i))
#t
new-d))
((new-d) (if (eq? supply-state 'falling)
#f
new-d)))
(transition (and (or new-i new-d)
(task-state new-i new-d))
actions)))
(define (default-on-task-exit s . captures)
(log-error "demand-matcher: Unexpected drop in supply ~v" captures)
s)
(define (make-demand-matcher demand-spec supply-spec start-task
[on-task-exit default-on-task-exit]
#:task-supervisor [task-supervisor default-task-supervisor])
(demand-matcher demand-spec (demand-matcher demand-spec
supply-spec supply-spec
(projection-arity demand-spec) (projection-arity demand-spec)
(projection-arity supply-spec) (projection-arity supply-spec)
increase-handler start-task
decrease-handler on-task-exit
(set) (set)
(set))) (set)
task-supervisor
(hash)))
(define (ensure-non-wild s kind spec direction t) (define (ensure-non-wild s kind spec direction t)
(when (not s) (when (not s)
@ -60,19 +119,31 @@
direction direction
(trie->pretty-string t)))) (trie->pretty-string t))))
(define (compute-state tasks added-tasks removed-tasks captures)
(define present? (set-member? tasks captures))
(define changing? (or (set-member? added-tasks captures)
(set-member? removed-tasks captures)))
(match* (present? changing?)
[(#f #f) 'low]
[(#f #t) 'rising]
[(#t #f) 'high]
[(#t #t) 'falling]))
;; DemandMatcher (Constreeof Action) Patch -> (Transition DemandMatcher) ;; DemandMatcher (Constreeof Action) Patch -> (Transition DemandMatcher)
;; Given a Patch from the environment, projects it into supply and ;; Given a Patch from the environment, projects it into supply and
;; demand increase and decrease sets. Calls ChangeHandlers in response ;; demand increase and decrease sets. Calls ChangeHandlers in response
;; to increased unsatisfied demand and decreased demanded supply. ;; to increased unsatisfied demand and decreased demanded supply.
(define (demand-matcher-update d s p) (define (demand-matcher-update d actions0 p)
(match-define (demand-matcher demand-spec (match-define (demand-matcher demand-spec
supply-spec supply-spec
demand-arity demand-arity
supply-arity supply-arity
inc-h start-task
dec-h on-task-exit
demand demand
supply) d) supply
task-supervisor
supervision-states) d)
(define-values (added-demand removed-demand) (define-values (added-demand removed-demand)
(patch-project/set #:take demand-arity p demand-spec)) (patch-project/set #:take demand-arity p demand-spec))
(define-values (added-supply removed-supply) (define-values (added-supply removed-supply)
@ -92,18 +163,34 @@
(set! added-supply (set-subtract added-supply overlap)) (set! added-supply (set-subtract added-supply overlap))
(set! removed-supply (set-subtract removed-supply overlap))) (set! removed-supply (set-subtract removed-supply overlap)))
(set! supply (set-union supply added-supply)) (define all-changing-tasks
(set! demand (set-subtract demand removed-demand)) (set-union added-demand removed-demand added-supply removed-supply))
(for [(captures (in-set removed-supply))] (define-values (new-supervision-states actions)
(when (set-member? demand captures) (set! s (apply dec-h s captures)))) (for/fold [(supervision-states supervision-states) (actions actions0)]
(for [(captures (in-set added-demand))] [(captures (in-set all-changing-tasks))]
(when (not (set-member? supply captures)) (set! s (apply inc-h s captures)))) (define demand-state (compute-state demand added-demand removed-demand captures))
(define supply-state (compute-state supply added-supply removed-supply captures))
(match-define (transition new-supervision-state new-actions)
(task-supervisor captures
demand-state
supply-state
(hash-ref supervision-states captures #f)
start-task
on-task-exit
actions))
(values (if new-supervision-state
(hash-set supervision-states captures new-supervision-state)
(hash-remove supervision-states captures))
(cons actions new-actions))))
(set! supply (set-subtract supply removed-supply)) (transition (struct-copy demand-matcher d
(set! demand (set-union demand added-demand)) [current-demand
(set-subtract (set-union demand added-demand) removed-demand)]
(transition (struct-copy demand-matcher d [current-demand demand] [current-supply supply]) s)) [current-supply
(set-subtract (set-union supply added-supply) removed-supply)]
[supervision-states new-supervision-states])
actions))
;; Behavior :> (Option Event) DemandMatcher -> (Transition DemandMatcher) ;; Behavior :> (Option Event) DemandMatcher -> (Transition DemandMatcher)
;; Handles events from the environment. Only cares about routing-updates. ;; Handles events from the environment. Only cares about routing-updates.
@ -113,24 +200,22 @@
(demand-matcher-update d '() p)] (demand-matcher-update d '() p)]
[_ #f])) [_ #f]))
;; Any* -> (Constreeof Action)
;; Default handler of unexpected supply decrease.
;; Ignores the situation.
(define (unexpected-supply-decrease . removed-captures)
'())
;; Projection Projection (Any* -> (Constreeof Action)) [(Any* -> (Constreeof Action))] -> Action ;; Projection Projection (Any* -> (Constreeof Action)) [(Any* -> (Constreeof Action))] -> Action
;; Spawns a demand matcher actor. ;; Spawns a demand matcher actor.
(define (spawn-demand-matcher demand-spec (define (spawn-demand-matcher demand-spec
supply-spec supply-spec
increase-handler start-task
[decrease-handler unexpected-supply-decrease] [on-task-exit #f]
#:task-supervisor [task-supervisor default-task-supervisor]
#:name [name #f] #:name [name #f]
#:meta-level [meta-level 0]) #:meta-level [meta-level 0])
(define d (make-demand-matcher (prepend-at-meta demand-spec meta-level) (define d (make-demand-matcher (prepend-at-meta demand-spec meta-level)
(prepend-at-meta supply-spec meta-level) (prepend-at-meta supply-spec meta-level)
(lambda (acs . rs) (cons (apply increase-handler rs) acs)) (lambda (acs . rs) (cons (apply start-task rs) acs))
(lambda (acs . rs) (cons (apply decrease-handler rs) acs)))) (if on-task-exit
(lambda (acs . rs) (cons (apply on-task-exit rs) acs))
default-on-task-exit)
#:task-supervisor task-supervisor))
(spawn #:name name (spawn #:name name
demand-matcher-handle-event demand-matcher-handle-event
d d
@ -138,47 +223,6 @@
(sub (projection->pattern supply-spec) #:meta-level meta-level) (sub (projection->pattern supply-spec) #:meta-level meta-level)
(pub (projection->pattern supply-spec) #:meta-level meta-level)))) (pub (projection->pattern supply-spec) #:meta-level meta-level))))
;; (Trie (Option (Setof (Listof Value))) ... -> (Option (Constreeof Action)))
;; Trie Projection ...
;; -> Action
;; Spawns a process that observes the given projections. Any time the
;; environment's interests change in a relevant way, calls
;; check-and-maybe-spawn-fn with the aggregate interests and the
;; projection results. If check-and-maybe-spawn-fn returns #f,
;; continues to wait; otherwise, takes the action(s) returned, and
;; quits.
(define (on-claim #:timeout-msec [timeout-msec #f]
#:on-timeout [timeout-handler (lambda () '())]
#:name [name #f]
check-and-maybe-spawn-fn
base-interests
. projections)
(define timer-id (gensym 'on-claim))
(define (on-claim-handler e current-aggregate)
(match e
[(? patch? p)
(define new-aggregate (update-interests current-aggregate p))
(define projection-results
(map (lambda (p) (trie-project/set #:take (projection-arity p) new-aggregate p))
projections))
(define maybe-spawn (apply check-and-maybe-spawn-fn
new-aggregate
projection-results))
(if maybe-spawn
(quit maybe-spawn)
(transition new-aggregate '()))]
[(message (timer-expired (== timer-id) _))
(quit (timeout-handler))]
[_ #f]))
(list
(when timeout-msec (message (set-timer timer-id timeout-msec 'relative)))
(spawn #:name name
on-claim-handler
trie-empty
(patch-seq (patch base-interests trie-empty)
(patch-seq* (map (lambda (p) (sub projection->pattern)) projections))
(sub (timer-expired timer-id ?))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define (pretty-print-demand-matcher s [p (current-output-port)]) (define (pretty-print-demand-matcher s [p (current-output-port)])
@ -186,13 +230,16 @@
supply-spec supply-spec
_demand-arity _demand-arity
_supply-arity _supply-arity
_increase-handler _start-task
_decrease-handler _on-task-exit
current-demand current-demand
current-supply) current-supply
_task-supervisor
supervision-states)
s) s)
(fprintf p "DEMAND MATCHER:\n") (fprintf p "DEMAND MATCHER:\n")
(fprintf p " - demand-spec: ~v\n" demand-spec) (fprintf p " - demand-spec: ~v\n" demand-spec)
(fprintf p " - supply-spec: ~v\n" supply-spec) (fprintf p " - supply-spec: ~v\n" supply-spec)
(fprintf p " - demand: ~v\n" current-demand) (fprintf p " - demand: ~v\n" current-demand)
(fprintf p " - supply: ~v\n" current-supply)) (fprintf p " - supply: ~v\n" current-supply)
(fprintf p " - supervision-states: ~v\n" supervision-states))