examples/flink: implement task delegation roles in terms of abstract

templates
This commit is contained in:
Sam Caldwell 2019-04-02 16:18:57 -04:00
parent e16db164df
commit 7815fad415
1 changed files with 123 additions and 88 deletions

View File

@ -5,7 +5,8 @@
set-count set-count
set-empty? set-empty?
set-first set-first
set-remove)) set-remove
set-add))
(require (only-in racket/list (require (only-in racket/list
partition partition
empty? empty?
@ -38,14 +39,14 @@ of each TaskManager (TM) is contingent on the presence of a job manager.
#| #|
In turn, TaskManagers advertise their presence with (task-manager ID slots), In turn, TaskManagers advertise their presence with (task-manager ID slots),
where ID is a unique id, and slots is a natural number. The number of slots where ID is a unique id, and slots is a natural number. The number of slots
dictates how many tasks the TM can take on. To reduce contention, we the JM dictates how many tasks the TM can take on. To reduce contention, the JM
should only assign a task to a TM if the TM actually has the resources to should only assign a task to a TM if the TM actually has the resources to
perform a task. perform a task.
|# |#
(assertion-struct task-manager (id slots)) (assertion-struct task-manager (id slots))
;; an ID is a symbol or a natural number. ;; an ID is a symbol or a natural number.
;; Any -> Bool ;; Any -> Bool
;; recognize ids ;; recognize IDs
(define (id? x) (define (id? x)
(or (symbol? x) (exact-nonnegative-integer? x))) (or (symbol? x) (exact-nonnegative-integer? x)))
#| #|
@ -127,6 +128,50 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (log fmt . args) (define (log fmt . args)
(displayln (apply format fmt args))) (displayln (apply format fmt args)))
;; ---------------------------------------------------------------------------------------------------
;; Generic Implementation of Task Delegation Protocol
;; a TaskFun is a
;; (Task ID (TaskResults -> Void) ((U ACCEPTED OVERLOAD RUNNING) -> Void) -> Void)
;; ID (-> Bool) TaskFun -> TaskPerformer
;; doesn't really account for long-running tasks
;; gonna need some effect polymorphism to type uses of this
(define (task-performer my-id can-accept? perform-task)
(react
(during (task-assignment my-id $job-id $task)
(field [status #f])
(assert (task-state my-id job-id (task-id task) (status)))
(cond
[(can-accept?)
(status RUNNING)
(define (on-complete results)
(status (finished results)))
(perform-task task job-id on-complete status)]
[else
(status OVERLOAD)]))))
;; Task
;; ID
;; ID
;; (-> Void)
;; (TaskResults -> Void)
;; -> TaskAssigner
(define (task-assigner tsk job-id performer
on-overload!
on-complete!)
(react
(assert (task-assignment performer job-id tsk))
(on (asserted (task-state performer job-id (task-id tsk) $status))
(match status
[(or (== ACCEPTED)
(== RUNNING))
(void)]
[(== OVERLOAD)
(stop-current-facet (on-overload!))]
[(finished results)
(stop-current-facet (on-complete! results))]))))
;; --------------------------------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------------------------------
;; TaskRunner ;; TaskRunner
@ -138,25 +183,24 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(assert (task-runner id (status))) (assert (task-runner id (status)))
(begin/dataflow (begin/dataflow
(log "task-runner ~v state is: ~a" id (status))) (log "task-runner ~v state is: ~a" id (status)))
(during (task-assignment id $job-id (task $tid $desc)) ;; Task (TaskStateDesc -> Void) -> Void
(field [execution-state (if (idle?) RUNNING OVERLOAD)] (define (perform-task tsk job-id on-complete! update-status!)
[word-count (hash)]) (unless (idle?)
(assert (task-state id job-id tid (execution-state))) (error "tried to perform a task when not idle"))
;; we have to avoid asking a non-idle runner to do anything ;; since we currently finish everything in one turn, these changes to status aren't
(when (idle?) ;; actually visible.
(on-stop (status IDLE)) (status RUNNING)
(on-start (match-define (task tid desc) tsk)
(status (executing tid))
;; since we currently finish everything in one turn, allow other actors to observe the changes in our
;; task-runner state by flushing pending actions.
(flush!)
(match desc (match desc
[(map-work data) [(map-work data)
(word-count (count-new-words (word-count) (string->words data))) (define wc (count-new-words (hash) (string->words data)))
(execution-state (finished (word-count)))] (on-complete! wc)]
[(reduce-work left right) [(reduce-work left right)
(word-count (hash-union left right #:combine +)) (define wc (hash-union left right #:combine +))
(execution-state (finished (word-count)))])))))) (on-complete! wc)])
(status IDLE))
(on-start
(task-performer id idle? perform-task))))
;; (Hash String Nat) String -> (Hash String Nat) ;; (Hash String Nat) String -> (Hash String Nat)
(define (word-count-increment h word) (define (word-count-increment h word)
@ -206,40 +250,25 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
#:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id)) #:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id))
(assert (task-manager id (set-count (idle-runners)))) (assert (task-manager id (set-count (idle-runners))))
(field [busy-runners (list)]) (field [busy-runners (list)])
(during (task-assignment id $job-id $t) (define (can-accept?)
(match-define (task task-id desc) t) (not (set-empty? (idle-runners))))
#;(on-start (log "TM receives task ~a" task-id)) (define (perform-task tsk job-id on-complete! update-status!)
(log "TM receives task ~a" task-id) (match-define (task task-id desc) tsk)
(on-stop (log "TM finished with task ~a" task-id))
(field [status ACCEPTED])
;; TODO - could delegate this assertion, in the non-overloaded case, to TaskRunner
;; (also removing the first id from task-state)
(assert (task-state id job-id task-id (status)))
(cond
[(set-empty? (idle-runners))
(log "TM can't run ~a right now" task-id)
(status OVERLOAD)]
[else
(define runner (set-first (idle-runners))) (define runner (set-first (idle-runners)))
;; n.b. modifying a query set is questionable ;; n.b. modifying a query set is questionable
;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner. ;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner.
;; Could use the busy-runners field to avoid that ;; Could use the busy-runners field to avoid that
(idle-runners (set-remove (idle-runners) runner)) (idle-runners (set-remove (idle-runners) runner))
(log "TM assigns task ~a to runner ~a" task-id runner) (log "TM assigns task ~a to runner ~a" task-id runner)
(assert (task-assignment runner job-id t)) ;; TODO - since we're both adding and removing from this set I'm not sure TRs
(status RUNNING) ;; need to be making assertions about their idleness
(on (asserted (task-state runner job-id task-id $state)) (on-stop (idle-runners (set-add (idle-runners) runner)))
(match state (on-start
[(or (== ACCEPTED) (task-assigner tsk job-id runner
(== RUNNING)) (lambda () (update-status! OVERLOAD))
;; nothing to do (lambda (results) (on-complete! results)))))
(void)] (on-start
[(== OVERLOAD) (task-performer id can-accept? perform-task)))))
(log "TM overloaded TR with task ~a" task-id)
(status OVERLOAD)]
[(finished results)
(log "TM receives the results of task ~a" task-id)
(status state)]))])))))
;; --------------------------------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------------------------------
;; JobManager ;; JobManager
@ -297,43 +326,49 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; Requires (task-ready? t) ;; Requires (task-ready? t)
(define (perform-task t k) (define (perform-task t k)
(react (react
(define task-facet (current-facet-id))
(on-start (tasks-in-progress (add1 (tasks-in-progress)))) (on-start (tasks-in-progress (add1 (tasks-in-progress))))
(on-stop (tasks-in-progress (sub1 (tasks-in-progress)))) (on-stop (tasks-in-progress (sub1 (tasks-in-progress))))
(match-define (task this-id desc) t) (match-define (task this-id desc) t)
(log "JM begins on task ~a" this-id) (log "JM begins on task ~a" this-id)
(field [task-mngr #f]) (define (select-a-task-manager)
(react
(begin/dataflow (begin/dataflow
;; n.b. cyclic data-flow dependency
(unless (task-mngr)
(define mngr (define mngr
(for/first ([(id slots) (in-hash (task-managers))] (for/first ([(id slots) (in-hash (task-managers))]
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0)))) #:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
id)) id))
(when mngr (when mngr
(take-slot! mngr) (take-slot! mngr)
(stop-current-facet (assign-task mngr))))))
;; ID -> ...
(define (assign-task mngr)
(react
(define this-facet (current-facet-id))
(on (retracted (task-manager mngr _))
;; our task manager has crashed
(stop-current-facet (select-a-task-manager)))
(on-start
;; N.B. when this line was here, and not after `(when mngr ...)` above,
;; things didn't work. I think that due to script scheduling, all ready
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(react (stop-when (asserted (task-state mngr job-id this-id _)) (react (stop-when (asserted (task-state mngr job-id this-id _))
(received-answer! mngr))) (received-answer! mngr)))
(task-mngr mngr)))) (task-assigner t job-id mngr
;; TODO - should respond if task manager dies (lambda ()
(assert #:when (task-mngr)
(task-assignment (task-mngr) job-id t))
(on #:when (task-mngr)
(asserted (task-state (task-mngr) job-id this-id $state))
(match state
[(or (== ACCEPTED)
(== RUNNING))
;; nothing to do
(void)]
[(== OVERLOAD)
;; need to find a new task manager ;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager, ;; don't think we need a release-slot! here, because if we've heard back from a task manager,
;; they should have told us a different slot count since we tried to give them work ;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" (task-mngr) this-id) (log "JM overloaded manager ~a with task ~a" mngr this-id)
(task-mngr #f)] (stop-facet this-facet (select-a-task-manager)))
[(finished results) (lambda (results)
(log "JM receives the results of task ~a" this-id) (log "JM receives the results of task ~a" this-id)
(stop-current-facet (k this-id results))])))) (stop-facet task-facet (k this-id results)))))))
(on-start (select-a-task-manager))))
;; ID Data -> Void ;; ID Data -> Void
;; Update any dependent tasks with the results of the given task, moving ;; Update any dependent tasks with the results of the given task, moving