diff --git a/racket/typed/examples/roles/flink-support.rkt b/racket/typed/examples/roles/flink-support.rkt index c26d79f..5ece89c 100644 --- a/racket/typed/examples/roles/flink-support.rkt +++ b/racket/typed/examples/roles/flink-support.rkt @@ -1,7 +1,13 @@ #lang racket (provide string->words - split-at/lenient-) + split-at/lenient- + (struct-out job) + (struct-out task) + (struct-out map-work) + (struct-out reduce-work) + string->job + file->job) (require (only-in racket/list split-at)) @@ -30,3 +36,96 @@ (define-values (a b) (split-at lst (min n (length lst)))) (list a b)) + +;; --------------------------------------------------------------------------------------------------- +;; Creating a Job + +(struct job (id tasks) #:transparent) +(struct task (id desc) #:transparent) +(struct map-work (data) #:transparent) +(struct reduce-work (left right) #:transparent) + +;; (Listof WorkDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc)) +;; Pair up elements of the input list into a list of reduce tasks, and if the input list is odd also +;; return the odd-one out. +;; Conceptually, it does something like this: +;; '(a b c d) => '((a b) (c d)) +;; '(a b c d e) => '((a b) (c d) e) +(define (pair-up ls) + (let loop ([ls ls] + [reductions '()]) + (match ls + ['() + (values reductions #f)] + [(list x) + (values reductions x)] + [(list-rest x y more) + (loop more (cons (reduce-work x y) reductions))]))) + + +;; a TaskTree is one of +;; (map-work data) +;; (reduce-work TaskTree TaskTree) + +;; (Listof String) -> TaskTree +;; Create a tree structure of tasks +(define (create-task-tree lines) + (define map-works + (for/list ([line (in-list lines)]) + (map-work line))) + ;; build the tree up from the leaves + (let loop ([nodes map-works]) + (match nodes + ['() + ;; input was empty + (map-work "")] + [(list x) + x] + [_ + (define-values (reductions left-over?) + (pair-up nodes)) + (loop (if left-over? + (cons left-over? reductions) + reductions))]))) + +;; TaskTree -> (Listof Task) +;; flatten a task tree by assigning job-unique IDs +(define (task-tree->list tt) + (define-values (tasks _) + ;; TaskTree ID -> (Values (Listof Task) ID) + ;; the input id is for the current node of the tree + ;; returned id is the "next available" id, given ids are assigned in strict ascending order + (let loop ([tt tt] + [next-id 0]) + (match tt + [(map-work _) + (values (list (task next-id tt)) + (add1 next-id))] + [(reduce-work left right) + (define left-id (add1 next-id)) + (define-values (lefts right-id) + (loop left left-id)) + (define-values (rights next) + (loop right right-id)) + (values (cons (task next-id (reduce-work left-id right-id)) + (append lefts rights)) + next)]))) + tasks) + +;; InputPort -> Job +(define (create-job in) + (define job-id (gensym 'job)) + (define input-lines (sequence->list (in-lines in))) + (define tasks (task-tree->list (create-task-tree input-lines))) + (job job-id tasks)) + +;; String -> Job +(define (string->job s) + (create-job (open-input-string s))) + +;; PathString -> Job +(define (file->job path) + (define in (open-input-file path)) + (define j (create-job in)) + (close-input-port in) + j) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 8e6c4f0..c6d016b 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -64,19 +64,23 @@ A TaskID is a natural number A TaskResult is a (Hashof String Natural), counting the occurrences of words |# -(define-constructor* (task : Task id desc)) -(define-constructor* (map-work : MapWork data)) -(define-constructor* (reduce-work : ReduceWork left right)) +(require-struct task #:as Task #:from "flink-support.rkt") +(require-struct map-work #:as MapWork #:from "flink-support.rkt") +(require-struct reduce-work #:as ReduceWork #:from "flink-support.rkt") (define-type-alias WordCount (Hash String Int)) (define-type-alias TaskResult WordCount) (define-type-alias Reduce (ReduceWork (Either TaskID TaskResult) (Either TaskID TaskResult))) +(define-type-alias ReduceInput + (ReduceWork TaskID TaskID)) (define-type-alias Work (U Reduce (MapWork String))) (define-type-alias ConcreteWork (U (ReduceWork TaskResult TaskResult) (MapWork String))) +(define-type-alias InputTask + (Task TaskID (U ReduceInput (MapWork String)))) (define-type-alias PendingTask (Task TaskID Work)) (define-type-alias ConcreteTask @@ -115,8 +119,9 @@ Job Submission Protocol Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)). The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult) |# -(assertion-struct job : Job (id tasks)) +(require-struct job #:as Job #:from "flink-support.rkt") (assertion-struct job-finished : JobFinished (id data)) +(define-type-alias JobDesc (Job ID (List InputTask))) (define-type-alias τc (U (TaskRunner ID Status) @@ -129,9 +134,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (Observe (TaskRunner ★/t ★/t)) (TaskManager ID Int) (Observe (TaskManager ★/t ★/t)) - (Job ID (List PendingTask)) + JobDesc (Observe (Job ★/t ★/t)) - (JobFinished ID TaskResult))) + (JobFinished ID TaskResult) + (Observe (JobFinished ID ★/t)))) ;; --------------------------------------------------------------------------------------------------- ;; Util Macros @@ -291,6 +297,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [none (left t)])))) +(define (input->pending-task [t : InputTask] -> PendingTask) + (match t + [(task $id (map-work $s)) + (task id (map-work s))] + [(task $id (reduce-work $l $r)) + (task id (reduce-work (left l) (left r)))])) + (define (spawn-job-manager) (spawn τc (start-facet jm @@ -321,7 +334,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (during (job $job-id $tasks) (log "JM receives job ~a" job-id) - (define-tuple (not-ready ready) (partition-ready-tasks tasks)) + (define pending (for/list ([t tasks]) + (input->pending-task t))) + (define-tuple (not-ready ready) (partition-ready-tasks pending #;(map input->pending-task tasks))) (field [ready-tasks (List ConcreteTask) ready] [waiting-tasks (List PendingTask) not-ready] [tasks-in-progress Int 0]) @@ -433,3 +448,33 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (set! ready-tasks readys))) #f)))) + +;; --------------------------------------------------------------------------------------------------- +;; Client + +;; Job -> Void +(define (spawn-client [j : JobDesc]) + (spawn τc + (start-facet _ + (match-define (job $id _) j) + (assert j) + (on (asserted (job-finished id $data)) + (printf "job done!\n~a\n" data))))) + +;; --------------------------------------------------------------------------------------------------- +;; Main + +(require/typed "flink-support.rkt" + [string->job : (→fn String JobDesc)] + [file->job : (→fn String JobDesc)]) + +(define INPUT "a b c a b c\na b\n a b\na b") +;; expected: +;; #hash((a . 5) (b . 5) (c . 2)) + +(run-ground-dataspace τc + (spawn-job-manager) + (spawn-task-manager) + (spawn-task-runner) + (spawn-task-runner) + (spawn-client (string->job INPUT)))