typed flink: unify task-state and task-assignment, job and job-finished

This commit is contained in:
Sam Caldwell 2019-10-21 12:22:10 -04:00
parent 18fdcdeff7
commit 5823cf32c3
1 changed files with 37 additions and 36 deletions

View File

@ -50,12 +50,12 @@ Task Delegation Protocol
Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
A TaskAssigner asserts the association of a Task with a particular TaskPerformer
(task-assignment ID Task)
A TaskAssigner requests the performance of a Task with a particular TaskPerformer
through the assertion of interest
(observe (task-performance ID Task ))
where the ID identifies the TP
(assertion-struct task-assignment : TaskAssignment (assignee task))
(assertion-struct task-performance : TaskPerformance (assignee task desc))
A Task is a (task TaskID Work), where Work is one of
- (map-work String)
@ -91,9 +91,9 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
(define-type-alias ConcreteTask
(Task TaskID ConcreteWork))
The TaskPerformer responds to a task-assignment by describing its state with respect
The TaskPerformer responds to a request by describing its state with respect
to that task,
(task-state TaskID TaskStateDesc)
(task-performance ID Task TaskStateDesc)
A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
@ -101,7 +101,6 @@ A TaskStateDesc is one of
- RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results
(assertion-struct task-state : TaskState (task-id desc))
(define-constructor* (finished : Finished data))
(define-type-alias TaskStateDesc
(U Symbol (Finished TaskResult)))
@ -117,44 +116,46 @@ TaskRunners.
(define-type-alias TaskAssigner
(Role (assign)
(Shares (TaskAssignment ID ConcreteTask))
(Shares (Observe (TaskPerformance ID ConcreteTask ★/t)))
;; would be nice to say how the TaskIDs relate to each other
(Reacts (Asserted (TaskState TaskID ★/t))
(Reacts (Asserted (TaskPerformance ID ConcreteTask ★/t))
(Branch (Stop assign)
(define-type-alias TaskPerformer
(Role (listen)
(During (TaskAssignment ID ConcreteTask)
(During (Observe (TaskPerformance ID ConcreteTask ★/t))
;; would be nice to say how the IDs and TaskIDs relate to each other
(Shares (TaskState TaskID TaskStateDesc)))))
(Shares (TaskPerformance TaskID TaskStateDesc)))))
Job Submission Protocol
Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)).
The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult)
Finally, Clients submit their jobs to the JobManager by asserting interest
(observe (job-completion ID (Listof Task) ))
The JobManager then performs the job and, when finished, asserts
(job-completion ID (Listof Task) TaskResult)
(require-struct job #:as Job #:from "flink-support.rkt")
(assertion-struct job-finished : JobFinished (id data))
(assertion-struct job-completion : JobCompletion (id data result))
(define-type-alias JobDesc (Job ID (List InputTask)))
(define-type-alias τc
(U (TaskRunner ID)
(TaskAssignment ID ConcreteTask)
(Observe (TaskAssignment ID ★/t))
(TaskState TaskID TaskStateDesc)
(Observe (TaskState TaskID ★/t))
(Observe (TaskPerformance ID ConcreteTask ★/t))
(TaskPerformance ID ConcreteTask TaskStateDesc)
(Observe (Observe (TaskPerformance ID ★/t ★/t)))
(Observe (JobManagerAlive))
(Observe (TaskRunner ★/t))
(TaskManager ID Int)
(Observe (TaskManager ★/t ★/t))
(Observe (Job ★/t ★/t))
(JobFinished ID TaskResult)
(Observe (JobFinished ID ★/t))))
(JobCompletion ID (List InputTask) TaskResult)
(Observe (JobCompletion ID (List InputTask) ★/t))
(Observe (Observe (JobCompletion ★/t ★/t ★/t)))))
;; ---------------------------------------------------------------------------------------------------
;; Util Macros
@ -194,9 +195,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(assert (task-runner id))
(on (retracted (task-manager tm-id _))
(stop runner))
(during (task-assignment id (task $task-id $desc))
(during (observe (task-performance id $t _))
(match-define (task $task-id $desc) t)
(field [state TaskStateDesc ACCEPTED])
(assert (task-state task-id (ref state)))
(assert (task-performance id t (ref state)))
;; since we currently finish everything in one turn, these changes to status aren't
;; actually visible.
(set! state RUNNING)
@ -257,19 +259,19 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(error "need to call can-accept? before selecting a runner")]))
(during (task-assignment id (task $task-id $desc))
(during (observe (task-performance id $t _))
(match-define (task $task-id $desc) t)
(define status0 : TaskStateDesc
(if (can-accept?)
(field [status TaskStateDesc status0])
(assert (task-state task-id (ref status)))
(assert (task-performance id t (ref status)))
(when (can-accept?)
(define runner (select-runner))
(log "TM ~a assigns task ~a to runner ~a" id task-id runner)
(on stop (set! busy-runners (set-remove (ref busy-runners) runner)))
(assert (task-assignment runner (task task-id desc)))
(on (asserted (task-state task-id $st))
(on (asserted (task-performance runner t $st))
(match st
@ -278,6 +280,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[(finished discard)
(set! status st)])))))))))
;; ---------------------------------------------------------------------------------------------------
;; JobManager
@ -369,7 +372,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (received-answer! [id : ID])
(set! requests-in-flight (hash-update (ref requests-in-flight) id sub1)))
(during (job $job-id $tasks)
(during (observe (job-completion $job-id $tasks _))
(log "JM receives job ~a" job-id)
(define pending (for/list ([t tasks])
(input->pending-task t)))
@ -427,7 +430,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ID -> ...
(define (assign-task [mngr : ID])
(start-facet assign
(assert (task-assignment mngr t))
(know (assigned-task mngr))
(on (retracted (task-manager mngr _))
;; our task manager has crashed
@ -438,10 +440,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(start-facet take-slot
(on (asserted (task-state this-id _))
(on (asserted (task-performance mngr t _))
(stop take-slot
(received-answer! mngr)))))
(on (asserted (task-state this-id $status))
(on (asserted (task-performance mngr t $status))
(match status
@ -487,7 +489,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(start-facet delegate-tasks
(on (realize (tasks-finished job-id $data:TaskResult))
(stop delegate-tasks
(start-facet done (assert (job-finished job-id data)))))
(start-facet done (assert (job-completion job-id tasks data)))))
(define slots (slots-available))
(define-tuple (ts readys)
@ -505,9 +507,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (spawn-client [j : JobDesc])
(spawn τc
(start-facet _
(match-define (job $id _) j)
(assert j)
(on (asserted (job-finished id $data))
(match-define (job $id $tasks) j)
(on (asserted (job-completion id tasks $data))
(printf "job done!\n~a\n" data)))))
;; ---------------------------------------------------------------------------------------------------