progress on flink

This commit is contained in:
Sam Caldwell 2019-05-21 17:23:45 -04:00
parent 96e9431e15
commit deca0a82be
1 changed files with 68 additions and 60 deletions

View File

@ -130,7 +130,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(TaskManager ID Int)
(Observe (TaskManager ★/t ★/t))
(Job ID (List PendingTask))
(Observe (Job ★/t ★/t))))
(Observe (Job ★/t ★/t))
(JobFinished ID TaskResult)))
;; ---------------------------------------------------------------------------------------------------
;; Util Macros
@ -328,66 +329,96 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(during (job (bind job-id ID) (bind tasks (List PendingTask)))
(log "JM receives job ~a" job-id)
(define n-r/r (partition-ready-tasks tasks))
(define ready (select 1 n-r/r))
(define not-ready (select 0 n-r/r))
#;(define-values (ready not-ready) (partition task-ready? tasks))
(define-tuple (not-ready ready) (partition-ready-tasks tasks))
(field [ready-tasks (List ConcreteTask) ready]
[waiting-tasks (List PendingTask) not-ready]
[tasks-in-progress Int 0])
#;(begin/dataflow
(begin/dataflow
(define slots (slots-available))
(define-values (ts readys)
(split-at/lenient (ready-tasks) slots))
(define-tuple (ts readys)
(split-at/lenient (ref ready-tasks) slots))
(for ([t ts])
(perform-task t push-results))
#f
#;(perform-task t push-results))
(unless (empty? ts)
;; the empty? check may be necessary to avoid a dataflow loop
(ready-tasks readys)))
(set! ready-tasks readys)))
;; Task -> Void
#;(define (add-ready-task! t)
(define (add-ready-task! [t : ConcreteTask])
;; TODO - use functional-queue.rkt from ../../
(log "JM marks task ~a as ready" (task-id t))
(ready-tasks (cons t (ready-tasks))))
(match-define (task (bind tid TaskID) discard) t)
(log "JM marks task ~a as ready" tid)
(set! ready-tasks (cons t (ref ready-tasks))))
;; ID Data -> Void
;; Update any dependent tasks with the results of the given task, moving
;; them to the ready queue when possible
(define (push-results [task-id : TaskID]
[data : TaskResult])
(cond
[(and (zero? (ref tasks-in-progress))
(empty? (ref ready-tasks))
(empty? (ref waiting-tasks)))
(log "JM finished with job ~a" job-id)
(start-facet done (assert (job-finished job-id data)))]
[else
;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done.
(define still-waiting
(for/fold ([ts : (List PendingTask) (list)])
([t (ref waiting-tasks)])
(define t+ (task+data t task-id data))
(match (task-ready? t+)
[(some (bind ready ConcreteTask))
(add-ready-task! ready)
ts]
[discard
(cons t+ ts)])))
(set! waiting-tasks still-waiting)]))
;; Task (ID TaskResult -> Void) -> Void
;; Requires (task-ready? t)
#;(define (perform-task t k)
(react
(define task-facet (current-facet-id))
(on-start (tasks-in-progress (add1 (tasks-in-progress))))
(on-stop (tasks-in-progress (sub1 (tasks-in-progress))))
(match-define (task this-id desc) t)
(define (perform-task [t : ConcreteTask]
[k : (→fn TaskID TaskResult (Tuple))]
-> ★/t)
(start-facet perform
(on start (set! tasks-in-progress (add1 (ref tasks-in-progress))))
(on stop (set! tasks-in-progress (sub1 (ref tasks-in-progress))))
(match-define (task (bind this-id TaskID) (bind desc ConcreteWork)) t)
(log "JM begins on task ~a" this-id)
(define (select-a-task-manager)
(react
(start-facet this-facet
(begin/dataflow
(define mngr
(for/first ([(id slots) (in-hash (task-managers))]
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
(define mngr?
(for/first ([(id slots) (ref task-managers)]
#:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0))))
id))
(when mngr
(take-slot! mngr)
(stop-current-facet (assign-task mngr))))))
(match mngr?
[(some (bind mngr ID))
(take-slot! mngr)
(stop this-facet
#;(assign-task mngr))]
[none
#f])
#f)))
;; ID -> ...
(define (assign-task mngr)
(react
(define this-facet (current-facet-id))
(on (retracted (task-manager mngr _))
(define (assign-task [mngr : ID])
(start-facet this-facet
(on (retracted (task-manager mngr discard))
;; our task manager has crashed
(stop-current-facet (select-a-task-manager)))
(on-start
(stop this-facet (select-a-task-manager)))
(on start
;; N.B. when this line was here, and not after `(when mngr ...)` above,
;; things didn't work. I think that due to script scheduling, all ready
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(react (stop-when (asserted (task-state mngr job-id this-id _))
(received-answer! mngr)))
(task-assigner t job-id mngr
(start-facet take-slot
(stop-when (asserted (task-state mngr job-id this-id discard))
(received-answer! mngr)))
#;(task-assigner t job-id mngr
(lambda ()
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
@ -396,32 +427,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(stop-facet this-facet (select-a-task-manager)))
(lambda (results)
(log "JM receives the results of task ~a" this-id)
(stop-facet task-facet (k this-id results)))))))
(stop-facet perform (k this-id results)))))))
(on-start (select-a-task-manager))))
(on start (select-a-task-manager))))
;; ID Data -> Void
;; Update any dependent tasks with the results of the given task, moving
;; them to the ready queue when possible
#;(define (push-results task-id data)
(cond
[(and (zero? (tasks-in-progress))
(empty? (ready-tasks))
(empty? (waiting-tasks)))
(log "JM finished with job ~a" job-id)
(react (assert (job-finished job-id data)))]
[else
;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done.
(define still-waiting
(for/fold ([ts '()])
([t (in-list (waiting-tasks))])
(define t+ (task+data t task-id data))
(cond
[(task-ready? t+)
(add-ready-task! t+)
ts]
[else
(cons t+ ts)])))
(waiting-tasks still-waiting)]))
#f))))