untyped flink: fiddle with race in task manager
This commit is contained in:
parent
a8e890ab30
commit
2610ceb541
|
@ -232,7 +232,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(spawn #:name id
|
(spawn #:name id
|
||||||
(log "Task Manager (TM) ~a is running" id)
|
(log "Task Manager (TM) ~a is running" id)
|
||||||
(during (job-manager-alive)
|
(during (job-manager-alive)
|
||||||
(log "TM learns about JM")
|
(log "TM ~a learns about JM" id)
|
||||||
|
|
||||||
(field [task-runners (set)])
|
(field [task-runners (set)])
|
||||||
|
|
||||||
|
@ -243,10 +243,11 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(react
|
(react
|
||||||
(on-start (spawn-task-runner tr-id id))
|
(on-start (spawn-task-runner tr-id id))
|
||||||
(on (asserted (task-runner tr-id))
|
(on (asserted (task-runner tr-id))
|
||||||
(log "TM successfully created task-runner ~a" id)
|
(log "TM ~a successfully created task-runner ~a" id tr-id)
|
||||||
(task-runners (set-add (task-runners) tr-id)))
|
(task-runners (set-add (task-runners) tr-id)))
|
||||||
(on (retracted (task-runner tr-id))
|
(on (retracted (task-runner tr-id))
|
||||||
(log "Detected failure of task runner ~a, restarting" tr-id)
|
(log "TM ~a detected failure of task runner ~a, restarting" id tr-id)
|
||||||
|
(task-runners (set-remove (task-runners) tr-id))
|
||||||
(spawn-task-runner tr-id id)))))
|
(spawn-task-runner tr-id id)))))
|
||||||
|
|
||||||
;; Assign incoming tasks
|
;; Assign incoming tasks
|
||||||
|
@ -267,7 +268,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(define (perform-task tsk job-id on-complete! update-status!)
|
(define (perform-task tsk job-id on-complete! update-status!)
|
||||||
(match-define (task task-id desc) tsk)
|
(match-define (task task-id desc) tsk)
|
||||||
(define runner (select-runner))
|
(define runner (select-runner))
|
||||||
(log "TM assigns task ~a to runner ~a" task-id runner)
|
(log "TM ~a assigns task ~a to runner ~a" id task-id runner)
|
||||||
(on-stop (busy-runners (set-remove (busy-runners) runner)))
|
(on-stop (busy-runners (set-remove (busy-runners) runner)))
|
||||||
(on-start
|
(on-start
|
||||||
(task-assigner tsk job-id runner
|
(task-assigner tsk job-id runner
|
||||||
|
@ -346,8 +347,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
|
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
|
||||||
id))
|
id))
|
||||||
(when mngr
|
(when mngr
|
||||||
(take-slot! mngr)
|
(stop-current-facet (take-slot! mngr)
|
||||||
(stop-current-facet (assign-task mngr))))))
|
(assign-task mngr))))))
|
||||||
|
|
||||||
;; ID -> ...
|
;; ID -> ...
|
||||||
(define (assign-task mngr)
|
(define (assign-task mngr)
|
||||||
|
@ -581,9 +582,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
;; expected:
|
;; expected:
|
||||||
;; #hash((a . 5) (b . 5) (c . 2))
|
;; #hash((a . 5) (b . 5) (c . 2))
|
||||||
|
|
||||||
(spawn-client (file->job "lorem.txt"))
|
(spawn-client j #;(file->job "lorem.txt"))
|
||||||
(spawn-job-manager)
|
(spawn-job-manager)
|
||||||
(spawn-task-manager 2)
|
(spawn-task-manager 2)
|
||||||
|
(spawn-task-manager 3)
|
||||||
(spawn-observer)
|
(spawn-observer)
|
||||||
|
|
||||||
(module+ main
|
(module+ main
|
||||||
|
|
Loading…
Reference in New Issue