From f85203ac73102782519b6a6797f25aafe1f8feed Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Tue, 5 Mar 2019 10:53:30 -0500 Subject: [PATCH] examples/flink: small cleanups --- racket/syndicate/examples/actor/flink.rkt | 37 +++++++++-------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 4a0fb05..06b01ab 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -138,16 +138,14 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-runner id (status))) (begin/dataflow (log "task-runner ~v state is: ~a" id (status))) - ;; this only does map tasks atm (during (task-assignment id $job-id (task $tid $desc)) (field [execution-state (if (idle?) RUNNING OVERLOAD)] [word-count (hash)]) (assert (task-state id job-id tid (execution-state))) - ;; I think we have to avoid asking a non-idle runner to do anything + ;; we have to avoid asking a non-idle runner to do anything (when (idle?) - (on-stop (status IDLE))) - (on-start - (when (equal? IDLE (status)) + (on-stop (status IDLE)) + (on-start (status (executing tid)) ;; since we currently finish everything in one turn, allow other actors to observe the changes in our ;; task-runner state by flushing pending actions. @@ -200,7 +198,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) (log "TM learns about JM") - ;; SUSPICION - these two query sets interfere with one another (define/query-set task-runners (task-runner $id _) id #:on-add (log "TM learns about task-runner ~a" id)) ;; I wonder just how inefficient this is @@ -213,9 +210,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (match-define (task task-id desc) t) #;(on-start (log "TM receives task ~a" task-id)) (log "TM receives task ~a" task-id) - (on-stop (log "TM finished with task ~a" task-id) - (when (= task-id 6) - (log "TM idle-runners: ~a" (idle-runners)))) + (on-stop (log "TM finished with task ~a" task-id)) (field [status ACCEPTED]) ;; TODO - could delegate this assertion, in the non-overloaded case, to TaskRunner ;; (also removing the first id from task-state) @@ -235,16 +230,16 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (status RUNNING) (on (asserted (task-state runner job-id task-id $state)) (match state - [(== RUNNING) + [(or (== ACCEPTED) + (== RUNNING)) ;; nothing to do (void)] + [(== OVERLOAD) + (log "TM overloaded TR with task ~a" task-id) + (status OVERLOAD)] [(finished results) (log "TM receives the results of task ~a" task-id) - (status state)] - [_ - ;; TODO - ;; need input maybe? - #f]))]))))) + (status state)]))]))))) ;; --------------------------------------------------------------------------------------------------- ;; JobManager @@ -280,8 +275,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define-values (ready not-ready) (partition task-ready? tasks)) (field [ready-tasks ready] [waiting-tasks not-ready] - [tasks-in-progress 0] - [data-partitions (hash)]) + [tasks-in-progress 0]) (begin/dataflow (define slots (slots-available)) @@ -327,11 +321,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (on #:when (task-mngr) (asserted (task-state (task-mngr) job-id this-id $state)) (match state - [(== ACCEPTED) - #f] - [(== RUNNING) + [(or (== ACCEPTED) + (== RUNNING)) ;; nothing to do - #f] + (void)] [(== OVERLOAD) ;; need to find a new task manager ;; don't think we need a release-slot! here, because if we've heard back from a task manager, @@ -339,7 +332,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (log "JM overloaded manager ~a with task ~a" (task-mngr) this-id) (task-mngr #f)] [(finished results) - ;; TODO - guess-timation of what this should look like (log "JM receives the results of task ~a" this-id) (stop-current-facet (k this-id results))])))) @@ -351,7 +343,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [(and (zero? (tasks-in-progress)) (empty? (ready-tasks)) (empty? (waiting-tasks))) - ;; TODO - also need to ensure there are no tasks in progress (log "JM finished with job ~a" job-id) (react (assert (job-finished job-id data)))] [else