typed flink: task runners don't need a status
This commit is contained in:
parent
142206d8e3
commit
eb56a1006f
|
@ -26,15 +26,19 @@ perform a task.
|
||||||
(define-type-alias ID Symbol)
|
(define-type-alias ID Symbol)
|
||||||
#|
|
#|
|
||||||
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
|
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
|
||||||
assert their presence with (task-runner ID Status), where Status is one of
|
assert their presence with (task-runner ID),
|
||||||
- IDLE, when the TR is not executing a task
|
|#
|
||||||
- (executing TaskID), when the TR is executing the task with the given TaskID
|
(assertion-struct task-runner : TaskRunner (id))
|
||||||
- OVERLOAD, when the TR has been asked to perform a task before it has
|
|
||||||
finished its previous assignment. For the purposes of this model, it indicates a
|
#|
|
||||||
failure in the protocol; like the exchange between the JM and the TM, a TR
|
A Status is one of
|
||||||
should only receive tasks when it is IDLE.
|
- IDLE, when the TR is not executing a task
|
||||||
|
- (executing TaskID), when the TR is executing the task with the given TaskID
|
||||||
|
- OVERLOAD, when the TR has been asked to perform a task before it has
|
||||||
|
finished its previous assignment. For the purposes of this model, it indicates a
|
||||||
|
failure in the protocol; like the exchange between the JM and the TM, a TR
|
||||||
|
should only receive tasks when it is IDLE.
|
||||||
|#
|
|#
|
||||||
(assertion-struct task-runner : TaskRunner (id status))
|
|
||||||
(define-constructor* (executing : Executing id))
|
(define-constructor* (executing : Executing id))
|
||||||
(define-type-alias TaskID Int)
|
(define-type-alias TaskID Int)
|
||||||
(define-type-alias Status (U Symbol (Executing TaskID)))
|
(define-type-alias Status (U Symbol (Executing TaskID)))
|
||||||
|
@ -137,14 +141,14 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(define-type-alias JobDesc (Job ID (List InputTask)))
|
(define-type-alias JobDesc (Job ID (List InputTask)))
|
||||||
|
|
||||||
(define-type-alias τc
|
(define-type-alias τc
|
||||||
(U (TaskRunner ID Status)
|
(U (TaskRunner ID)
|
||||||
(TaskAssignment ID ID ConcreteTask)
|
(TaskAssignment ID ID ConcreteTask)
|
||||||
(Observe (TaskAssignment ID ★/t ★/t))
|
(Observe (TaskAssignment ID ★/t ★/t))
|
||||||
(TaskState ID ID TaskID TaskStateDesc)
|
(TaskState ID ID TaskID TaskStateDesc)
|
||||||
(Observe (TaskState ID ID TaskID ★/t))
|
(Observe (TaskState ID ID TaskID ★/t))
|
||||||
(JobManagerAlive)
|
(JobManagerAlive)
|
||||||
(Observe (JobManagerAlive))
|
(Observe (JobManagerAlive))
|
||||||
(Observe (TaskRunner ★/t ★/t))
|
(Observe (TaskRunner ★/t))
|
||||||
(TaskManager ID Int)
|
(TaskManager ID Int)
|
||||||
(Observe (TaskManager ★/t ★/t))
|
(Observe (TaskManager ★/t ★/t))
|
||||||
JobDesc
|
JobDesc
|
||||||
|
@ -188,30 +192,20 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(spawn τc
|
(spawn τc
|
||||||
(begin
|
(begin
|
||||||
(start-facet runner
|
(start-facet runner
|
||||||
(field [status Status IDLE])
|
(assert (task-runner id))
|
||||||
(define (idle?) (equal? IDLE (ref status)))
|
|
||||||
(assert (task-runner id (ref status)))
|
|
||||||
(begin/dataflow
|
|
||||||
(log "task-runner ~v state is: ~a" id (ref status)))
|
|
||||||
(during (task-assignment id $job-id (task $task-id $desc))
|
(during (task-assignment id $job-id (task $task-id $desc))
|
||||||
(field [state TaskStateDesc ACCEPTED])
|
(field [state TaskStateDesc ACCEPTED])
|
||||||
(assert (task-state id job-id task-id (ref state)))
|
(assert (task-state id job-id task-id (ref state)))
|
||||||
(cond
|
;; since we currently finish everything in one turn, these changes to status aren't
|
||||||
[(idle?)
|
;; actually visible.
|
||||||
;; since we currently finish everything in one turn, these changes to status aren't
|
(set! state RUNNING)
|
||||||
;; actually visible.
|
(match desc
|
||||||
(set! state RUNNING)
|
[(map-work $data)
|
||||||
(set! status (executing task-id))
|
(define wc (count-new-words (ann (hash) WordCount) (string->words data)))
|
||||||
(match desc
|
(set! state (finished wc))]
|
||||||
[(map-work $data)
|
[(reduce-work $left $right)
|
||||||
(define wc (count-new-words (ann (hash) WordCount) (string->words data)))
|
(define wc (hash-union/combine left right +))
|
||||||
(set! state (finished wc))]
|
(set! state (finished wc))]))))))
|
||||||
[(reduce-work $left $right)
|
|
||||||
(define wc (hash-union/combine left right +))
|
|
||||||
(set! state (finished wc))])
|
|
||||||
(set! status IDLE)]
|
|
||||||
[#t
|
|
||||||
(set! status OVERLOAD)]))))))
|
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
;; TaskManager
|
;; TaskManager
|
||||||
|
@ -224,16 +218,28 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(log "Task Manager (TM) ~a is running" id)
|
(log "Task Manager (TM) ~a is running" id)
|
||||||
(during (job-manager-alive)
|
(during (job-manager-alive)
|
||||||
(log "TM learns about JM")
|
(log "TM learns about JM")
|
||||||
(define/query-set task-runners (task-runner $id _) id
|
(define/query-set task-runners (task-runner $id) id
|
||||||
#:on-add (log "TM learns about task-runner ~a" id))
|
#:on-add (log "TM learns about task-runner ~a" id))
|
||||||
;; I wonder just how inefficient this is
|
(field [busy-runners (Set ID) (set)])
|
||||||
(define/query-set idle-runners (task-runner $id IDLE) id
|
(define/dataflow idle-runners
|
||||||
#:on-add (log "TM learns that task-runner ~a is IDLE" id)
|
(set-count (set-subtract (ref task-runners) (ref busy-runners))))
|
||||||
#:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id))
|
|
||||||
(assert (task-manager id (set-count (ref idle-runners))))
|
(assert (task-manager id (ref idle-runners)))
|
||||||
(field [busy-runners (List ID) (list)])
|
|
||||||
(define (can-accept?)
|
(define (can-accept?)
|
||||||
(not (set-empty? (ref idle-runners))))
|
(positive? (ref idle-runners)))
|
||||||
|
|
||||||
|
(define (select-runner)
|
||||||
|
(define runner (for/first ([r (in-set (ref task-runners))]
|
||||||
|
#:unless (set-member? (ref busy-runners) r))
|
||||||
|
r))
|
||||||
|
(match runner
|
||||||
|
[(some $r)
|
||||||
|
(set! busy-runners (set-add (ref busy-runners) r))
|
||||||
|
r]
|
||||||
|
[none
|
||||||
|
(error "need to call can-accept? before selecting a runner")]))
|
||||||
|
|
||||||
(during (task-assignment id $job-id (task $task-id $desc))
|
(during (task-assignment id $job-id (task $task-id $desc))
|
||||||
(define status0 : TaskStateDesc
|
(define status0 : TaskStateDesc
|
||||||
(if (can-accept?)
|
(if (can-accept?)
|
||||||
|
@ -242,15 +248,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(field [status TaskStateDesc status0])
|
(field [status TaskStateDesc status0])
|
||||||
(assert (task-state id job-id task-id (ref status)))
|
(assert (task-state id job-id task-id (ref status)))
|
||||||
(when (can-accept?)
|
(when (can-accept?)
|
||||||
(define runner (set-first (ref idle-runners)))
|
(define runner (select-runner))
|
||||||
;; n.b. modifying a query set is questionable
|
|
||||||
;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner.
|
|
||||||
;; Could use the busy-runners field to avoid that
|
|
||||||
(set! idle-runners (set-remove (ref idle-runners) runner))
|
|
||||||
(log "TM assigns task ~a to runner ~a" task-id runner)
|
(log "TM assigns task ~a to runner ~a" task-id runner)
|
||||||
;; TODO - since we're both adding and removing from this set I'm not sure TRs
|
(on stop (set! busy-runners (set-remove (ref busy-runners) runner)))
|
||||||
;; need to be making assertions about their idleness
|
|
||||||
(on stop (set! idle-runners (set-add (ref idle-runners) runner)))
|
|
||||||
(assert (task-assignment runner job-id (task task-id desc)))
|
(assert (task-assignment runner job-id (task task-id desc)))
|
||||||
(on (asserted (task-state runner job-id task-id $st))
|
(on (asserted (task-state runner job-id task-id $st))
|
||||||
(match st
|
(match st
|
||||||
|
@ -510,4 +510,4 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(spawn-task-runner)
|
(spawn-task-runner)
|
||||||
(spawn-task-runner)
|
(spawn-task-runner)
|
||||||
(spawn-client (file->job "lorem.txt"))
|
(spawn-client (file->job "lorem.txt"))
|
||||||
(spawn-client (string->job INPUT)))
|
#;(spawn-client (string->job INPUT)))
|
||||||
|
|
Loading…
Reference in New Issue