examples/flink: tidy up a bit

This commit is contained in:
Sam Caldwell 2019-02-26 12:13:37 -05:00
parent abc8669b74
commit a6d6ceaa7c
1 changed files with 46 additions and 60 deletions

View File

@ -130,15 +130,42 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ---------------------------------------------------------------------------------------------------
;; Logging
#;(define-logger flink)
(define (log fmt . args)
(displayln (apply format fmt args))
#;(log-message flink-logger 'warning #f (apply format fmt args)))
(displayln (apply format fmt args)))
;; ---------------------------------------------------------------------------------------------------
;; TaskRunner
(define (spawn-task-runner)
(define id (gensym 'task-runner))
(spawn #:name id
(field [status IDLE])
(assert (task-runner id (status)))
(begin/dataflow
(log "task-runner ~v state is: ~a" id (status)))
;; this only does map tasks atm
(during (run-task id (task $tid $desc))
(field [execution-state (if (equal? IDLE (status)) RUNNING OVERLOAD)]
[word-count (hash)])
;; TODO - may need to include more correlation info in here to properly describe state when overloaded
(assert (task-execution-state tid (execution-state)))
;; I think we have to avoid asking a non-idle runner to do anything
(when (equal? IDLE (status))
(on-stop (status IDLE)))
(on-start
(when (equal? IDLE (status))
(status (executing tid))
;; since we currently finish everything in one turn, allow other actors to observe the changes in our
;; task-runner state by flushing pending actions.
(flush!)
(match desc
[(map-task data)
(word-count (count-new-words (word-count) (string->words data)))
(execution-state (finished (word-count)))]
[(reduce-task left right)
(word-count (hash-union left right #:combine +))
(execution-state (finished (word-count)))]))))))
;; (Hash String Nat) String -> (Hash String Nat)
(define (word-count-increment h word)
(hash-update h
@ -170,51 +197,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
#;(check-equal? (string->words "but wait---there's more")
(list "but" "wait" "there's" "more")))
(assertion-struct task-input (task-id task input-id data))
(define (spawn-task-runner)
(define id (gensym 'task-runner))
(spawn #:name id
(field [status IDLE])
(assert (task-runner id (status)))
(begin/dataflow
(log "task-runner ~v state is: ~a" id (status)))
;; this only does map tasks atm
(during (run-task id (task $tid $desc))
(field [execution-state (if (equal? IDLE (status)) RUNNING OVERLOAD)]
[word-count (hash)])
;; TODO - may need to include more correlation info in here to properly describe state when overloaded
(assert (task-execution-state tid (execution-state)))
;; I think we have to avoid asking a non-idle runner to do anything
(when (equal? IDLE (status))
(on-stop (status IDLE)))
(on-start
(when (equal? IDLE (status))
(status (executing tid))
;; since we currently finish everything in one turn, allow other actors to observe the changes in our
;; task-runner state by flushing pending actions.
(flush!)
(match desc
[(map-task data)
(word-count (count-new-words (word-count) (string->words data)))
(execution-state (finished (word-count)))
#;(status IDLE)]
[(reduce-task left right)
(word-count (hash-union left right #:combine +))
(execution-state (finished (word-count)))
#;(status IDLE)])
;; don't think Jonathan has any examples w/ input streaming
#;(on (asserted (task-input id task $chunk $words))
;; not currently worried about seeing the same chunk multiple times
(cond
[(null? words)
(execution-state (finished (word-count)))
;; n.b. if we are asked to do multiple tasks at the same time this state update is not enough
(status IDLE)]
[else (word-count (count-new-words (word-count) words))]))
)))))
;; ---------------------------------------------------------------------------------------------------
;; TaskManager
@ -415,6 +397,16 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (split-at/lenient lst n)
(split-at lst (min n (length lst))))
;; ---------------------------------------------------------------------------------------------------
;; Client
;; Job -> Void
(define (spawn-client j)
(spawn
(assert j)
(on (asserted (job-finished (job-id j) $data))
(printf "job done!\n~a\n" data))))
;; ---------------------------------------------------------------------------------------------------
;; Observe interaction between task and job manager
@ -434,9 +426,12 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ---------------------------------------------------------------------------------------------------
;; Creating a Job
;; (Listof WordDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc))
;; (Listof WorkDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc))
;; Pair up elements of the input list into a list of reduce tasks, and if the input list is odd also
;; return the odd-one out
;; return the odd-one out.
;; Conceptually, it does something like this:
;; '(a b c d) => '((a b) (c d))
;; '(a b c d e) => '((a b) (c d) e)
(define (pair-up ls)
(let loop ([ls ls]
[reductions '()])
@ -549,15 +544,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(check-equal? (task-desc (car tasks))
(map-task ""))))
;; ---------------------------------------------------------------------------------------------------
;; Client
;; Job -> Void
(define (spawn-client j)
(spawn
(assert j)
(on (asserted (job-finished (job-id j) $data))
(printf "job done!\n~a\n" data))))
;; ---------------------------------------------------------------------------------------------------
;; Main