From eb56a1006f4104b334db7de49da6861c7051452f Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Thu, 3 Oct 2019 16:09:40 -0400 Subject: [PATCH] typed flink: task runners don't need a status --- racket/typed/examples/roles/flink.rkt | 96 +++++++++++++-------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 2601888..2c87a75 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -26,15 +26,19 @@ perform a task. (define-type-alias ID Symbol) #| The resources available to a TM are its associated TaskRunners (TRs). TaskRunners -assert their presence with (task-runner ID Status), where Status is one of - - IDLE, when the TR is not executing a task - - (executing TaskID), when the TR is executing the task with the given TaskID - - OVERLOAD, when the TR has been asked to perform a task before it has - finished its previous assignment. For the purposes of this model, it indicates a - failure in the protocol; like the exchange between the JM and the TM, a TR - should only receive tasks when it is IDLE. +assert their presence with (task-runner ID), +|# +(assertion-struct task-runner : TaskRunner (id)) + +#| +A Status is one of +- IDLE, when the TR is not executing a task +- (executing TaskID), when the TR is executing the task with the given TaskID +- OVERLOAD, when the TR has been asked to perform a task before it has +finished its previous assignment. For the purposes of this model, it indicates a +failure in the protocol; like the exchange between the JM and the TM, a TR +should only receive tasks when it is IDLE. |# -(assertion-struct task-runner : TaskRunner (id status)) (define-constructor* (executing : Executing id)) (define-type-alias TaskID Int) (define-type-alias Status (U Symbol (Executing TaskID))) @@ -137,14 +141,14 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define-type-alias JobDesc (Job ID (List InputTask))) (define-type-alias τc - (U (TaskRunner ID Status) + (U (TaskRunner ID) (TaskAssignment ID ID ConcreteTask) (Observe (TaskAssignment ID ★/t ★/t)) (TaskState ID ID TaskID TaskStateDesc) (Observe (TaskState ID ID TaskID ★/t)) (JobManagerAlive) (Observe (JobManagerAlive)) - (Observe (TaskRunner ★/t ★/t)) + (Observe (TaskRunner ★/t)) (TaskManager ID Int) (Observe (TaskManager ★/t ★/t)) JobDesc @@ -188,30 +192,20 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (spawn τc (begin (start-facet runner - (field [status Status IDLE]) - (define (idle?) (equal? IDLE (ref status))) - (assert (task-runner id (ref status))) - (begin/dataflow - (log "task-runner ~v state is: ~a" id (ref status))) + (assert (task-runner id)) (during (task-assignment id $job-id (task $task-id $desc)) (field [state TaskStateDesc ACCEPTED]) (assert (task-state id job-id task-id (ref state))) - (cond - [(idle?) - ;; since we currently finish everything in one turn, these changes to status aren't - ;; actually visible. - (set! state RUNNING) - (set! status (executing task-id)) - (match desc - [(map-work $data) - (define wc (count-new-words (ann (hash) WordCount) (string->words data))) - (set! state (finished wc))] - [(reduce-work $left $right) - (define wc (hash-union/combine left right +)) - (set! state (finished wc))]) - (set! status IDLE)] - [#t - (set! status OVERLOAD)])))))) + ;; since we currently finish everything in one turn, these changes to status aren't + ;; actually visible. + (set! state RUNNING) + (match desc + [(map-work $data) + (define wc (count-new-words (ann (hash) WordCount) (string->words data))) + (set! state (finished wc))] + [(reduce-work $left $right) + (define wc (hash-union/combine left right +)) + (set! state (finished wc))])))))) ;; --------------------------------------------------------------------------------------------------- ;; TaskManager @@ -224,16 +218,28 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) (log "TM learns about JM") - (define/query-set task-runners (task-runner $id _) id + (define/query-set task-runners (task-runner $id) id #:on-add (log "TM learns about task-runner ~a" id)) - ;; I wonder just how inefficient this is - (define/query-set idle-runners (task-runner $id IDLE) id - #:on-add (log "TM learns that task-runner ~a is IDLE" id) - #:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id)) - (assert (task-manager id (set-count (ref idle-runners)))) - (field [busy-runners (List ID) (list)]) + (field [busy-runners (Set ID) (set)]) + (define/dataflow idle-runners + (set-count (set-subtract (ref task-runners) (ref busy-runners)))) + + (assert (task-manager id (ref idle-runners))) + (define (can-accept?) - (not (set-empty? (ref idle-runners)))) + (positive? (ref idle-runners))) + + (define (select-runner) + (define runner (for/first ([r (in-set (ref task-runners))] + #:unless (set-member? (ref busy-runners) r)) + r)) + (match runner + [(some $r) + (set! busy-runners (set-add (ref busy-runners) r)) + r] + [none + (error "need to call can-accept? before selecting a runner")])) + (during (task-assignment id $job-id (task $task-id $desc)) (define status0 : TaskStateDesc (if (can-accept?) @@ -242,15 +248,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (field [status TaskStateDesc status0]) (assert (task-state id job-id task-id (ref status))) (when (can-accept?) - (define runner (set-first (ref idle-runners))) - ;; n.b. modifying a query set is questionable - ;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner. - ;; Could use the busy-runners field to avoid that - (set! idle-runners (set-remove (ref idle-runners) runner)) + (define runner (select-runner)) (log "TM assigns task ~a to runner ~a" task-id runner) - ;; TODO - since we're both adding and removing from this set I'm not sure TRs - ;; need to be making assertions about their idleness - (on stop (set! idle-runners (set-add (ref idle-runners) runner))) + (on stop (set! busy-runners (set-remove (ref busy-runners) runner))) (assert (task-assignment runner job-id (task task-id desc))) (on (asserted (task-state runner job-id task-id $st)) (match st @@ -510,4 +510,4 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (spawn-task-runner) (spawn-task-runner) (spawn-client (file->job "lorem.txt")) - (spawn-client (string->job INPUT))) + #;(spawn-client (string->job INPUT)))