diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index b098b0f..5d0eb7f 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -123,6 +123,13 @@ (assertion-struct task-manager (id slots)) (assertion-struct submitted-task (manager task)) (assertion-struct job-manager-alive ()) + +;; a TaskState is (task-state ID TaskStateDesc) +;; where TaskStateDesc is one of +;; - ACCEPTED +;; - OVERLOAD +;; - RUNNING +;; - (finished data) (assertion-struct task-state (id desc)) ;; task states @@ -193,17 +200,27 @@ (spawn (assert (job-manager-alive)) (log "Job Manager Up") + ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. (define/query-hash task-managers (task-manager $id $slots) id slots #:on-add (log "JM learns that ~a has ~v slots" id slots)) + + ;; (Hashof TaskManagerID Nat) + ;; to better understand the supply of slots for each task manager, keep track of the number + ;; of requested tasks that we have yet to hear back about + (field [requests-in-flight (hash)]) (define (slots-available) - (for/sum ([v (in-hash-values (task-managers))]) - v)) + (for/sum ([(id v) (in-hash (task-managers))]) + (max 0 (- v (hash-ref (requests-in-flight) id 0))))) ;; ID -> Void + ;; mark that we have requested the given task manager to perform a task (define (take-slot! id) - ;; make local changes to task-managers to reflect tasks delegated in the current turn (log "JM assigns a task to ~a" id) - (task-managers (hash-update (task-managers) id sub1))) + (requests-in-flight (hash-update (requests-in-flight) id add1 0))) + ;; ID -> Void + ;; mark that we have heard back from the given manager about a requested task + (define (received-answer! id) + (requests-in-flight (hash-update (requests-in-flight) id sub1))) (during (job $job-id $tasks) (log "JM receives job ~a" job-id) @@ -229,7 +246,6 @@ (log "JM marks task ~a as ready" (task-id t)) (ready-tasks (cons t (ready-tasks)))) - ;; need to parcel out tasks ;; Task (ID TaskResult -> Void) -> Void ;; Requires (task-ready? t) (define (perform-task t k) @@ -245,10 +261,12 @@ (unless (task-mngr) (define mngr (for/first ([(id slots) (in-hash (task-managers))] - #:unless (zero? slots)) + #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) id)) (when mngr (take-slot! mngr) + (react (stop-when (asserted (task-state this-id _)) + (received-answer! mngr))) (task-mngr mngr)))) ;; TODO - should respond if task manager dies (assert #:when (task-mngr) @@ -256,6 +274,11 @@ (on #:when (task-mngr) (asserted (task-state this-id $state)) (match state + [(== ACCEPTED) + #f] + [(== RUNNING) + ;; nothing to do + #f] [(== OVERLOAD) ;; need to find a new task manager ;; don't think we need a release-slot! here, because if we've heard back from a task manager, @@ -265,10 +288,7 @@ [(finished results) ;; TODO - guess-timation of what this should look like (log "JM receives the results of task ~a" this-id) - (stop-current-facet (k this-id results))] - [_ - ;; TODO - needs more data? - #f])))) + (stop-current-facet (k this-id results))])))) ;; ID Data -> Void ;; Update any dependent tasks with the results of the given task, moving @@ -324,6 +344,22 @@ (define (split-at/lenient lst n) (split-at lst (min n (length lst)))) +;; --------------------------------------------------------------------------------------------------- +;; Observe interaction between task and job manager + +(define (spawn-observer) + (spawn + (during (job-manager-alive) + (during (task-manager $tm-id _) + (define/query-set requests (submitted-task tm-id (task $tid _)) tid) + (field [high-water-mark 0]) + (on (asserted (task-manager tm-id $slots)) + (when (> slots (high-water-mark)) + (high-water-mark slots))) + (begin/dataflow + (when (> (set-count (requests)) (high-water-mark)) + (log "!! DEMAND > SUPPLY !!"))))))) + ;; --------------------------------------------------------------------------------------------------- ;; Creating a Job @@ -469,3 +505,4 @@ (spawn-task-manager) (spawn-task-runner) (spawn-task-runner) +(spawn-observer)