diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 06b01ab..2f3b34c 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -5,7 +5,8 @@ set-count set-empty? set-first - set-remove)) + set-remove + set-add)) (require (only-in racket/list partition empty? @@ -38,14 +39,14 @@ of each TaskManager (TM) is contingent on the presence of a job manager. #| In turn, TaskManagers advertise their presence with (task-manager ID slots), where ID is a unique id, and slots is a natural number. The number of slots -dictates how many tasks the TM can take on. To reduce contention, we the JM +dictates how many tasks the TM can take on. To reduce contention, the JM should only assign a task to a TM if the TM actually has the resources to perform a task. |# (assertion-struct task-manager (id slots)) ;; an ID is a symbol or a natural number. ;; Any -> Bool -;; recognize ids +;; recognize IDs (define (id? x) (or (symbol? x) (exact-nonnegative-integer? x))) #| @@ -127,6 +128,50 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (log fmt . args) (displayln (apply format fmt args))) +;; --------------------------------------------------------------------------------------------------- +;; Generic Implementation of Task Delegation Protocol + +;; a TaskFun is a +;; (Task ID (TaskResults -> Void) ((U ACCEPTED OVERLOAD RUNNING) -> Void) -> Void) + +;; ID (-> Bool) TaskFun -> TaskPerformer +;; doesn't really account for long-running tasks +;; gonna need some effect polymorphism to type uses of this +(define (task-performer my-id can-accept? perform-task) + (react + (during (task-assignment my-id $job-id $task) + (field [status #f]) + (assert (task-state my-id job-id (task-id task) (status))) + (cond + [(can-accept?) + (status RUNNING) + (define (on-complete results) + (status (finished results))) + (perform-task task job-id on-complete status)] + [else + (status OVERLOAD)])))) + +;; Task +;; ID +;; ID +;; (-> Void) +;; (TaskResults -> Void) +;; -> TaskAssigner +(define (task-assigner tsk job-id performer + on-overload! + on-complete!) + (react + (assert (task-assignment performer job-id tsk)) + (on (asserted (task-state performer job-id (task-id tsk) $status)) + (match status + [(or (== ACCEPTED) + (== RUNNING)) + (void)] + [(== OVERLOAD) + (stop-current-facet (on-overload!))] + [(finished results) + (stop-current-facet (on-complete! results))])))) + ;; --------------------------------------------------------------------------------------------------- ;; TaskRunner @@ -138,25 +183,24 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-runner id (status))) (begin/dataflow (log "task-runner ~v state is: ~a" id (status))) - (during (task-assignment id $job-id (task $tid $desc)) - (field [execution-state (if (idle?) RUNNING OVERLOAD)] - [word-count (hash)]) - (assert (task-state id job-id tid (execution-state))) - ;; we have to avoid asking a non-idle runner to do anything - (when (idle?) - (on-stop (status IDLE)) - (on-start - (status (executing tid)) - ;; since we currently finish everything in one turn, allow other actors to observe the changes in our - ;; task-runner state by flushing pending actions. - (flush!) - (match desc - [(map-work data) - (word-count (count-new-words (word-count) (string->words data))) - (execution-state (finished (word-count)))] - [(reduce-work left right) - (word-count (hash-union left right #:combine +)) - (execution-state (finished (word-count)))])))))) + ;; Task (TaskStateDesc -> Void) -> Void + (define (perform-task tsk job-id on-complete! update-status!) + (unless (idle?) + (error "tried to perform a task when not idle")) + ;; since we currently finish everything in one turn, these changes to status aren't + ;; actually visible. + (status RUNNING) + (match-define (task tid desc) tsk) + (match desc + [(map-work data) + (define wc (count-new-words (hash) (string->words data))) + (on-complete! wc)] + [(reduce-work left right) + (define wc (hash-union left right #:combine +)) + (on-complete! wc)]) + (status IDLE)) + (on-start + (task-performer id idle? perform-task)))) ;; (Hash String Nat) String -> (Hash String Nat) (define (word-count-increment h word) @@ -206,40 +250,25 @@ The JobManager then performs the job and, when finished, asserts (job-finished I #:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id)) (assert (task-manager id (set-count (idle-runners)))) (field [busy-runners (list)]) - (during (task-assignment id $job-id $t) - (match-define (task task-id desc) t) - #;(on-start (log "TM receives task ~a" task-id)) - (log "TM receives task ~a" task-id) - (on-stop (log "TM finished with task ~a" task-id)) - (field [status ACCEPTED]) - ;; TODO - could delegate this assertion, in the non-overloaded case, to TaskRunner - ;; (also removing the first id from task-state) - (assert (task-state id job-id task-id (status))) - (cond - [(set-empty? (idle-runners)) - (log "TM can't run ~a right now" task-id) - (status OVERLOAD)] - [else - (define runner (set-first (idle-runners))) - ;; n.b. modifying a query set is questionable - ;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner. - ;; Could use the busy-runners field to avoid that - (idle-runners (set-remove (idle-runners) runner)) - (log "TM assigns task ~a to runner ~a" task-id runner) - (assert (task-assignment runner job-id t)) - (status RUNNING) - (on (asserted (task-state runner job-id task-id $state)) - (match state - [(or (== ACCEPTED) - (== RUNNING)) - ;; nothing to do - (void)] - [(== OVERLOAD) - (log "TM overloaded TR with task ~a" task-id) - (status OVERLOAD)] - [(finished results) - (log "TM receives the results of task ~a" task-id) - (status state)]))]))))) + (define (can-accept?) + (not (set-empty? (idle-runners)))) + (define (perform-task tsk job-id on-complete! update-status!) + (match-define (task task-id desc) tsk) + (define runner (set-first (idle-runners))) + ;; n.b. modifying a query set is questionable + ;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner. + ;; Could use the busy-runners field to avoid that + (idle-runners (set-remove (idle-runners) runner)) + (log "TM assigns task ~a to runner ~a" task-id runner) + ;; TODO - since we're both adding and removing from this set I'm not sure TRs + ;; need to be making assertions about their idleness + (on-stop (idle-runners (set-add (idle-runners) runner))) + (on-start + (task-assigner tsk job-id runner + (lambda () (update-status! OVERLOAD)) + (lambda (results) (on-complete! results))))) + (on-start + (task-performer id can-accept? perform-task))))) ;; --------------------------------------------------------------------------------------------------- ;; JobManager @@ -297,43 +326,49 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; Requires (task-ready? t) (define (perform-task t k) (react + (define task-facet (current-facet-id)) (on-start (tasks-in-progress (add1 (tasks-in-progress)))) (on-stop (tasks-in-progress (sub1 (tasks-in-progress)))) (match-define (task this-id desc) t) (log "JM begins on task ~a" this-id) - (field [task-mngr #f]) - (begin/dataflow - ;; n.b. cyclic data-flow dependency - (unless (task-mngr) - (define mngr - (for/first ([(id slots) (in-hash (task-managers))] - #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) - id)) - (when mngr - (take-slot! mngr) - (react (stop-when (asserted (task-state mngr job-id this-id _)) - (received-answer! mngr))) - (task-mngr mngr)))) - ;; TODO - should respond if task manager dies - (assert #:when (task-mngr) - (task-assignment (task-mngr) job-id t)) - (on #:when (task-mngr) - (asserted (task-state (task-mngr) job-id this-id $state)) - (match state - [(or (== ACCEPTED) - (== RUNNING)) - ;; nothing to do - (void)] - [(== OVERLOAD) - ;; need to find a new task manager - ;; don't think we need a release-slot! here, because if we've heard back from a task manager, - ;; they should have told us a different slot count since we tried to give them work - (log "JM overloaded manager ~a with task ~a" (task-mngr) this-id) - (task-mngr #f)] - [(finished results) - (log "JM receives the results of task ~a" this-id) - (stop-current-facet (k this-id results))])))) + (define (select-a-task-manager) + (react + (begin/dataflow + (define mngr + (for/first ([(id slots) (in-hash (task-managers))] + #:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) + id)) + (when mngr + (take-slot! mngr) + (stop-current-facet (assign-task mngr)))))) + + ;; ID -> ... + (define (assign-task mngr) + (react + (define this-facet (current-facet-id)) + (on (retracted (task-manager mngr _)) + ;; our task manager has crashed + (stop-current-facet (select-a-task-manager))) + (on-start + ;; N.B. when this line was here, and not after `(when mngr ...)` above, + ;; things didn't work. I think that due to script scheduling, all ready + ;; tasks were being assigned to the manager + #;(take-slot! mngr) + (react (stop-when (asserted (task-state mngr job-id this-id _)) + (received-answer! mngr))) + (task-assigner t job-id mngr + (lambda () + ;; need to find a new task manager + ;; don't think we need a release-slot! here, because if we've heard back from a task manager, + ;; they should have told us a different slot count since we tried to give them work + (log "JM overloaded manager ~a with task ~a" mngr this-id) + (stop-facet this-facet (select-a-task-manager))) + (lambda (results) + (log "JM receives the results of task ~a" this-id) + (stop-facet task-facet (k this-id results))))))) + + (on-start (select-a-task-manager)))) ;; ID Data -> Void ;; Update any dependent tasks with the results of the given task, moving