diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index e8fa5a5..2601888 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -321,7 +321,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (task id (reduce-work (left l) (left r)))])) (assertion-struct assigned-task : SelectedTM (mngr)) -(message-struct tasks-finished : TasksFinished (id)) +(message-struct tasks-finished : TasksFinished (id results)) (define (spawn-job-manager) (spawn τc @@ -378,8 +378,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (empty? (ref ready-tasks)) (empty? (ref waiting-tasks))) (log "JM finished with job ~a" job-id) - (realize! (tasks-finished job-id)) - (start-facet done (assert (job-finished job-id data)))] + (realize! (tasks-finished job-id data))] [else ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. (define still-waiting @@ -469,8 +468,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (on start (start-facet delegate-tasks - (on (realize (tasks-finished job-id)) - (stop delegate-tasks)) + (on (realize (tasks-finished job-id $data:TaskResult)) + (stop delegate-tasks + (start-facet done (assert (job-finished job-id data))))) (begin/dataflow (define slots (slots-available)) (define-tuple (ts readys)