(require (only-in racket/list
In turn, TaskManagers advertise their presence with (task-manager ID slots),
where ID is a unique id, and slots is a natural number. The number of slots
dictates how many tasks the TM can take on. To reduce contention, the JM
dictates how many tasks the TM can take on. To reduce contention, the JM
should only assign a task to a TM if the TM actually has the resources to
perform a task.
(assertion-struct task-manager (id slots))
;; an ID is a symbol or a natural number.
;; Any -> Bool
;; recognize IDs
;; recognize IDs
(define (id? x)
(or (symbol? x) (exact-nonnegative-integer? x)))
(define (log fmt . args)
(displayln (apply format fmt args)))
;; Generic Implementation of Task Delegation Protocol
;; a TaskFun is a
;; (Task ID (TaskResults -> Void) ((U ACCEPTED OVERLOAD RUNNING) -> Void) -> Void)
;; ID (-> Bool) TaskFun -> TaskPerformer
;; doesn't really account for long-running tasks
;; gonna need some effect polymorphism to type uses of this
(define (task-performer my-id can-accept? perform-task)
(during (task-assignment my-id $job-id $task)
(field [status #f])
(assert (task-state my-id job-id (task-id task) (status)))
(status RUNNING)
(define (on-complete results)
(status (finished results)))
(perform-task task job-id on-complete status)]
(status OVERLOAD)]))))
;; Task
;; ID
;; ID
;; (-> Void)
;; (TaskResults -> Void)
;; -> TaskAssigner
(define (task-assigner tsk job-id performer
(assert (task-assignment performer job-id tsk))
(on (asserted (task-state performer job-id (task-id tsk) $status))
(match status
[(or (== ACCEPTED)
(stop-current-facet (on-overload!))]
[(finished results)
(stop-current-facet (on-complete! results))]))))
;; TaskRunner
(assert (task-runner id (status)))
(log "task-runner ~v state is: ~a" id (status)))
(during (task-assignment id $job-id (task $tid $desc))
(field [execution-state (if (idle?) RUNNING OVERLOAD)]
[word-count (hash)])
(assert (task-state id job-id tid (execution-state)))
;; we have to avoid asking a non-idle runner to do anything
(when (idle?)
(on-stop (status IDLE))
(status (executing tid))
;; since we currently finish everything in one turn, allow other actors to observe the changes in our
;; task-runner state by flushing pending actions.
;; Task (TaskStateDesc -> Void) -> Void
(define (perform-task tsk job-id on-complete! update-status!)
(unless (idle?)
(error "tried to perform a task when not idle"))
;; since we currently finish everything in one turn, these changes to status aren't
;; actually visible.
(status RUNNING)
(match-define (task tid desc) tsk)
(match desc
[(map-work data)
(word-count (count-new-words (word-count) (string->words data)))
(execution-state (finished (word-count)))]
(define wc (count-new-words (hash) (string->words data)))
(on-complete! wc)]
[(reduce-work left right)
(word-count (hash-union left right #:combine +))
(execution-state (finished (word-count)))]))))))
(define wc (hash-union left right #:combine +))
(on-complete! wc)])
(status IDLE))
(task-performer id idle? perform-task))))
;; (Hash String Nat) String -> (Hash String Nat)
(define (word-count-increment h word)
#:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id))
(assert (task-manager id (set-count (idle-runners))))
(field [busy-runners (list)])
(during (task-assignment id $job-id $t)
(match-define (task task-id desc) t)
#;(on-start (log "TM receives task ~a" task-id))
(log "TM receives task ~a" task-id)
(on-stop (log "TM finished with task ~a" task-id))
(field [status ACCEPTED])
;; TODO - could delegate this assertion, in the non-overloaded case, to TaskRunner
;; (also removing the first id from task-state)
(assert (task-state id job-id task-id (status)))
[(set-empty? (idle-runners))
(log "TM can't run ~a right now" task-id)
(status OVERLOAD)]
(define (can-accept?)
(not (set-empty? (idle-runners))))
(define (perform-task tsk job-id on-complete! update-status!)
(match-define (task task-id desc) tsk)
(define runner (set-first (idle-runners)))
;; n.b. modifying a query set is questionable
;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner.
;; Could use the busy-runners field to avoid that
(idle-runners (set-remove (idle-runners) runner))
(log "TM assigns task ~a to runner ~a" task-id runner)
(assert (task-assignment runner job-id t))
(status RUNNING)
(on (asserted (task-state runner job-id task-id $state))
(match state
[(or (== ACCEPTED)
;; nothing to do
(log "TM overloaded TR with task ~a" task-id)
(status OVERLOAD)]
[(finished results)
(log "TM receives the results of task ~a" task-id)
(status state)]))])))))
;; TODO - since we're both adding and removing from this set I'm not sure TRs
;; need to be making assertions about their idleness
(on-stop (idle-runners (set-add (idle-runners) runner)))
(task-assigner tsk job-id runner
(lambda () (update-status! OVERLOAD))
(lambda (results) (on-complete! results)))))
(task-performer id can-accept? perform-task)))))
;; JobManager
;; Requires (task-ready? t)
(define (perform-task t k)
(define task-facet (current-facet-id))
(on-start (tasks-in-progress (add1 (tasks-in-progress))))
(on-stop (tasks-in-progress (sub1 (tasks-in-progress))))
(match-define (task this-id desc) t)
(log "JM begins on task ~a" this-id)
(field [task-mngr #f])
(define (select-a-task-manager)
;; n.b. cyclic data-flow dependency
(unless (task-mngr)
(define mngr
(for/first ([(id slots) (in-hash (task-managers))]
#:when (positive? (- slots (hash-ref (requests-in-flight) id 0))))
(when mngr
(take-slot! mngr)
(stop-current-facet (assign-task mngr))))))
;; ID -> ...
(define (assign-task mngr)
(define this-facet (current-facet-id))
(on (retracted (task-manager mngr _))
;; our task manager has crashed
(stop-current-facet (select-a-task-manager)))
;; N.B. when this line was here, and not after `(when mngr ...)` above,
;; things didn't work. I think that due to script scheduling, all ready
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(react (stop-when (asserted (task-state mngr job-id this-id _))
(received-answer! mngr)))
(task-mngr mngr))))
;; TODO - should respond if task manager dies
(assert #:when (task-mngr)
(task-assignment (task-mngr) job-id t))
(on #:when (task-mngr)
(asserted (task-state (task-mngr) job-id this-id $state))
(match state
[(or (== ACCEPTED)
;; nothing to do
(task-assigner t job-id mngr
(lambda ()
;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" (task-mngr) this-id)
(task-mngr #f)]
[(finished results)
(log "JM overloaded manager ~a with task ~a" mngr this-id)
(stop-facet this-facet (select-a-task-manager)))
(lambda (results)
(log "JM receives the results of task ~a" this-id)
(stop-current-facet (k this-id results))]))))
(stop-facet task-facet (k this-id results)))))))
(on-start (select-a-task-manager))))
;; ID Data -> Void
;; Update any dependent tasks with the results of the given task, moving