diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index b810c69..368317d 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -130,15 +130,42 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; Logging -#;(define-logger flink) - (define (log fmt . args) - (displayln (apply format fmt args)) - #;(log-message flink-logger 'warning #f (apply format fmt args))) + (displayln (apply format fmt args))) ;; --------------------------------------------------------------------------------------------------- ;; TaskRunner +(define (spawn-task-runner) + (define id (gensym 'task-runner)) + (spawn #:name id + (field [status IDLE]) + (assert (task-runner id (status))) + (begin/dataflow + (log "task-runner ~v state is: ~a" id (status))) + ;; this only does map tasks atm + (during (run-task id (task $tid $desc)) + (field [execution-state (if (equal? IDLE (status)) RUNNING OVERLOAD)] + [word-count (hash)]) + ;; TODO - may need to include more correlation info in here to properly describe state when overloaded + (assert (task-execution-state tid (execution-state))) + ;; I think we have to avoid asking a non-idle runner to do anything + (when (equal? IDLE (status)) + (on-stop (status IDLE))) + (on-start + (when (equal? IDLE (status)) + (status (executing tid)) + ;; since we currently finish everything in one turn, allow other actors to observe the changes in our + ;; task-runner state by flushing pending actions. + (flush!) + (match desc + [(map-task data) + (word-count (count-new-words (word-count) (string->words data))) + (execution-state (finished (word-count)))] + [(reduce-task left right) + (word-count (hash-union left right #:combine +)) + (execution-state (finished (word-count)))])))))) + ;; (Hash String Nat) String -> (Hash String Nat) (define (word-count-increment h word) (hash-update h @@ -170,51 +197,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I #;(check-equal? (string->words "but wait---there's more") (list "but" "wait" "there's" "more"))) -(assertion-struct task-input (task-id task input-id data)) - - -(define (spawn-task-runner) - (define id (gensym 'task-runner)) - (spawn #:name id - (field [status IDLE]) - (assert (task-runner id (status))) - (begin/dataflow - (log "task-runner ~v state is: ~a" id (status))) - ;; this only does map tasks atm - (during (run-task id (task $tid $desc)) - (field [execution-state (if (equal? IDLE (status)) RUNNING OVERLOAD)] - [word-count (hash)]) - ;; TODO - may need to include more correlation info in here to properly describe state when overloaded - (assert (task-execution-state tid (execution-state))) - ;; I think we have to avoid asking a non-idle runner to do anything - (when (equal? IDLE (status)) - (on-stop (status IDLE))) - (on-start - (when (equal? IDLE (status)) - (status (executing tid)) - ;; since we currently finish everything in one turn, allow other actors to observe the changes in our - ;; task-runner state by flushing pending actions. - (flush!) - (match desc - [(map-task data) - (word-count (count-new-words (word-count) (string->words data))) - (execution-state (finished (word-count))) - #;(status IDLE)] - [(reduce-task left right) - (word-count (hash-union left right #:combine +)) - (execution-state (finished (word-count))) - #;(status IDLE)]) - ;; don't think Jonathan has any examples w/ input streaming - #;(on (asserted (task-input id task $chunk $words)) - ;; not currently worried about seeing the same chunk multiple times - (cond - [(null? words) - (execution-state (finished (word-count))) - ;; n.b. if we are asked to do multiple tasks at the same time this state update is not enough - (status IDLE)] - [else (word-count (count-new-words (word-count) words))])) - ))))) - ;; --------------------------------------------------------------------------------------------------- ;; TaskManager @@ -415,6 +397,16 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (split-at/lenient lst n) (split-at lst (min n (length lst)))) +;; --------------------------------------------------------------------------------------------------- +;; Client + +;; Job -> Void +(define (spawn-client j) + (spawn + (assert j) + (on (asserted (job-finished (job-id j) $data)) + (printf "job done!\n~a\n" data)))) + ;; --------------------------------------------------------------------------------------------------- ;; Observe interaction between task and job manager @@ -434,9 +426,12 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; Creating a Job -;; (Listof WordDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc)) +;; (Listof WorkDesc) -> (Values (Listof WorkDesc) (Optionof WorkDesc)) ;; Pair up elements of the input list into a list of reduce tasks, and if the input list is odd also -;; return the odd-one out +;; return the odd-one out. +;; Conceptually, it does something like this: +;; '(a b c d) => '((a b) (c d)) +;; '(a b c d e) => '((a b) (c d) e) (define (pair-up ls) (let loop ([ls ls] [reductions '()]) @@ -549,15 +544,6 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (check-equal? (task-desc (car tasks)) (map-task "")))) -;; --------------------------------------------------------------------------------------------------- -;; Client - -;; Job -> Void -(define (spawn-client j) - (spawn - (assert j) - (on (asserted (job-finished (job-id j) $data)) - (printf "job done!\n~a\n" data)))) ;; --------------------------------------------------------------------------------------------------- ;; Main