diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 58a841c..1edc460 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -407,6 +407,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; ID -> ... (define (assign-task [mngr : ID]) (start-facet this-facet + (assert (task-assignment mngr job-id t)) (on (retracted (task-manager mngr discard)) ;; our task manager has crashed (stop this-facet (select-a-task-manager))) @@ -417,19 +418,21 @@ The JobManager then performs the job and, when finished, asserts (job-finished I #;(take-slot! mngr) (start-facet take-slot (stop-when (asserted (task-state mngr job-id this-id discard)) - (received-answer! mngr))) - #;(task-assigner t job-id mngr - (lambda () - ;; need to find a new task manager - ;; don't think we need a release-slot! here, because if we've heard back from a task manager, - ;; they should have told us a different slot count since we tried to give them work - (log "JM overloaded manager ~a with task ~a" mngr this-id) - (stop-facet this-facet (select-a-task-manager))) - (lambda (results) - (log "JM receives the results of task ~a" this-id) - (stop-facet perform (k this-id results))))))) + (received-answer! mngr)))) + (on (asserted (task-state mngr job-id this-id (bind status TaskStateDesc))) + (match status + [ACCEPTED #f] + [RUNNING #f] + [OVERLOAD/ts + ;; need to find a new task manager + ;; don't think we need a release-slot! here, because if we've heard back from a task manager, + ;; they should have told us a different slot count since we tried to give them work + (log "JM overloaded manager ~a with task ~a" mngr this-id) + (stop this-facet (select-a-task-manager))] + [(finished (bind results TaskResult)) + (log "JM receives the results of task ~a" this-id) + (stop perform (k this-id results))])))) (on start (select-a-task-manager)))) #f)))) -