typed-flink: task manager

This commit is contained in:
Sam Caldwell 2019-05-16 15:53:02 -04:00
parent dc0e434caa
commit 8f92368d8f
1 changed files with 69 additions and 2 deletions

View File

@ -88,7 +88,7 @@ and the third that of the task.
A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
- OVERLOAD, when the TP does not have the resources to perform the task.
- OVERLOAD/ts, when the TP does not have the resources to perform the task.
- RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results
|#
@ -98,6 +98,8 @@ A TaskStateDesc is one of
(U Symbol (Finished TaskResult)))
(define ACCEPTED : TaskStateDesc 'accepted)
(define RUNNING : TaskStateDesc 'running)
;; this is gross, it's needed in part because equal? requires two of args of the same type
(define OVERLOAD/ts : TaskStateDesc 'overload)
#|
Two instances of the Task Delegation Protocol take place: one between the
JobManager and the TaskManager, and one between the TaskManager and its
@ -119,8 +121,23 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(TaskAssignment ID ID ConcreteTask)
(Observe (TaskAssignment ID ★/t ★/t))
(TaskState ID ID TaskID TaskStateDesc)
(JobManagerAlive)
(Observe (JobManagerAlive))
(Observe (TaskRunner ★/t ★/t))
(TaskManager ID Int)
(Observe (TaskState ID ID TaskID ★/t))
))
;; ---------------------------------------------------------------------------------------------------
;; Util Macros
(require syntax/parse/define)
(define-simple-macro (log fmt . args)
(begin
(printf fmt . args)
(printf "\n")))
;; ---------------------------------------------------------------------------------------------------
;; TaskRunner
@ -150,7 +167,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (idle?) (equal? IDLE (ref status)))
(assert (task-runner id (ref status)))
(begin/dataflow
(printf "task-runner ~v state is: ~a" id (ref status)))
(log "task-runner ~v state is: ~a" id (ref status)))
(during (task-assignment id
(bind job-id ID)
(task (bind task-id TaskID)
@ -173,3 +190,53 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(set! status IDLE)]
[#t
(set! status OVERLOAD)])))))
;; ---------------------------------------------------------------------------------------------------
;; TaskManager
(define (spawn-task-manager)
(define id (gensym 'task-manager))
(spawn τc
(start-facet tm
(log "Task Manager (TM) ~a is running" id)
(during (job-manager-alive)
(log "TM learns about JM")
(define/query-set task-runners (task-runner (bind id ID) discard) id
#;#:on-add #;(log "TM learns about task-runner ~a" id))
;; I wonder just how inefficient this is
(define/query-set idle-runners (task-runner (bind id ID) IDLE) id
#;#:on-add #;(log "TM learns that task-runner ~a is IDLE" id)
#;#:on-remove #;(log "TM learns that task-runner ~a is NOT IDLE" id))
(assert (task-manager id (set-count (ref idle-runners))))
(field [busy-runners (List ID) (list)])
(define (can-accept?)
(not (set-empty? (ref idle-runners))))
(during (task-assignment id
(bind job-id ID)
(task (bind task-id TaskID)
(bind desc ConcreteWork)))
(define status0 : TaskStateDesc
(if (can-accept?)
RUNNING
OVERLOAD/ts))
(field [status TaskStateDesc status0])
(assert (task-state id job-id task-id (ref status)))
(when (can-accept?)
(define runner (set-first (ref idle-runners)))
;; n.b. modifying a query set is questionable
;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner.
;; Could use the busy-runners field to avoid that
(set! idle-runners (set-remove (ref idle-runners) runner))
(log "TM assigns task ~a to runner ~a" task-id runner)
;; TODO - since we're both adding and removing from this set I'm not sure TRs
;; need to be making assertions about their idleness
(on stop (set! idle-runners (set-add (ref idle-runners) runner)))
(assert (task-assignment runner job-id (task task-id desc)))
(on (asserted (task-state runner job-id task-id (bind st TaskStateDesc)))
(match st
[ACCEPTED #f]
[RUNNING #f]
[OVERLOAD/ts
(set! status OVERLOAD/ts)]
[(finished discard)
(set! status st)]))))))))