From 311108fbcfbada63bdc0ce6c3a071616d5ec4a68 Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Wed, 26 Feb 2020 16:34:24 -0500 Subject: [PATCH] typed flink: replace dataflow in job manager with internal events --- racket/typed/examples/roles/flink.rkt | 143 +++++++++++++------------- 1 file changed, 71 insertions(+), 72 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 63cb8c9..b550f68 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -341,9 +341,16 @@ The JobManager then performs the job and, when finished, asserts [(task $id (reduce-work $l $r)) (task id (reduce-work (left l) (left r)))])) -(assertion-struct assigned-task : SelectedTM (mngr)) (message-struct tasks-finished : TasksFinished (id results)) +;; assertions used for internal slot-management protocol +(assertion-struct slots : Slots (v)) +(assertion-struct slot-assignment : SlotAssignment (who mngr)) +;; tid is the TaskID, rid is a unique symbol to a particular request for a slot +(define-constructor* (request-id : ReqID tid rid)) +(define-type-alias RequestID (ReqID TaskID ID)) +(message-struct task-is-ready : TaskIsReady (job-id task)) + (define (spawn-job-manager) (spawn τc (begin @@ -351,35 +358,64 @@ The JobManager then performs the job and, when finished, asserts (assert (job-manager-alive)) (log "Job Manager Up") - ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. - (define/query-hash task-managers (task-manager $id $slots) id slots - #:on-add (log "JM learns that ~a has ~v slots" id (hash-ref (ref task-managers) id))) + (on start + (start-facet slot-manager + ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. + (define/query-hash task-managers (task-manager $id:ID $slots:Int) id slots + #:on-add (log "JM learns that ~a has ~v slots" id (hash-ref (ref task-managers) id))) - ;; (Hashof TaskManagerID Nat) - ;; to better understand the supply of slots for each task manager, keep track of the number - ;; of requested tasks that we have yet to hear back about - (field [requests-in-flight (Hash ID Int) (hash)]) - (define (slots-available) - (for/sum ([(id v) (ref task-managers)]) - (max 0 (- v (hash-ref/failure (ref requests-in-flight) id 0))))) + (field ;; how many outstanding assignments there are for each task manager + [requests-in-flight (Hash ID Int) (hash)] + ;; map a request's ID to the manager it is assigned to + [assignments (Hash ID ID) (hash)]) + (define (slots-available) + (for/sum ([(id v) (ref task-managers)]) + (max 0 (- v (hash-ref/failure (ref requests-in-flight) id 0))))) - ;; ID -> Void - ;; mark that we have requested the given task manager to perform a task - (define (take-slot! [id : ID]) - (log "JM assigns a task to ~a" id) - (set! requests-in-flight (hash-update/failure (ref requests-in-flight) id add1 0))) - ;; ID -> Void - ;; mark that we have heard back from the given manager about a requested task - (define (received-answer! [id : ID]) - (set! requests-in-flight (hash-update (ref requests-in-flight) id sub1))) + (define (try-take-slot! [me : ID] -> (Maybe ID)) + (define mngr? + (for/first ([(id slots) (ref task-managers)] + #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) + id)) + (match mngr? + [(some $m) + (set! assignments (hash-set (ref assignments) me m)) + (set! requests-in-flight (hash-update/failure (ref requests-in-flight) m add1 0))] + [none + #f]) + mngr?) + + (know (slots (slots-available))) + + (during (know (observe (slot-assignment (request-id $tid:TaskID $who:ID) _))) + (on start + (start-facet assign-manager + ;; what if one manager gains a slot but another loses one, so n stays the same? + (on (know (slots $n:Int)) + #;(log "Dispatcher request ~a learns there are ~a slots" tid n) + (unless (or (zero? n) (hash-has-key? (ref assignments) who)) + (define mngr? (try-take-slot! who)) + (match mngr? + [(some $mngr) + (stop assign-manager + (log "Dispatcher assigns task ~a to ~a" tid mngr) + (start-facet _ (know (slot-assignment (request-id tid who) mngr))) + (start-facet waiting-for-answer + (on (asserted (observe (task-performance mngr (task tid $x) _))) + (start-facet _ (on (asserted (task-performance mngr (task tid x) _)) + (log "Dispatcher sees answer for ~a" tid) + (stop waiting-for-answer)))) + (on stop + (set! requests-in-flight (hash-update (ref requests-in-flight) mngr sub1)))))] + [_ #f]))))) + (on stop (set! assignments (hash-remove (ref assignments) who)))))) (during (observe (job-completion $job-id $tasks _)) (log "JM receives job ~a" job-id) (define pending (for/list ([t tasks]) (input->pending-task t))) (define-tuple (not-ready ready) (partition-ready-tasks pending)) - (field [ready-tasks (List ConcreteTask) ready] - [waiting-tasks (List PendingTask) not-ready] + (field [waiting-tasks (List PendingTask) not-ready] [tasks-in-progress Int 0]) ;; Task -> Void @@ -387,7 +423,7 @@ The JobManager then performs the job and, when finished, asserts ;; TODO - use functional-queue.rkt from ../../ (match-define (task $tid _) t) (log "JM marks task ~a as ready" tid) - (set! ready-tasks (cons t (ref ready-tasks)))) + (realize! (task-is-ready job-id t))) ;; ID Data -> Void ;; Update any dependent tasks with the results of the given task, moving @@ -396,7 +432,6 @@ The JobManager then performs the job and, when finished, asserts [data : TaskResult]) (cond [(and (zero? (ref tasks-in-progress)) - (empty? (ref ready-tasks)) (empty? (ref waiting-tasks))) (log "JM finished with job ~a" job-id) (realize! (tasks-finished job-id data))] @@ -425,25 +460,13 @@ The JobManager then performs the job and, when finished, asserts (match-define (task $this-id $desc) t) (log "JM begins on task ~a" this-id) - (define not-a-real-task-manager (gensym 'FAKE)) - (field [task-mngr ID not-a-real-task-manager]) - ;; ID -> ... - (define (assign-task [mngr : ID]) + (define (assign-task [mngr : ID] + [request-again! : (→fn ★/t)]) (start-facet assign - (know (assigned-task mngr)) (on (retracted (task-manager mngr _)) ;; our task manager has crashed - (stop assign)) - (on start - ;; N.B. when this line was here, and not after `(when mngr ...)` above, - ;; things didn't work. I think that due to script scheduling, all ready - ;; tasks were being assigned to the manager - #;(take-slot! mngr) - (start-facet take-slot - (on (asserted (task-performance mngr t _)) - (stop take-slot - (received-answer! mngr))))) + (stop assign (request-again!))) (on (asserted (task-performance mngr t $status)) (match status [ACCEPTED #f] @@ -453,36 +476,17 @@ The JobManager then performs the job and, when finished, asserts ;; don't think we need a release-slot! here, because if we've heard back from a task manager, ;; they should have told us a different slot count since we tried to give them work (log "JM overloaded manager ~a with task ~a" mngr this-id) - (stop assign)] + (stop assign (request-again!))] [(finished $results) (log "JM receives the results of task ~a" this-id) (stop perform (k this-id results))])))) (define (select-a-task-manager) (start-facet select - - (field [mngr (Maybe ID) none]) - - (define (try-assign!) - (define mngr? - (for/first ([(id slots) (ref task-managers)] - #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) - id)) - (match mngr? - [(some $m) - (take-slot! m) - (set! mngr (some m)) - (assign-task m)] - [none - #f])) - - (begin/dataflow - (when (equal? (ref mngr) none) - (try-assign!))) - - (on (forget (assigned-task $m:ID)) - (when (equal? (some m) (ref mngr)) - (set! mngr none))))) + (field [req-id ID (gensym 'perform-task)]) + (define (request-again!) (set! req-id (gensym 'perform-task))) + (on (know (slot-assignment (request-id this-id (ref req-id)) $mngr:ID)) + (assign-task mngr request-again!)))) (on start (select-a-task-manager)))) @@ -491,15 +495,10 @@ The JobManager then performs the job and, when finished, asserts (on (realize (tasks-finished job-id $data:TaskResult)) (stop delegate-tasks (start-facet done (assert (job-completion job-id tasks data))))) - (begin/dataflow - (define slots (slots-available)) - (define-tuple (ts readys) - (split-at/lenient (ref ready-tasks) slots)) - (for ([t ts]) - (perform-task t push-results)) - (unless (empty? ts) - ;; the empty? check may be necessary to avoid a dataflow loop - (set! ready-tasks readys)))))))))) + (on (realize (task-is-ready job-id $t)) + (perform-task t push-results))) + (for ([t (in-list ready)]) + (add-ready-task! t)))))))) ;; --------------------------------------------------------------------------------------------------- ;; Client