diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 71843b6..6d8c34b 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -282,30 +282,70 @@ The JobManager then performs the job and, when finished, asserts ;; --------------------------------------------------------------------------------------------------- ;; JobManager +;; assertions used for internal slot-management protocol +(assertion-struct slots (v)) +(assertion-struct slot-assignment (who mngr)) +;; tid is the TaskID, rid is a unique symbol to a particular request for a slot +(struct request-id (tid rid) #:prefab) + (define (spawn-job-manager) (spawn (assert (job-manager-alive)) (log "Job Manager Up") - ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. - (define/query-hash task-managers (task-manager $id $slots) id slots - #:on-add (log "JM learns that ~a has ~v slots" id slots)) + (on-start + (react - ;; (Hashof TaskManagerID Nat) - ;; to better understand the supply of slots for each task manager, keep track of the number - ;; of requested tasks that we have yet to hear back about - (field [requests-in-flight (hash)]) - (define (slots-available) - (for/sum ([(id v) (in-hash (task-managers))]) - (max 0 (- v (hash-ref (requests-in-flight) id 0))))) - ;; ID -> Void - ;; mark that we have requested the given task manager to perform a task - (define (take-slot! id) - (requests-in-flight (hash-update (requests-in-flight) id add1 0))) - ;; ID -> Void - ;; mark that we have heard back from the given manager about a requested task - (define (received-answer! id) - (requests-in-flight (hash-update (requests-in-flight) id sub1))) + ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. + ;; (Hashof TaskManagerID Nat) + (define/query-hash task-managers (task-manager $id $slots) id slots + #:on-add (begin (log "JM learns that ~a has ~v slots" id slots) + #;(requests-in-flight (hash-set (requests-in-flight) id 0)))) + + (field [waiting-tasks '()] ;; (Listof ID) + [requests-in-flight (hash)] ;; (Hashof ID Nat) + [assignments (hash)]) ;; (Hashof ID ID) request ID to manager ID + + ;; to better understand the supply of slots for each task manager, keep track of the number + ;; of requested tasks that we have yet to hear back about + (define (slots-available) + (for/sum ([(id v) (in-hash (task-managers))]) + (max 0 (- v (hash-ref (requests-in-flight) id 0))))) + + ;; ID -> (U #f ID) + (define (try-take-slot! me) + (define mngr + (for/first ([(id slots) (in-hash (task-managers))] + #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) + id)) + (when mngr + (assignments (hash-set (assignments) me mngr)) + (requests-in-flight (hash-update (requests-in-flight) mngr add1 0))) + mngr) + + (know (slots (slots-available))) + + (during (know (observe (slot-assignment (request-id $tid $who) _))) + (on-start + (react + ;; what if one manager gains a slot but another loses one, so n stays the same? + (on (know (slots $n)) + #;(log "Dispatcher request ~a learns there are ~a slots" tid n) + (unless (or (zero? n) (hash-has-key? (assignments) who)) + (define mngr (try-take-slot! who)) + (when mngr + (stop-current-facet + (log "Dispatcher assigns task ~a to ~a" tid mngr) + (react (know (slot-assignment (request-id tid who) mngr))) + (react + (define waiting-for-answer (current-facet-id)) + (on (asserted (observe (task-performance mngr (task tid $x) _))) + (react (on (asserted (task-performance mngr (task tid x) _)) + (log "Dispatcher sees answer for ~a" tid) + (stop-facet waiting-for-answer)))) + (on-stop + (requests-in-flight (hash-update (requests-in-flight) mngr sub1)))))))))) + (on-stop (assignments (hash-remove (assignments) who)))))) (during (observe (job-completion $job-id $tasks _)) (log "JM receives job ~a" job-id) @@ -315,14 +355,10 @@ The JobManager then performs the job and, when finished, asserts [tasks-in-progress 0]) (begin/dataflow - (define slots (slots-available)) - (define-values (ts readys) - (split-at/lenient (ready-tasks) slots)) - (for ([t ts]) - (perform-task t push-results)) - (unless (empty? ts) - ;; the empty? check may be necessary to avoid a dataflow loop - (ready-tasks readys))) + (unless (empty? (ready-tasks)) + (for ([t (ready-tasks)]) + (perform-task t push-results)) + (ready-tasks '()))) ;; Task -> Void (define (add-ready-task! t) @@ -340,35 +376,23 @@ The JobManager then performs the job and, when finished, asserts (match-define (task this-id desc) t) (log "JM begins on task ~a" this-id) + (define (select-a-task-manager) (react - (field [selection #f]) - (begin/dataflow - (unless (selection) - (define mngr - (for/first ([(id slots) (in-hash (task-managers))] - #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) - id)) - (when mngr - (selection mngr) - (take-slot! mngr) - (log "JM assigns task ~a to ~a" this-id mngr) - (stop-current-facet (assign-task mngr))))))) + (define req-id (gensym 'perform-task)) + (on (know (slot-assignment (request-id this-id req-id) $mngr)) + (assign-task mngr)))) ;; ID -> ... (define (assign-task mngr) + (define this-facet (current-facet-id)) (react - (define this-facet (current-facet-id)) + #;(define this-facet (current-facet-id)) (on (retracted (task-manager mngr _)) ;; our task manager has crashed (stop-current-facet (select-a-task-manager))) (on-start - ;; N.B. when this line was here, and not after `(when mngr ...)` above, - ;; things didn't work. I think that due to script scheduling, all ready - ;; tasks were being assigned to the manager - #;(take-slot! mngr) - (react (stop-when (asserted (task-performance mngr t _)) - (received-answer! mngr))) + (log "JM assigns task ~a to manager ~a" this-id mngr) (task-assigner t mngr (lambda () ;; need to find a new task manager