more flink

This commit is contained in:
Sam Caldwell 2019-05-22 10:24:02 -04:00
parent deca0a82be
commit 20693f234e
1 changed files with 15 additions and 12 deletions

View File

@ -407,6 +407,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ID -> ...
(define (assign-task [mngr : ID])
(start-facet this-facet
(assert (task-assignment mngr job-id t))
(on (retracted (task-manager mngr discard))
;; our task manager has crashed
(stop this-facet (select-a-task-manager)))
@ -417,19 +418,21 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
#;(take-slot! mngr)
(start-facet take-slot
(stop-when (asserted (task-state mngr job-id this-id discard))
(received-answer! mngr)))
#;(task-assigner t job-id mngr
(lambda ()
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" mngr this-id)
(stop-facet this-facet (select-a-task-manager)))
(lambda (results)
(log "JM receives the results of task ~a" this-id)
(stop-facet perform (k this-id results)))))))
(received-answer! mngr))))
(on (asserted (task-state mngr job-id this-id (bind status TaskStateDesc)))
(match status
[ACCEPTED #f]
[RUNNING #f]
[OVERLOAD/ts
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" mngr this-id)
(stop this-facet (select-a-task-manager))]
[(finished (bind results TaskResult))
(log "JM receives the results of task ~a" this-id)
(stop perform (k this-id results))]))))
(on start (select-a-task-manager))))
#f))))