untyped flink: work on streamlining ids, demonstrating dataflow issue
This commit is contained in:
parent
e3d9f93eca
commit
79277c91d3
|
@ -2,6 +2,7 @@
|
|||
|
||||
(require racket/set)
|
||||
(require (only-in racket/list
|
||||
first
|
||||
partition
|
||||
empty?
|
||||
split-at))
|
||||
|
@ -71,17 +72,20 @@ Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
|
|||
|
||||
A TaskAssigner asserts the association of a Task with a particular TaskPerformer
|
||||
through
|
||||
(task-assignment ID ID Task)
|
||||
where the first ID identifies the TP and the second identifies the job.
|
||||
(task-assignment ID Task)
|
||||
where the ID identifies the TP
|
||||
|#
|
||||
(assertion-struct task-assignment (assignee job-id task))
|
||||
(assertion-struct task-assignment (assignee task))
|
||||
#|
|
||||
A Task is a (task ID Work), where Work is one of
|
||||
A Task is a (task TaskID Work), where Work is one of
|
||||
- (map-work String)
|
||||
- (reduce-work (U ID TaskResult) (U ID TaskResult)), referring to either the
|
||||
ID of the dependent task or its results. A reduce-work is ready to be executed
|
||||
when it has both results.
|
||||
|
||||
A TaskID is a (list ID ID), where the first ID is specific to the individual
|
||||
task and the second identifies the job it belongs to.
|
||||
|
||||
A TaskResult is a (Hashof String Natural), counting the occurrences of words
|
||||
|#
|
||||
(struct task (id desc) #:transparent)
|
||||
|
@ -91,9 +95,7 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
|
|||
#|
|
||||
The TaskPerformer responds to a task-assignment by describing its state with respect
|
||||
to that task,
|
||||
(task-state ID ID ID TaskStateDesc)
|
||||
where the first ID is that of the TP, the second is that of the job,
|
||||
and the third that of the task.
|
||||
(task-state TaskID TaskStateDesc)
|
||||
|
||||
A TaskStateDesc is one of
|
||||
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
|
||||
|
@ -101,7 +103,7 @@ A TaskStateDesc is one of
|
|||
- RUNNING, indicating that the task is being performed
|
||||
- (finished TaskResult), describing the results
|
||||
|#
|
||||
(assertion-struct task-state (assignee job-id task-id desc))
|
||||
(assertion-struct task-state (task-id desc))
|
||||
(struct finished (data) #:transparent)
|
||||
(define ACCEPTED 'accepted)
|
||||
(define RUNNING 'running)
|
||||
|
@ -138,30 +140,27 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
;; gonna need some effect polymorphism to type uses of this
|
||||
(define (task-performer my-id can-accept? perform-task)
|
||||
(react
|
||||
(during (task-assignment my-id $job-id $task)
|
||||
(during (task-assignment my-id $task)
|
||||
(field [status #f])
|
||||
(assert (task-state my-id job-id (task-id task) (status)))
|
||||
(assert (task-state (task-id task) (status)))
|
||||
(cond
|
||||
[(can-accept?)
|
||||
(status RUNNING)
|
||||
(define (on-complete results)
|
||||
(status (finished results)))
|
||||
(perform-task task job-id on-complete status)]
|
||||
(perform-task task on-complete status)]
|
||||
[else
|
||||
(status OVERLOAD)]))))
|
||||
|
||||
;; Task
|
||||
;; ID
|
||||
;; ID
|
||||
;; (-> Void)
|
||||
;; (TaskResults -> Void)
|
||||
;; -> TaskAssigner
|
||||
(define (task-assigner tsk job-id performer
|
||||
on-overload!
|
||||
on-complete!)
|
||||
(define (task-assigner tsk performer on-overload! on-complete!)
|
||||
(react
|
||||
(assert (task-assignment performer job-id tsk))
|
||||
(on (asserted (task-state performer job-id (task-id tsk) $status))
|
||||
(assert (task-assignment performer tsk))
|
||||
(on (asserted (task-state (task-id tsk) $status))
|
||||
(match status
|
||||
[(or (== ACCEPTED)
|
||||
(== RUNNING))
|
||||
|
@ -173,14 +172,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
|
||||
;; ---------------------------------------------------------------------------------------------------
|
||||
;; TaskRunner
|
||||
|
||||
;; ID ID -> Spawn
|
||||
(define (spawn-task-runner id tm-id)
|
||||
(spawn #:name id
|
||||
(assert (task-runner id))
|
||||
(stop-when (retracted (task-manager tm-id _)))
|
||||
;; Task (TaskStateDesc -> Void) -> Void
|
||||
(define (perform-task tsk job-id on-complete! update-status!)
|
||||
(define (perform-task tsk on-complete! update-status!)
|
||||
(match-define (task tid desc) tsk)
|
||||
(match desc
|
||||
[(map-work data)
|
||||
|
@ -219,7 +217,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(list "good" "eve" "ma'am"))
|
||||
(check-equal? (string->words "please sir. may I have another?")
|
||||
(list "please" "sir" "may" "I" "have" "another"))
|
||||
;; TODO - currently fails
|
||||
;; currently fails, doesn't seem worth fixing
|
||||
#;(check-equal? (string->words "but wait---there's more")
|
||||
(list "but" "wait" "there's" "more")))
|
||||
|
||||
|
@ -252,26 +250,30 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
|
||||
;; Assign incoming tasks
|
||||
(field [busy-runners (set)])
|
||||
|
||||
(define/dataflow idle-runners
|
||||
(set-count (set-subtract (task-runners) (busy-runners))))
|
||||
|
||||
(assert (task-manager id (idle-runners)))
|
||||
|
||||
(define (can-accept?)
|
||||
(log "TM ~a idle-runners = ~a" id (idle-runners))
|
||||
(positive? (idle-runners)))
|
||||
(define (select-runner)
|
||||
(define runner (for/first ([r (in-set (task-runners))]
|
||||
#:unless (set-member? (busy-runners) r))
|
||||
r))
|
||||
(unless runner
|
||||
(log "ERROR: TM ~a failed to select a runner.\nrunners: ~a\nbusy: ~a" id (task-runners) (busy-runners)))
|
||||
(busy-runners (set-add (busy-runners) runner))
|
||||
runner)
|
||||
(define (perform-task tsk job-id on-complete! update-status!)
|
||||
(define (perform-task tsk on-complete! update-status!)
|
||||
(match-define (task task-id desc) tsk)
|
||||
(define runner (select-runner))
|
||||
(log "TM ~a assigns task ~a to runner ~a" id task-id runner)
|
||||
(on-stop (busy-runners (set-remove (busy-runners) runner)))
|
||||
(on-start
|
||||
(task-assigner tsk job-id runner
|
||||
(task-assigner tsk runner
|
||||
(lambda () (update-status! OVERLOAD))
|
||||
(lambda (results) (on-complete! results)))))
|
||||
(on-start
|
||||
|
@ -362,9 +364,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
;; things didn't work. I think that due to script scheduling, all ready
|
||||
;; tasks were being assigned to the manager
|
||||
#;(take-slot! mngr)
|
||||
(react (stop-when (asserted (task-state mngr job-id this-id _))
|
||||
(react (stop-when (asserted (task-state this-id _))
|
||||
(received-answer! mngr)))
|
||||
(task-assigner t job-id mngr
|
||||
(task-assigner t mngr
|
||||
(lambda ()
|
||||
;; need to find a new task manager
|
||||
;; don't think we need a release-slot! here, because if we've heard back from a task manager,
|
||||
|
@ -373,7 +375,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(stop-facet this-facet (select-a-task-manager)))
|
||||
(lambda (results)
|
||||
(log "JM receives the results of task ~a" this-id)
|
||||
(stop-facet task-facet (k this-id results)))))))
|
||||
(stop-facet task-facet (k (first this-id) results)))))))
|
||||
|
||||
(on-start (select-a-task-manager))))
|
||||
|
||||
|
@ -443,7 +445,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(spawn
|
||||
(during (job-manager-alive)
|
||||
(during (task-manager $tm-id _)
|
||||
(define/query-set requests (task-assignment tm-id _ (task $tid _)) tid)
|
||||
(define/query-set requests (task-assignment _ (task $tid _)) tid)
|
||||
(field [high-water-mark 0])
|
||||
(on (asserted (task-manager tm-id $slots))
|
||||
(when (> slots (high-water-mark))
|
||||
|
@ -500,7 +502,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
|
||||
;; TaskTree -> (Listof Task)
|
||||
;; flatten a task tree by assigning job-unique IDs
|
||||
(define (task-tree->list tt)
|
||||
(define (task-tree->list tt job-id)
|
||||
(define-values (tasks _)
|
||||
;; TaskTree ID -> (Values (Listof Task) ID)
|
||||
;; the input id is for the current node of the tree
|
||||
|
@ -509,7 +511,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
[next-id 0])
|
||||
(match tt
|
||||
[(map-work _)
|
||||
(values (list (task next-id tt))
|
||||
(values (list (task (list next-id job-id) tt))
|
||||
(add1 next-id))]
|
||||
[(reduce-work left right)
|
||||
(define left-id (add1 next-id))
|
||||
|
@ -517,8 +519,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(loop left left-id))
|
||||
(define-values (rights next)
|
||||
(loop right right-id))
|
||||
(values (cons (task next-id (reduce-work left-id right-id))
|
||||
(append lefts rights))
|
||||
(values (cons (task (list next-id job-id) (reduce-work left-id right-id))
|
||||
(append lefts rights))
|
||||
next)])))
|
||||
tasks)
|
||||
|
||||
|
@ -526,7 +528,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(define (create-job in)
|
||||
(define job-id (gensym 'job))
|
||||
(define input-lines (sequence->list (in-lines in)))
|
||||
(define tasks (task-tree->list (create-task-tree input-lines)))
|
||||
(define tasks (task-tree->list (create-task-tree input-lines) job-id))
|
||||
(job job-id tasks))
|
||||
|
||||
;; String -> Job
|
||||
|
@ -556,7 +558,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
(task mid2 (map-work data2)))
|
||||
(check-true (id? left))
|
||||
(check-true (id? right))
|
||||
(check-equal? (set left right) (set mid1 mid2))
|
||||
(check-equal? (set left right) (set (first mid1) (first mid2)))
|
||||
(check-equal? (set data1 data2)
|
||||
(set "a b c" "d e f"))]
|
||||
[_
|
||||
|
@ -582,7 +584,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
|||
;; expected:
|
||||
;; #hash((a . 5) (b . 5) (c . 2))
|
||||
|
||||
(spawn-client j #;(file->job "lorem.txt"))
|
||||
(spawn-client j)
|
||||
(spawn-client (file->job "lorem.txt"))
|
||||
(spawn-job-manager)
|
||||
(spawn-task-manager 2)
|
||||
(spawn-task-manager 3)
|
||||
|
|
Loading…
Reference in New Issue