examples/flink: small cleanups

This commit is contained in:
Sam Caldwell 2019-03-05 10:53:30 -05:00
parent 0da903e438
commit f85203ac73
1 changed files with 14 additions and 23 deletions

View File

@ -138,16 +138,14 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(assert (task-runner id (status)))
(begin/dataflow
(log "task-runner ~v state is: ~a" id (status)))
;; this only does map tasks atm
(during (task-assignment id $job-id (task $tid $desc))
(field [execution-state (if (idle?) RUNNING OVERLOAD)]
[word-count (hash)])
(assert (task-state id job-id tid (execution-state)))
;; I think we have to avoid asking a non-idle runner to do anything
;; we have to avoid asking a non-idle runner to do anything
(when (idle?)
(on-stop (status IDLE)))
(on-start
(when (equal? IDLE (status))
(on-stop (status IDLE))
(on-start
(status (executing tid))
;; since we currently finish everything in one turn, allow other actors to observe the changes in our
;; task-runner state by flushing pending actions.
@ -200,7 +198,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(log "Task Manager (TM) ~a is running" id)
(during (job-manager-alive)
(log "TM learns about JM")
;; SUSPICION - these two query sets interfere with one another
(define/query-set task-runners (task-runner $id _) id
#:on-add (log "TM learns about task-runner ~a" id))
;; I wonder just how inefficient this is
@ -213,9 +210,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(match-define (task task-id desc) t)
#;(on-start (log "TM receives task ~a" task-id))
(log "TM receives task ~a" task-id)
(on-stop (log "TM finished with task ~a" task-id)
(when (= task-id 6)
(log "TM idle-runners: ~a" (idle-runners))))
(on-stop (log "TM finished with task ~a" task-id))
(field [status ACCEPTED])
;; TODO - could delegate this assertion, in the non-overloaded case, to TaskRunner
;; (also removing the first id from task-state)
@ -235,16 +230,16 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(status RUNNING)
(on (asserted (task-state runner job-id task-id $state))
(match state
[(== RUNNING)
[(or (== ACCEPTED)
(== RUNNING))
;; nothing to do
(void)]
[(== OVERLOAD)
(log "TM overloaded TR with task ~a" task-id)
(status OVERLOAD)]
[(finished results)
(log "TM receives the results of task ~a" task-id)
(status state)]
[_
;; TODO
;; need input maybe?
#f]))])))))
(status state)]))])))))
;; ---------------------------------------------------------------------------------------------------
;; JobManager
@ -280,8 +275,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define-values (ready not-ready) (partition task-ready? tasks))
(field [ready-tasks ready]
[waiting-tasks not-ready]
[tasks-in-progress 0]
[data-partitions (hash)])
[tasks-in-progress 0])
(begin/dataflow
(define slots (slots-available))
@ -327,11 +321,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(on #:when (task-mngr)
(asserted (task-state (task-mngr) job-id this-id $state))
(match state
[(== ACCEPTED)
#f]
[(== RUNNING)
[(or (== ACCEPTED)
(== RUNNING))
;; nothing to do
#f]
(void)]
[(== OVERLOAD)
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
@ -339,7 +332,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(log "JM overloaded manager ~a with task ~a" (task-mngr) this-id)
(task-mngr #f)]
[(finished results)
;; TODO - guess-timation of what this should look like
(log "JM receives the results of task ~a" this-id)
(stop-current-facet (k this-id results))]))))
@ -351,7 +343,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[(and (zero? (tasks-in-progress))
(empty? (ready-tasks))
(empty? (waiting-tasks)))
;; TODO - also need to ensure there are no tasks in progress
(log "JM finished with job ~a" job-id)
(react (assert (job-finished job-id data)))]
[else