diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 17be6af..e6a3aa3 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -251,13 +251,12 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; Assign incoming tasks (field [busy-runners (set)]) - (define/dataflow idle-runners + (define (idle-runners) (set-count (set-subtract (task-runners) (busy-runners)))) (assert (task-manager id (idle-runners))) (define (can-accept?) - (log "TM ~a idle-runners = ~a" id (idle-runners)) (positive? (idle-runners))) (define (select-runner) (define runner (for/first ([r (in-set (task-runners))] @@ -301,7 +300,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; ID -> Void ;; mark that we have requested the given task manager to perform a task (define (take-slot! id) - (log "JM assigns a task to ~a" id) (requests-in-flight (hash-update (requests-in-flight) id add1 0))) ;; ID -> Void ;; mark that we have heard back from the given manager about a requested task @@ -343,14 +341,18 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (select-a-task-manager) (react + (field [selection #f]) (begin/dataflow - (define mngr - (for/first ([(id slots) (in-hash (task-managers))] - #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) - id)) - (when mngr - (stop-current-facet (take-slot! mngr) - (assign-task mngr)))))) + (unless (selection) + (define mngr + (for/first ([(id slots) (in-hash (task-managers))] + #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) + id)) + (when mngr + (selection mngr) + (take-slot! mngr) + (log "JM assigns task ~a to ~a" this-id mngr) + (stop-current-facet (assign-task mngr))))))) ;; ID -> ... (define (assign-task mngr) @@ -445,7 +447,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (spawn (during (job-manager-alive) (during (task-manager $tm-id _) - (define/query-set requests (task-assignment _ (task $tid _)) tid) + (define/query-set requests (task-assignment tm-id (task $tid _)) tid) (field [high-water-mark 0]) (on (asserted (task-manager tm-id $slots)) (when (> slots (high-water-mark))