diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 1edc460..62e1916 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -164,7 +164,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (require/typed "flink-support.rkt" [string->words : (→fn String (List String))]) -(define (spawn-task-runner) +#;(define (spawn-task-runner) (define id (gensym 'task-runner)) (spawn τc (start-facet runner @@ -199,7 +199,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; TaskManager -(define (spawn-task-manager) +#;(define (spawn-task-manager) (define id (gensym 'task-manager)) (spawn τc (start-facet tm @@ -334,17 +334,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [waiting-tasks (List PendingTask) not-ready] [tasks-in-progress Int 0]) - (begin/dataflow - (define slots (slots-available)) - (define-tuple (ts readys) - (split-at/lenient (ref ready-tasks) slots)) - (for ([t ts]) - #f - #;(perform-task t push-results)) - (unless (empty? ts) - ;; the empty? check may be necessary to avoid a dataflow loop - (set! ready-tasks readys))) - ;; Task -> Void (define (add-ready-task! [t : ConcreteTask]) ;; TODO - use functional-queue.rkt from ../../ @@ -379,16 +368,17 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; Task (ID TaskResult -> Void) -> Void ;; Requires (task-ready? t) - (define (perform-task [t : ConcreteTask] - [k : (→fn TaskID TaskResult (Tuple))] - -> ★/t) + (define (∀ (ρ) (perform-task [t : ConcreteTask] + [k : (proc TaskID TaskResult -> ★/t + #:roles (ρ))])) (start-facet perform (on start (set! tasks-in-progress (add1 (ref tasks-in-progress)))) (on stop (set! tasks-in-progress (sub1 (ref tasks-in-progress)))) (match-define (task (bind this-id TaskID) (bind desc ConcreteWork)) t) (log "JM begins on task ~a" this-id) - (define (select-a-task-manager) + (define (∀ (ρ2) (select-a-task-manager [assign-task : (proc ID -> ★/t + #:roles (ρ2))])) (start-facet this-facet (begin/dataflow (define mngr? @@ -399,7 +389,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [(some (bind mngr ID)) (take-slot! mngr) (stop this-facet - #;(assign-task mngr))] + (assign-task mngr))] [none #f]) #f))) @@ -410,7 +400,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-assignment mngr job-id t)) (on (retracted (task-manager mngr discard)) ;; our task manager has crashed - (stop this-facet (select-a-task-manager))) + (stop this-facet #;(select-a-task-manager))) (on start ;; N.B. when this line was here, and not after `(when mngr ...)` above, ;; things didn't work. I think that due to script scheduling, all ready @@ -428,11 +418,21 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; don't think we need a release-slot! here, because if we've heard back from a task manager, ;; they should have told us a different slot count since we tried to give them work (log "JM overloaded manager ~a with task ~a" mngr this-id) - (stop this-facet (select-a-task-manager))] + (stop this-facet #;(select-a-task-manager))] [(finished (bind results TaskResult)) (log "JM receives the results of task ~a" this-id) (stop perform (k this-id results))])))) - (on start (select-a-task-manager)))) + (on start (select-a-task-manager assign-task)))) + + (begin/dataflow + (define slots (slots-available)) + (define-tuple (ts readys) + (split-at/lenient (ref ready-tasks) slots)) + (for ([t ts]) + (perform-task t push-results)) + (unless (empty? ts) + ;; the empty? check may be necessary to avoid a dataflow loop + (set! ready-tasks readys))) #f))))