typed-flink: task manager
This commit is contained in:
parent
16ce86c6c9
commit
93e1fea202
|
@ -88,7 +88,7 @@ and the third that of the task.
|
||||||
|
|
||||||
A TaskStateDesc is one of
|
A TaskStateDesc is one of
|
||||||
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
|
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
|
||||||
- OVERLOAD, when the TP does not have the resources to perform the task.
|
- OVERLOAD/ts, when the TP does not have the resources to perform the task.
|
||||||
- RUNNING, indicating that the task is being performed
|
- RUNNING, indicating that the task is being performed
|
||||||
- (finished TaskResult), describing the results
|
- (finished TaskResult), describing the results
|
||||||
|#
|
|#
|
||||||
|
@ -98,6 +98,8 @@ A TaskStateDesc is one of
|
||||||
(U Symbol (Finished TaskResult)))
|
(U Symbol (Finished TaskResult)))
|
||||||
(define ACCEPTED : TaskStateDesc 'accepted)
|
(define ACCEPTED : TaskStateDesc 'accepted)
|
||||||
(define RUNNING : TaskStateDesc 'running)
|
(define RUNNING : TaskStateDesc 'running)
|
||||||
|
;; this is gross, it's needed in part because equal? requires two of args of the same type
|
||||||
|
(define OVERLOAD/ts : TaskStateDesc 'overload)
|
||||||
#|
|
#|
|
||||||
Two instances of the Task Delegation Protocol take place: one between the
|
Two instances of the Task Delegation Protocol take place: one between the
|
||||||
JobManager and the TaskManager, and one between the TaskManager and its
|
JobManager and the TaskManager, and one between the TaskManager and its
|
||||||
|
@ -119,8 +121,23 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(TaskAssignment ID ID ConcreteTask)
|
(TaskAssignment ID ID ConcreteTask)
|
||||||
(Observe (TaskAssignment ID ★/t ★/t))
|
(Observe (TaskAssignment ID ★/t ★/t))
|
||||||
(TaskState ID ID TaskID TaskStateDesc)
|
(TaskState ID ID TaskID TaskStateDesc)
|
||||||
|
(JobManagerAlive)
|
||||||
|
(Observe (JobManagerAlive))
|
||||||
|
(Observe (TaskRunner ★/t ★/t))
|
||||||
|
(TaskManager ID Int)
|
||||||
|
(Observe (TaskState ID ID TaskID ★/t))
|
||||||
))
|
))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
|
;; Util Macros
|
||||||
|
|
||||||
|
(require syntax/parse/define)
|
||||||
|
|
||||||
|
(define-simple-macro (log fmt . args)
|
||||||
|
(begin
|
||||||
|
(printf fmt . args)
|
||||||
|
(printf "\n")))
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
;; TaskRunner
|
;; TaskRunner
|
||||||
|
|
||||||
|
@ -150,7 +167,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(define (idle?) (equal? IDLE (ref status)))
|
(define (idle?) (equal? IDLE (ref status)))
|
||||||
(assert (task-runner id (ref status)))
|
(assert (task-runner id (ref status)))
|
||||||
(begin/dataflow
|
(begin/dataflow
|
||||||
(printf "task-runner ~v state is: ~a" id (ref status)))
|
(log "task-runner ~v state is: ~a" id (ref status)))
|
||||||
(during (task-assignment id
|
(during (task-assignment id
|
||||||
(bind job-id ID)
|
(bind job-id ID)
|
||||||
(task (bind task-id TaskID)
|
(task (bind task-id TaskID)
|
||||||
|
@ -173,3 +190,53 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(set! status IDLE)]
|
(set! status IDLE)]
|
||||||
[#t
|
[#t
|
||||||
(set! status OVERLOAD)])))))
|
(set! status OVERLOAD)])))))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
|
;; TaskManager
|
||||||
|
|
||||||
|
(define (spawn-task-manager)
|
||||||
|
(define id (gensym 'task-manager))
|
||||||
|
(spawn τc
|
||||||
|
(start-facet tm
|
||||||
|
(log "Task Manager (TM) ~a is running" id)
|
||||||
|
(during (job-manager-alive)
|
||||||
|
(log "TM learns about JM")
|
||||||
|
(define/query-set task-runners (task-runner (bind id ID) discard) id
|
||||||
|
#;#:on-add #;(log "TM learns about task-runner ~a" id))
|
||||||
|
;; I wonder just how inefficient this is
|
||||||
|
(define/query-set idle-runners (task-runner (bind id ID) IDLE) id
|
||||||
|
#;#:on-add #;(log "TM learns that task-runner ~a is IDLE" id)
|
||||||
|
#;#:on-remove #;(log "TM learns that task-runner ~a is NOT IDLE" id))
|
||||||
|
(assert (task-manager id (set-count (ref idle-runners))))
|
||||||
|
(field [busy-runners (List ID) (list)])
|
||||||
|
(define (can-accept?)
|
||||||
|
(not (set-empty? (ref idle-runners))))
|
||||||
|
(during (task-assignment id
|
||||||
|
(bind job-id ID)
|
||||||
|
(task (bind task-id TaskID)
|
||||||
|
(bind desc ConcreteWork)))
|
||||||
|
(define status0 : TaskStateDesc
|
||||||
|
(if (can-accept?)
|
||||||
|
RUNNING
|
||||||
|
OVERLOAD/ts))
|
||||||
|
(field [status TaskStateDesc status0])
|
||||||
|
(assert (task-state id job-id task-id (ref status)))
|
||||||
|
(when (can-accept?)
|
||||||
|
(define runner (set-first (ref idle-runners)))
|
||||||
|
;; n.b. modifying a query set is questionable
|
||||||
|
;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner.
|
||||||
|
;; Could use the busy-runners field to avoid that
|
||||||
|
(set! idle-runners (set-remove (ref idle-runners) runner))
|
||||||
|
(log "TM assigns task ~a to runner ~a" task-id runner)
|
||||||
|
;; TODO - since we're both adding and removing from this set I'm not sure TRs
|
||||||
|
;; need to be making assertions about their idleness
|
||||||
|
(on stop (set! idle-runners (set-add (ref idle-runners) runner)))
|
||||||
|
(assert (task-assignment runner job-id (task task-id desc)))
|
||||||
|
(on (asserted (task-state runner job-id task-id (bind st TaskStateDesc)))
|
||||||
|
(match st
|
||||||
|
[ACCEPTED #f]
|
||||||
|
[RUNNING #f]
|
||||||
|
[OVERLOAD/ts
|
||||||
|
(set! status OVERLOAD/ts)]
|
||||||
|
[(finished discard)
|
||||||
|
(set! status st)]))))))))
|
||||||
|
|
Loading…
Reference in New Issue