examples/flink.rkt: work on job manager and utilities

This commit is contained in:
Sam Caldwell 2019-05-20 17:03:21 -04:00
parent fc220a4e16
commit c8a1253d7b
1 changed files with 190 additions and 5 deletions

@ -70,13 +70,15 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
(define-type-alias WordCount (Hash String Int))
(define-type-alias TaskResult WordCount)
(define-type-alias Reduce
(ReduceWork (U TaskID TaskResult)
(U TaskID TaskResult)))
(ReduceWork (Either TaskID TaskResult)
(Either TaskID TaskResult)))
(define-type-alias Work
(U Reduce (MapWork String)))
(define-type-alias ConcreteWork
(U (ReduceWork TaskResult TaskResult)
(MapWork String)))
(define-type-alias PendingTask
(Task TaskID Work))
(define-type-alias ConcreteTask
(Task TaskID ConcreteWork))
@ -121,12 +123,14 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(TaskAssignment ID ID ConcreteTask)
(Observe (TaskAssignment ID ★/t ★/t))
(TaskState ID ID TaskID TaskStateDesc)
(Observe (TaskState ID ID TaskID ★/t))
(Observe (JobManagerAlive))
(Observe (TaskRunner ★/t ★/t))
(TaskManager ID Int)
(Observe (TaskState ID ID TaskID ★/t))
(Observe (TaskManager ★/t ★/t))
(Job ID (List PendingTask))
(Observe (Job ★/t ★/t))))
;; ---------------------------------------------------------------------------------------------------
;; Util Macros
@ -147,7 +151,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(hash-update/failure h
(lambda () 0)))
(define (count-new-words [word-count : WordCount]
[words : (List String)]
@ -240,3 +244,184 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(set! status OVERLOAD/ts)]
[(finished discard)
(set! status st)]))))))))
;; ---------------------------------------------------------------------------------------------------
;; JobManager
;; Task -> Bool
;; Test if the task is ready to run
(define (task-ready? [t : PendingTask] -> (Maybe ConcreteTask))
(match t
[(task (bind tid TaskID) (map-work (bind s String)))
;; having to re-produce this is directly bc of no occurrence typing
(some (task tid (map-work s)))]
[(task (bind tid TaskID) (reduce-work (right (bind v1 TaskResult))
(right (bind v2 TaskResult))))
(some (task tid (reduce-work v1 v2)))]
;; Task Id Any -> Task
;; If the given task is waiting for this data, replace the waiting ID with the data
(define (task+data [t : PendingTask]
[id : TaskID]
[data : TaskResult]
-> PendingTask)
(match t
[(task (bind tid TaskID)
(reduce-work (left id) (bind r (Either TaskID TaskResult))))
(task tid (reduce-work (right data) r))]
[(task (bind tid TaskID)
(reduce-work (bind l (Either TaskID TaskResult)) (left id)))
(task tid (reduce-work l (right data)))]
[discard t]))
(require/typed "flink-support.rkt"
[split-at/lenient- : ( (X) (→fn (List X) Int (List (List X))))])
(define ( (X) (split-at/lenient [xs : (List X)]
[n : Int]
-> (Tuple (List X) (List X))))
(define l (split-at/lenient- xs n))
(tuple (first l) (second l)))
(define (partition-ready-tasks [tasks : (List PendingTask)]
-> (Tuple (List PendingTask)
(List ConcreteTask)))
(define part (inst partition/either PendingTask PendingTask ConcreteTask))
(part tasks
(lambda ([t : PendingTask])
(match (task-ready? t)
[(some (bind ct ConcreteTask))
(right ct)]
(left t)]))))
(define (spawn-job-manager)
(spawn τc
(start-facet jm
(assert (job-manager-alive))
(log "Job Manager Up")
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
(define/query-hash task-managers (task-manager (bind id ID) (bind slots Int)) id slots
#;#:on-add #;(log "JM learns that ~a has ~v slots" id slots))
;; (Hashof TaskManagerID Nat)
;; to better understand the supply of slots for each task manager, keep track of the number
;; of requested tasks that we have yet to hear back about
(field [requests-in-flight (Hash ID Int) (hash)])
(define (slots-available)
(for/sum ([(id v) (ref task-managers)])
(max 0 (- v (hash-ref/failure (ref requests-in-flight) id 0)))))
;; ID -> Void
;; mark that we have requested the given task manager to perform a task
(define (take-slot! [id : ID])
(log "JM assigns a task to ~a" id)
(set! requests-in-flight (hash-update/failure (ref requests-in-flight) id add1 0)))
;; ID -> Void
;; mark that we have heard back from the given manager about a requested task
(define (received-answer! [id : ID])
(set! requests-in-flight (hash-update (ref requests-in-flight) id sub1)))
(during (job (bind job-id ID) (bind tasks (List PendingTask)))
(log "JM receives job ~a" job-id)
(define n-r/r (partition-ready-tasks tasks))
(define ready (select 1 n-r/r))
(define not-ready (select 0 n-r/r))
#;(define-values (ready not-ready) (partition task-ready? tasks))
(field [ready-tasks (List ConcreteTask) ready]
[waiting-tasks (List PendingTask) not-ready]
[tasks-in-progress Int 0])
(define slots (slots-available))
(define-values (ts readys)
(split-at/lenient (ready-tasks) slots))
(for ([t ts])
(perform-task t push-results))
(unless (empty? ts)
;; the empty? check may be necessary to avoid a dataflow loop
(ready-tasks readys)))
;; Task -> Void
#;(define (add-ready-task! t)
;; TODO - use functional-queue.rkt from ../../
(log "JM marks task ~a as ready" (task-id t))
(ready-tasks (cons t (ready-tasks))))
;; Task (ID TaskResult -> Void) -> Void
;; Requires (task-ready? t)
#;(define (perform-task t k)
(define task-facet (current-facet-id))
(on-start (tasks-in-progress (add1 (tasks-in-progress))))
(on-stop (tasks-in-progress (sub1 (tasks-in-progress))))
(match-define (task this-id desc) t)
(log "JM begins on task ~a" this-id)
(define (select-a-task-manager)
(define mngr
(for/first ([(id slots) (in-hash (task-managers))]
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
(when mngr
(take-slot! mngr)
(stop-current-facet (assign-task mngr))))))
;; ID -> ...
(define (assign-task mngr)
(define this-facet (current-facet-id))
(on (retracted (task-manager mngr _))
;; our task manager has crashed
(stop-current-facet (select-a-task-manager)))
;; N.B. when this line was here, and not after `(when mngr ...)` above,
;; things didn't work. I think that due to script scheduling, all ready
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(react (stop-when (asserted (task-state mngr job-id this-id _))
(received-answer! mngr)))
(task-assigner t job-id mngr
(lambda ()
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" mngr this-id)
(stop-facet this-facet (select-a-task-manager)))
(lambda (results)
(log "JM receives the results of task ~a" this-id)
(stop-facet task-facet (k this-id results)))))))
(on-start (select-a-task-manager))))
;; ID Data -> Void
;; Update any dependent tasks with the results of the given task, moving
;; them to the ready queue when possible
#;(define (push-results task-id data)
[(and (zero? (tasks-in-progress))
(empty? (ready-tasks))
(empty? (waiting-tasks)))
(log "JM finished with job ~a" job-id)
(react (assert (job-finished job-id data)))]
;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done.
(define still-waiting
(for/fold ([ts '()])
([t (in-list (waiting-tasks))])
(define t+ (task+data t task-id data))
[(task-ready? t+)
(add-ready-task! t+)
(cons t+ ts)])))
(waiting-tasks still-waiting)]))