syndicate-2017/racket/typed/examples/roles/flink.rkt

485 lines
19 KiB
Racket
Raw Normal View History

2019-05-15 19:19:05 +00:00
#lang typed/syndicate/roles
;; ---------------------------------------------------------------------------------------------------
;; Protocol
#|
Conversations in the flink dataspace primarily concern two topics: presence and
task execution.
Presence Protocol
-----------------
The JobManager (JM) asserts its presence with (job-manager-alive). The operation
of each TaskManager (TM) is contingent on the presence of a job manager.
|#
(assertion-struct job-manager-alive : JobManagerAlive ())
#|
In turn, TaskManagers advertise their presence with (task-manager ID slots),
where ID is a unique id, and slots is a natural number. The number of slots
dictates how many tasks the TM can take on. To reduce contention, the JM
should only assign a task to a TM if the TM actually has the resources to
perform a task.
|#
(assertion-struct task-manager : TaskManager (id slots))
;; an ID is a symbol
2019-05-15 21:00:21 +00:00
(define-type-alias ID Symbol)
2019-05-15 19:19:05 +00:00
#|
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
assert their presence with (task-runner ID Status), where Status is one of
- IDLE, when the TR is not executing a task
2019-05-15 21:00:21 +00:00
- (executing TaskID), when the TR is executing the task with the given TaskID
2019-05-15 19:19:05 +00:00
- OVERLOAD, when the TR has been asked to perform a task before it has
finished its previous assignment. For the purposes of this model, it indicates a
failure in the protocol; like the exchange between the JM and the TM, a TR
should only receive tasks when it is IDLE.
|#
(assertion-struct task-runner : TaskRunner (id status))
(define-constructor* (executing : Executing id))
2019-05-15 21:00:21 +00:00
(define-type-alias TaskID Int)
(define-type-alias Status (U Symbol (Executing TaskID)))
(define IDLE : Status 'idle)
(define OVERLOAD : Status 'overload)
2019-05-15 19:19:05 +00:00
#|
Task Delegation Protocol
-----------------------
Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
A TaskAssigner asserts the association of a Task with a particular TaskPerformer
through
(task-assignment ID ID Task)
where the first ID identifies the TP and the second identifies the job.
|#
(assertion-struct task-assignment : TaskAssignment (assignee job-id task))
#|
A Task is a (task TaskID Work), where Work is one of
- (map-work String)
- (reduce-work (U TaskID TaskResult) (U TaskID TaskResult)), referring to either the
ID of the dependent task or its results. A reduce-work is ready to be executed
when it has both results.
A TaskID is a natural number
A TaskResult is a (Hashof String Natural), counting the occurrences of words
|#
2019-05-24 16:31:03 +00:00
(require-struct task #:as Task #:from "flink-support.rkt")
(require-struct map-work #:as MapWork #:from "flink-support.rkt")
(require-struct reduce-work #:as ReduceWork #:from "flink-support.rkt")
2019-05-15 21:00:21 +00:00
(define-type-alias WordCount (Hash String Int))
(define-type-alias TaskResult WordCount)
(define-type-alias Reduce
(ReduceWork (Either TaskID TaskResult)
(Either TaskID TaskResult)))
2019-05-24 16:31:03 +00:00
(define-type-alias ReduceInput
(ReduceWork TaskID TaskID))
2019-05-15 21:00:21 +00:00
(define-type-alias Work
(U Reduce (MapWork String)))
(define-type-alias ConcreteWork
(U (ReduceWork TaskResult TaskResult)
(MapWork String)))
2019-05-24 16:31:03 +00:00
(define-type-alias InputTask
(Task TaskID (U ReduceInput (MapWork String))))
(define-type-alias PendingTask
(Task TaskID Work))
2019-05-15 21:00:21 +00:00
(define-type-alias ConcreteTask
(Task TaskID ConcreteWork))
2019-05-15 19:19:05 +00:00
#|
The TaskPerformer responds to a task-assignment by describing its state with respect
to that task,
(task-state ID ID ID TaskStateDesc)
where the first ID is that of the TP, the second is that of the job,
and the third that of the task.
A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
2019-05-16 19:53:02 +00:00
- OVERLOAD/ts, when the TP does not have the resources to perform the task.
2019-05-15 19:19:05 +00:00
- RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results
|#
(assertion-struct task-state : TaskState (assignee job-id task-id desc))
(define-constructor* (finished : Finished data))
2019-05-15 21:00:21 +00:00
(define-type-alias TaskStateDesc
(U Symbol (Finished TaskResult)))
(define ACCEPTED : TaskStateDesc 'accepted)
(define RUNNING : TaskStateDesc 'running)
2019-05-16 19:53:02 +00:00
;; this is gross, it's needed in part because equal? requires two of args of the same type
(define OVERLOAD/ts : TaskStateDesc 'overload)
2019-05-15 19:19:05 +00:00
#|
Two instances of the Task Delegation Protocol take place: one between the
JobManager and the TaskManager, and one between the TaskManager and its
TaskRunners.
|#
#|
Job Submission Protocol
-----------------------
Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)).
The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult)
|#
2019-05-24 16:31:03 +00:00
(require-struct job #:as Job #:from "flink-support.rkt")
2019-05-15 19:19:05 +00:00
(assertion-struct job-finished : JobFinished (id data))
2019-05-24 16:31:03 +00:00
(define-type-alias JobDesc (Job ID (List InputTask)))
2019-05-15 21:00:21 +00:00
(define-type-alias τc
(U (TaskRunner ID Status)
(TaskAssignment ID ID ConcreteTask)
(Observe (TaskAssignment ID ★/t ★/t))
(TaskState ID ID TaskID TaskStateDesc)
(Observe (TaskState ID ID TaskID ★/t))
2019-05-16 19:53:02 +00:00
(JobManagerAlive)
(Observe (JobManagerAlive))
(Observe (TaskRunner ★/t ★/t))
(TaskManager ID Int)
(Observe (TaskManager ★/t ★/t))
2019-05-24 16:31:03 +00:00
JobDesc
2019-05-21 21:23:45 +00:00
(Observe (Job ★/t ★/t))
2019-05-24 16:31:03 +00:00
(JobFinished ID TaskResult)
(Observe (JobFinished ID ★/t))))
2019-05-15 21:00:21 +00:00
2019-05-16 19:53:02 +00:00
;; ---------------------------------------------------------------------------------------------------
;; Util Macros
(require syntax/parse/define)
(define-simple-macro (log fmt . args)
(begin
(printf fmt . args)
(printf "\n")))
2019-05-15 21:00:21 +00:00
;; ---------------------------------------------------------------------------------------------------
;; TaskRunner
(define (word-count-increment [h : WordCount]
[word : String]
-> WordCount)
2019-05-15 21:25:11 +00:00
(hash-update/failure h
word
add1
0))
2019-05-15 21:00:21 +00:00
(define (count-new-words [word-count : WordCount]
[words : (List String)]
-> WordCount)
(for/fold ([result word-count])
([word words])
(word-count-increment result word)))
(require/typed "flink-support.rkt"
[string->words : (→fn String (List String))])
2019-05-24 14:06:01 +00:00
(define (spawn-task-runner)
2019-05-15 21:00:21 +00:00
(define id (gensym 'task-runner))
(spawn τc
(start-facet runner
(field [status Status IDLE])
(define (idle?) (equal? IDLE (ref status)))
(assert (task-runner id (ref status)))
(begin/dataflow
2019-05-16 19:53:02 +00:00
(log "task-runner ~v state is: ~a" id (ref status)))
2019-05-24 15:25:29 +00:00
(during (task-assignment id $job-id (task $task-id $desc))
2019-05-15 21:00:21 +00:00
(field [state TaskStateDesc ACCEPTED])
(assert (task-state id job-id task-id (ref state)))
(cond
[(idle?)
;; since we currently finish everything in one turn, these changes to status aren't
;; actually visible.
(set! state RUNNING)
(set! status (executing task-id))
(match desc
2019-05-24 15:25:29 +00:00
[(map-work $data)
2019-05-15 21:00:21 +00:00
(define wc (count-new-words (ann (hash) WordCount) (string->words data)))
(set! state (finished wc))]
2019-05-24 15:25:29 +00:00
[(reduce-work $left $right)
2019-05-15 21:25:11 +00:00
(define wc (hash-union/combine left right +))
2019-05-15 21:00:21 +00:00
(set! state (finished wc))])
(set! status IDLE)]
[#t
(set! status OVERLOAD)])))))
2019-05-16 19:53:02 +00:00
;; ---------------------------------------------------------------------------------------------------
;; TaskManager
2019-05-24 14:06:01 +00:00
(define (spawn-task-manager)
2019-05-16 19:53:02 +00:00
(define id (gensym 'task-manager))
(spawn τc
(start-facet tm
(log "Task Manager (TM) ~a is running" id)
(during (job-manager-alive)
(log "TM learns about JM")
2019-05-24 15:25:29 +00:00
(define/query-set task-runners (task-runner $id _) id
2019-05-24 19:07:34 +00:00
#:on-add (log "TM learns about task-runner ~a" id))
2019-05-16 19:53:02 +00:00
;; I wonder just how inefficient this is
2019-05-24 15:25:29 +00:00
(define/query-set idle-runners (task-runner $id IDLE) id
2019-05-24 19:07:34 +00:00
#:on-add (log "TM learns that task-runner ~a is IDLE" id)
#:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id))
2019-05-16 19:53:02 +00:00
(assert (task-manager id (set-count (ref idle-runners))))
(field [busy-runners (List ID) (list)])
(define (can-accept?)
(not (set-empty? (ref idle-runners))))
2019-05-24 15:25:29 +00:00
(during (task-assignment id $job-id (task $task-id $desc))
2019-05-16 19:53:02 +00:00
(define status0 : TaskStateDesc
(if (can-accept?)
RUNNING
OVERLOAD/ts))
(field [status TaskStateDesc status0])
(assert (task-state id job-id task-id (ref status)))
(when (can-accept?)
(define runner (set-first (ref idle-runners)))
;; n.b. modifying a query set is questionable
;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner.
;; Could use the busy-runners field to avoid that
(set! idle-runners (set-remove (ref idle-runners) runner))
(log "TM assigns task ~a to runner ~a" task-id runner)
;; TODO - since we're both adding and removing from this set I'm not sure TRs
;; need to be making assertions about their idleness
(on stop (set! idle-runners (set-add (ref idle-runners) runner)))
(assert (task-assignment runner job-id (task task-id desc)))
2019-05-24 15:25:29 +00:00
(on (asserted (task-state runner job-id task-id $st))
2019-05-16 19:53:02 +00:00
(match st
[ACCEPTED #f]
[RUNNING #f]
[OVERLOAD/ts
(set! status OVERLOAD/ts)]
[(finished discard)
(set! status st)]))))))))
;; ---------------------------------------------------------------------------------------------------
;; JobManager
;; Task -> Bool
;; Test if the task is ready to run
(define (task-ready? [t : PendingTask] -> (Maybe ConcreteTask))
(match t
2019-05-24 14:06:01 +00:00
[(task $tid (map-work $s))
;; having to re-produce this is directly bc of no occurrence typing
(some (task tid (map-work s)))]
2019-05-24 14:06:01 +00:00
[(task $tid (reduce-work (right $v1)
(right $v2)))
(some (task tid (reduce-work v1 v2)))]
2019-05-24 14:06:01 +00:00
[_
none]))
;; Task Id Any -> Task
;; If the given task is waiting for this data, replace the waiting ID with the data
(define (task+data [t : PendingTask]
[id : TaskID]
[data : TaskResult]
-> PendingTask)
(match t
2019-05-24 14:06:01 +00:00
[(task $tid (reduce-work (left id) $r))
(task tid (reduce-work (right data) r))]
2019-05-24 14:06:01 +00:00
[(task $tid (reduce-work $l (left id)))
(task tid (reduce-work l (right data)))]
2019-05-24 14:06:01 +00:00
[_ t]))
(require/typed "flink-support.rkt"
[split-at/lenient- : ( (X) (→fn (List X) Int (List (List X))))])
(define ( (X) (split-at/lenient [xs : (List X)]
[n : Int]
-> (Tuple (List X) (List X))))
(define l (split-at/lenient- xs n))
(tuple (first l) (second l)))
(define (partition-ready-tasks [tasks : (List PendingTask)]
-> (Tuple (List PendingTask)
(List ConcreteTask)))
(define part (inst partition/either PendingTask PendingTask ConcreteTask))
(part tasks
(lambda ([t : PendingTask])
(match (task-ready? t)
2019-05-24 14:06:01 +00:00
[(some $ct)
(right ct)]
[none
(left t)]))))
2019-05-24 16:31:03 +00:00
(define (input->pending-task [t : InputTask] -> PendingTask)
(match t
[(task $id (map-work $s))
(task id (map-work s))]
[(task $id (reduce-work $l $r))
(task id (reduce-work (left l) (left r)))]))
(define (spawn-job-manager)
(spawn τc
(start-facet jm
(assert (job-manager-alive))
(log "Job Manager Up")
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
2019-05-24 15:25:29 +00:00
(define/query-hash task-managers (task-manager $id $slots) id slots
2019-05-24 19:07:34 +00:00
#:on-add (log "JM learns that ~a has ~v slots" id (hash-ref (ref task-managers) id)))
;; (Hashof TaskManagerID Nat)
;; to better understand the supply of slots for each task manager, keep track of the number
;; of requested tasks that we have yet to hear back about
(field [requests-in-flight (Hash ID Int) (hash)])
(define (slots-available)
2019-05-24 19:07:34 +00:00
#;(printf "slots available!\n")
(for/sum ([(id v) (ref task-managers)])
2019-05-24 19:07:34 +00:00
#;(printf "(slots-available) ~a :: ~a\n" id v)
(max 0 (- v (hash-ref/failure (ref requests-in-flight) id 0)))))
;; ID -> Void
;; mark that we have requested the given task manager to perform a task
(define (take-slot! [id : ID])
(log "JM assigns a task to ~a" id)
(set! requests-in-flight (hash-update/failure (ref requests-in-flight) id add1 0)))
;; ID -> Void
;; mark that we have heard back from the given manager about a requested task
(define (received-answer! [id : ID])
(set! requests-in-flight (hash-update (ref requests-in-flight) id sub1)))
2019-05-24 15:25:29 +00:00
(during (job $job-id $tasks)
(log "JM receives job ~a" job-id)
2019-05-24 16:31:03 +00:00
(define pending (for/list ([t tasks])
(input->pending-task t)))
(define-tuple (not-ready ready) (partition-ready-tasks pending #;(map input->pending-task tasks)))
(field [ready-tasks (List ConcreteTask) ready]
[waiting-tasks (List PendingTask) not-ready]
[tasks-in-progress Int 0])
;; Task -> Void
2019-05-21 21:23:45 +00:00
(define (add-ready-task! [t : ConcreteTask])
;; TODO - use functional-queue.rkt from ../../
2019-05-24 14:06:01 +00:00
(match-define (task $tid _) t)
2019-05-21 21:23:45 +00:00
(log "JM marks task ~a as ready" tid)
(set! ready-tasks (cons t (ref ready-tasks))))
;; ID Data -> Void
;; Update any dependent tasks with the results of the given task, moving
;; them to the ready queue when possible
(define (push-results [task-id : TaskID]
[data : TaskResult])
(cond
[(and (zero? (ref tasks-in-progress))
(empty? (ref ready-tasks))
(empty? (ref waiting-tasks)))
(log "JM finished with job ~a" job-id)
(start-facet done (assert (job-finished job-id data)))]
[else
;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done.
(define still-waiting
(for/fold ([ts : (List PendingTask) (list)])
([t (ref waiting-tasks)])
(define t+ (task+data t task-id data))
(match (task-ready? t+)
2019-05-24 14:06:01 +00:00
[(some $ready)
2019-05-21 21:23:45 +00:00
(add-ready-task! ready)
ts]
2019-05-24 14:06:01 +00:00
[_
2019-05-21 21:23:45 +00:00
(cons t+ ts)])))
(set! waiting-tasks still-waiting)]))
;; Task (ID TaskResult -> Void) -> Void
;; Requires (task-ready? t)
2019-05-23 15:14:14 +00:00
(define ( (ρ) (perform-task [t : ConcreteTask]
[k : (proc TaskID TaskResult -> ★/t
#:roles (ρ))]))
2019-05-21 21:23:45 +00:00
(start-facet perform
(on start (set! tasks-in-progress (add1 (ref tasks-in-progress))))
(on stop (set! tasks-in-progress (sub1 (ref tasks-in-progress))))
2019-05-24 14:06:01 +00:00
(match-define (task $this-id $desc) t)
(log "JM begins on task ~a" this-id)
(define not-a-real-task-manager (gensym 'FAKE))
(field [task-mngr ID not-a-real-task-manager])
;; ID -> ...
2019-05-21 21:23:45 +00:00
(define (assign-task [mngr : ID])
(start-facet this-facet
2019-05-22 14:24:02 +00:00
(assert (task-assignment mngr job-id t))
2019-05-21 21:23:45 +00:00
(on (retracted (task-manager mngr discard))
;; our task manager has crashed
(stop this-facet
(set! task-mngr not-a-real-task-manager)))
2019-05-21 21:23:45 +00:00
(on start
;; N.B. when this line was here, and not after `(when mngr ...)` above,
;; things didn't work. I think that due to script scheduling, all ready
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(start-facet take-slot
(on (asserted (task-state mngr job-id this-id _))
(stop take-slot
(received-answer! mngr)))))
2019-05-24 15:25:29 +00:00
(on (asserted (task-state mngr job-id this-id $status))
2019-05-22 14:24:02 +00:00
(match status
[ACCEPTED #f]
[RUNNING #f]
[OVERLOAD/ts
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" mngr this-id)
(stop this-facet
(set! task-mngr not-a-real-task-manager))]
2019-05-24 14:06:01 +00:00
[(finished $results)
2019-05-22 14:24:02 +00:00
(log "JM receives the results of task ~a" this-id)
(stop perform (k this-id results))]))))
(define (select-a-task-manager)
(start-facet this-facet
(begin/dataflow
(when (equal? (ref task-mngr) not-a-real-task-manager)
(define mngr?
(for/first ([(id slots) (ref task-managers)]
#:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0))))
id))
(match mngr?
[(some $mngr)
(take-slot! mngr)
(set! task-mngr mngr)
(assign-task mngr)]
[none
#f])))))
(on start (select-a-task-manager))))
2019-05-23 15:14:14 +00:00
(begin/dataflow
(define slots (slots-available))
(define-tuple (ts readys)
(split-at/lenient (ref ready-tasks) slots))
2019-05-24 19:07:34 +00:00
#;(printf "slots: ~a\n" slots)
#;(printf "ts: ~a\n" ts)
#;(printf "readys: ~a\n" readys)
2019-05-23 15:14:14 +00:00
(for ([t ts])
(perform-task t push-results))
(unless (empty? ts)
;; the empty? check may be necessary to avoid a dataflow loop
2019-05-24 19:07:34 +00:00
(set! ready-tasks readys)))))))
2019-05-24 16:31:03 +00:00
;; ---------------------------------------------------------------------------------------------------
;; Client
;; Job -> Void
(define (spawn-client [j : JobDesc])
(spawn τc
(start-facet _
(match-define (job $id _) j)
(assert j)
(on (asserted (job-finished id $data))
(printf "job done!\n~a\n" data)))))
;; ---------------------------------------------------------------------------------------------------
;; Main
(require/typed "flink-support.rkt"
[string->job : (→fn String JobDesc)]
[file->job : (→fn String JobDesc)])
(define INPUT "a b c a b c\na b\n a b\na b")
;; expected:
;; #hash((a . 5) (b . 5) (c . 2))
(run-ground-dataspace τc
(spawn-job-manager)
(spawn-task-manager)
(spawn-task-runner)
(spawn-task-runner)
2019-05-24 19:07:34 +00:00
(spawn-client (file->job "lorem.txt"))
2019-05-24 16:31:03 +00:00
(spawn-client (string->job INPUT)))