typed flink: streamline ids

This commit is contained in:
Sam Caldwell 2019-10-16 12:25:04 -04:00
parent 5e61e9941b
commit 123037ba51
2 changed files with 44 additions and 42 deletions

View File

@ -88,9 +88,9 @@
(cons left-over? reductions) (cons left-over? reductions)
reductions))]))) reductions))])))
;; TaskTree -> (Listof Task) ;; TaskTree ID -> (Listof Task)
;; flatten a task tree by assigning job-unique IDs ;; flatten a task tree by assigning job-unique IDs
(define (task-tree->list tt) (define (task-tree->list tt job-id)
(define-values (tasks _) (define-values (tasks _)
;; TaskTree ID -> (Values (Listof Task) ID) ;; TaskTree ID -> (Values (Listof Task) ID)
;; the input id is for the current node of the tree ;; the input id is for the current node of the tree
@ -99,7 +99,8 @@
[next-id 0]) [next-id 0])
(match tt (match tt
[(map-work _) [(map-work _)
(values (list (task next-id tt)) ;; NOTE : utilizing knowledge of Tuple representation here
(values (list (task (list 'tuple next-id job-id) tt))
(add1 next-id))] (add1 next-id))]
[(reduce-work left right) [(reduce-work left right)
(define left-id (add1 next-id)) (define left-id (add1 next-id))
@ -107,7 +108,7 @@
(loop left left-id)) (loop left left-id))
(define-values (rights next) (define-values (rights next)
(loop right right-id)) (loop right right-id))
(values (cons (task next-id (reduce-work left-id right-id)) (values (cons (task (list 'tuple next-id job-id) (reduce-work left-id right-id))
(append lefts rights)) (append lefts rights))
next)]))) next)])))
tasks) tasks)
@ -116,7 +117,7 @@
(define (create-job in) (define (create-job in)
(define job-id (gensym 'job)) (define job-id (gensym 'job))
(define input-lines (sequence->list (in-lines in))) (define input-lines (sequence->list (in-lines in)))
(define tasks (task-tree->list (create-task-tree input-lines))) (define tasks (task-tree->list (create-task-tree input-lines) job-id))
(job job-id tasks)) (job job-id tasks))
;; String -> Job ;; String -> Job

View File

