flink: replace a lot of dataflow in job manager with internal events
This commit is contained in:
parent
220d112393
commit
5d5b827535
|
@ -282,30 +282,70 @@ The JobManager then performs the job and, when finished, asserts
|
|||
;; ---------------------------------------------------------------------------------------------------
|
||||
;; JobManager
|
||||
|
||||
;; assertions used for internal slot-management protocol
|
||||
(assertion-struct slots (v))
|
||||
(assertion-struct slot-assignment (who mngr))
|
||||
;; tid is the TaskID, rid is a unique symbol to a particular request for a slot
|
||||
(struct request-id (tid rid) #:prefab)
|
||||
|
||||
(define (spawn-job-manager)
|
||||
(spawn
|
||||
(assert (job-manager-alive))
|
||||
(log "Job Manager Up")
|
||||
|
||||
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
|
||||
(define/query-hash task-managers (task-manager $id $slots) id slots
|
||||
#:on-add (log "JM learns that ~a has ~v slots" id slots))
|
||||
(on-start
|
||||
(react
|
||||
|
||||
;; (Hashof TaskManagerID Nat)
|
||||
;; to better understand the supply of slots for each task manager, keep track of the number
|
||||
;; of requested tasks that we have yet to hear back about
|
||||
(field [requests-in-flight (hash)])
|
||||
(define (slots-available)
|
||||
(for/sum ([(id v) (in-hash (task-managers))])
|
||||
(max 0 (- v (hash-ref (requests-in-flight) id 0)))))
|
||||
;; ID -> Void
|
||||
;; mark that we have requested the given task manager to perform a task
|
||||
(define (take-slot! id)
|
||||
(requests-in-flight (hash-update (requests-in-flight) id add1 0)))
|
||||
;; ID -> Void
|
||||
;; mark that we have heard back from the given manager about a requested task
|
||||
(define (received-answer! id)
|
||||
(requests-in-flight (hash-update (requests-in-flight) id sub1)))
|
||||
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
|
||||
;; (Hashof TaskManagerID Nat)
|
||||
(define/query-hash task-managers (task-manager $id $slots) id slots
|
||||
#:on-add (begin (log "JM learns that ~a has ~v slots" id slots)
|
||||
#;(requests-in-flight (hash-set (requests-in-flight) id 0))))
|
||||
|
||||
(field [waiting-tasks '()] ;; (Listof ID)
|
||||
[requests-in-flight (hash)] ;; (Hashof ID Nat)
|
||||
[assignments (hash)]) ;; (Hashof ID ID) request ID to manager ID
|
||||
|
||||
;; to better understand the supply of slots for each task manager, keep track of the number
|
||||
;; of requested tasks that we have yet to hear back about
|
||||
(define (slots-available)
|
||||
(for/sum ([(id v) (in-hash (task-managers))])
|
||||
(max 0 (- v (hash-ref (requests-in-flight) id 0)))))
|
||||
|
||||
;; ID -> (U #f ID)
|
||||
(define (try-take-slot! me)
|
||||
(define mngr
|
||||
(for/first ([(id slots) (in-hash (task-managers))]
|
||||
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
|
||||
id))
|
||||
(when mngr
|
||||
(assignments (hash-set (assignments) me mngr))
|
||||
(requests-in-flight (hash-update (requests-in-flight) mngr add1 0)))
|
||||
mngr)
|
||||
|
||||
(know (slots (slots-available)))
|
||||
|
||||
(during (know (observe (slot-assignment (request-id $tid $who) _)))
|
||||
(on-start
|
||||
(react
|
||||
;; what if one manager gains a slot but another loses one, so n stays the same?
|
||||
(on (know (slots $n))
|
||||
#;(log "Dispatcher request ~a learns there are ~a slots" tid n)
|
||||
(unless (or (zero? n) (hash-has-key? (assignments) who))
|
||||
(define mngr (try-take-slot! who))
|
||||
(when mngr
|
||||
(stop-current-facet
|
||||
(log "Dispatcher assigns task ~a to ~a" tid mngr)
|
||||
(react (know (slot-assignment (request-id tid who) mngr)))
|
||||
(react
|
||||
(define waiting-for-answer (current-facet-id))
|
||||
(on (asserted (observe (task-performance mngr (task tid $x) _)))
|
||||
(react (on (asserted (task-performance mngr (task tid x) _))
|
||||
(log "Dispatcher sees answer for ~a" tid)
|
||||
(stop-facet waiting-for-answer))))
|
||||
(on-stop
|
||||
(requests-in-flight (hash-update (requests-in-flight) mngr sub1))))))))))
|
||||
(on-stop (assignments (hash-remove (assignments) who))))))
|
||||
|
||||
(during (observe (job-completion $job-id $tasks _))
|
||||
(log "JM receives job ~a" job-id)
|
||||
|
@ -315,14 +355,10 @@ The JobManager then performs the job and, when finished, asserts
|
|||
[tasks-in-progress 0])
|
||||
|
||||
(begin/dataflow
|
||||
(define slots (slots-available))
|
||||
(define-values (ts readys)
|
||||
(split-at/lenient (ready-tasks) slots))
|
||||
(for ([t ts])
|
||||
(perform-task t push-results))
|
||||
(unless (empty? ts)
|
||||
;; the empty? check may be necessary to avoid a dataflow loop
|
||||
(ready-tasks readys)))
|
||||
(unless (empty? (ready-tasks))
|
||||
(for ([t (ready-tasks)])
|
||||
(perform-task t push-results))
|
||||
(ready-tasks '())))
|
||||
|
||||
;; Task -> Void
|
||||
(define (add-ready-task! t)
|
||||
|
@ -340,35 +376,23 @@ The JobManager then performs the job and, when finished, asserts
|
|||
(match-define (task this-id desc) t)
|
||||
(log "JM begins on task ~a" this-id)
|
||||
|
||||
|
||||
(define (select-a-task-manager)
|
||||
(react
|
||||
(field [selection #f])
|
||||
(begin/dataflow
|
||||
(unless (selection)
|
||||
(define mngr
|
||||
(for/first ([(id slots) (in-hash (task-managers))]
|
||||
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
|
||||
id))
|
||||
(when mngr
|
||||
(selection mngr)
|
||||
(take-slot! mngr)
|
||||
(log "JM assigns task ~a to ~a" this-id mngr)
|
||||
(stop-current-facet (assign-task mngr)))))))
|
||||
(define req-id (gensym 'perform-task))
|
||||
(on (know (slot-assignment (request-id this-id req-id) $mngr))
|
||||
(assign-task mngr))))
|
||||
|
||||
;; ID -> ...
|
||||
(define (assign-task mngr)
|
||||
(define this-facet (current-facet-id))
|
||||
(react
|
||||
(define this-facet (current-facet-id))
|
||||
#;(define this-facet (current-facet-id))
|
||||
(on (retracted (task-manager mngr _))
|
||||
;; our task manager has crashed
|
||||
(stop-current-facet (select-a-task-manager)))
|
||||
(on-start
|
||||
;; N.B. when this line was here, and not after `(when mngr ...)` above,
|
||||
;; things didn't work. I think that due to script scheduling, all ready
|
||||
;; tasks were being assigned to the manager
|
||||
#;(take-slot! mngr)
|
||||
(react (stop-when (asserted (task-performance mngr t _))
|
||||
(received-answer! mngr)))
|
||||
(log "JM assigns task ~a to manager ~a" this-id mngr)
|
||||
(task-assigner t mngr
|
||||
(lambda ()
|
||||
;; need to find a new task manager
|
||||
|
|
Loading…
Reference in New Issue