fix bug in flink

This commit is contained in:
Sam Caldwell 2019-06-21 16:48:16 -04:00
parent 27abf8ab1e
commit 16175c7bb4
1 changed files with 5 additions and 5 deletions

View File

@ -321,7 +321,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(task id (reduce-work (left l) (left r)))]))
(assertion-struct assigned-task : SelectedTM (mngr))
(message-struct tasks-finished : TasksFinished (id))
(message-struct tasks-finished : TasksFinished (id results))
(define (spawn-job-manager)
(spawn τc
@ -378,8 +378,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(empty? (ref ready-tasks))
(empty? (ref waiting-tasks)))
(log "JM finished with job ~a" job-id)
(realize! (tasks-finished job-id))
(start-facet done (assert (job-finished job-id data)))]
(realize! (tasks-finished job-id data))]
[else
;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done.
(define still-waiting
@ -469,8 +468,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(on start
(start-facet delegate-tasks
(on (realize (tasks-finished job-id))
(stop delegate-tasks))
(on (realize (tasks-finished job-id $data:TaskResult))
(stop delegate-tasks
(start-facet done (assert (job-finished job-id data)))))
(begin/dataflow
(define slots (slots-available))
(define-tuple (ts readys)