@ -33,15 +33,14 @@ assert their presence with (task-runner ID),
#| #|
A Status is one of A Status is one of
- IDLE, when the TR is not executing a task - IDLE, when the TR is not executing a task
- (executing TaskID), when the TR is executing the task with the given TaskID - (executing Int), when the TR is executing the task with identified by the Int
- OVERLOAD, when the TR has been asked to perform a task before it has - OVERLOAD, when the TR has been asked to perform a task before it has
finished its previous assignment. For the purposes of this model, it indicates a finished its previous assignment. For the purposes of this model, it indicates a
failure in the protocol; like the exchange between the JM and the TM, a TR failure in the protocol; like the exchange between the JM and the TM, a TR
should only receive tasks when it is IDLE. should only receive tasks when it is IDLE.
|# |#
(define-constructor* (executing : Executing id)) (define-constructor* (executing : Executing id))
(define-type-alias TaskID Int) (define-type-alias Status (U Symbol (Executing Int)))
(define-type-alias Status (U Symbol (Executing TaskID)))
(define IDLE : Status 'idle) (define IDLE : Status 'idle)
(define OVERLOAD : Status 'overload) (define OVERLOAD : Status 'overload)
@ -53,31 +52,33 @@ Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
A TaskAssigner asserts the association of a Task with a particular TaskPerformer A TaskAssigner asserts the association of a Task with a particular TaskPerformer
through through
(task-assignment ID ID Task) (task-assignment ID Task)
where the first ID identifies the TP and the second identifies the job. where the ID identifies the TP
|# |#
(assertion-struct task-assignment : TaskAssignment (assignee job-id task)) (assertion-struct task-assignment : TaskAssignment (assignee task))
#| #|
A Task is a (task TaskID Work), where Work is one of A Task is a (task TaskID Work), where Work is one of
- (map-work String) - (map-work String)
- (reduce-work (U TaskID TaskResult) (U TaskID TaskResult)), referring to either the - (reduce-work (U Int TaskResult) (U Int TaskResult)), referring to either the
ID of the dependent task or its results. A reduce-work is ready to be executed ID of the dependent task or its results. A reduce-work is ready to be executed
when it has both results. when it has both results.
A TaskID is a natural number A TaskID is a (Tuple Int ID), where the first Int is specific to the individual
task and the second identifies the job it belongs to.
A TaskResult is a (Hashof String Natural), counting the occurrences of words A TaskResult is a (Hashof String Natural), counting the occurrences of words
|# |#
(require-struct task #:as Task #:from "flink-support.rkt") (require-struct task #:as Task #:from "flink-support.rkt")
(require-struct map-work #:as MapWork #:from "flink-support.rkt") (require-struct map-work #:as MapWork #:from "flink-support.rkt")
(require-struct reduce-work #:as ReduceWork #:from "flink-support.rkt") (require-struct reduce-work #:as ReduceWork #:from "flink-support.rkt")
(define-type-alias TaskID (Tuple Int ID))
(define-type-alias WordCount (Hash String Int)) (define-type-alias WordCount (Hash String Int))
(define-type-alias TaskResult WordCount) (define-type-alias TaskResult WordCount)
(define-type-alias Reduce (define-type-alias Reduce
(ReduceWork (Either TaskID TaskResult) (ReduceWork (Either Int TaskResult)
(Either TaskID TaskResult))) (Either Int TaskResult)))
(define-type-alias ReduceInput (define-type-alias ReduceInput
(ReduceWork TaskID TaskID)) (ReduceWork Int Int))
(define-type-alias Work (define-type-alias Work
(U Reduce (MapWork String))) (U Reduce (MapWork String)))
(define-type-alias ConcreteWork (define-type-alias ConcreteWork
@ -92,9 +93,7 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
#| #|
The TaskPerformer responds to a task-assignment by describing its state with respect The TaskPerformer responds to a task-assignment by describing its state with respect
to that task, to that task,
(task-state ID ID ID TaskStateDesc) (task-state TaskID TaskStateDesc)
where the first ID is that of the TP, the second is that of the job,
and the third that of the task.
A TaskStateDesc is one of A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently) - ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
@ -102,7 +101,7 @@ A TaskStateDesc is one of
- RUNNING, indicating that the task is being performed - RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results - (finished TaskResult), describing the results
|# |#
(assertion-struct task-state : TaskState (assignee job-id task-id desc)) (assertion-struct task-state : TaskState (task-id desc))
(define-constructor* (finished : Finished data)) (define-constructor* (finished : Finished data))
(define-type-alias TaskStateDesc (define-type-alias TaskStateDesc
(U Symbol (Finished TaskResult))) (U Symbol (Finished TaskResult)))
@ -118,16 +117,17 @@ TaskRunners.
(define-type-alias TaskAssigner (define-type-alias TaskAssigner
(Role (assign) (Role (assign)
(Shares (TaskAssignment ID ID ConcreteTask)) (Shares (TaskAssignment ID ConcreteTask))
;; would be nice to say how the IDs relate to each other (first two are the same) ;; would be nice to say how the TaskIDs relate to each other
(Reacts (Asserted (TaskState ID ID TaskID ★/t)) (Reacts (Asserted (TaskState TaskID ★/t))
(Branch (Stop assign) (Branch (Stop assign)
(Effs))))) (Effs)))))
(define-type-alias TaskPerformer (define-type-alias TaskPerformer
(Role (listen) (Role (listen)
(During (TaskAssignment ID ID ConcreteTask) (During (TaskAssignment ID ConcreteTask)
(Shares (TaskState ID ID TaskID TaskStateDesc))))) ;; would be nice to say how the IDs and TaskIDs relate to each other
(Shares (TaskState TaskID TaskStateDesc)))))
#| #|
Job Submission Protocol Job Submission Protocol
@ -142,10 +142,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define-type-alias τc (define-type-alias τc
(U (TaskRunner ID) (U (TaskRunner ID)
(TaskAssignment ID ID ConcreteTask) (TaskAssignment ID ConcreteTask)
(Observe (TaskAssignment ID ★/t ★/t)) (Observe (TaskAssignment ID ★/t))
(TaskState ID ID TaskID TaskStateDesc) (TaskState TaskID TaskStateDesc)
(Observe (TaskState ID ID TaskID ★/t)) (Observe (TaskState TaskID ★/t))
(JobManagerAlive) (JobManagerAlive)
(Observe (JobManagerAlive)) (Observe (JobManagerAlive))
(Observe (TaskRunner ★/t)) (Observe (TaskRunner ★/t))
@ -194,9 +194,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(assert (task-runner id)) (assert (task-runner id))
(on (retracted (task-manager tm-id _)) (on (retracted (task-manager tm-id _))
(stop runner)) (stop runner))
(during (task-assignment id $job-id (task $task-id $desc)) (during (task-assignment id (task $task-id $desc))
(field [state TaskStateDesc ACCEPTED]) (field [state TaskStateDesc ACCEPTED])
(assert (task-state id job-id task-id (ref state))) (assert (task-state task-id (ref state)))
;; since we currently finish everything in one turn, these changes to status aren't ;; since we currently finish everything in one turn, these changes to status aren't
;; actually visible. ;; actually visible.
(set! state RUNNING) (set! state RUNNING)
@ -237,6 +237,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(field [busy-runners (Set ID) (set)]) (field [busy-runners (Set ID) (set)])
(define/dataflow idle-runners (define/dataflow idle-runners
(set-count (set-subtract (ref task-runners) (ref busy-runners)))) (set-count (set-subtract (ref task-runners) (ref busy-runners))))
@ -256,19 +257,19 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[none [none
(error "need to call can-accept? before selecting a runner")])) (error "need to call can-accept? before selecting a runner")]))
(during (task-assignment id $job-id (task $task-id $desc)) (during (task-assignment id (task $task-id $desc))
(define status0 : TaskStateDesc (define status0 : TaskStateDesc
(if (can-accept?) (if (can-accept?)
RUNNING RUNNING
OVERLOAD/ts)) OVERLOAD/ts))
(field [status TaskStateDesc status0]) (field [status TaskStateDesc status0])
(assert (task-state id job-id task-id (ref status))) (assert (task-state task-id (ref status)))
(when (can-accept?) (when (can-accept?)
(define runner (select-runner)) (define runner (select-runner))
(log "TM ~a assigns task ~a to runner ~a" id task-id runner) (log "TM ~a assigns task ~a to runner ~a" id task-id runner)
(on stop (set! busy-runners (set-remove (ref busy-runners) runner))) (on stop (set! busy-runners (set-remove (ref busy-runners) runner)))
(assert (task-assignment runner job-id (task task-id desc))) (assert (task-assignment runner (task task-id desc)))
(on (asserted (task-state runner job-id task-id $st)) (on (asserted (task-state task-id $st))
(match st (match st
[ACCEPTED #f] [ACCEPTED #f]
[RUNNING #f] [RUNNING #f]
@ -293,10 +294,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[_ [_
none])) none]))
;; Task Id Any -> Task ;; Task Int Any -> Task
;; If the given task is waiting for this data, replace the waiting ID with the data ;; If the given task is waiting for this data, replace the waiting ID with the data
(define (task+data [t : PendingTask] (define (task+data [t : PendingTask]
[id : TaskID] [id : Int]
[data : TaskResult] [data : TaskResult]
-> PendingTask) -> PendingTask)
(match t (match t
@ -400,7 +401,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define still-waiting (define still-waiting
(for/fold ([ts : (List PendingTask) (list)]) (for/fold ([ts : (List PendingTask) (list)])
([t (ref waiting-tasks)]) ([t (ref waiting-tasks)])
(define t+ (task+data t task-id data)) (define t+ (task+data t (select 0 task-id) data))
(match (task-ready? t+) (match (task-ready? t+)
[(some $ready) [(some $ready)
(add-ready-task! ready) (add-ready-task! ready)
@ -426,7 +427,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ID -> ... ;; ID -> ...
(define (assign-task [mngr : ID]) (define (assign-task [mngr : ID])
(start-facet assign (start-facet assign
(assert (task-assignment mngr job-id t)) (assert (task-assignment mngr t))
(know (assigned-task mngr)) (know (assigned-task mngr))
(on (retracted (task-manager mngr _)) (on (retracted (task-manager mngr _))
;; our task manager has crashed ;; our task manager has crashed
@ -437,10 +438,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; tasks were being assigned to the manager ;; tasks were being assigned to the manager
#;(take-slot! mngr) #;(take-slot! mngr)
(start-facet take-slot (start-facet take-slot
(on (asserted (task-state mngr job-id this-id _)) (on (asserted (task-state this-id _))
(stop take-slot (stop take-slot
(received-answer! mngr))))) (received-answer! mngr)))))
(on (asserted (task-state mngr job-id this-id $status)) (on (asserted (task-state this-id $status))
(match status (match status
[ACCEPTED #f] [ACCEPTED #f]
[RUNNING #f] [RUNNING #f]
@ -525,4 +526,4 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(spawn-task-manager 2) (spawn-task-manager 2)
(spawn-task-manager 3) (spawn-task-manager 3)
(spawn-client (file->job "lorem.txt")) (spawn-client (file->job "lorem.txt"))
#;(spawn-client (string->job INPUT))) (spawn-client (string->job INPUT)))