From abc8669b74456a803cd89404a43f4eb915eb96c7 Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Tue, 26 Feb 2019 10:50:00 -0500 Subject: [PATCH] examples/flink: describe the protocol --- racket/syndicate/examples/actor/flink.rkt | 145 ++++++++++++++++------ 1 file changed, 106 insertions(+), 39 deletions(-) diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 5d0eb7f..b810c69 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -21,6 +21,112 @@ (module+ test (require rackunit)) +;; --------------------------------------------------------------------------------------------------- +;; Protocol + +#| +Conversations in the flink dataspace primarily concern two topics: presence and +task execution. + +Presence Protocol +----------------- + +The JobManager (JM) asserts its presence with (job-manager-alive). The operation +of each TaskManager (TM) is contingent on the presence of a job manager. +|# +(assertion-struct job-manager-alive ()) +#| +In turn, TaskManagers advertise their presence with (task-manager ID slots), +where ID is a unique id, and slots is a natural number. The number of slots +dictates how many tasks the TM can take on. To reduce contention, we the JM +should only assign a task to a TM if the TM actually has the resources to +perform a task. +|# +(assertion-struct task-manager (id slots)) +;; an ID is a symbol or a natural number. +;; Any -> Bool +;; recognize ids +(define (id? x) + (or (symbol? x) (exact-nonnegative-integer? x))) +#| +The resources available to a TM are its associated TaskRunners (TRs). TaskRunners +assert their presence with (task-runner ID Status), where Status is one of + - IDLE, when the TR is not executing a task + - (executing ID), when the TR is executing the task with the given ID + - OVERLOAD, when the TR has been asked to perform a task before it has + finished its previous assignment. For the purposes of this model, it indicates a + failure in the protocol; like the exchange between the JM and the TM, a TR + should only receive tasks when it is IDLE. +|# +(assertion-struct task-runner (id status)) +(define IDLE 'idle) +(define OVERLOAD 'overload) +(struct executing (id) #:transparent) + +#| +Task Execution Protocol +----------------------- + +When the JobManager receives a Job, it assigns its constituent Tasks to the +TaskManagers, subject to TM availability and the readiness of each Task. + +The JM asserts the association of a Task with a particular TM +through (submitted-task ID Task), where ID identifies the TM. +TODO - also need to correlate Job ID through here. +|# +(assertion-struct submitted-task (manager task)) +#| +A Task is a (task ID Work), where Work is one of + - (map-task String) + - (reduce-task (U ID TaskResult) (U ID TaskResult)), referring to either the + ID of the dependent task or its results. A reduce-task is ready to be executed + when it has both results. + +A TaskResult is a (Hashof String Natural), counting the occurrences of words +|# +(struct task (id desc) #:transparent) +(struct map-task (data) #:transparent) +(struct reduce-task (left right) #:transparent) + +#| +A TaskManager responds to a submitted-task by describing its state with respect +to that task, (task-state ID TaskStateDesc), where ID is that of the +submitted-task (TODO - that doesn't seem like enough). TaskStateDesc is one of + - ACCEPTED, when the TM can assign the Task to an available JR (TODO - not sure if this is ever visible, currently) + - OVERLOAD, when the TM does not have the resources to perform the task. + - RUNNING, indicating that the task has successfully been delegated to a JR + - (finished TaskResult), describing the results +|# +(assertion-struct task-state (id desc)) +(struct finished (data) #:transparent) +(define ACCEPTED 'accepted) +(define RUNNING 'running) +#| +Upon receipt, a TaskManager selects a TaskRunner to perform it. The TM asserts +the association, (run-task ID Task), where ID is that of the given TaskRunner. + +The TaskRunner shares its reaction to a task assignment with an assertion, +(task-execution-state Task ExecutionState). (TODO - the TR ID should be in there!). +The ExecutionState is one of + - RUNNING, indicating that the TR has accepted and is executing the task + - OVERLOAD, when the TR is overloaded + - (finished TaskResult), describing the results +TODO - merge this with TaskStateDesc +TODO TODO - merge the JR/TM and TM/TR task protocols with one another - TODO TODO +|# +(assertion-struct run-task (id task)) +(assertion-struct task-execution-state (task state)) + +#| +Job Submission Protocol +----------------------- + +Finally, Clients submit their jobs to the JobManager by asserting a Job, which is a (job ID (Listof Task)). +The JobManager then performs the job and, when finished, asserts (job-finished ID TaskResult) +|# +(assertion-struct job (id tasks)) +(assertion-struct job-finished (id data)) + ;; --------------------------------------------------------------------------------------------------- ;; Logging @@ -33,7 +139,6 @@ ;; --------------------------------------------------------------------------------------------------- ;; TaskRunner - ;; (Hash String Nat) String -> (Hash String Nat) (define (word-count-increment h word) (hash-update h @@ -65,15 +170,8 @@ #;(check-equal? (string->words "but wait---there's more") (list "but" "wait" "there's" "more"))) -(assertion-struct task-runner (id status)) -(assertion-struct run-task (id task)) -(assertion-struct task-execution-state (task state)) (assertion-struct task-input (task-id task input-id data)) -(struct finished (data) #:transparent) -(struct executing (id) #:transparent) -(define IDLE 'idle) -(define RUNNING 'running) (define (spawn-task-runner) (define id (gensym 'task-runner)) @@ -120,22 +218,6 @@ ;; --------------------------------------------------------------------------------------------------- ;; TaskManager -(assertion-struct task-manager (id slots)) -(assertion-struct submitted-task (manager task)) -(assertion-struct job-manager-alive ()) - -;; a TaskState is (task-state ID TaskStateDesc) -;; where TaskStateDesc is one of -;; - ACCEPTED -;; - OVERLOAD -;; - RUNNING -;; - (finished data) -(assertion-struct task-state (id desc)) - -;; task states -(define ACCEPTED 'accepted) -(define OVERLOAD 'overload) - (define (spawn-task-manager) (define id (gensym 'task-manager)) (spawn #:name id @@ -189,13 +271,6 @@ ;; --------------------------------------------------------------------------------------------------- ;; JobManager -(assertion-struct job (id tasks)) -(assertion-struct job-finished (id data)) -(struct job-description (id tasks) #:transparent) -(struct map-task (data) #:transparent) -(struct reduce-task (left right) #:transparent) -(struct task (id desc) #:transparent) - (define (spawn-job-manager) (spawn (assert (job-manager-alive)) @@ -334,10 +409,6 @@ (task tid (reduce-task l data))] [_ t])) -;; Any -> Bool -;; recognize ids -(define (id? x) - (or (symbol? x) (exact-nonnegative-integer? x))) ;; (Listof A) Nat -> (Values (Listof A) (Listof A)) ;; like split-at but allow a number larger than the length of the list @@ -363,10 +434,6 @@ ;; --------------------------------------------------------------------------------------------------- ;; Creating a Job -;; a WorkDesc is one of -;; (map-task data) -;; (reduce-task WorkDesc WorkDesc) - ;; (Listof WordDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc)) ;; Pair up elements of the input list into a list of reduce tasks, and if the input list is odd also ;; return the odd-one out