flink: remove use of dataflow

This commit is contained in:
Sam Caldwell 2020-02-24 13:34:22 -05:00
parent 5d5b827535
commit 0074fcb566
1 changed files with 9 additions and 9 deletions

View File

@ -288,6 +288,8 @@ The JobManager then performs the job and, when finished, asserts
;; tid is the TaskID, rid is a unique symbol to a particular request for a slot ;; tid is the TaskID, rid is a unique symbol to a particular request for a slot
(struct request-id (tid rid) #:prefab) (struct request-id (tid rid) #:prefab)
(message-struct task-is-ready (job-id task))
(define (spawn-job-manager) (define (spawn-job-manager)
(spawn (spawn
(assert (job-manager-alive)) (assert (job-manager-alive))
@ -350,21 +352,18 @@ The JobManager then performs the job and, when finished, asserts
(during (observe (job-completion $job-id $tasks _)) (during (observe (job-completion $job-id $tasks _))
(log "JM receives job ~a" job-id) (log "JM receives job ~a" job-id)
(define-values (ready not-ready) (partition task-ready? tasks)) (define-values (ready not-ready) (partition task-ready? tasks))
(field [ready-tasks ready] (field [waiting-tasks not-ready]
[waiting-tasks not-ready]
[tasks-in-progress 0]) [tasks-in-progress 0])
(begin/dataflow (on-start (for [(t ready)] (add-ready-task! t)))
(unless (empty? (ready-tasks)) (on (realize (task-is-ready job-id $t))
(for ([t (ready-tasks)]) (perform-task t push-results))
(perform-task t push-results))
(ready-tasks '())))
;; Task -> Void ;; Task -> Void
(define (add-ready-task! t) (define (add-ready-task! t)
;; TODO - use functional-queue.rkt from ../../ ;; TODO - use functional-queue.rkt from ../../
(log "JM marks task ~a as ready" (task-id t)) (log "JM marks task ~a as ready" (task-id t))
(ready-tasks (cons t (ready-tasks)))) (realize! (task-is-ready job-id t)))
;; Task (ID TaskResult -> Void) -> Void ;; Task (ID TaskResult -> Void) -> Void
;; Requires (task-ready? t) ;; Requires (task-ready? t)
@ -411,8 +410,9 @@ The JobManager then performs the job and, when finished, asserts
;; them to the ready queue when possible ;; them to the ready queue when possible
(define (push-results task-id data) (define (push-results task-id data)
(cond (cond
;; this is an interesting scenario wrt stop handlers running; this code is assuming
;; it runs after the on-stop above that decrements `tasks-in-progress`
[(and (zero? (tasks-in-progress)) [(and (zero? (tasks-in-progress))
(empty? (ready-tasks))
(empty? (waiting-tasks))) (empty? (waiting-tasks)))
(log "JM finished with job ~a" job-id) (log "JM finished with job ~a" job-id)
(react (assert (job-completion job-id tasks data)))] (react (assert (job-completion job-id tasks data)))]