From 0074fcb566051298c42123d0bd6ca31014be6dae Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Mon, 24 Feb 2020 13:34:22 -0500 Subject: [PATCH] flink: remove use of dataflow --- racket/syndicate/examples/actor/flink.rkt | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 6d8c34b..cbe0556 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -288,6 +288,8 @@ The JobManager then performs the job and, when finished, asserts ;; tid is the TaskID, rid is a unique symbol to a particular request for a slot (struct request-id (tid rid) #:prefab) +(message-struct task-is-ready (job-id task)) + (define (spawn-job-manager) (spawn (assert (job-manager-alive)) @@ -350,21 +352,18 @@ The JobManager then performs the job and, when finished, asserts (during (observe (job-completion $job-id $tasks _)) (log "JM receives job ~a" job-id) (define-values (ready not-ready) (partition task-ready? tasks)) - (field [ready-tasks ready] - [waiting-tasks not-ready] + (field [waiting-tasks not-ready] [tasks-in-progress 0]) - (begin/dataflow - (unless (empty? (ready-tasks)) - (for ([t (ready-tasks)]) - (perform-task t push-results)) - (ready-tasks '()))) + (on-start (for [(t ready)] (add-ready-task! t))) + (on (realize (task-is-ready job-id $t)) + (perform-task t push-results)) ;; Task -> Void (define (add-ready-task! t) ;; TODO - use functional-queue.rkt from ../../ (log "JM marks task ~a as ready" (task-id t)) - (ready-tasks (cons t (ready-tasks)))) + (realize! (task-is-ready job-id t))) ;; Task (ID TaskResult -> Void) -> Void ;; Requires (task-ready? t) @@ -411,8 +410,9 @@ The JobManager then performs the job and, when finished, asserts ;; them to the ready queue when possible (define (push-results task-id data) (cond + ;; this is an interesting scenario wrt stop handlers running; this code is assuming + ;; it runs after the on-stop above that decrements `tasks-in-progress` [(and (zero? (tasks-in-progress)) - (empty? (ready-tasks)) (empty? (waiting-tasks))) (log "JM finished with job ~a" job-id) (react (assert (job-completion job-id tasks data)))]