diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index b95f6af..58a841c 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -130,7 +130,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (TaskManager ID Int) (Observe (TaskManager ★/t ★/t)) (Job ID (List PendingTask)) - (Observe (Job ★/t ★/t)))) + (Observe (Job ★/t ★/t)) + (JobFinished ID TaskResult))) ;; --------------------------------------------------------------------------------------------------- ;; Util Macros @@ -328,66 +329,96 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (during (job (bind job-id ID) (bind tasks (List PendingTask))) (log "JM receives job ~a" job-id) - (define n-r/r (partition-ready-tasks tasks)) - (define ready (select 1 n-r/r)) - (define not-ready (select 0 n-r/r)) - #;(define-values (ready not-ready) (partition task-ready? tasks)) + (define-tuple (not-ready ready) (partition-ready-tasks tasks)) (field [ready-tasks (List ConcreteTask) ready] [waiting-tasks (List PendingTask) not-ready] [tasks-in-progress Int 0]) - #;(begin/dataflow + (begin/dataflow (define slots (slots-available)) - (define-values (ts readys) - (split-at/lenient (ready-tasks) slots)) + (define-tuple (ts readys) + (split-at/lenient (ref ready-tasks) slots)) (for ([t ts]) - (perform-task t push-results)) + #f + #;(perform-task t push-results)) (unless (empty? ts) ;; the empty? check may be necessary to avoid a dataflow loop - (ready-tasks readys))) + (set! ready-tasks readys))) ;; Task -> Void - #;(define (add-ready-task! t) + (define (add-ready-task! [t : ConcreteTask]) ;; TODO - use functional-queue.rkt from ../../ - (log "JM marks task ~a as ready" (task-id t)) - (ready-tasks (cons t (ready-tasks)))) + (match-define (task (bind tid TaskID) discard) t) + (log "JM marks task ~a as ready" tid) + (set! ready-tasks (cons t (ref ready-tasks)))) + + ;; ID Data -> Void + ;; Update any dependent tasks with the results of the given task, moving + ;; them to the ready queue when possible + (define (push-results [task-id : TaskID] + [data : TaskResult]) + (cond + [(and (zero? (ref tasks-in-progress)) + (empty? (ref ready-tasks)) + (empty? (ref waiting-tasks))) + (log "JM finished with job ~a" job-id) + (start-facet done (assert (job-finished job-id data)))] + [else + ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. + (define still-waiting + (for/fold ([ts : (List PendingTask) (list)]) + ([t (ref waiting-tasks)]) + (define t+ (task+data t task-id data)) + (match (task-ready? t+) + [(some (bind ready ConcreteTask)) + (add-ready-task! ready) + ts] + [discard + (cons t+ ts)]))) + (set! waiting-tasks still-waiting)])) ;; Task (ID TaskResult -> Void) -> Void ;; Requires (task-ready? t) - #;(define (perform-task t k) - (react - (define task-facet (current-facet-id)) - (on-start (tasks-in-progress (add1 (tasks-in-progress)))) - (on-stop (tasks-in-progress (sub1 (tasks-in-progress)))) - (match-define (task this-id desc) t) + (define (perform-task [t : ConcreteTask] + [k : (→fn TaskID TaskResult (Tuple))] + -> ★/t) + (start-facet perform + (on start (set! tasks-in-progress (add1 (ref tasks-in-progress)))) + (on stop (set! tasks-in-progress (sub1 (ref tasks-in-progress)))) + (match-define (task (bind this-id TaskID) (bind desc ConcreteWork)) t) (log "JM begins on task ~a" this-id) (define (select-a-task-manager) - (react + (start-facet this-facet (begin/dataflow - (define mngr - (for/first ([(id slots) (in-hash (task-managers))] - #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) + (define mngr? + (for/first ([(id slots) (ref task-managers)] + #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) id)) - (when mngr - (take-slot! mngr) - (stop-current-facet (assign-task mngr)))))) + (match mngr? + [(some (bind mngr ID)) + (take-slot! mngr) + (stop this-facet + #;(assign-task mngr))] + [none + #f]) + #f))) ;; ID -> ... - (define (assign-task mngr) - (react - (define this-facet (current-facet-id)) - (on (retracted (task-manager mngr _)) + (define (assign-task [mngr : ID]) + (start-facet this-facet + (on (retracted (task-manager mngr discard)) ;; our task manager has crashed - (stop-current-facet (select-a-task-manager))) - (on-start + (stop this-facet (select-a-task-manager))) + (on start ;; N.B. when this line was here, and not after `(when mngr ...)` above, ;; things didn't work. I think that due to script scheduling, all ready ;; tasks were being assigned to the manager #;(take-slot! mngr) - (react (stop-when (asserted (task-state mngr job-id this-id _)) - (received-answer! mngr))) - (task-assigner t job-id mngr + (start-facet take-slot + (stop-when (asserted (task-state mngr job-id this-id discard)) + (received-answer! mngr))) + #;(task-assigner t job-id mngr (lambda () ;; need to find a new task manager ;; don't think we need a release-slot! here, because if we've heard back from a task manager, @@ -396,32 +427,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (stop-facet this-facet (select-a-task-manager))) (lambda (results) (log "JM receives the results of task ~a" this-id) - (stop-facet task-facet (k this-id results))))))) + (stop-facet perform (k this-id results))))))) - (on-start (select-a-task-manager)))) + (on start (select-a-task-manager)))) - ;; ID Data -> Void - ;; Update any dependent tasks with the results of the given task, moving - ;; them to the ready queue when possible - #;(define (push-results task-id data) - (cond - [(and (zero? (tasks-in-progress)) - (empty? (ready-tasks)) - (empty? (waiting-tasks))) - (log "JM finished with job ~a" job-id) - (react (assert (job-finished job-id data)))] - [else - ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. - (define still-waiting - (for/fold ([ts '()]) - ([t (in-list (waiting-tasks))]) - (define t+ (task+data t task-id data)) - (cond - [(task-ready? t+) - (add-ready-task! t+) - ts] - [else - (cons t+ ts)]))) - (waiting-tasks still-waiting)])) #f))))