client and jobs (not working)
This commit is contained in:
parent
807e6bb8f7
commit
3aedb63a9c
|
@ -1,7 +1,13 @@
|
||||||
#lang racket
|
#lang racket
|
||||||
|
|
||||||
(provide string->words
|
(provide string->words
|
||||||
split-at/lenient-)
|
split-at/lenient-
|
||||||
|
(struct-out job)
|
||||||
|
(struct-out task)
|
||||||
|
(struct-out map-work)
|
||||||
|
(struct-out reduce-work)
|
||||||
|
string->job
|
||||||
|
file->job)
|
||||||
|
|
||||||
(require (only-in racket/list
|
(require (only-in racket/list
|
||||||
split-at))
|
split-at))
|
||||||
|
@ -30,3 +36,96 @@
|
||||||
(define-values (a b)
|
(define-values (a b)
|
||||||
(split-at lst (min n (length lst))))
|
(split-at lst (min n (length lst))))
|
||||||
(list a b))
|
(list a b))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
|
;; Creating a Job
|
||||||
|
|
||||||
|
(struct job (id tasks) #:transparent)
|
||||||
|
(struct task (id desc) #:transparent)
|
||||||
|
(struct map-work (data) #:transparent)
|
||||||
|
(struct reduce-work (left right) #:transparent)
|
||||||
|
|
||||||
|
;; (Listof WorkDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc))
|
||||||
|
;; Pair up elements of the input list into a list of reduce tasks, and if the input list is odd also
|
||||||
|
;; return the odd-one out.
|
||||||
|
;; Conceptually, it does something like this:
|
||||||
|
;; '(a b c d) => '((a b) (c d))
|
||||||
|
;; '(a b c d e) => '((a b) (c d) e)
|
||||||
|
(define (pair-up ls)
|
||||||
|
(let loop ([ls ls]
|
||||||
|
[reductions '()])
|
||||||
|
(match ls
|
||||||
|
['()
|
||||||
|
(values reductions #f)]
|
||||||
|
[(list x)
|
||||||
|
(values reductions x)]
|
||||||
|
[(list-rest x y more)
|
||||||
|
(loop more (cons (reduce-work x y) reductions))])))
|
||||||
|
|
||||||
|
|
||||||
|
;; a TaskTree is one of
|
||||||
|
;; (map-work data)
|
||||||
|
;; (reduce-work TaskTree TaskTree)
|
||||||
|
|
||||||
|
;; (Listof String) -> TaskTree
|
||||||
|
;; Create a tree structure of tasks
|
||||||
|
(define (create-task-tree lines)
|
||||||
|
(define map-works
|
||||||
|
(for/list ([line (in-list lines)])
|
||||||
|
(map-work line)))
|
||||||
|
;; build the tree up from the leaves
|
||||||
|
(let loop ([nodes map-works])
|
||||||
|
(match nodes
|
||||||
|
['()
|
||||||
|
;; input was empty
|
||||||
|
(map-work "")]
|
||||||
|
[(list x)
|
||||||
|
x]
|
||||||
|
[_
|
||||||
|
(define-values (reductions left-over?)
|
||||||
|
(pair-up nodes))
|
||||||
|
(loop (if left-over?
|
||||||
|
(cons left-over? reductions)
|
||||||
|
reductions))])))
|
||||||
|
|
||||||
|
;; TaskTree -> (Listof Task)
|
||||||
|
;; flatten a task tree by assigning job-unique IDs
|
||||||
|
(define (task-tree->list tt)
|
||||||
|
(define-values (tasks _)
|
||||||
|
;; TaskTree ID -> (Values (Listof Task) ID)
|
||||||
|
;; the input id is for the current node of the tree
|
||||||
|
;; returned id is the "next available" id, given ids are assigned in strict ascending order
|
||||||
|
(let loop ([tt tt]
|
||||||
|
[next-id 0])
|
||||||
|
(match tt
|
||||||
|
[(map-work _)
|
||||||
|
(values (list (task next-id tt))
|
||||||
|
(add1 next-id))]
|
||||||
|
[(reduce-work left right)
|
||||||
|
(define left-id (add1 next-id))
|
||||||
|
(define-values (lefts right-id)
|
||||||
|
(loop left left-id))
|
||||||
|
(define-values (rights next)
|
||||||
|
(loop right right-id))
|
||||||
|
(values (cons (task next-id (reduce-work left-id right-id))
|
||||||
|
(append lefts rights))
|
||||||
|
next)])))
|
||||||
|
tasks)
|
||||||
|
|
||||||
|
;; InputPort -> Job
|
||||||
|
(define (create-job in)
|
||||||
|
(define job-id (gensym 'job))
|
||||||
|
(define input-lines (sequence->list (in-lines in)))
|
||||||
|
(define tasks (task-tree->list (create-task-tree input-lines)))
|
||||||
|
(job job-id tasks))
|
||||||
|
|
||||||
|
;; String -> Job
|
||||||
|
(define (string->job s)
|
||||||
|
(create-job (open-input-string s)))
|
||||||
|
|
||||||
|
;; PathString -> Job
|
||||||
|
(define (file->job path)
|
||||||
|
(define in (open-input-file path))
|
||||||
|
(define j (create-job in))
|
||||||
|
(close-input-port in)
|
||||||
|
j)
|
||||||
|
|
|
@ -64,19 +64,23 @@ A TaskID is a natural number
|
||||||
|
|
||||||
A TaskResult is a (Hashof String Natural), counting the occurrences of words
|
A TaskResult is a (Hashof String Natural), counting the occurrences of words
|
||||||
|#
|
|#
|
||||||
(define-constructor* (task : Task id desc))
|
(require-struct task #:as Task #:from "flink-support.rkt")
|
||||||
(define-constructor* (map-work : MapWork data))
|
(require-struct map-work #:as MapWork #:from "flink-support.rkt")
|
||||||
(define-constructor* (reduce-work : ReduceWork left right))
|
(require-struct reduce-work #:as ReduceWork #:from "flink-support.rkt")
|
||||||
(define-type-alias WordCount (Hash String Int))
|
(define-type-alias WordCount (Hash String Int))
|
||||||
(define-type-alias TaskResult WordCount)
|
(define-type-alias TaskResult WordCount)
|
||||||
(define-type-alias Reduce
|
(define-type-alias Reduce
|
||||||
(ReduceWork (Either TaskID TaskResult)
|
(ReduceWork (Either TaskID TaskResult)
|
||||||
(Either TaskID TaskResult)))
|
(Either TaskID TaskResult)))
|
||||||
|
(define-type-alias ReduceInput
|
||||||
|
(ReduceWork TaskID TaskID))
|
||||||
(define-type-alias Work
|
(define-type-alias Work
|
||||||
(U Reduce (MapWork String)))
|
(U Reduce (MapWork String)))
|
||||||
(define-type-alias ConcreteWork
|
(define-type-alias ConcreteWork
|
||||||
(U (ReduceWork TaskResult TaskResult)
|
(U (ReduceWork TaskResult TaskResult)
|
||||||
(MapWork String)))
|
(MapWork String)))
|
||||||
|
(define-type-alias InputTask
|
||||||
|
(Task TaskID (U ReduceInput (MapWork String))))
|
||||||
(define-type-alias PendingTask
|
(define-type-alias PendingTask
|
||||||
(Task TaskID Work))
|
(Task TaskID Work))
|
||||||
(define-type-alias ConcreteTask
|
(define-type-alias ConcreteTask
|
||||||
|
@ -115,8 +119,9 @@ Job Submission Protocol
|
||||||
Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)).
|
Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)).
|
||||||
The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult)
|
The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult)
|
||||||
|#
|
|#
|
||||||
(assertion-struct job : Job (id tasks))
|
(require-struct job #:as Job #:from "flink-support.rkt")
|
||||||
(assertion-struct job-finished : JobFinished (id data))
|
(assertion-struct job-finished : JobFinished (id data))
|
||||||
|
(define-type-alias JobDesc (Job ID (List InputTask)))
|
||||||
|
|
||||||
(define-type-alias τc
|
(define-type-alias τc
|
||||||
(U (TaskRunner ID Status)
|
(U (TaskRunner ID Status)
|
||||||
|
@ -129,9 +134,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(Observe (TaskRunner ★/t ★/t))
|
(Observe (TaskRunner ★/t ★/t))
|
||||||
(TaskManager ID Int)
|
(TaskManager ID Int)
|
||||||
(Observe (TaskManager ★/t ★/t))
|
(Observe (TaskManager ★/t ★/t))
|
||||||
(Job ID (List PendingTask))
|
JobDesc
|
||||||
(Observe (Job ★/t ★/t))
|
(Observe (Job ★/t ★/t))
|
||||||
(JobFinished ID TaskResult)))
|
(JobFinished ID TaskResult)
|
||||||
|
(Observe (JobFinished ID ★/t))))
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
;; Util Macros
|
;; Util Macros
|
||||||
|
@ -291,6 +297,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
[none
|
[none
|
||||||
(left t)]))))
|
(left t)]))))
|
||||||
|
|
||||||
|
(define (input->pending-task [t : InputTask] -> PendingTask)
|
||||||
|
(match t
|
||||||
|
[(task $id (map-work $s))
|
||||||
|
(task id (map-work s))]
|
||||||
|
[(task $id (reduce-work $l $r))
|
||||||
|
(task id (reduce-work (left l) (left r)))]))
|
||||||
|
|
||||||
(define (spawn-job-manager)
|
(define (spawn-job-manager)
|
||||||
(spawn τc
|
(spawn τc
|
||||||
(start-facet jm
|
(start-facet jm
|
||||||
|
@ -321,7 +334,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
|
|
||||||
(during (job $job-id $tasks)
|
(during (job $job-id $tasks)
|
||||||
(log "JM receives job ~a" job-id)
|
(log "JM receives job ~a" job-id)
|
||||||
(define-tuple (not-ready ready) (partition-ready-tasks tasks))
|
(define pending (for/list ([t tasks])
|
||||||
|
(input->pending-task t)))
|
||||||
|
(define-tuple (not-ready ready) (partition-ready-tasks pending #;(map input->pending-task tasks)))
|
||||||
(field [ready-tasks (List ConcreteTask) ready]
|
(field [ready-tasks (List ConcreteTask) ready]
|
||||||
[waiting-tasks (List PendingTask) not-ready]
|
[waiting-tasks (List PendingTask) not-ready]
|
||||||
[tasks-in-progress Int 0])
|
[tasks-in-progress Int 0])
|
||||||
|
@ -433,3 +448,33 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(set! ready-tasks readys)))
|
(set! ready-tasks readys)))
|
||||||
|
|
||||||
#f))))
|
#f))))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
|
;; Client
|
||||||
|
|
||||||
|
;; Job -> Void
|
||||||
|
(define (spawn-client [j : JobDesc])
|
||||||
|
(spawn τc
|
||||||
|
(start-facet _
|
||||||
|
(match-define (job $id _) j)
|
||||||
|
(assert j)
|
||||||
|
(on (asserted (job-finished id $data))
|
||||||
|
(printf "job done!\n~a\n" data)))))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
|
;; Main
|
||||||
|
|
||||||
|
(require/typed "flink-support.rkt"
|
||||||
|
[string->job : (→fn String JobDesc)]
|
||||||
|
[file->job : (→fn String JobDesc)])
|
||||||
|
|
||||||
|
(define INPUT "a b c a b c\na b\n a b\na b")
|
||||||
|
;; expected:
|
||||||
|
;; #hash((a . 5) (b . 5) (c . 2))
|
||||||
|
|
||||||
|
(run-ground-dataspace τc
|
||||||
|
(spawn-job-manager)
|
||||||
|
(spawn-task-manager)
|
||||||
|
(spawn-task-runner)
|
||||||
|
(spawn-task-runner)
|
||||||
|
(spawn-client (string->job INPUT)))
|
||||||
|
|
Loading…
Reference in New Issue