typed flink: replace dataflow in job manager with internal events
This commit is contained in:
parent
01a544b0d9
commit
311108fbcf
|
@ -341,9 +341,16 @@ The JobManager then performs the job and, when finished, asserts
|
||||||
[(task $id (reduce-work $l $r))
|
[(task $id (reduce-work $l $r))
|
||||||
(task id (reduce-work (left l) (left r)))]))
|
(task id (reduce-work (left l) (left r)))]))
|
||||||
|
|
||||||
(assertion-struct assigned-task : SelectedTM (mngr))
|
|
||||||
(message-struct tasks-finished : TasksFinished (id results))
|
(message-struct tasks-finished : TasksFinished (id results))
|
||||||
|
|
||||||
|
;; assertions used for internal slot-management protocol
|
||||||
|
(assertion-struct slots : Slots (v))
|
||||||
|
(assertion-struct slot-assignment : SlotAssignment (who mngr))
|
||||||
|
;; tid is the TaskID, rid is a unique symbol to a particular request for a slot
|
||||||
|
(define-constructor* (request-id : ReqID tid rid))
|
||||||
|
(define-type-alias RequestID (ReqID TaskID ID))
|
||||||
|
(message-struct task-is-ready : TaskIsReady (job-id task))
|
||||||
|
|
||||||
(define (spawn-job-manager)
|
(define (spawn-job-manager)
|
||||||
(spawn τc
|
(spawn τc
|
||||||
(begin
|
(begin
|
||||||
|
@ -351,35 +358,64 @@ The JobManager then performs the job and, when finished, asserts
|
||||||
(assert (job-manager-alive))
|
(assert (job-manager-alive))
|
||||||
(log "Job Manager Up")
|
(log "Job Manager Up")
|
||||||
|
|
||||||
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
|
(on start
|
||||||
(define/query-hash task-managers (task-manager $id $slots) id slots
|
(start-facet slot-manager
|
||||||
#:on-add (log "JM learns that ~a has ~v slots" id (hash-ref (ref task-managers) id)))
|
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
|
||||||
|
(define/query-hash task-managers (task-manager $id:ID $slots:Int) id slots
|
||||||
|
#:on-add (log "JM learns that ~a has ~v slots" id (hash-ref (ref task-managers) id)))
|
||||||
|
|
||||||
;; (Hashof TaskManagerID Nat)
|
(field ;; how many outstanding assignments there are for each task manager
|
||||||
;; to better understand the supply of slots for each task manager, keep track of the number
|
[requests-in-flight (Hash ID Int) (hash)]
|
||||||
;; of requested tasks that we have yet to hear back about
|
;; map a request's ID to the manager it is assigned to
|
||||||
(field [requests-in-flight (Hash ID Int) (hash)])
|
[assignments (Hash ID ID) (hash)])
|
||||||
(define (slots-available)
|
(define (slots-available)
|
||||||
(for/sum ([(id v) (ref task-managers)])
|
(for/sum ([(id v) (ref task-managers)])
|
||||||
(max 0 (- v (hash-ref/failure (ref requests-in-flight) id 0)))))
|
(max 0 (- v (hash-ref/failure (ref requests-in-flight) id 0)))))
|
||||||
|
|
||||||
;; ID -> Void
|
(define (try-take-slot! [me : ID] -> (Maybe ID))
|
||||||
;; mark that we have requested the given task manager to perform a task
|
(define mngr?
|
||||||
(define (take-slot! [id : ID])
|
(for/first ([(id slots) (ref task-managers)]
|
||||||
(log "JM assigns a task to ~a" id)
|
#:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0))))
|
||||||
(set! requests-in-flight (hash-update/failure (ref requests-in-flight) id add1 0)))
|
id))
|
||||||
;; ID -> Void
|
(match mngr?
|
||||||
;; mark that we have heard back from the given manager about a requested task
|
[(some $m)
|
||||||
(define (received-answer! [id : ID])
|
(set! assignments (hash-set (ref assignments) me m))
|
||||||
(set! requests-in-flight (hash-update (ref requests-in-flight) id sub1)))
|
(set! requests-in-flight (hash-update/failure (ref requests-in-flight) m add1 0))]
|
||||||
|
[none
|
||||||
|
#f])
|
||||||
|
mngr?)
|
||||||
|
|
||||||
|
(know (slots (slots-available)))
|
||||||
|
|
||||||
|
(during (know (observe (slot-assignment (request-id $tid:TaskID $who:ID) _)))
|
||||||
|
(on start
|
||||||
|
(start-facet assign-manager
|
||||||
|
;; what if one manager gains a slot but another loses one, so n stays the same?
|
||||||
|
(on (know (slots $n:Int))
|
||||||
|
#;(log "Dispatcher request ~a learns there are ~a slots" tid n)
|
||||||
|
(unless (or (zero? n) (hash-has-key? (ref assignments) who))
|
||||||
|
(define mngr? (try-take-slot! who))
|
||||||
|
(match mngr?
|
||||||
|
[(some $mngr)
|
||||||
|
(stop assign-manager
|
||||||
|
(log "Dispatcher assigns task ~a to ~a" tid mngr)
|
||||||
|
(start-facet _ (know (slot-assignment (request-id tid who) mngr)))
|
||||||
|
(start-facet waiting-for-answer
|
||||||
|
(on (asserted (observe (task-performance mngr (task tid $x) _)))
|
||||||
|
(start-facet _ (on (asserted (task-performance mngr (task tid x) _))
|
||||||
|
(log "Dispatcher sees answer for ~a" tid)
|
||||||
|
(stop waiting-for-answer))))
|
||||||
|
(on stop
|
||||||
|
(set! requests-in-flight (hash-update (ref requests-in-flight) mngr sub1)))))]
|
||||||
|
[_ #f])))))
|
||||||
|
(on stop (set! assignments (hash-remove (ref assignments) who))))))
|
||||||
|
|
||||||
(during (observe (job-completion $job-id $tasks _))
|
(during (observe (job-completion $job-id $tasks _))
|
||||||
(log "JM receives job ~a" job-id)
|
(log "JM receives job ~a" job-id)
|
||||||
(define pending (for/list ([t tasks])
|
(define pending (for/list ([t tasks])
|
||||||
(input->pending-task t)))
|
(input->pending-task t)))
|
||||||
(define-tuple (not-ready ready) (partition-ready-tasks pending))
|
(define-tuple (not-ready ready) (partition-ready-tasks pending))
|
||||||
(field [ready-tasks (List ConcreteTask) ready]
|
(field [waiting-tasks (List PendingTask) not-ready]
|
||||||
[waiting-tasks (List PendingTask) not-ready]
|
|
||||||
[tasks-in-progress Int 0])
|
[tasks-in-progress Int 0])
|
||||||
|
|
||||||
;; Task -> Void
|
;; Task -> Void
|
||||||
|
@ -387,7 +423,7 @@ The JobManager then performs the job and, when finished, asserts
|
||||||
;; TODO - use functional-queue.rkt from ../../
|
;; TODO - use functional-queue.rkt from ../../
|
||||||
(match-define (task $tid _) t)
|
(match-define (task $tid _) t)
|
||||||
(log "JM marks task ~a as ready" tid)
|
(log "JM marks task ~a as ready" tid)
|
||||||
(set! ready-tasks (cons t (ref ready-tasks))))
|
(realize! (task-is-ready job-id t)))
|
||||||
|
|
||||||
;; ID Data -> Void
|
;; ID Data -> Void
|
||||||
;; Update any dependent tasks with the results of the given task, moving
|
;; Update any dependent tasks with the results of the given task, moving
|
||||||
|
@ -396,7 +432,6 @@ The JobManager then performs the job and, when finished, asserts
|
||||||
[data : TaskResult])
|
[data : TaskResult])
|
||||||
(cond
|
(cond
|
||||||
[(and (zero? (ref tasks-in-progress))
|
[(and (zero? (ref tasks-in-progress))
|
||||||
(empty? (ref ready-tasks))
|
|
||||||
(empty? (ref waiting-tasks)))
|
(empty? (ref waiting-tasks)))
|
||||||
(log "JM finished with job ~a" job-id)
|
(log "JM finished with job ~a" job-id)
|
||||||
(realize! (tasks-finished job-id data))]
|
(realize! (tasks-finished job-id data))]
|
||||||
|
@ -425,25 +460,13 @@ The JobManager then performs the job and, when finished, asserts
|
||||||
(match-define (task $this-id $desc) t)
|
(match-define (task $this-id $desc) t)
|
||||||
(log "JM begins on task ~a" this-id)
|
(log "JM begins on task ~a" this-id)
|
||||||
|
|
||||||
(define not-a-real-task-manager (gensym 'FAKE))
|
|
||||||
(field [task-mngr ID not-a-real-task-manager])
|
|
||||||
|
|
||||||
;; ID -> ...
|
;; ID -> ...
|
||||||
(define (assign-task [mngr : ID])
|
(define (assign-task [mngr : ID]
|
||||||
|
[request-again! : (→fn ★/t)])
|
||||||
(start-facet assign
|
(start-facet assign
|
||||||
(know (assigned-task mngr))
|
|
||||||
(on (retracted (task-manager mngr _))
|
(on (retracted (task-manager mngr _))
|
||||||
;; our task manager has crashed
|
;; our task manager has crashed
|
||||||
(stop assign))
|
(stop assign (request-again!)))
|
||||||
(on start
|
|
||||||
;; N.B. when this line was here, and not after `(when mngr ...)` above,
|
|
||||||
;; things didn't work. I think that due to script scheduling, all ready
|
|
||||||
;; tasks were being assigned to the manager
|
|
||||||
#;(take-slot! mngr)
|
|
||||||
(start-facet take-slot
|
|
||||||
(on (asserted (task-performance mngr t _))
|
|
||||||
(stop take-slot
|
|
||||||
(received-answer! mngr)))))
|
|
||||||
(on (asserted (task-performance mngr t $status))
|
(on (asserted (task-performance mngr t $status))
|
||||||
(match status
|
(match status
|
||||||
[ACCEPTED #f]
|
[ACCEPTED #f]
|
||||||
|
@ -453,36 +476,17 @@ The JobManager then performs the job and, when finished, asserts
|
||||||
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
|
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
|
||||||
;; they should have told us a different slot count since we tried to give them work
|
;; they should have told us a different slot count since we tried to give them work
|
||||||
(log "JM overloaded manager ~a with task ~a" mngr this-id)
|
(log "JM overloaded manager ~a with task ~a" mngr this-id)
|
||||||
(stop assign)]
|
(stop assign (request-again!))]
|
||||||
[(finished $results)
|
[(finished $results)
|
||||||
(log "JM receives the results of task ~a" this-id)
|
(log "JM receives the results of task ~a" this-id)
|
||||||
(stop perform (k this-id results))]))))
|
(stop perform (k this-id results))]))))
|
||||||
|
|
||||||
(define (select-a-task-manager)
|
(define (select-a-task-manager)
|
||||||
(start-facet select
|
(start-facet select
|
||||||
|
(field [req-id ID (gensym 'perform-task)])
|
||||||
(field [mngr (Maybe ID) none])
|
(define (request-again!) (set! req-id (gensym 'perform-task)))
|
||||||
|
(on (know (slot-assignment (request-id this-id (ref req-id)) $mngr:ID))
|
||||||
(define (try-assign!)
|
(assign-task mngr request-again!))))
|
||||||
(define mngr?
|
|
||||||
(for/first ([(id slots) (ref task-managers)]
|
|
||||||
#:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0))))
|
|
||||||
id))
|
|
||||||
(match mngr?
|
|
||||||
[(some $m)
|
|
||||||
(take-slot! m)
|
|
||||||
(set! mngr (some m))
|
|
||||||
(assign-task m)]
|
|
||||||
[none
|
|
||||||
#f]))
|
|
||||||
|
|
||||||
(begin/dataflow
|
|
||||||
(when (equal? (ref mngr) none)
|
|
||||||
(try-assign!)))
|
|
||||||
|
|
||||||
(on (forget (assigned-task $m:ID))
|
|
||||||
(when (equal? (some m) (ref mngr))
|
|
||||||
(set! mngr none)))))
|
|
||||||
|
|
||||||
(on start (select-a-task-manager))))
|
(on start (select-a-task-manager))))
|
||||||
|
|
||||||
|
@ -491,15 +495,10 @@ The JobManager then performs the job and, when finished, asserts
|
||||||
(on (realize (tasks-finished job-id $data:TaskResult))
|
(on (realize (tasks-finished job-id $data:TaskResult))
|
||||||
(stop delegate-tasks
|
(stop delegate-tasks
|
||||||
(start-facet done (assert (job-completion job-id tasks data)))))
|
(start-facet done (assert (job-completion job-id tasks data)))))
|
||||||
(begin/dataflow
|
(on (realize (task-is-ready job-id $t))
|
||||||
(define slots (slots-available))
|
(perform-task t push-results)))
|
||||||
(define-tuple (ts readys)
|
(for ([t (in-list ready)])
|
||||||
(split-at/lenient (ref ready-tasks) slots))
|
(add-ready-task! t))))))))
|
||||||
(for ([t ts])
|
|
||||||
(perform-task t push-results))
|
|
||||||
(unless (empty? ts)
|
|
||||||
;; the empty? check may be necessary to avoid a dataflow loop
|
|
||||||
(set! ready-tasks readys))))))))))
|
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------------------------------
|
||||||
;; Client
|
;; Client
|
||||||
|
|
Loading…
Reference in New Issue