From a8e890ab3033806a7faef640d56b577d369c4ae7 Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Thu, 10 Oct 2019 13:44:38 -0400 Subject: [PATCH] typed flink: associate task runners with a particular task manager --- racket/typed/examples/roles/flink.rkt | 37 +++++++++++++++++++-------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 2c87a75..bf5c743 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -187,12 +187,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (require/typed "flink-support.rkt" [string->words : (→fn String (List String))]) -(define (spawn-task-runner) - (define id (gensym 'task-runner)) +(define (spawn-task-runner [id : ID] [tm-id : ID]) (spawn τc (begin (start-facet runner (assert (task-runner id)) + (on (retracted (task-manager tm-id _)) + (stop runner)) (during (task-assignment id $job-id (task $task-id $desc)) (field [state TaskStateDesc ACCEPTED]) (assert (task-state id job-id task-id (ref state))) @@ -210,16 +211,31 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; TaskManager -(define (spawn-task-manager) +(define (spawn-task-manager [num-task-runners : Int]) (define id (gensym 'task-manager)) (spawn τc (begin (start-facet tm (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) - (log "TM learns about JM") - (define/query-set task-runners (task-runner $id) id - #:on-add (log "TM learns about task-runner ~a" id)) + (log "TM ~a learns about JM" id) + + (field [task-runners (Set ID) (set)]) + + (on start + (for ([_ (in-range num-task-runners)]) + (define tr-id (gensym 'task-runner)) + (start-facet monitor-task-runner + (on start (spawn-task-runner tr-id id)) + (on (asserted (task-runner tr-id)) + (log "TM ~a successfully created task-runner ~a" id tr-id) + (set! task-runners (set-add (ref task-runners) tr-id))) + (on (retracted (task-runner tr-id)) + (log "TM ~a detected failure of task runner ~a, restarting" id tr-id) + (set! task-runners (set-remove (ref task-runners) tr-id)) + (spawn-task-runner tr-id id))))) + + (field [busy-runners (Set ID) (set)]) (define/dataflow idle-runners (set-count (set-subtract (ref task-runners) (ref busy-runners)))) @@ -249,7 +265,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-state id job-id task-id (ref status))) (when (can-accept?) (define runner (select-runner)) - (log "TM assigns task ~a to runner ~a" task-id runner) + (log "TM ~a assigns task ~a to runner ~a" id task-id runner) (on stop (set! busy-runners (set-remove (ref busy-runners) runner))) (assert (task-assignment runner job-id (task task-id desc))) (on (asserted (task-state runner job-id task-id $st)) @@ -325,7 +341,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (spawn-job-manager) (spawn τc - (print-role + (begin (start-facet jm (assert (job-manager-alive)) (log "Job Manager Up") @@ -506,8 +522,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (run-ground-dataspace τc (spawn-job-manager) - (spawn-task-manager) - (spawn-task-runner) - (spawn-task-runner) + (spawn-task-manager 2) + (spawn-task-manager 3) (spawn-client (file->job "lorem.txt")) #;(spawn-client (string->job INPUT)))