flink: remove use of dataflow
This commit is contained in:
parent
013ce19e68
commit
18932662de
|
@ -288,6 +288,8 @@ The JobManager then performs the job and, when finished, asserts
|
|||
;; tid is the TaskID, rid is a unique symbol to a particular request for a slot
|
||||
(struct request-id (tid rid) #:prefab)
|
||||
|
||||
(message-struct task-is-ready (job-id task))
|
||||
|
||||
(define (spawn-job-manager)
|
||||
(spawn
|
||||
(assert (job-manager-alive))
|
||||
|
@ -350,21 +352,18 @@ The JobManager then performs the job and, when finished, asserts
|
|||
(during (observe (job-completion $job-id $tasks _))
|
||||
(log "JM receives job ~a" job-id)
|
||||
(define-values (ready not-ready) (partition task-ready? tasks))
|
||||
(field [ready-tasks ready]
|
||||
[waiting-tasks not-ready]
|
||||
(field [waiting-tasks not-ready]
|
||||
[tasks-in-progress 0])
|
||||
|
||||
(begin/dataflow
|
||||
(unless (empty? (ready-tasks))
|
||||
(for ([t (ready-tasks)])
|
||||
(perform-task t push-results))
|
||||
(ready-tasks '())))
|
||||
(on-start (for [(t ready)] (add-ready-task! t)))
|
||||
(on (realize (task-is-ready job-id $t))
|
||||
(perform-task t push-results))
|
||||
|
||||
;; Task -> Void
|
||||
(define (add-ready-task! t)
|
||||
;; TODO - use functional-queue.rkt from ../../
|
||||
(log "JM marks task ~a as ready" (task-id t))
|
||||
(ready-tasks (cons t (ready-tasks))))
|
||||
(realize! (task-is-ready job-id t)))
|
||||
|
||||
;; Task (ID TaskResult -> Void) -> Void
|
||||
;; Requires (task-ready? t)
|
||||
|
@ -411,8 +410,9 @@ The JobManager then performs the job and, when finished, asserts
|
|||
;; them to the ready queue when possible
|
||||
(define (push-results task-id data)
|
||||
(cond
|
||||
;; this is an interesting scenario wrt stop handlers running; this code is assuming
|
||||
;; it runs after the on-stop above that decrements `tasks-in-progress`
|
||||
[(and (zero? (tasks-in-progress))
|
||||
(empty? (ready-tasks))
|
||||
(empty? (waiting-tasks)))
|
||||
(log "JM finished with job ~a" job-id)
|
||||
(react (assert (job-completion job-id tasks data)))]
|
||||
|
|
Loading…
Reference in New Issue