From c8a1253d7b07c6181ceee85b4fb2433dc33e0420 Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Mon, 20 May 2019 17:03:21 -0400 Subject: [PATCH] examples/flink.rkt: work on job manager and utilities --- racket/typed/examples/roles/flink.rkt | 195 +++++++++++++++++++++++++- 1 file changed, 190 insertions(+), 5 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 7fabbc7..b95f6af 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -70,13 +70,15 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words (define-type-alias WordCount (Hash String Int)) (define-type-alias TaskResult WordCount) (define-type-alias Reduce - (ReduceWork (U TaskID TaskResult) - (U TaskID TaskResult))) + (ReduceWork (Either TaskID TaskResult) + (Either TaskID TaskResult))) (define-type-alias Work (U Reduce (MapWork String))) (define-type-alias ConcreteWork (U (ReduceWork TaskResult TaskResult) (MapWork String))) +(define-type-alias PendingTask + (Task TaskID Work)) (define-type-alias ConcreteTask (Task TaskID ConcreteWork)) #| @@ -121,12 +123,14 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (TaskAssignment ID ID ConcreteTask) (Observe (TaskAssignment ID ★/t ★/t)) (TaskState ID ID TaskID TaskStateDesc) + (Observe (TaskState ID ID TaskID ★/t)) (JobManagerAlive) (Observe (JobManagerAlive)) (Observe (TaskRunner ★/t ★/t)) (TaskManager ID Int) - (Observe (TaskState ID ID TaskID ★/t)) - )) + (Observe (TaskManager ★/t ★/t)) + (Job ID (List PendingTask)) + (Observe (Job ★/t ★/t)))) ;; --------------------------------------------------------------------------------------------------- ;; Util Macros @@ -147,7 +151,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (hash-update/failure h word add1 - (lambda () 0))) + 0)) (define (count-new-words [word-count : WordCount] [words : (List String)] @@ -240,3 +244,184 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (set! status OVERLOAD/ts)] [(finished discard) (set! status st)])))))))) + +;; --------------------------------------------------------------------------------------------------- +;; JobManager + +;; Task -> Bool +;; Test if the task is ready to run +(define (task-ready? [t : PendingTask] -> (Maybe ConcreteTask)) + (match t + [(task (bind tid TaskID) (map-work (bind s String))) + ;; having to re-produce this is directly bc of no occurrence typing + (some (task tid (map-work s)))] + [(task (bind tid TaskID) (reduce-work (right (bind v1 TaskResult)) + (right (bind v2 TaskResult)))) + (some (task tid (reduce-work v1 v2)))] + [discard + none])) + +;; Task Id Any -> Task +;; If the given task is waiting for this data, replace the waiting ID with the data +(define (task+data [t : PendingTask] + [id : TaskID] + [data : TaskResult] + -> PendingTask) + (match t + [(task (bind tid TaskID) + (reduce-work (left id) (bind r (Either TaskID TaskResult)))) + (task tid (reduce-work (right data) r))] + [(task (bind tid TaskID) + (reduce-work (bind l (Either TaskID TaskResult)) (left id))) + (task tid (reduce-work l (right data)))] + [discard t])) + + +(require/typed "flink-support.rkt" + [split-at/lenient- : (∀ (X) (→fn (List X) Int (List (List X))))]) + +(define (∀ (X) (split-at/lenient [xs : (List X)] + [n : Int] + -> (Tuple (List X) (List X)))) + (define l (split-at/lenient- xs n)) + (tuple (first l) (second l))) + +(define (partition-ready-tasks [tasks : (List PendingTask)] + -> (Tuple (List PendingTask) + (List ConcreteTask))) + (define part (inst partition/either PendingTask PendingTask ConcreteTask)) + (part tasks + (lambda ([t : PendingTask]) + (match (task-ready? t) + [(some (bind ct ConcreteTask)) + (right ct)] + [none + (left t)])))) + +(define (spawn-job-manager) + (spawn τc + (start-facet jm + (assert (job-manager-alive)) + (log "Job Manager Up") + + ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. + (define/query-hash task-managers (task-manager (bind id ID) (bind slots Int)) id slots + #;#:on-add #;(log "JM learns that ~a has ~v slots" id slots)) + + ;; (Hashof TaskManagerID Nat) + ;; to better understand the supply of slots for each task manager, keep track of the number + ;; of requested tasks that we have yet to hear back about + (field [requests-in-flight (Hash ID Int) (hash)]) + (define (slots-available) + (for/sum ([(id v) (ref task-managers)]) + (max 0 (- v (hash-ref/failure (ref requests-in-flight) id 0))))) + + ;; ID -> Void + ;; mark that we have requested the given task manager to perform a task + (define (take-slot! [id : ID]) + (log "JM assigns a task to ~a" id) + (set! requests-in-flight (hash-update/failure (ref requests-in-flight) id add1 0))) + ;; ID -> Void + ;; mark that we have heard back from the given manager about a requested task + (define (received-answer! [id : ID]) + (set! requests-in-flight (hash-update (ref requests-in-flight) id sub1))) + + (during (job (bind job-id ID) (bind tasks (List PendingTask))) + (log "JM receives job ~a" job-id) + (define n-r/r (partition-ready-tasks tasks)) + (define ready (select 1 n-r/r)) + (define not-ready (select 0 n-r/r)) + #;(define-values (ready not-ready) (partition task-ready? tasks)) + (field [ready-tasks (List ConcreteTask) ready] + [waiting-tasks (List PendingTask) not-ready] + [tasks-in-progress Int 0]) + + #;(begin/dataflow + (define slots (slots-available)) + (define-values (ts readys) + (split-at/lenient (ready-tasks) slots)) + (for ([t ts]) + (perform-task t push-results)) + (unless (empty? ts) + ;; the empty? check may be necessary to avoid a dataflow loop + (ready-tasks readys))) + + ;; Task -> Void + #;(define (add-ready-task! t) + ;; TODO - use functional-queue.rkt from ../../ + (log "JM marks task ~a as ready" (task-id t)) + (ready-tasks (cons t (ready-tasks)))) + + ;; Task (ID TaskResult -> Void) -> Void + ;; Requires (task-ready? t) + #;(define (perform-task t k) + (react + (define task-facet (current-facet-id)) + (on-start (tasks-in-progress (add1 (tasks-in-progress)))) + (on-stop (tasks-in-progress (sub1 (tasks-in-progress)))) + (match-define (task this-id desc) t) + (log "JM begins on task ~a" this-id) + + (define (select-a-task-manager) + (react + (begin/dataflow + (define mngr + (for/first ([(id slots) (in-hash (task-managers))] + #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) + id)) + (when mngr + (take-slot! mngr) + (stop-current-facet (assign-task mngr)))))) + + ;; ID -> ... + (define (assign-task mngr) + (react + (define this-facet (current-facet-id)) + (on (retracted (task-manager mngr _)) + ;; our task manager has crashed + (stop-current-facet (select-a-task-manager))) + (on-start + ;; N.B. when this line was here, and not after `(when mngr ...)` above, + ;; things didn't work. I think that due to script scheduling, all ready + ;; tasks were being assigned to the manager + #;(take-slot! mngr) + (react (stop-when (asserted (task-state mngr job-id this-id _)) + (received-answer! mngr))) + (task-assigner t job-id mngr + (lambda () + ;; need to find a new task manager + ;; don't think we need a release-slot! here, because if we've heard back from a task manager, + ;; they should have told us a different slot count since we tried to give them work + (log "JM overloaded manager ~a with task ~a" mngr this-id) + (stop-facet this-facet (select-a-task-manager))) + (lambda (results) + (log "JM receives the results of task ~a" this-id) + (stop-facet task-facet (k this-id results))))))) + + (on-start (select-a-task-manager)))) + + ;; ID Data -> Void + ;; Update any dependent tasks with the results of the given task, moving + ;; them to the ready queue when possible + #;(define (push-results task-id data) + (cond + [(and (zero? (tasks-in-progress)) + (empty? (ready-tasks)) + (empty? (waiting-tasks))) + (log "JM finished with job ~a" job-id) + (react (assert (job-finished job-id data)))] + [else + ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. + (define still-waiting + (for/fold ([ts '()]) + ([t (in-list (waiting-tasks))]) + (define t+ (task+data t task-id data)) + (cond + [(task-ready? t+) + (add-ready-task! t+) + ts] + [else + (cons t+ ts)]))) + (waiting-tasks still-waiting)])) + #f)))) +