typed flink - task runner
This commit is contained in:
parent
68f14919d7
commit
1590687e7a
|
@ -23,20 +23,23 @@ perform a task.
|
||||||
|#
|
|#
|
||||||
(assertion-struct task-manager : TaskManager (id slots))
|
(assertion-struct task-manager : TaskManager (id slots))
|
||||||
;; an ID is a symbol
|
;; an ID is a symbol
|
||||||
|
(define-type-alias ID Symbol)
|
||||||
#|
|
#|
|
||||||
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
|
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
|
||||||
assert their presence with (task-runner ID Status), where Status is one of
|
assert their presence with (task-runner ID Status), where Status is one of
|
||||||
- IDLE, when the TR is not executing a task
|
- IDLE, when the TR is not executing a task
|
||||||
- (executing ID), when the TR is executing the task with the given ID
|
- (executing TaskID), when the TR is executing the task with the given TaskID
|
||||||
- OVERLOAD, when the TR has been asked to perform a task before it has
|
- OVERLOAD, when the TR has been asked to perform a task before it has
|
||||||
finished its previous assignment. For the purposes of this model, it indicates a
|
finished its previous assignment. For the purposes of this model, it indicates a
|
||||||
failure in the protocol; like the exchange between the JM and the TM, a TR
|
failure in the protocol; like the exchange between the JM and the TM, a TR
|
||||||
should only receive tasks when it is IDLE.
|
should only receive tasks when it is IDLE.
|
||||||
|#
|
|#
|
||||||
(assertion-struct task-runner : TaskRunner (id status))
|
(assertion-struct task-runner : TaskRunner (id status))
|
||||||
(define IDLE 'idle)
|
|
||||||
(define OVERLOAD 'overload)
|
|
||||||
(define-constructor* (executing : Executing id))
|
(define-constructor* (executing : Executing id))
|
||||||
|
(define-type-alias TaskID Int)
|
||||||
|
(define-type-alias Status (U Symbol (Executing TaskID)))
|
||||||
|
(define IDLE : Status 'idle)
|
||||||
|
(define OVERLOAD : Status 'overload)
|
||||||
|
|
||||||
#|
|
#|
|
||||||
Task Delegation Protocol
|
Task Delegation Protocol
|
||||||
|
@ -64,7 +67,18 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
|
||||||
(define-constructor* (task : Task id desc))
|
(define-constructor* (task : Task id desc))
|
||||||
(define-constructor* (map-work : MapWork data))
|
(define-constructor* (map-work : MapWork data))
|
||||||
(define-constructor* (reduce-work : ReduceWork left right))
|
(define-constructor* (reduce-work : ReduceWork left right))
|
||||||
|
(define-type-alias WordCount (Hash String Int))
|
||||||
|
(define-type-alias TaskResult WordCount)
|
||||||
|
(define-type-alias Reduce
|
||||||
|
(ReduceWork (U TaskID TaskResult)
|
||||||
|
(U TaskID TaskResult)))
|
||||||
|
(define-type-alias Work
|
||||||
|
(U Reduce (MapWork String)))
|
||||||
|
(define-type-alias ConcreteWork
|
||||||
|
(U (ReduceWork TaskResult TaskResult)
|
||||||
|
(MapWork String)))
|
||||||
|
(define-type-alias ConcreteTask
|
||||||
|
(Task TaskID ConcreteWork))
|
||||||
#|
|
#|
|
||||||
The TaskPerformer responds to a task-assignment by describing its state with respect
|
The TaskPerformer responds to a task-assignment by describing its state with respect
|
||||||
to that task,
|
to that task,
|
||||||
|
@ -80,8 +94,10 @@ A TaskStateDesc is one of
|
||||||
|#
|
|#
|
||||||
(assertion-struct task-state : TaskState (assignee job-id task-id desc))
|
(assertion-struct task-state : TaskState (assignee job-id task-id desc))
|
||||||
(define-constructor* (finished : Finished data))
|
(define-constructor* (finished : Finished data))
|
||||||
(define ACCEPTED 'accepted)
|
(define-type-alias TaskStateDesc
|
||||||
(define RUNNING 'running)
|
(U Symbol (Finished TaskResult)))
|
||||||
|
(define ACCEPTED : TaskStateDesc 'accepted)
|
||||||
|
(define RUNNING : TaskStateDesc 'running)
|
||||||
#|
|
#|
|
||||||
Two instances of the Task Delegation Protocol take place: one between the
|
Two instances of the Task Delegation Protocol take place: one between the
|
||||||
JobManager and the TaskManager, and one between the TaskManager and its
|
JobManager and the TaskManager, and one between the TaskManager and its
|
||||||
|
@ -97,3 +113,65 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
|#
|
|#
|
||||||
(assertion-struct job : Job (id tasks))
|
(assertion-struct job : Job (id tasks))
|
||||||
(assertion-struct job-finished : JobFinished (id data))
|
(assertion-struct job-finished : JobFinished (id data))
|
||||||
|
|
||||||
|
(define-type-alias τc
|
||||||
|
(U (TaskRunner ID Status)
|
||||||
|
(TaskAssignment ID ID ConcreteTask)
|
||||||
|
(Observe (TaskAssignment ID ★/t ★/t))
|
||||||
|
(TaskState ID ID TaskID TaskStateDesc)
|
||||||
|
))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
|
;; TaskRunner
|
||||||
|
|
||||||
|
(define (word-count-increment [h : WordCount]
|
||||||
|
[word : String]
|
||||||
|
-> WordCount)
|
||||||
|
(hash-update h
|
||||||
|
word
|
||||||
|
add1
|
||||||
|
#;(λ x 0)))
|
||||||
|
|
||||||
|
(define (count-new-words [word-count : WordCount]
|
||||||
|
[words : (List String)]
|
||||||
|
-> WordCount)
|
||||||
|
(for/fold ([result word-count])
|
||||||
|
([word words])
|
||||||
|
(word-count-increment result word)))
|
||||||
|
|
||||||
|
(require/typed "flink-support.rkt"
|
||||||
|
[string->words : (→fn String (List String))])
|
||||||
|
|
||||||
|
(define (spawn-task-runner)
|
||||||
|
(define id (gensym 'task-runner))
|
||||||
|
(spawn τc
|
||||||
|
(start-facet runner
|
||||||
|
(field [status Status IDLE])
|
||||||
|
(define (idle?) (equal? IDLE (ref status)))
|
||||||
|
(assert (task-runner id (ref status)))
|
||||||
|
(begin/dataflow
|
||||||
|
(printf "task-runner ~v state is: ~a" id (ref status)))
|
||||||
|
(during (task-assignment id
|
||||||
|
(bind job-id ID)
|
||||||
|
(task (bind task-id TaskID)
|
||||||
|
(bind desc ConcreteWork)))
|
||||||
|
(field [state TaskStateDesc ACCEPTED])
|
||||||
|
(assert (task-state id job-id task-id (ref state)))
|
||||||
|
(cond
|
||||||
|
[(idle?)
|
||||||
|
;; since we currently finish everything in one turn, these changes to status aren't
|
||||||
|
;; actually visible.
|
||||||
|
(set! state RUNNING)
|
||||||
|
(set! status (executing task-id))
|
||||||
|
(match desc
|
||||||
|
[(map-work (bind data String))
|
||||||
|
(define wc (count-new-words (ann (hash) WordCount) (string->words data)))
|
||||||
|
(set! state (finished wc))]
|
||||||
|
[(reduce-work (bind left WordCount) (bind right WordCount))
|
||||||
|
;; TODO - this kind of hash-union
|
||||||
|
#;(define wc (hash-union left right #:combine +))
|
||||||
|
(define wc left)
|
||||||
|
(set! state (finished wc))])
|
||||||
|
(set! status IDLE)]
|
||||||
|
[#t
|
||||||
|
(set! status OVERLOAD)])))))
|
||||||
|
|
Loading…
Reference in New Issue