typed flink: associate task runners with a particular task manager
This commit is contained in:
parent
35827c970c
commit
5f472b5402
|
@ -187,12 +187,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(require/typed "flink-support.rkt"
|
||||
[string->words : (→fn String (List String))])
|
||||
|
||||
(define (spawn-task-runner)
|
||||
(define id (gensym 'task-runner))
|
||||
(define (spawn-task-runner [id : ID] [tm-id : ID])
|
||||
(spawn τc
|
||||
(begin
|
||||
(start-facet runner
|
||||
(assert (task-runner id))
|
||||
(on (retracted (task-manager tm-id _))
|
||||
(stop runner))
|
||||
(during (task-assignment id $job-id (task $task-id $desc))
|
||||
(field [state TaskStateDesc ACCEPTED])
|
||||
(assert (task-state id job-id task-id (ref state)))
|
||||
|
@ -210,16 +211,31 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
;; ---------------------------------------------------------------------------------------------------
|
||||
;; TaskManager
|
||||
|
||||
(define (spawn-task-manager)
|
||||
(define (spawn-task-manager [num-task-runners : Int])
|
||||
(define id (gensym 'task-manager))
|
||||
(spawn τc
|
||||
(begin
|
||||
(start-facet tm
|
||||
(log "Task Manager (TM) ~a is running" id)
|
||||
(during (job-manager-alive)
|
||||
(log "TM learns about JM")
|
||||
(define/query-set task-runners (task-runner $id) id
|
||||
#:on-add (log "TM learns about task-runner ~a" id))
|
||||
(log "TM ~a learns about JM" id)
|
||||
|
||||
(field [task-runners (Set ID) (set)])
|
||||
|
||||
(on start
|
||||
(for ([_ (in-range num-task-runners)])
|
||||
(define tr-id (gensym 'task-runner))
|
||||
(start-facet monitor-task-runner
|
||||
(on start (spawn-task-runner tr-id id))
|
||||
(on (asserted (task-runner tr-id))
|
||||
(log "TM ~a successfully created task-runner ~a" id tr-id)
|
||||
(set! task-runners (set-add (ref task-runners) tr-id)))
|
||||
(on (retracted (task-runner tr-id))
|
||||
(log "TM ~a detected failure of task runner ~a, restarting" id tr-id)
|
||||
(set! task-runners (set-remove (ref task-runners) tr-id))
|
||||
(spawn-task-runner tr-id id)))))
|
||||
|
||||
|
||||
(field [busy-runners (Set ID) (set)])
|
||||
(define/dataflow idle-runners
|
||||
(set-count (set-subtract (ref task-runners) (ref busy-runners))))
|
||||
|
@ -249,7 +265,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(assert (task-state id job-id task-id (ref status)))
|
||||
(when (can-accept?)
|
||||
(define runner (select-runner))
|
||||
(log "TM assigns task ~a to runner ~a" task-id runner)
|
||||
(log "TM ~a assigns task ~a to runner ~a" id task-id runner)
|
||||
(on stop (set! busy-runners (set-remove (ref busy-runners) runner)))
|
||||
(assert (task-assignment runner job-id (task task-id desc)))
|
||||
(on (asserted (task-state runner job-id task-id $st))
|
||||
|
@ -325,7 +341,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
|
||||
(define (spawn-job-manager)
|
||||
(spawn τc
|
||||
(print-role
|
||||
(begin
|
||||
(start-facet jm
|
||||
(assert (job-manager-alive))
|
||||
(log "Job Manager Up")
|
||||
|
@ -506,8 +522,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
|
||||
(run-ground-dataspace τc
|
||||
(spawn-job-manager)
|
||||
(spawn-task-manager)
|
||||
(spawn-task-runner)
|
||||
(spawn-task-runner)
|
||||
(spawn-task-manager 2)
|
||||
(spawn-task-manager 3)
|
||||
(spawn-client (file->job "lorem.txt"))
|
||||
#;(spawn-client (string->job INPUT)))
|
||||
|
|
Loading…
Reference in New Issue