diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 5d206ce..71843b6 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -116,11 +116,14 @@ TaskRunners. Job Submission Protocol ----------------------- -Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)). -The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult) +Finally, Clients submit their jobs to the JobManager by asserting interest + (observe (job-completion ID (Listof Task) ★)) + +The JobManager then performs the job and, when finished, asserts + (job-completion ID (Listof Task) TaskResults) |# -(assertion-struct job (id tasks)) -(assertion-struct job-finished (id data)) +(struct job (id tasks) #:transparent) +(assertion-struct job-completion (id data result)) ;; --------------------------------------------------------------------------------------------------- ;; Logging @@ -304,7 +307,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (received-answer! id) (requests-in-flight (hash-update (requests-in-flight) id sub1))) - (during (job $job-id $tasks) + (during (observe (job-completion $job-id $tasks _)) (log "JM receives job ~a" job-id) (define-values (ready not-ready) (partition task-ready? tasks)) (field [ready-tasks ready] @@ -388,7 +391,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (empty? (ready-tasks)) (empty? (waiting-tasks))) (log "JM finished with job ~a" job-id) - (react (assert (job-finished job-id data)))] + (react (assert (job-completion job-id tasks data)))] [else ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. (define still-waiting @@ -434,8 +437,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; Job -> Void (define (spawn-client j) (spawn - (assert j) - (on (asserted (job-finished (job-id j) $data)) + (on (asserted (job-completion (job-id j) (job-tasks j) $data)) (printf "job done!\n~a\n" data)))) ;; ---------------------------------------------------------------------------------------------------