untyped flink: work on streamlining ids, demonstrating dataflow issue

This commit is contained in:
Sam Caldwell 2019-10-15 11:16:46 -04:00
parent 2610ceb541
commit 7374c8c506
1 changed files with 36 additions and 33 deletions

View File

@ -2,6 +2,7 @@
(require racket/set) (require racket/set)
(require (only-in racket/list (require (only-in racket/list
first
partition partition
empty? empty?
split-at)) split-at))
@ -71,17 +72,20 @@ Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP).
A TaskAssigner asserts the association of a Task with a particular TaskPerformer A TaskAssigner asserts the association of a Task with a particular TaskPerformer
through through
(task-assignment ID ID Task) (task-assignment ID Task)
where the first ID identifies the TP and the second identifies the job. where the ID identifies the TP
|# |#
(assertion-struct task-assignment (assignee job-id task)) (assertion-struct task-assignment (assignee task))
#| #|
A Task is a (task ID Work), where Work is one of A Task is a (task TaskID Work), where Work is one of
- (map-work String) - (map-work String)
- (reduce-work (U ID TaskResult) (U ID TaskResult)), referring to either the - (reduce-work (U ID TaskResult) (U ID TaskResult)), referring to either the
ID of the dependent task or its results. A reduce-work is ready to be executed ID of the dependent task or its results. A reduce-work is ready to be executed
when it has both results. when it has both results.
A TaskID is a (list ID ID), where the first ID is specific to the individual
task and the second identifies the job it belongs to.
A TaskResult is a (Hashof String Natural), counting the occurrences of words A TaskResult is a (Hashof String Natural), counting the occurrences of words
|# |#
(struct task (id desc) #:transparent) (struct task (id desc) #:transparent)
@ -91,9 +95,7 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words
#| #|
The TaskPerformer responds to a task-assignment by describing its state with respect The TaskPerformer responds to a task-assignment by describing its state with respect
to that task, to that task,
(task-state ID ID ID TaskStateDesc) (task-state TaskID TaskStateDesc)
where the first ID is that of the TP, the second is that of the job,
and the third that of the task.
A TaskStateDesc is one of A TaskStateDesc is one of
- ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently) - ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently)
@ -101,7 +103,7 @@ A TaskStateDesc is one of
- RUNNING, indicating that the task is being performed - RUNNING, indicating that the task is being performed
- (finished TaskResult), describing the results - (finished TaskResult), describing the results
|# |#
(assertion-struct task-state (assignee job-id task-id desc)) (assertion-struct task-state (task-id desc))
(struct finished (data) #:transparent) (struct finished (data) #:transparent)
(define ACCEPTED 'accepted) (define ACCEPTED 'accepted)
(define RUNNING 'running) (define RUNNING 'running)
@ -138,30 +140,27 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; gonna need some effect polymorphism to type uses of this ;; gonna need some effect polymorphism to type uses of this
(define (task-performer my-id can-accept? perform-task) (define (task-performer my-id can-accept? perform-task)
(react (react
(during (task-assignment my-id $job-id $task) (during (task-assignment my-id $task)
(field [status #f]) (field [status #f])
(assert (task-state my-id job-id (task-id task) (status))) (assert (task-state (task-id task) (status)))
(cond (cond
[(can-accept?) [(can-accept?)
(status RUNNING) (status RUNNING)
(define (on-complete results) (define (on-complete results)
(status (finished results))) (status (finished results)))
(perform-task task job-id on-complete status)] (perform-task task on-complete status)]
[else [else
(status OVERLOAD)])))) (status OVERLOAD)]))))
;; Task ;; Task
;; ID ;; ID
;; ID
;; (-> Void) ;; (-> Void)
;; (TaskResults -> Void) ;; (TaskResults -> Void)
;; -> TaskAssigner ;; -> TaskAssigner
(define (task-assigner tsk job-id performer (define (task-assigner tsk performer on-overload! on-complete!)
on-overload!
on-complete!)
(react (react
(assert (task-assignment performer job-id tsk)) (assert (task-assignment performer tsk))
(on (asserted (task-state performer job-id (task-id tsk) $status)) (on (asserted (task-state (task-id tsk) $status))
(match status (match status
[(or (== ACCEPTED) [(or (== ACCEPTED)
(== RUNNING)) (== RUNNING))
@ -173,14 +172,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; --------------------------------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------------------------------
;; TaskRunner ;; TaskRunner
;; ID ID -> Spawn ;; ID ID -> Spawn
(define (spawn-task-runner id tm-id) (define (spawn-task-runner id tm-id)
(spawn #:name id (spawn #:name id
(assert (task-runner id)) (assert (task-runner id))
(stop-when (retracted (task-manager tm-id _))) (stop-when (retracted (task-manager tm-id _)))
;; Task (TaskStateDesc -> Void) -> Void ;; Task (TaskStateDesc -> Void) -> Void
(define (perform-task tsk job-id on-complete! update-status!) (define (perform-task tsk on-complete! update-status!)
(match-define (task tid desc) tsk) (match-define (task tid desc) tsk)
(match desc (match desc
[(map-work data) [(map-work data)
@ -219,7 +217,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(list "good" "eve" "ma'am")) (list "good" "eve" "ma'am"))
(check-equal? (string->words "please sir. may I have another?") (check-equal? (string->words "please sir. may I have another?")
(list "please" "sir" "may" "I" "have" "another")) (list "please" "sir" "may" "I" "have" "another"))
;; TODO - currently fails ;; currently fails, doesn't seem worth fixing
#;(check-equal? (string->words "but wait---there's more") #;(check-equal? (string->words "but wait---there's more")
(list "but" "wait" "there's" "more"))) (list "but" "wait" "there's" "more")))
@ -252,26 +250,30 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; Assign incoming tasks ;; Assign incoming tasks
(field [busy-runners (set)]) (field [busy-runners (set)])
(define/dataflow idle-runners (define/dataflow idle-runners
(set-count (set-subtract (task-runners) (busy-runners)))) (set-count (set-subtract (task-runners) (busy-runners))))
(assert (task-manager id (idle-runners))) (assert (task-manager id (idle-runners)))
(define (can-accept?) (define (can-accept?)
(log "TM ~a idle-runners = ~a" id (idle-runners))
(positive? (idle-runners))) (positive? (idle-runners)))
(define (select-runner) (define (select-runner)
(define runner (for/first ([r (in-set (task-runners))] (define runner (for/first ([r (in-set (task-runners))]
#:unless (set-member? (busy-runners) r)) #:unless (set-member? (busy-runners) r))
r)) r))
(unless runner
(log "ERROR: TM ~a failed to select a runner.\nrunners: ~a\nbusy: ~a" id (task-runners) (busy-runners)))
(busy-runners (set-add (busy-runners) runner)) (busy-runners (set-add (busy-runners) runner))
runner) runner)
(define (perform-task tsk job-id on-complete! update-status!) (define (perform-task tsk on-complete! update-status!)
(match-define (task task-id desc) tsk) (match-define (task task-id desc) tsk)
(define runner (select-runner)) (define runner (select-runner))
(log "TM ~a assigns task ~a to runner ~a" id task-id runner) (log "TM ~a assigns task ~a to runner ~a" id task-id runner)
(on-stop (busy-runners (set-remove (busy-runners) runner))) (on-stop (busy-runners (set-remove (busy-runners) runner)))
(on-start (on-start
(task-assigner tsk job-id runner (task-assigner tsk runner
(lambda () (update-status! OVERLOAD)) (lambda () (update-status! OVERLOAD))
(lambda (results) (on-complete! results))))) (lambda (results) (on-complete! results)))))
(on-start (on-start
@ -362,9 +364,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; things didn't work. I think that due to script scheduling, all ready ;; things didn't work. I think that due to script scheduling, all ready
;; tasks were being assigned to the manager ;; tasks were being assigned to the manager
#;(take-slot! mngr) #;(take-slot! mngr)
(react (stop-when (asserted (task-state mngr job-id this-id _)) (react (stop-when (asserted (task-state this-id _))
(received-answer! mngr))) (received-answer! mngr)))
(task-assigner t job-id mngr (task-assigner t mngr
(lambda () (lambda ()
;; need to find a new task manager ;; need to find a new task manager
;; don't think we need a release-slot! here, because if we've heard back from a task manager, ;; don't think we need a release-slot! here, because if we've heard back from a task manager,
@ -373,7 +375,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(stop-facet this-facet (select-a-task-manager))) (stop-facet this-facet (select-a-task-manager)))
(lambda (results) (lambda (results)
(log "JM receives the results of task ~a" this-id) (log "JM receives the results of task ~a" this-id)
(stop-facet task-facet (k this-id results))))))) (stop-facet task-facet (k (first this-id) results)))))))
(on-start (select-a-task-manager)))) (on-start (select-a-task-manager))))
@ -443,7 +445,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(spawn (spawn
(during (job-manager-alive) (during (job-manager-alive)
(during (task-manager $tm-id _) (during (task-manager $tm-id _)
(define/query-set requests (task-assignment tm-id _ (task $tid _)) tid) (define/query-set requests (task-assignment _ (task $tid _)) tid)
(field [high-water-mark 0]) (field [high-water-mark 0])
(on (asserted (task-manager tm-id $slots)) (on (asserted (task-manager tm-id $slots))
(when (> slots (high-water-mark)) (when (> slots (high-water-mark))
@ -500,7 +502,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; TaskTree -> (Listof Task) ;; TaskTree -> (Listof Task)
;; flatten a task tree by assigning job-unique IDs ;; flatten a task tree by assigning job-unique IDs
(define (task-tree->list tt) (define (task-tree->list tt job-id)
(define-values (tasks _) (define-values (tasks _)
;; TaskTree ID -> (Values (Listof Task) ID) ;; TaskTree ID -> (Values (Listof Task) ID)
;; the input id is for the current node of the tree ;; the input id is for the current node of the tree
@ -509,7 +511,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[next-id 0]) [next-id 0])
(match tt (match tt
[(map-work _) [(map-work _)
(values (list (task next-id tt)) (values (list (task (list next-id job-id) tt))
(add1 next-id))] (add1 next-id))]
[(reduce-work left right) [(reduce-work left right)
(define left-id (add1 next-id)) (define left-id (add1 next-id))
@ -517,8 +519,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(loop left left-id)) (loop left left-id))
(define-values (rights next) (define-values (rights next)
(loop right right-id)) (loop right right-id))
(values (cons (task next-id (reduce-work left-id right-id)) (values (cons (task (list next-id job-id) (reduce-work left-id right-id))
(append lefts rights)) (append lefts rights))
next)]))) next)])))
tasks) tasks)
@ -526,7 +528,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (create-job in) (define (create-job in)
(define job-id (gensym 'job)) (define job-id (gensym 'job))
(define input-lines (sequence->list (in-lines in))) (define input-lines (sequence->list (in-lines in)))
(define tasks (task-tree->list (create-task-tree input-lines))) (define tasks (task-tree->list (create-task-tree input-lines) job-id))
(job job-id tasks)) (job job-id tasks))
;; String -> Job ;; String -> Job
@ -556,7 +558,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(task mid2 (map-work data2))) (task mid2 (map-work data2)))
(check-true (id? left)) (check-true (id? left))
(check-true (id? right)) (check-true (id? right))
(check-equal? (set left right) (set mid1 mid2)) (check-equal? (set left right) (set (first mid1) (first mid2)))
(check-equal? (set data1 data2) (check-equal? (set data1 data2)
(set "a b c" "d e f"))] (set "a b c" "d e f"))]
[_ [_
@ -582,7 +584,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; expected: ;; expected:
;; #hash((a . 5) (b . 5) (c . 2)) ;; #hash((a . 5) (b . 5) (c . 2))
(spawn-client j #;(file->job "lorem.txt")) (spawn-client j)
(spawn-client (file->job "lorem.txt"))
(spawn-job-manager) (spawn-job-manager)
(spawn-task-manager 2) (spawn-task-manager 2)
(spawn-task-manager 3) (spawn-task-manager 3)