untyped flink: unify task-assignment and task-state assertions

This commit is contained in:
Sam Caldwell 2019-10-16 16:13:19 -04:00
parent 123037ba51
commit d29afb6679
1 changed files with 11 additions and 13 deletions

View File

@ -70,12 +70,12 @@ Task Delegation Protocol
Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP). Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
A TaskAssigner asserts the association of a Task with a particular TaskPerformer A TaskAssigner requests the performance of a Task with a particular TaskPerformer
through through the assertion of interest
(task-assignment ID Task) (observe (task-performance ID Task ))
where the ID identifies the TP where the ID identifies the TP
|# |#
(assertion-struct task-assignment (assignee task)) (assertion-struct task-performance (assignee task desc))
#| #|
A Task is a (task TaskID Work), where Work is one of A Task is a (task TaskID Work), where Work is one of
- (map-work String) - (map-work String)
@ -93,9 +93,9 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
(struct reduce-work (left right) #:transparent) (struct reduce-work (left right) #:transparent)
#| #|
The TaskPerformer responds to a task-assignment by describing its state with respect The TaskPerformer responds to a request by describing its state with respect
to that task, to that task,
(task-state TaskID TaskStateDesc) (task-performance ID Task TaskStateDesc)
A TaskStateDesc is one of A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently) - ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
@ -103,7 +103,6 @@ A TaskStateDesc is one of
- RUNNING, indicating that the task is being performed - RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results - (finished TaskResult), describing the results
|# |#
(assertion-struct task-state (task-id desc))
(struct finished (data) #:transparent) (struct finished (data) #:transparent)
(define ACCEPTED 'accepted) (define ACCEPTED 'accepted)
(define RUNNING 'running) (define RUNNING 'running)
@ -140,9 +139,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; gonna need some effect polymorphism to type uses of this ;; gonna need some effect polymorphism to type uses of this
(define (task-performer my-id can-accept? perform-task) (define (task-performer my-id can-accept? perform-task)
(react (react
(during (task-assignment my-id $task) (during (observe (task-performance my-id $task _))
(field [status #f]) (field [status #f])
(assert (task-state (task-id task) (status))) (assert (task-performance my-id task (status)))
(cond (cond
[(can-accept?) [(can-accept?)
(status RUNNING) (status RUNNING)
@ -159,8 +158,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; -> TaskAssigner ;; -> TaskAssigner
(define (task-assigner tsk performer on-overload! on-complete!) (define (task-assigner tsk performer on-overload! on-complete!)
(react (react
(assert (task-assignment performer tsk)) (on (asserted (task-performance performer tsk $status))
(on (asserted (task-state (task-id tsk) $status))
(match status (match status
[(or (== ACCEPTED) [(or (== ACCEPTED)
(== RUNNING)) (== RUNNING))
@ -366,7 +364,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; things didn't work. I think that due to script scheduling, all ready ;; things didn't work. I think that due to script scheduling, all ready
;; tasks were being assigned to the manager ;; tasks were being assigned to the manager
#;(take-slot! mngr) #;(take-slot! mngr)
(react (stop-when (asserted (task-state this-id _)) (react (stop-when (asserted (task-performance mngr t _))
(received-answer! mngr))) (received-answer! mngr)))
(task-assigner t mngr (task-assigner t mngr
(lambda () (lambda ()
@ -447,7 +445,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(spawn (spawn
(during (job-manager-alive) (during (job-manager-alive)
(during (task-manager $tm-id _) (during (task-manager $tm-id _)
(define/query-set requests (task-assignment tm-id (task $tid _)) tid) (define/query-set requests (observe (task-performance tm-id (task $tid _) _)) tid)
(field [high-water-mark 0]) (field [high-water-mark 0])
(on (asserted (task-manager tm-id $slots)) (on (asserted (task-manager tm-id $slots))
(when (> slots (high-water-mark)) (when (> slots (high-water-mark))