untyped flink: finish streamlining ids, resolve dataflow issue

This commit is contained in:
Sam Caldwell 2019-10-15 11:40:55 -04:00
parent 7374c8c506
commit 5e61e9941b
1 changed files with 13 additions and 11 deletions

View File

@ -251,13 +251,12 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; Assign incoming tasks
(field [busy-runners (set)])
(define/dataflow idle-runners
(define (idle-runners)
(set-count (set-subtract (task-runners) (busy-runners))))
(assert (task-manager id (idle-runners)))
(define (can-accept?)
(log "TM ~a idle-runners = ~a" id (idle-runners))
(positive? (idle-runners)))
(define (select-runner)
(define runner (for/first ([r (in-set (task-runners))]
@ -301,7 +300,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ID -> Void
;; mark that we have requested the given task manager to perform a task
(define (take-slot! id)
(log "JM assigns a task to ~a" id)
(requests-in-flight (hash-update (requests-in-flight) id add1 0)))
;; ID -> Void
;; mark that we have heard back from the given manager about a requested task
@ -343,14 +341,18 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (select-a-task-manager)
(react
(field [selection #f])
(begin/dataflow
(define mngr
(for/first ([(id slots) (in-hash (task-managers))]
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
id))
(when mngr
(stop-current-facet (take-slot! mngr)
(assign-task mngr))))))
(unless (selection)
(define mngr
(for/first ([(id slots) (in-hash (task-managers))]
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
id))
(when mngr
(selection mngr)
(take-slot! mngr)
(log "JM assigns task ~a to ~a" this-id mngr)
(stop-current-facet (assign-task mngr)))))))
;; ID -> ...
(define (assign-task mngr)
@ -445,7 +447,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(spawn
(during (job-manager-alive)
(during (task-manager $tm-id _)
(define/query-set requests (task-assignment _ (task $tid _)) tid)
(define/query-set requests (task-assignment tm-id (task $tid _)) tid)
(field [high-water-mark 0])
(on (asserted (task-manager tm-id $slots))
(when (> slots (high-water-mark))