From 8f8f4c416fe2fbd069704bee23f2dc2fb0dba2d3 Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Mon, 17 Jun 2019 11:25:09 -0400 Subject: [PATCH] replace some dataflow with internal events in typed flink --- racket/typed/examples/roles/flink.rkt | 75 ++++++++++++++++----------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 3aacc62..e8fa5a5 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -219,7 +219,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (spawn-task-manager) (define id (gensym 'task-manager)) (spawn τc - (print-role + (begin (start-facet tm (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) @@ -320,9 +320,12 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [(task $id (reduce-work $l $r)) (task id (reduce-work (left l) (left r)))])) +(assertion-struct assigned-task : SelectedTM (mngr)) +(message-struct tasks-finished : TasksFinished (id)) + (define (spawn-job-manager) (spawn τc - (begin + (print-role (start-facet jm (assert (job-manager-alive)) (log "Job Manager Up") @@ -375,6 +378,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (empty? (ref ready-tasks)) (empty? (ref waiting-tasks))) (log "JM finished with job ~a" job-id) + (realize! (tasks-finished job-id)) (start-facet done (assert (job-finished job-id data)))] [else ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. @@ -408,10 +412,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (assign-task [mngr : ID]) (start-facet assign (assert (task-assignment mngr job-id t)) - (on (retracted (task-manager mngr discard)) + (know (assigned-task mngr)) + (on (retracted (task-manager mngr _)) ;; our task manager has crashed - (stop assign - (set! task-mngr not-a-real-task-manager))) + (stop assign)) (on start ;; N.B. when this line was here, and not after `(when mngr ...)` above, ;; things didn't work. I think that due to script scheduling, all ready @@ -430,39 +434,52 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; don't think we need a release-slot! here, because if we've heard back from a task manager, ;; they should have told us a different slot count since we tried to give them work (log "JM overloaded manager ~a with task ~a" mngr this-id) - (stop assign - (set! task-mngr not-a-real-task-manager))] + (stop assign)] [(finished $results) (log "JM receives the results of task ~a" this-id) (stop perform (k this-id results))])))) (define (select-a-task-manager) (start-facet select + + (field [mngr (Maybe ID) none]) + + (define (try-assign!) + (define mngr? + (for/first ([(id slots) (ref task-managers)] + #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) + id)) + (match mngr? + [(some $m) + (take-slot! m) + (set! mngr (some m)) + (assign-task m)] + [none + #f])) + (begin/dataflow - (when (equal? (ref task-mngr) not-a-real-task-manager) - (define mngr? - (for/first ([(id slots) (ref task-managers)] - #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) - id)) - (match mngr? - [(some $mngr) - (take-slot! mngr) - (set! task-mngr mngr) - (assign-task mngr)] - [none - #f]))))) + (when (equal? (ref mngr) none) + (try-assign!))) + + (on (forget (assigned-task $m:ID)) + (when (equal? (some m) (ref mngr)) + (set! mngr none))))) (on start (select-a-task-manager)))) - (begin/dataflow - (define slots (slots-available)) - (define-tuple (ts readys) - (split-at/lenient (ref ready-tasks) slots)) - (for ([t ts]) - (perform-task t push-results)) - (unless (empty? ts) - ;; the empty? check may be necessary to avoid a dataflow loop - (set! ready-tasks readys)))))))) + (on start + (start-facet delegate-tasks + (on (realize (tasks-finished job-id)) + (stop delegate-tasks)) + (begin/dataflow + (define slots (slots-available)) + (define-tuple (ts readys) + (split-at/lenient (ref ready-tasks) slots)) + (for ([t ts]) + (perform-task t push-results)) + (unless (empty? ts) + ;; the empty? check may be necessary to avoid a dataflow loop + (set! ready-tasks readys)))))))))) ;; --------------------------------------------------------------------------------------------------- ;; Client @@ -492,5 +509,5 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (spawn-task-manager) (spawn-task-runner) (spawn-task-runner) - #;(spawn-client (file->job "lorem.txt")) + (spawn-client (file->job "lorem.txt")) (spawn-client (string->job INPUT)))