From 8f92368d8f7c1df50966a5780fa9ff2670ac762d Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Thu, 16 May 2019 15:53:02 -0400 Subject: [PATCH] typed-flink: task manager --- racket/typed/examples/roles/flink.rkt | 71 ++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 6d44482..7fabbc7 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -88,7 +88,7 @@ and the third that of the task. A TaskStateDesc is one of - ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently) - - OVERLOAD, when the TP does not have the resources to perform the task. + - OVERLOAD/ts, when the TP does not have the resources to perform the task. - RUNNING, indicating that the task is being performed - (finished TaskResult), describing the results |# @@ -98,6 +98,8 @@ A TaskStateDesc is one of (U Symbol (Finished TaskResult))) (define ACCEPTED : TaskStateDesc 'accepted) (define RUNNING : TaskStateDesc 'running) +;; this is gross, it's needed in part because equal? requires two of args of the same type +(define OVERLOAD/ts : TaskStateDesc 'overload) #| Two instances of the Task Delegation Protocol take place: one between the JobManager and the TaskManager, and one between the TaskManager and its @@ -119,8 +121,23 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (TaskAssignment ID ID ConcreteTask) (Observe (TaskAssignment ID ★/t ★/t)) (TaskState ID ID TaskID TaskStateDesc) + (JobManagerAlive) + (Observe (JobManagerAlive)) + (Observe (TaskRunner ★/t ★/t)) + (TaskManager ID Int) + (Observe (TaskState ID ID TaskID ★/t)) )) +;; --------------------------------------------------------------------------------------------------- +;; Util Macros + +(require syntax/parse/define) + +(define-simple-macro (log fmt . args) + (begin + (printf fmt . args) + (printf "\n"))) + ;; --------------------------------------------------------------------------------------------------- ;; TaskRunner @@ -150,7 +167,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (idle?) (equal? IDLE (ref status))) (assert (task-runner id (ref status))) (begin/dataflow - (printf "task-runner ~v state is: ~a" id (ref status))) + (log "task-runner ~v state is: ~a" id (ref status))) (during (task-assignment id (bind job-id ID) (task (bind task-id TaskID) @@ -173,3 +190,53 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (set! status IDLE)] [#t (set! status OVERLOAD)]))))) + +;; --------------------------------------------------------------------------------------------------- +;; TaskManager + +(define (spawn-task-manager) + (define id (gensym 'task-manager)) + (spawn τc + (start-facet tm + (log "Task Manager (TM) ~a is running" id) + (during (job-manager-alive) + (log "TM learns about JM") + (define/query-set task-runners (task-runner (bind id ID) discard) id + #;#:on-add #;(log "TM learns about task-runner ~a" id)) + ;; I wonder just how inefficient this is + (define/query-set idle-runners (task-runner (bind id ID) IDLE) id + #;#:on-add #;(log "TM learns that task-runner ~a is IDLE" id) + #;#:on-remove #;(log "TM learns that task-runner ~a is NOT IDLE" id)) + (assert (task-manager id (set-count (ref idle-runners)))) + (field [busy-runners (List ID) (list)]) + (define (can-accept?) + (not (set-empty? (ref idle-runners)))) + (during (task-assignment id + (bind job-id ID) + (task (bind task-id TaskID) + (bind desc ConcreteWork))) + (define status0 : TaskStateDesc + (if (can-accept?) + RUNNING + OVERLOAD/ts)) + (field [status TaskStateDesc status0]) + (assert (task-state id job-id task-id (ref status))) + (when (can-accept?) + (define runner (set-first (ref idle-runners))) + ;; n.b. modifying a query set is questionable + ;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner. + ;; Could use the busy-runners field to avoid that + (set! idle-runners (set-remove (ref idle-runners) runner)) + (log "TM assigns task ~a to runner ~a" task-id runner) + ;; TODO - since we're both adding and removing from this set I'm not sure TRs + ;; need to be making assertions about their idleness + (on stop (set! idle-runners (set-add (ref idle-runners) runner))) + (assert (task-assignment runner job-id (task task-id desc))) + (on (asserted (task-state runner job-id task-id (bind st TaskStateDesc))) + (match st + [ACCEPTED #f] + [RUNNING #f] + [OVERLOAD/ts + (set! status OVERLOAD/ts)] + [(finished discard) + (set! status st)]))))))))