diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 756fb35..89ddcc9 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -174,10 +174,11 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; TaskRunner -(define (spawn-task-runner) - (define id (gensym 'task-runner)) +;; ID ID -> Spawn +(define (spawn-task-runner id tm-id) (spawn #:name id (assert (task-runner id)) + (stop-when (retracted (task-manager tm-id _))) ;; Task (TaskStateDesc -> Void) -> Void (define (perform-task tsk job-id on-complete! update-status!) (match-define (task tid desc) tsk) @@ -225,18 +226,36 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; TaskManager -(define (spawn-task-manager) +;; PosInt -> Spawn +(define (spawn-task-manager num-task-runners) (define id (gensym 'task-manager)) (spawn #:name id (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) (log "TM learns about JM") - (define/query-set task-runners (task-runner $id) id - #:on-add (log "TM learns about task-runner ~a" id)) + + (field [task-runners (set)]) + + ;; Create & Monitor Task Runners + (on-start + (for ([_ (in-range num-task-runners)]) + (define tr-id (gensym 'task-runner)) + (react + (on-start (spawn-task-runner tr-id id)) + (on (asserted (task-runner tr-id)) + (log "TM successfully created task-runner ~a" id) + (task-runners (set-add (task-runners) tr-id))) + (on (retracted (task-runner tr-id)) + (log "Detected failure of task runner ~a, restarting" tr-id) + (spawn-task-runner tr-id id))))) + + ;; Assign incoming tasks (field [busy-runners (set)]) (define/dataflow idle-runners (set-count (set-subtract (task-runners) (busy-runners)))) + (assert (task-manager id (idle-runners))) + (define (can-accept?) (positive? (idle-runners))) (define (select-runner) @@ -564,9 +583,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (spawn-client (file->job "lorem.txt")) (spawn-job-manager) -(spawn-task-manager) -(spawn-task-runner) -(spawn-task-runner) +(spawn-task-manager 2) (spawn-observer) (module+ main