From f5331eb24f8ae1b25150ebb005f89ea899b95bc5 Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Mon, 21 Oct 2019 12:22:10 -0400 Subject: [PATCH] typed flink: unify task-state and task-assignment, job and job-finished --- racket/typed/examples/roles/flink.rkt | 73 ++++++++++++++------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 5ce9345..4f3792e 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -50,12 +50,12 @@ Task Delegation Protocol Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP). -A TaskAssigner asserts the association of a Task with a particular TaskPerformer -through - (task-assignment ID Task) +A TaskAssigner requests the performance of a Task with a particular TaskPerformer +through the assertion of interest +(observe (task-performance ID Task ★)) where the ID identifies the TP |# -(assertion-struct task-assignment : TaskAssignment (assignee task)) +(assertion-struct task-performance : TaskPerformance (assignee task desc)) #| A Task is a (task TaskID Work), where Work is one of - (map-work String) @@ -91,9 +91,9 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words (define-type-alias ConcreteTask (Task TaskID ConcreteWork)) #| -The TaskPerformer responds to a task-assignment by describing its state with respect +The TaskPerformer responds to a request by describing its state with respect to that task, - (task-state TaskID TaskStateDesc) +(task-performance ID Task TaskStateDesc) A TaskStateDesc is one of - ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently) @@ -101,7 +101,6 @@ A TaskStateDesc is one of - RUNNING, indicating that the task is being performed - (finished TaskResult), describing the results |# -(assertion-struct task-state : TaskState (task-id desc)) (define-constructor* (finished : Finished data)) (define-type-alias TaskStateDesc (U Symbol (Finished TaskResult))) @@ -117,44 +116,46 @@ TaskRunners. (define-type-alias TaskAssigner (Role (assign) - (Shares (TaskAssignment ID ConcreteTask)) + (Shares (Observe (TaskPerformance ID ConcreteTask ★/t))) ;; would be nice to say how the TaskIDs relate to each other - (Reacts (Asserted (TaskState TaskID ★/t)) + (Reacts (Asserted (TaskPerformance ID ConcreteTask ★/t)) (Branch (Stop assign) (Effs))))) (define-type-alias TaskPerformer (Role (listen) - (During (TaskAssignment ID ConcreteTask) + (During (Observe (TaskPerformance ID ConcreteTask ★/t)) ;; would be nice to say how the IDs and TaskIDs relate to each other - (Shares (TaskState TaskID TaskStateDesc))))) + (Shares (TaskPerformance TaskID TaskStateDesc))))) #| Job Submission Protocol ----------------------- -Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)). -The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult) +Finally, Clients submit their jobs to the JobManager by asserting interest + (observe (job-completion ID (Listof Task) ★)) + +The JobManager then performs the job and, when finished, asserts + (job-completion ID (Listof Task) TaskResult) + |# (require-struct job #:as Job #:from "flink-support.rkt") -(assertion-struct job-finished : JobFinished (id data)) +(assertion-struct job-completion : JobCompletion (id data result)) (define-type-alias JobDesc (Job ID (List InputTask))) (define-type-alias τc (U (TaskRunner ID) - (TaskAssignment ID ConcreteTask) - (Observe (TaskAssignment ID ★/t)) - (TaskState TaskID TaskStateDesc) - (Observe (TaskState TaskID ★/t)) + (Observe (TaskPerformance ID ConcreteTask ★/t)) + (TaskPerformance ID ConcreteTask TaskStateDesc) + (Observe (Observe (TaskPerformance ID ★/t ★/t))) (JobManagerAlive) (Observe (JobManagerAlive)) (Observe (TaskRunner ★/t)) (TaskManager ID Int) (Observe (TaskManager ★/t ★/t)) - JobDesc - (Observe (Job ★/t ★/t)) - (JobFinished ID TaskResult) - (Observe (JobFinished ID ★/t)))) + (JobCompletion ID (List InputTask) TaskResult) + (Observe (JobCompletion ID (List InputTask) ★/t)) + (Observe (Observe (JobCompletion ★/t ★/t ★/t))))) ;; --------------------------------------------------------------------------------------------------- ;; Util Macros @@ -194,9 +195,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-runner id)) (on (retracted (task-manager tm-id _)) (stop runner)) - (during (task-assignment id (task $task-id $desc)) + (during (observe (task-performance id $t _)) + (match-define (task $task-id $desc) t) (field [state TaskStateDesc ACCEPTED]) - (assert (task-state task-id (ref state))) + (assert (task-performance id t (ref state))) ;; since we currently finish everything in one turn, these changes to status aren't ;; actually visible. (set! state RUNNING) @@ -257,19 +259,19 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [none (error "need to call can-accept? before selecting a runner")])) - (during (task-assignment id (task $task-id $desc)) + (during (observe (task-performance id $t _)) + (match-define (task $task-id $desc) t) (define status0 : TaskStateDesc (if (can-accept?) RUNNING OVERLOAD/ts)) (field [status TaskStateDesc status0]) - (assert (task-state task-id (ref status))) + (assert (task-performance id t (ref status))) (when (can-accept?) (define runner (select-runner)) (log "TM ~a assigns task ~a to runner ~a" id task-id runner) (on stop (set! busy-runners (set-remove (ref busy-runners) runner))) - (assert (task-assignment runner (task task-id desc))) - (on (asserted (task-state task-id $st)) + (on (asserted (task-performance runner t $st)) (match st [ACCEPTED #f] [RUNNING #f] @@ -278,6 +280,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [(finished discard) (set! status st)]))))))))) + ;; --------------------------------------------------------------------------------------------------- ;; JobManager @@ -369,7 +372,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (received-answer! [id : ID]) (set! requests-in-flight (hash-update (ref requests-in-flight) id sub1))) - (during (job $job-id $tasks) + (during (observe (job-completion $job-id $tasks _)) (log "JM receives job ~a" job-id) (define pending (for/list ([t tasks]) (input->pending-task t))) @@ -427,7 +430,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; ID -> ... (define (assign-task [mngr : ID]) (start-facet assign - (assert (task-assignment mngr t)) (know (assigned-task mngr)) (on (retracted (task-manager mngr _)) ;; our task manager has crashed @@ -438,10 +440,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; tasks were being assigned to the manager #;(take-slot! mngr) (start-facet take-slot - (on (asserted (task-state this-id _)) + (on (asserted (task-performance mngr t _)) (stop take-slot (received-answer! mngr))))) - (on (asserted (task-state this-id $status)) + (on (asserted (task-performance mngr t $status)) (match status [ACCEPTED #f] [RUNNING #f] @@ -487,7 +489,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (start-facet delegate-tasks (on (realize (tasks-finished job-id $data:TaskResult)) (stop delegate-tasks - (start-facet done (assert (job-finished job-id data))))) + (start-facet done (assert (job-completion job-id tasks data))))) (begin/dataflow (define slots (slots-available)) (define-tuple (ts readys) @@ -505,9 +507,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (spawn-client [j : JobDesc]) (spawn τc (start-facet _ - (match-define (job $id _) j) - (assert j) - (on (asserted (job-finished id $data)) + (match-define (job $id $tasks) j) + (on (asserted (job-completion id tasks $data)) (printf "job done!\n~a\n" data))))) ;; ---------------------------------------------------------------------------------------------------