syndicate-2017/racket/typed/examples/roles/flink.rkt

176 lines
6.5 KiB
Racket

#lang typed/syndicate/roles
;; ---------------------------------------------------------------------------------------------------
;; Protocol
#|
Conversations in the flink dataspace primarily concern two topics: presence and
task execution.
Presence Protocol
-----------------
The JobManager (JM) asserts its presence with (job-manager-alive). The operation
of each TaskManager (TM) is contingent on the presence of a job manager.
|#
(assertion-struct job-manager-alive : JobManagerAlive ())
#|
In turn, TaskManagers advertise their presence with (task-manager ID slots),
where ID is a unique id, and slots is a natural number. The number of slots
dictates how many tasks the TM can take on. To reduce contention, the JM
should only assign a task to a TM if the TM actually has the resources to
perform a task.
|#
(assertion-struct task-manager : TaskManager (id slots))
;; an ID is a symbol
(define-type-alias ID Symbol)
#|
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
assert their presence with (task-runner ID Status), where Status is one of
- IDLE, when the TR is not executing a task
- (executing TaskID), when the TR is executing the task with the given TaskID
- OVERLOAD, when the TR has been asked to perform a task before it has
finished its previous assignment. For the purposes of this model, it indicates a
failure in the protocol; like the exchange between the JM and the TM, a TR
should only receive tasks when it is IDLE.
|#
(assertion-struct task-runner : TaskRunner (id status))
(define-constructor* (executing : Executing id))
(define-type-alias TaskID Int)
(define-type-alias Status (U Symbol (Executing TaskID)))
(define IDLE : Status 'idle)
(define OVERLOAD : Status 'overload)
#|
Task Delegation Protocol
-----------------------
Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
A TaskAssigner asserts the association of a Task with a particular TaskPerformer
through
(task-assignment ID ID Task)
where the first ID identifies the TP and the second identifies the job.
|#
(assertion-struct task-assignment : TaskAssignment (assignee job-id task))
#|
A Task is a (task TaskID Work), where Work is one of
- (map-work String)
- (reduce-work (U TaskID TaskResult) (U TaskID TaskResult)), referring to either the
ID of the dependent task or its results. A reduce-work is ready to be executed
when it has both results.
A TaskID is a natural number
A TaskResult is a (Hashof String Natural), counting the occurrences of words
|#
(define-constructor* (task : Task id desc))
(define-constructor* (map-work : MapWork data))
(define-constructor* (reduce-work : ReduceWork left right))
(define-type-alias WordCount (Hash String Int))
(define-type-alias TaskResult WordCount)
(define-type-alias Reduce
(ReduceWork (U TaskID TaskResult)
(U TaskID TaskResult)))
(define-type-alias Work
(U Reduce (MapWork String)))
(define-type-alias ConcreteWork
(U (ReduceWork TaskResult TaskResult)
(MapWork String)))
(define-type-alias ConcreteTask
(Task TaskID ConcreteWork))
#|
The TaskPerformer responds to a task-assignment by describing its state with respect
to that task,
(task-state ID ID ID TaskStateDesc)
where the first ID is that of the TP, the second is that of the job,
and the third that of the task.
A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
- OVERLOAD, when the TP does not have the resources to perform the task.
- RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results
|#
(assertion-struct task-state : TaskState (assignee job-id task-id desc))
(define-constructor* (finished : Finished data))
(define-type-alias TaskStateDesc
(U Symbol (Finished TaskResult)))
(define ACCEPTED : TaskStateDesc 'accepted)
(define RUNNING : TaskStateDesc 'running)
#|
Two instances of the Task Delegation Protocol take place: one between the
JobManager and the TaskManager, and one between the TaskManager and its
TaskRunners.
|#
#|
Job Submission Protocol
-----------------------
Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)).
The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult)
|#
(assertion-struct job : Job (id tasks))
(assertion-struct job-finished : JobFinished (id data))
(define-type-alias τc
(U (TaskRunner ID Status)
(TaskAssignment ID ID ConcreteTask)
(Observe (TaskAssignment ID ★/t ★/t))
(TaskState ID ID TaskID TaskStateDesc)
))
;; ---------------------------------------------------------------------------------------------------
;; TaskRunner
(define (word-count-increment [h : WordCount]
[word : String]
-> WordCount)
(hash-update/failure h
word
add1
(lambda () 0)))
(define (count-new-words [word-count : WordCount]
[words : (List String)]
-> WordCount)
(for/fold ([result word-count])
([word words])
(word-count-increment result word)))
(require/typed "flink-support.rkt"
[string->words : (→fn String (List String))])
(define (spawn-task-runner)
(define id (gensym 'task-runner))
(spawn τc
(start-facet runner
(field [status Status IDLE])
(define (idle?) (equal? IDLE (ref status)))
(assert (task-runner id (ref status)))
(begin/dataflow
(printf "task-runner ~v state is: ~a" id (ref status)))
(during (task-assignment id
(bind job-id ID)
(task (bind task-id TaskID)
(bind desc ConcreteWork)))
(field [state TaskStateDesc ACCEPTED])
(assert (task-state id job-id task-id (ref state)))
(cond
[(idle?)
;; since we currently finish everything in one turn, these changes to status aren't
;; actually visible.
(set! state RUNNING)
(set! status (executing task-id))
(match desc
[(map-work (bind data String))
(define wc (count-new-words (ann (hash) WordCount) (string->words data)))
(set! state (finished wc))]
[(reduce-work (bind left WordCount) (bind right WordCount))
(define wc (hash-union/combine left right +))
(set! state (finished wc))])
(set! status IDLE)]
[#t
(set! status OVERLOAD)])))))