typed flink: unify task-state and task-assignment, job and job-finished

This commit is contained in:
Sam Caldwell 2019-10-21 12:22:10 -04:00
parent 80ebab5ed7
commit f5331eb24f
1 changed files with 37 additions and 36 deletions

View File

@ -50,12 +50,12 @@ Task Delegation Protocol
Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
A TaskAssigner asserts the association of a Task with a particular TaskPerformer
through
(task-assignment ID Task)
A TaskAssigner requests the performance of a Task with a particular TaskPerformer
through the assertion of interest
(observe (task-performance ID Task ))
where the ID identifies the TP
|#
(assertion-struct task-assignment : TaskAssignment (assignee task))
(assertion-struct task-performance : TaskPerformance (assignee task desc))
#|
A Task is a (task TaskID Work), where Work is one of
- (map-work String)
@ -91,9 +91,9 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
(define-type-alias ConcreteTask
(Task TaskID ConcreteWork))
#|
The TaskPerformer responds to a task-assignment by describing its state with respect
The TaskPerformer responds to a request by describing its state with respect
to that task,
(task-state TaskID TaskStateDesc)
(task-performance ID Task TaskStateDesc)
A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
@ -101,7 +101,6 @@ A TaskStateDesc is one of
- RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results
|#
(assertion-struct task-state : TaskState (task-id desc))
(define-constructor* (finished : Finished data))
(define-type-alias TaskStateDesc
(U Symbol (Finished TaskResult)))
@ -117,44 +116,46 @@ TaskRunners.
(define-type-alias TaskAssigner
(Role (assign)
(Shares (TaskAssignment ID ConcreteTask))
(Shares (Observe (TaskPerformance ID ConcreteTask ★/t)))
;; would be nice to say how the TaskIDs relate to each other
(Reacts (Asserted (TaskState TaskID ★/t))
(Reacts (Asserted (TaskPerformance ID ConcreteTask ★/t))
(Branch (Stop assign)
(Effs)))))
(define-type-alias TaskPerformer
(Role (listen)
(During (TaskAssignment ID ConcreteTask)
(During (Observe (TaskPerformance ID ConcreteTask ★/t))
;; would be nice to say how the IDs and TaskIDs relate to each other
(Shares (TaskState TaskID TaskStateDesc)))))
(Shares (TaskPerformance TaskID TaskStateDesc)))))
#|
Job Submission Protocol
-----------------------
Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)).
The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult)
Finally, Clients submit their jobs to the JobManager by asserting interest
(observe (job-completion ID (Listof Task) ))
The JobManager then performs the job and, when finished, asserts
(job-completion ID (Listof Task) TaskResult)
|#
(require-struct job #:as Job #:from "flink-support.rkt")
(assertion-struct job-finished : JobFinished (id data))
(assertion-struct job-completion : JobCompletion (id data result))
(define-type-alias JobDesc (Job ID (List InputTask)))
(define-type-alias τc
(U (TaskRunner ID)
(TaskAssignment ID ConcreteTask)
(Observe (TaskAssignment ID ★/t))
(TaskState TaskID TaskStateDesc)
(Observe (TaskState TaskID ★/t))
(Observe (TaskPerformance ID ConcreteTask ★/t))
(TaskPerformance ID ConcreteTask TaskStateDesc)
(Observe (Observe (TaskPerformance ID ★/t ★/t)))
(JobManagerAlive)
(Observe (JobManagerAlive))
(Observe (TaskRunner ★/t))
(TaskManager ID Int)
(Observe (TaskManager ★/t ★/t))
JobDesc
(Observe (Job ★/t ★/t))
(JobFinished ID TaskResult)
(Observe (JobFinished ID ★/t))))
(JobCompletion ID (List InputTask) TaskResult)
(Observe (JobCompletion ID (List InputTask) ★/t))
(Observe (Observe (JobCompletion ★/t ★/t ★/t)))))
;; ---------------------------------------------------------------------------------------------------
;; Util Macros
@ -194,9 +195,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(assert (task-runner id))
(on (retracted (task-manager tm-id _))
(stop runner))
(during (task-assignment id (task $task-id $desc))
(during (observe (task-performance id $t _))
(match-define (task $task-id $desc) t)
(field [state TaskStateDesc ACCEPTED])
(assert (task-state task-id (ref state)))
(assert (task-performance id t (ref state)))
;; since we currently finish everything in one turn, these changes to status aren't
;; actually visible.
(set! state RUNNING)
@ -257,19 +259,19 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[none
(error "need to call can-accept? before selecting a runner")]))
(during (task-assignment id (task $task-id $desc))
(during (observe (task-performance id $t _))
(match-define (task $task-id $desc) t)
(define status0 : TaskStateDesc
(if (can-accept?)
RUNNING
OVERLOAD/ts))
(field [status TaskStateDesc status0])
(assert (task-state task-id (ref status)))
(assert (task-performance id t (ref status)))
(when (can-accept?)
(define runner (select-runner))
(log "TM ~a assigns task ~a to runner ~a" id task-id runner)
(on stop (set! busy-runners (set-remove (ref busy-runners) runner)))
(assert (task-assignment runner (task task-id desc)))
(on (asserted (task-state task-id $st))
(on (asserted (task-performance runner t $st))
(match st
[ACCEPTED #f]
[RUNNING #f]
@ -278,6 +280,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[(finished discard)
(set! status st)])))))))))
;; ---------------------------------------------------------------------------------------------------
;; JobManager
@ -369,7 +372,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (received-answer! [id : ID])
(set! requests-in-flight (hash-update (ref requests-in-flight) id sub1)))
(during (job $job-id $tasks)
(during (observe (job-completion $job-id $tasks _))
(log "JM receives job ~a" job-id)
(define pending (for/list ([t tasks])
(input->pending-task t)))
@ -427,7 +430,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ID -> ...
(define (assign-task [mngr : ID])
(start-facet assign
(assert (task-assignment mngr t))
(know (assigned-task mngr))
(on (retracted (task-manager mngr _))
;; our task manager has crashed
@ -438,10 +440,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(start-facet take-slot
(on (asserted (task-state this-id _))
(on (asserted (task-performance mngr t _))
(stop take-slot
(received-answer! mngr)))))
(on (asserted (task-state this-id $status))
(on (asserted (task-performance mngr t $status))
(match status
[ACCEPTED #f]
[RUNNING #f]
@ -487,7 +489,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(start-facet delegate-tasks
(on (realize (tasks-finished job-id $data:TaskResult))
(stop delegate-tasks
(start-facet done (assert (job-finished job-id data)))))
(start-facet done (assert (job-completion job-id tasks data)))))
(begin/dataflow
(define slots (slots-available))
(define-tuple (ts readys)
@ -505,9 +507,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (spawn-client [j : JobDesc])
(spawn τc
(start-facet _
(match-define (job $id _) j)
(assert j)
(on (asserted (job-finished id $data))
(match-define (job $id $tasks) j)
(on (asserted (job-completion id tasks $data))
(printf "job done!\n~a\n" data)))))
;; ---------------------------------------------------------------------------------------------------