examples/flink: avoid asking the task manager to do more than it is

capable of
This commit is contained in:
Sam Caldwell 2019-02-21 15:10:42 -05:00
parent 702c53f7d1
commit a98ba7baab
1 changed files with 47 additions and 10 deletions

View File

@ -123,6 +123,13 @@
(assertion-struct task-manager (id slots))
(assertion-struct submitted-task (manager task))
(assertion-struct job-manager-alive ())
;; a TaskState is (task-state ID TaskStateDesc)
;; where TaskStateDesc is one of
;; - ACCEPTED
;; - OVERLOAD
;; - RUNNING
;; - (finished data)
(assertion-struct task-state (id desc))
;; task states
@ -193,17 +200,27 @@
(spawn
(assert (job-manager-alive))
(log "Job Manager Up")
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
(define/query-hash task-managers (task-manager $id $slots) id slots
#:on-add (log "JM learns that ~a has ~v slots" id slots))
;; (Hashof TaskManagerID Nat)
;; to better understand the supply of slots for each task manager, keep track of the number
;; of requested tasks that we have yet to hear back about
(field [requests-in-flight (hash)])
(define (slots-available)
(for/sum ([v (in-hash-values (task-managers))])
v))
(for/sum ([(id v) (in-hash (task-managers))])
(max 0 (- v (hash-ref (requests-in-flight) id 0)))))
;; ID -> Void
;; mark that we have requested the given task manager to perform a task
(define (take-slot! id)
;; make local changes to task-managers to reflect tasks delegated in the current turn
(log "JM assigns a task to ~a" id)
(task-managers (hash-update (task-managers) id sub1)))
(requests-in-flight (hash-update (requests-in-flight) id add1 0)))
;; ID -> Void
;; mark that we have heard back from the given manager about a requested task
(define (received-answer! id)
(requests-in-flight (hash-update (requests-in-flight) id sub1)))
(during (job $job-id $tasks)
(log "JM receives job ~a" job-id)
@ -229,7 +246,6 @@
(log "JM marks task ~a as ready" (task-id t))
(ready-tasks (cons t (ready-tasks))))
;; need to parcel out tasks
;; Task (ID TaskResult -> Void) -> Void
;; Requires (task-ready? t)
(define (perform-task t k)
@ -245,10 +261,12 @@
(unless (task-mngr)
(define mngr
(for/first ([(id slots) (in-hash (task-managers))]
#:unless (zero? slots))
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
id))
(when mngr
(take-slot! mngr)
(react (stop-when (asserted (task-state this-id _))
(received-answer! mngr)))
(task-mngr mngr))))
;; TODO - should respond if task manager dies
(assert #:when (task-mngr)
@ -256,6 +274,11 @@
(on #:when (task-mngr)
(asserted (task-state this-id $state))
(match state
[(== ACCEPTED)
#f]
[(== RUNNING)
;; nothing to do
#f]
[(== OVERLOAD)
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
@ -265,10 +288,7 @@
[(finished results)
;; TODO - guess-timation of what this should look like
(log "JM receives the results of task ~a" this-id)
(stop-current-facet (k this-id results))]
[_
;; TODO - needs more data?
#f]))))
(stop-current-facet (k this-id results))]))))
;; ID Data -> Void
;; Update any dependent tasks with the results of the given task, moving
@ -324,6 +344,22 @@
(define (split-at/lenient lst n)
(split-at lst (min n (length lst))))
;; ---------------------------------------------------------------------------------------------------
;; Observe interaction between task and job manager
(define (spawn-observer)
(spawn
(during (job-manager-alive)
(during (task-manager $tm-id _)
(define/query-set requests (submitted-task tm-id (task $tid _)) tid)
(field [high-water-mark 0])
(on (asserted (task-manager tm-id $slots))
(when (> slots (high-water-mark))
(high-water-mark slots)))
(begin/dataflow
(when (> (set-count (requests)) (high-water-mark))
(log "!! DEMAND > SUPPLY !!")))))))
;; ---------------------------------------------------------------------------------------------------
;; Creating a Job
@ -469,3 +505,4 @@
(spawn-task-manager)
(spawn-task-runner)
(spawn-task-runner)
(spawn-observer)