From f460011a5dd511bdc5e8d2b291052445bd2ea70d Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Wed, 15 May 2019 17:00:21 -0400 Subject: [PATCH] typed flink - task runner --- racket/typed/examples/roles/flink.rkt | 90 +++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 6 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 39b2096..b0292f1 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -23,20 +23,23 @@ perform a task. |# (assertion-struct task-manager : TaskManager (id slots)) ;; an ID is a symbol +(define-type-alias ID Symbol) #| The resources available to a TM are its associated TaskRunners (TRs). TaskRunners assert their presence with (task-runner ID Status), where Status is one of - IDLE, when the TR is not executing a task - - (executing ID), when the TR is executing the task with the given ID + - (executing TaskID), when the TR is executing the task with the given TaskID - OVERLOAD, when the TR has been asked to perform a task before it has finished its previous assignment. For the purposes of this model, it indicates a failure in the protocol; like the exchange between the JM and the TM, a TR should only receive tasks when it is IDLE. |# (assertion-struct task-runner : TaskRunner (id status)) -(define IDLE 'idle) -(define OVERLOAD 'overload) (define-constructor* (executing : Executing id)) +(define-type-alias TaskID Int) +(define-type-alias Status (U Symbol (Executing TaskID))) +(define IDLE : Status 'idle) +(define OVERLOAD : Status 'overload) #| Task Delegation Protocol @@ -64,7 +67,18 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words (define-constructor* (task : Task id desc)) (define-constructor* (map-work : MapWork data)) (define-constructor* (reduce-work : ReduceWork left right)) - +(define-type-alias WordCount (Hash String Int)) +(define-type-alias TaskResult WordCount) +(define-type-alias Reduce + (ReduceWork (U TaskID TaskResult) + (U TaskID TaskResult))) +(define-type-alias Work + (U Reduce (MapWork String))) +(define-type-alias ConcreteWork + (U (ReduceWork TaskResult TaskResult) + (MapWork String))) +(define-type-alias ConcreteTask + (Task TaskID ConcreteWork)) #| The TaskPerformer responds to a task-assignment by describing its state with respect to that task, @@ -80,8 +94,10 @@ A TaskStateDesc is one of |# (assertion-struct task-state : TaskState (assignee job-id task-id desc)) (define-constructor* (finished : Finished data)) -(define ACCEPTED 'accepted) -(define RUNNING 'running) +(define-type-alias TaskStateDesc + (U Symbol (Finished TaskResult))) +(define ACCEPTED : TaskStateDesc 'accepted) +(define RUNNING : TaskStateDesc 'running) #| Two instances of the Task Delegation Protocol take place: one between the JobManager and the TaskManager, and one between the TaskManager and its @@ -97,3 +113,65 @@ The JobManager then performs the job and, when finished, asserts (job-finished I |# (assertion-struct job : Job (id tasks)) (assertion-struct job-finished : JobFinished (id data)) + +(define-type-alias τc + (U (TaskRunner ID Status) + (TaskAssignment ID ID ConcreteTask) + (Observe (TaskAssignment ID ★/t ★/t)) + (TaskState ID ID TaskID TaskStateDesc) + )) + +;; --------------------------------------------------------------------------------------------------- +;; TaskRunner + +(define (word-count-increment [h : WordCount] + [word : String] + -> WordCount) + (hash-update h + word + add1 + #;(λ x 0))) + +(define (count-new-words [word-count : WordCount] + [words : (List String)] + -> WordCount) + (for/fold ([result word-count]) + ([word words]) + (word-count-increment result word))) + +(require/typed "flink-support.rkt" + [string->words : (→fn String (List String))]) + +(define (spawn-task-runner) + (define id (gensym 'task-runner)) + (spawn τc + (start-facet runner + (field [status Status IDLE]) + (define (idle?) (equal? IDLE (ref status))) + (assert (task-runner id (ref status))) + (begin/dataflow + (printf "task-runner ~v state is: ~a" id (ref status))) + (during (task-assignment id + (bind job-id ID) + (task (bind task-id TaskID) + (bind desc ConcreteWork))) + (field [state TaskStateDesc ACCEPTED]) + (assert (task-state id job-id task-id (ref state))) + (cond + [(idle?) + ;; since we currently finish everything in one turn, these changes to status aren't + ;; actually visible. + (set! state RUNNING) + (set! status (executing task-id)) + (match desc + [(map-work (bind data String)) + (define wc (count-new-words (ann (hash) WordCount) (string->words data))) + (set! state (finished wc))] + [(reduce-work (bind left WordCount) (bind right WordCount)) + ;; TODO - this kind of hash-union + #;(define wc (hash-union left right #:combine +)) + (define wc left) + (set! state (finished wc))]) + (set! status IDLE)] + [#t + (set! status OVERLOAD)])))))