From 123037ba51d3964d6ed565f3b498e16b6c6ced8d Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Wed, 16 Oct 2019 12:25:04 -0400 Subject: [PATCH] typed flink: streamline ids --- racket/typed/examples/roles/flink-support.rkt | 11 +-- racket/typed/examples/roles/flink.rkt | 75 ++++++++++--------- 2 files changed, 44 insertions(+), 42 deletions(-) diff --git a/racket/typed/examples/roles/flink-support.rkt b/racket/typed/examples/roles/flink-support.rkt index 5ece89c..bb56888 100644 --- a/racket/typed/examples/roles/flink-support.rkt +++ b/racket/typed/examples/roles/flink-support.rkt @@ -88,9 +88,9 @@ (cons left-over? reductions) reductions))]))) -;; TaskTree -> (Listof Task) +;; TaskTree ID -> (Listof Task) ;; flatten a task tree by assigning job-unique IDs -(define (task-tree->list tt) +(define (task-tree->list tt job-id) (define-values (tasks _) ;; TaskTree ID -> (Values (Listof Task) ID) ;; the input id is for the current node of the tree @@ -99,7 +99,8 @@ [next-id 0]) (match tt [(map-work _) - (values (list (task next-id tt)) + ;; NOTE : utilizing knowledge of Tuple representation here + (values (list (task (list 'tuple next-id job-id) tt)) (add1 next-id))] [(reduce-work left right) (define left-id (add1 next-id)) @@ -107,7 +108,7 @@ (loop left left-id)) (define-values (rights next) (loop right right-id)) - (values (cons (task next-id (reduce-work left-id right-id)) + (values (cons (task (list 'tuple next-id job-id) (reduce-work left-id right-id)) (append lefts rights)) next)]))) tasks) @@ -116,7 +117,7 @@ (define (create-job in) (define job-id (gensym 'job)) (define input-lines (sequence->list (in-lines in))) - (define tasks (task-tree->list (create-task-tree input-lines))) + (define tasks (task-tree->list (create-task-tree input-lines) job-id)) (job job-id tasks)) ;; String -> Job diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index bf5c743..5ce9345 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -33,15 +33,14 @@ assert their presence with (task-runner ID), #| A Status is one of - IDLE, when the TR is not executing a task -- (executing TaskID), when the TR is executing the task with the given TaskID +- (executing Int), when the TR is executing the task with identified by the Int - OVERLOAD, when the TR has been asked to perform a task before it has finished its previous assignment. For the purposes of this model, it indicates a failure in the protocol; like the exchange between the JM and the TM, a TR should only receive tasks when it is IDLE. |# (define-constructor* (executing : Executing id)) -(define-type-alias TaskID Int) -(define-type-alias Status (U Symbol (Executing TaskID))) +(define-type-alias Status (U Symbol (Executing Int))) (define IDLE : Status 'idle) (define OVERLOAD : Status 'overload) @@ -53,31 +52,33 @@ Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP). A TaskAssigner asserts the association of a Task with a particular TaskPerformer through - (task-assignment ID ID Task) -where the first ID identifies the TP and the second identifies the job. + (task-assignment ID Task) +where the ID identifies the TP |# -(assertion-struct task-assignment : TaskAssignment (assignee job-id task)) +(assertion-struct task-assignment : TaskAssignment (assignee task)) #| A Task is a (task TaskID Work), where Work is one of - (map-work String) - - (reduce-work (U TaskID TaskResult) (U TaskID TaskResult)), referring to either the + - (reduce-work (U Int TaskResult) (U Int TaskResult)), referring to either the ID of the dependent task or its results. A reduce-work is ready to be executed when it has both results. -A TaskID is a natural number +A TaskID is a (Tuple Int ID), where the first Int is specific to the individual +task and the second identifies the job it belongs to. A TaskResult is a (Hashof String Natural), counting the occurrences of words |# (require-struct task #:as Task #:from "flink-support.rkt") (require-struct map-work #:as MapWork #:from "flink-support.rkt") (require-struct reduce-work #:as ReduceWork #:from "flink-support.rkt") +(define-type-alias TaskID (Tuple Int ID)) (define-type-alias WordCount (Hash String Int)) (define-type-alias TaskResult WordCount) (define-type-alias Reduce - (ReduceWork (Either TaskID TaskResult) - (Either TaskID TaskResult))) + (ReduceWork (Either Int TaskResult) + (Either Int TaskResult))) (define-type-alias ReduceInput - (ReduceWork TaskID TaskID)) + (ReduceWork Int Int)) (define-type-alias Work (U Reduce (MapWork String))) (define-type-alias ConcreteWork @@ -92,9 +93,7 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words #| The TaskPerformer responds to a task-assignment by describing its state with respect to that task, - (task-state ID ID ID TaskStateDesc) -where the first ID is that of the TP, the second is that of the job, -and the third that of the task. + (task-state TaskID TaskStateDesc) A TaskStateDesc is one of - ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently) @@ -102,7 +101,7 @@ A TaskStateDesc is one of - RUNNING, indicating that the task is being performed - (finished TaskResult), describing the results |# -(assertion-struct task-state : TaskState (assignee job-id task-id desc)) +(assertion-struct task-state : TaskState (task-id desc)) (define-constructor* (finished : Finished data)) (define-type-alias TaskStateDesc (U Symbol (Finished TaskResult))) @@ -118,16 +117,17 @@ TaskRunners. (define-type-alias TaskAssigner (Role (assign) - (Shares (TaskAssignment ID ID ConcreteTask)) - ;; would be nice to say how the IDs relate to each other (first two are the same) - (Reacts (Asserted (TaskState ID ID TaskID ★/t)) + (Shares (TaskAssignment ID ConcreteTask)) + ;; would be nice to say how the TaskIDs relate to each other + (Reacts (Asserted (TaskState TaskID ★/t)) (Branch (Stop assign) (Effs))))) (define-type-alias TaskPerformer (Role (listen) - (During (TaskAssignment ID ID ConcreteTask) - (Shares (TaskState ID ID TaskID TaskStateDesc))))) + (During (TaskAssignment ID ConcreteTask) + ;; would be nice to say how the IDs and TaskIDs relate to each other + (Shares (TaskState TaskID TaskStateDesc))))) #| Job Submission Protocol @@ -142,10 +142,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define-type-alias τc (U (TaskRunner ID) - (TaskAssignment ID ID ConcreteTask) - (Observe (TaskAssignment ID ★/t ★/t)) - (TaskState ID ID TaskID TaskStateDesc) - (Observe (TaskState ID ID TaskID ★/t)) + (TaskAssignment ID ConcreteTask) + (Observe (TaskAssignment ID ★/t)) + (TaskState TaskID TaskStateDesc) + (Observe (TaskState TaskID ★/t)) (JobManagerAlive) (Observe (JobManagerAlive)) (Observe (TaskRunner ★/t)) @@ -194,9 +194,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-runner id)) (on (retracted (task-manager tm-id _)) (stop runner)) - (during (task-assignment id $job-id (task $task-id $desc)) + (during (task-assignment id (task $task-id $desc)) (field [state TaskStateDesc ACCEPTED]) - (assert (task-state id job-id task-id (ref state))) + (assert (task-state task-id (ref state))) ;; since we currently finish everything in one turn, these changes to status aren't ;; actually visible. (set! state RUNNING) @@ -237,6 +237,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (field [busy-runners (Set ID) (set)]) + (define/dataflow idle-runners (set-count (set-subtract (ref task-runners) (ref busy-runners)))) @@ -256,19 +257,19 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [none (error "need to call can-accept? before selecting a runner")])) - (during (task-assignment id $job-id (task $task-id $desc)) + (during (task-assignment id (task $task-id $desc)) (define status0 : TaskStateDesc (if (can-accept?) RUNNING OVERLOAD/ts)) (field [status TaskStateDesc status0]) - (assert (task-state id job-id task-id (ref status))) + (assert (task-state task-id (ref status))) (when (can-accept?) (define runner (select-runner)) (log "TM ~a assigns task ~a to runner ~a" id task-id runner) (on stop (set! busy-runners (set-remove (ref busy-runners) runner))) - (assert (task-assignment runner job-id (task task-id desc))) - (on (asserted (task-state runner job-id task-id $st)) + (assert (task-assignment runner (task task-id desc))) + (on (asserted (task-state task-id $st)) (match st [ACCEPTED #f] [RUNNING #f] @@ -293,10 +294,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [_ none])) -;; Task Id Any -> Task +;; Task Int Any -> Task ;; If the given task is waiting for this data, replace the waiting ID with the data (define (task+data [t : PendingTask] - [id : TaskID] + [id : Int] [data : TaskResult] -> PendingTask) (match t @@ -400,7 +401,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define still-waiting (for/fold ([ts : (List PendingTask) (list)]) ([t (ref waiting-tasks)]) - (define t+ (task+data t task-id data)) + (define t+ (task+data t (select 0 task-id) data)) (match (task-ready? t+) [(some $ready) (add-ready-task! ready) @@ -426,7 +427,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; ID -> ... (define (assign-task [mngr : ID]) (start-facet assign - (assert (task-assignment mngr job-id t)) + (assert (task-assignment mngr t)) (know (assigned-task mngr)) (on (retracted (task-manager mngr _)) ;; our task manager has crashed @@ -437,10 +438,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; tasks were being assigned to the manager #;(take-slot! mngr) (start-facet take-slot - (on (asserted (task-state mngr job-id this-id _)) + (on (asserted (task-state this-id _)) (stop take-slot (received-answer! mngr))))) - (on (asserted (task-state mngr job-id this-id $status)) + (on (asserted (task-state this-id $status)) (match status [ACCEPTED #f] [RUNNING #f] @@ -525,4 +526,4 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (spawn-task-manager 2) (spawn-task-manager 3) (spawn-client (file->job "lorem.txt")) - #;(spawn-client (string->job INPUT))) + (spawn-client (string->job INPUT)))