examples/flink: merge task assignment and delegation protocols

This commit is contained in:
Sam Caldwell 2019-02-26 15:48:26 -05:00
parent 22bd143025
commit 0da903e438
1 changed files with 36 additions and 40 deletions

View File

@ -64,17 +64,17 @@ assert their presence with (task-runner ID Status), where Status is one of
(struct executing (id) #:transparent)
#|
Task Execution Protocol
Task Delegation Protocol
-----------------------
When the JobManager receives a Job, it assigns its constituent Tasks to the
TaskManagers, subject to TM availability and the readiness of each Task.
Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
The JM asserts the association of a Task with a particular TM
through (submitted-task ID Task), where ID identifies the TM.
TODO - also need to correlate Job ID through here.
A TaskAssigner asserts the association of a Task with a particular TaskPerformer
through
(task-assignment ID ID Task)
where the first ID identifies the TP and the second identifies the job.
|#
(assertion-struct submitted-task (manager task))
(assertion-struct task-assignment (assignee job-id task))
#|
A Task is a (task ID Work), where Work is one of
- (map-work String)
@ -89,33 +89,27 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
(struct reduce-work (left right) #:transparent)
#|
A TaskManager responds to a submitted-task by describing its state with respect
to that task, (task-state ID TaskStateDesc), where ID is that of the
submitted-task (TODO - that doesn't seem like enough). TaskStateDesc is one of
- ACCEPTED, when the TM can assign the Task to an available JR (TODO - not sure if this is ever visible, currently)
- OVERLOAD, when the TM does not have the resources to perform the task.
- RUNNING, indicating that the task has successfully been delegated to a JR
The TaskPerformer responds to a task-assignment by describing its state with respect
to that task,
(task-state ID ID ID TaskStateDesc)
where the first ID is that of the TP, the second is that of the job,
and the third that of the task.
A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
- OVERLOAD, when the TP does not have the resources to perform the task.
- RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results
|#
(assertion-struct task-state (id desc))
(assertion-struct task-state (assignee job-id task-id desc))
(struct finished (data) #:transparent)
(define ACCEPTED 'accepted)
(define RUNNING 'running)
#|
Upon receipt, a TaskManager selects a TaskRunner to perform it. The TM asserts
the association, (run-task ID Task), where ID is that of the given TaskRunner.
The TaskRunner shares its reaction to a task assignment with an assertion,
(task-execution-state Task ExecutionState). (TODO - the TR ID should be in there!).
The ExecutionState is one of
- RUNNING, indicating that the TR has accepted and is executing the task
- OVERLOAD, when the TR is overloaded
- (finished TaskResult), describing the results
TODO - merge this with TaskStateDesc
TODO TODO - merge the JR/TM and TM/TR task protocols with one another - TODO TODO
Two instances of the Task Delegation Protocol take place: one between the
JobManager and the TaskManager, and one between the TaskManager and its
TaskRunners.
|#
(assertion-struct run-task (id task))
(assertion-struct task-execution-state (task state))
#|
Job Submission Protocol
@ -140,17 +134,17 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define id (gensym 'task-runner))
(spawn #:name id
(field [status IDLE])
(define (idle?) (equal? IDLE (status)))
(assert (task-runner id (status)))
(begin/dataflow
(log "task-runner ~v state is: ~a" id (status)))
;; this only does map tasks atm
(during (run-task id (task $tid $desc))
(field [execution-state (if (equal? IDLE (status)) RUNNING OVERLOAD)]
(during (task-assignment id $job-id (task $tid $desc))
(field [execution-state (if (idle?) RUNNING OVERLOAD)]
[word-count (hash)])
;; TODO - may need to include more correlation info in here to properly describe state when overloaded
(assert (task-execution-state tid (execution-state)))
(assert (task-state id job-id tid (execution-state)))
;; I think we have to avoid asking a non-idle runner to do anything
(when (equal? IDLE (status))
(when (idle?)
(on-stop (status IDLE)))
(on-start
(when (equal? IDLE (status))
@ -215,7 +209,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
#:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id))
(assert (task-manager id (set-count (idle-runners))))
(field [busy-runners (list)])
(during (submitted-task id $t)
(during (task-assignment id $job-id $t)
(match-define (task task-id desc) t)
#;(on-start (log "TM receives task ~a" task-id))
(log "TM receives task ~a" task-id)
@ -223,7 +217,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(when (= task-id 6)
(log "TM idle-runners: ~a" (idle-runners))))
(field [status ACCEPTED])
(assert (task-state task-id (status)))
;; TODO - could delegate this assertion, in the non-overloaded case, to TaskRunner
;; (also removing the first id from task-state)
(assert (task-state id job-id task-id (status)))
(cond
[(set-empty? (idle-runners))
(log "TM can't run ~a right now" task-id)
@ -235,9 +231,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; Could use the busy-runners field to avoid that
(idle-runners (set-remove (idle-runners) runner))
(log "TM assigns task ~a to runner ~a" task-id runner)
(assert (run-task runner t))
(assert (task-assignment runner job-id t))
(status RUNNING)
(on (asserted (task-execution-state task-id $state))
(on (asserted (task-state runner job-id task-id $state))
(match state
[(== RUNNING)
;; nothing to do
@ -322,14 +318,14 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
id))
(when mngr
(take-slot! mngr)
(react (stop-when (asserted (task-state this-id _))
(react (stop-when (asserted (task-state mngr job-id this-id _))
(received-answer! mngr)))
(task-mngr mngr))))
;; TODO - should respond if task manager dies
(assert #:when (task-mngr)
(submitted-task (task-mngr) t))
(task-assignment (task-mngr) job-id t))
(on #:when (task-mngr)
(asserted (task-state this-id $state))
(asserted (task-state (task-mngr) job-id this-id $state))
(match state
[(== ACCEPTED)
#f]
@ -414,7 +410,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(spawn
(during (job-manager-alive)
(during (task-manager $tm-id _)
(define/query-set requests (submitted-task tm-id (task $tid _)) tid)
(define/query-set requests (task-assignment tm-id _ (task $tid _)) tid)
(field [high-water-mark 0])
(on (asserted (task-manager tm-id $slots))
(when (> slots (high-water-mark))