examples/flink: rename map and reduce structs for slightly less overloading on task
This commit is contained in:
parent
a6d6ceaa7c
commit
22bd143025
|
@ -77,16 +77,16 @@ TODO - also need to correlate Job ID through here.
|
|||
(assertion-struct submitted-task (manager task))
|
||||
#|
|
||||
A Task is a (task ID Work), where Work is one of
|
||||
- (map-task String)
|
||||
- (reduce-task (U ID TaskResult) (U ID TaskResult)), referring to either the
|
||||
ID of the dependent task or its results. A reduce-task is ready to be executed
|
||||
- (map-work String)
|
||||
- (reduce-work (U ID TaskResult) (U ID TaskResult)), referring to either the
|
||||
ID of the dependent task or its results. A reduce-work is ready to be executed
|
||||
when it has both results.
|
||||
|
||||
A TaskResult is a (Hashof String Natural), counting the occurrences of words
|
||||
|#
|
||||
(struct task (id desc) #:transparent)
|
||||
(struct map-task (data) #:transparent)
|
||||
(struct reduce-task (left right) #:transparent)
|
||||
(struct map-work (data) #:transparent)
|
||||
(struct reduce-work (left right) #:transparent)
|
||||
|
||||
#|
|
||||
A TaskManager responds to a submitted-task by describing its state with respect
|
||||
|
@ -159,10 +159,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
;; task-runner state by flushing pending actions.
|
||||
(flush!)
|
||||
(match desc
|
||||
[(map-task data)
|
||||
[(map-work data)
|
||||
(word-count (count-new-words (word-count) (string->words data)))
|
||||
(execution-state (finished (word-count)))]
|
||||
[(reduce-task left right)
|
||||
[(reduce-work left right)
|
||||
(word-count (hash-union left right #:combine +))
|
||||
(execution-state (finished (word-count)))]))))))
|
||||
|
||||
|
@ -377,7 +377,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
;; Test if the task is ready to run
|
||||
(define (task-ready? t)
|
||||
(match t
|
||||
[(task _ (reduce-task l r))
|
||||
[(task _ (reduce-work l r))
|
||||
(not (or (id? l) (id? r)))]
|
||||
[_ #t]))
|
||||
|
||||
|
@ -385,10 +385,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
;; If the given task is waiting for this data, replace the waiting ID with the data
|
||||
(define (task+data t id data)
|
||||
(match t
|
||||
[(task tid (reduce-task (== id) r))
|
||||
(task tid (reduce-task data r))]
|
||||
[(task tid (reduce-task l (== id)))
|
||||
(task tid (reduce-task l data))]
|
||||
[(task tid (reduce-work (== id) r))
|
||||
(task tid (reduce-work data r))]
|
||||
[(task tid (reduce-work l (== id)))
|
||||
(task tid (reduce-work l data))]
|
||||
[_ t]))
|
||||
|
||||
|
||||
|
@ -441,25 +441,25 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
[(list x)
|
||||
(values reductions x)]
|
||||
[(list-rest x y more)
|
||||
(loop more (cons (reduce-task x y) reductions))])))
|
||||
(loop more (cons (reduce-work x y) reductions))])))
|
||||
|
||||
|
||||
;; a TaskTree is one of
|
||||
;; (map-task data)
|
||||
;; (reduce-task TaskTree TaskTree)
|
||||
;; (map-work data)
|
||||
;; (reduce-work TaskTree TaskTree)
|
||||
|
||||
;; (Listof String) -> TaskTree
|
||||
;; Create a tree structure of tasks
|
||||
(define (create-task-tree lines)
|
||||
(define map-tasks
|
||||
(define map-works
|
||||
(for/list ([line (in-list lines)])
|
||||
(map-task line)))
|
||||
(map-work line)))
|
||||
;; build the tree up from the leaves
|
||||
(let loop ([nodes map-tasks])
|
||||
(let loop ([nodes map-works])
|
||||
(match nodes
|
||||
['()
|
||||
;; input was empty
|
||||
(map-task "")]
|
||||
(map-work "")]
|
||||
[(list x)
|
||||
x]
|
||||
[_
|
||||
|
@ -479,16 +479,16 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(let loop ([tt tt]
|
||||
[next-id 0])
|
||||
(match tt
|
||||
[(map-task _)
|
||||
[(map-work _)
|
||||
(values (list (task next-id tt))
|
||||
(add1 next-id))]
|
||||
[(reduce-task left right)
|
||||
[(reduce-work left right)
|
||||
(define left-id (add1 next-id))
|
||||
(define-values (lefts right-id)
|
||||
(loop left left-id))
|
||||
(define-values (rights next)
|
||||
(loop right right-id))
|
||||
(values (cons (task next-id (reduce-task left-id right-id))
|
||||
(values (cons (task next-id (reduce-work left-id right-id))
|
||||
(append lefts rights))
|
||||
next)])))
|
||||
tasks)
|
||||
|
@ -522,9 +522,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(check-true (list? tasks))
|
||||
(check-true (andmap task? tasks))
|
||||
(match tasks
|
||||
[(list-no-order (task rid (reduce-task left right))
|
||||
(task mid1 (map-task data1))
|
||||
(task mid2 (map-task data2)))
|
||||
[(list-no-order (task rid (reduce-work left right))
|
||||
(task mid1 (map-work data1))
|
||||
(task mid2 (map-work data2)))
|
||||
(check-true (id? left))
|
||||
(check-true (id? right))
|
||||
(check-equal? (set left right) (set mid1 mid2))
|
||||
|
@ -542,7 +542,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(check-true (list? tasks))
|
||||
(check-equal? (length tasks) 1)
|
||||
(check-equal? (task-desc (car tasks))
|
||||
(map-task ""))))
|
||||
(map-work ""))))
|
||||
|
||||
|
||||
;; ---------------------------------------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue