From 5cb0462ec4d874369cb385382e502b6deb57c49e Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Wed, 13 Feb 2019 15:52:39 -0500 Subject: [PATCH] examples: adapt Jonathan's flink exceprt to syndicate --- racket/syndicate/examples/actor/flink.rkt | 324 ++++++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 racket/syndicate/examples/actor/flink.rkt diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt new file mode 100644 index 0000000..ed10d2e --- /dev/null +++ b/racket/syndicate/examples/actor/flink.rkt @@ -0,0 +1,324 @@ +#lang syndicate + +(require (only-in racket/set + set-count + set-empty? + set-first + set-remove)) +(require (only-in racket/list + partition + empty? + split-at)) +(require (only-in racket/hash + hash-union)) + +;; --------------------------------------------------------------------------------------------------- +;; Logging + +#;(define-logger flink) + +(define (log fmt . args) + (displayln (apply format fmt args)) + #;(log-message flink-logger 'warning #f (apply format fmt args))) + +;; --------------------------------------------------------------------------------------------------- +;; TaskRunner + + +;; (Hash String Nat) String -> (Hash String Nat) +(define (word-count-increment h word) + (hash-update h + word + add1 + (λ x 0))) + +;; (Hash String Nat) (Listof String) -> (Hash String Nat) +(define (count-new-words word-count words) + (for/fold ([result word-count]) + ([word words]) + (word-count-increment result word))) + +(assertion-struct task-runner (id status)) +(assertion-struct run-task (id task)) +(assertion-struct task-execution-state (task state)) +(assertion-struct task-input (task-id task input-id data)) +(struct finished (data) #:transparent) +(struct executing (id) #:transparent) + +(define IDLE 'idle) +(define RUNNING 'running) + +(define (spawn-task-runner) + (define id (gensym 'task-runner)) + (spawn #:name id + (field [status IDLE]) + (assert (task-runner id (status))) + (begin/dataflow + (log "task-runner ~v state is: ~a" id (status))) + ;; this only does map tasks atm + (during (run-task id (task $tid $desc)) + (field [execution-state (if (equal? IDLE (status)) RUNNING OVERLOAD)] + [word-count (hash)]) + ;; TODO - may need to include more correlation info in here to properly describe state when overloaded + (assert (task-execution-state tid (execution-state))) + ;; I think we have to avoid asking a non-idle runner to do anything + (when (equal? IDLE (status)) + (on-stop (status IDLE))) + (on-start + (when (equal? IDLE (status)) + (status (executing tid)) + ;; since we currently finish everything in one turn, allow other actors to observe the changes in our + ;; task-runner state by flushing pending actions. + (flush!) + (match desc + [(map-task data) + (word-count (count-new-words (word-count) data)) + (execution-state (finished (word-count))) + #;(status IDLE)] + [(reduce-task left right) + (word-count (hash-union left right #:combine +)) + (execution-state (finished (word-count))) + #;(status IDLE)]) + ;; don't think Jonathan has any examples w/ input streaming + #;(on (asserted (task-input id task $chunk $words)) + ;; not currently worried about seeing the same chunk multiple times + (cond + [(null? words) + (execution-state (finished (word-count))) + ;; n.b. if we are asked to do multiple tasks at the same time this state update is not enough + (status IDLE)] + [else (word-count (count-new-words (word-count) words))])) + ))))) + +;; --------------------------------------------------------------------------------------------------- +;; TaskManager + +(assertion-struct task-manager (id slots)) +(assertion-struct submitted-task (manager task)) +(assertion-struct job-manager-alive ()) +(assertion-struct task-state (id desc)) + +;; task states +(define ACCEPTED 'accepted) +(define OVERLOAD 'overload) + +(define (spawn-task-manager) + (define id (gensym 'task-manager)) + (spawn #:name id + (log "Task Manager (TM) ~a is running" id) + (during (job-manager-alive) + (log "TM learns about JM") + ;; SUSPICION - these two query sets interfere with one another + (define/query-set task-runners (task-runner $id _) id + #:on-add (log "TM learns about task-runner ~a" id)) + ;; I wonder just how inefficient this is + (define/query-set idle-runners (task-runner $id IDLE) id + #:on-add (log "TM learns that task-runner ~a is IDLE" id) + #:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id)) + (assert (task-manager id (set-count (idle-runners)))) + (field [busy-runners (list)]) + (during (submitted-task id $t) + (match-define (task task-id desc) t) + #;(on-start (log "TM receives task ~a" task-id)) + (log "TM receives task ~a" task-id) + (on-stop (log "TM finished with task ~a" task-id) + (when (= task-id 6) + (log "TM idle-runners: ~a" (idle-runners)))) + (field [status ACCEPTED]) + (assert (task-state task-id (status))) + (cond + [(set-empty? (idle-runners)) + (log "TM can't run ~a right now" task-id) + (status OVERLOAD)] + [else + (define runner (set-first (idle-runners))) + ;; n.b. modifying a query set is questionable + ;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner. + ;; Could use the busy-runners field to avoid that + (idle-runners (set-remove (idle-runners) runner)) + (log "TM assigns task ~a to runner ~a" task-id runner) + (assert (run-task runner t)) + (status RUNNING) + (on (asserted (task-execution-state task-id $state)) + (match state + [(== RUNNING) + ;; nothing to do + (void)] + [(finished results) + (log "TM receives the results of task ~a" task-id) + (status state)] + [_ + ;; TODO + ;; need input maybe? + #f]))]))))) + +;; --------------------------------------------------------------------------------------------------- +;; JobManager + +(assertion-struct job (id tasks)) +(assertion-struct job-finished (id data)) +(struct job-description (id tasks) #:transparent) +(struct map-task (data) #:transparent) +(struct reduce-task (left right) #:transparent) +(struct task (id desc) #:transparent) + +(define (spawn-job-manager) + (spawn + (assert (job-manager-alive)) + (log "Job Manager Up") + ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. + (define/query-hash task-managers (task-manager $id $slots) id slots + #:on-add (log "JM learns that ~a has ~v slots" id slots)) + (define (slots-available) + (for/sum ([v (in-hash-values (task-managers))]) + v)) + ;; ID -> Void + (define (take-slot! id) + ;; make local changes to task-managers to reflect tasks delegated in the current turn + (log "JM assigns a task to ~a" id) + (task-managers (hash-update (task-managers) id sub1))) + + (during (job $job-id $tasks) + (log "JM receives job ~a" job-id) + (define-values (ready not-ready) (partition task-ready? tasks)) + (field [ready-tasks ready] + [waiting-tasks not-ready] + [tasks-in-progress 0] + [data-partitions (hash)]) + + (begin/dataflow + (define slots (slots-available)) + (define-values (ts readys) + (split-at/lenient (ready-tasks) slots)) + (for ([t ts]) + (perform-task t push-results)) + (unless (empty? ts) + ;; the empty? check may be necessary to avoid a dataflow loop + (ready-tasks readys))) + + ;; Task -> Void + (define (add-ready-task! t) + ;; TODO - use functional-queue.rkt from ../../ + (log "JM marks task ~a as ready" (task-id t)) + (ready-tasks (cons t (ready-tasks)))) + + ;; need to parcel out tasks + ;; Task (ID TaskResult -> Void) -> Void + ;; Requires (task-ready? t) + (define (perform-task t k) + (react + (on-start (tasks-in-progress (add1 (tasks-in-progress)))) + (on-stop (tasks-in-progress (sub1 (tasks-in-progress)))) + (match-define (task this-id desc) t) + (log "JM begins on task ~a" this-id) + + (field [task-mngr #f]) + (begin/dataflow + ;; n.b. cyclic data-flow dependency + (unless (task-mngr) + (define mngr + (for/first ([(id slots) (in-hash (task-managers))] + #:unless (zero? slots)) + id)) + (when mngr + (take-slot! mngr) + (task-mngr mngr)))) + ;; TODO - should respond if task manager dies + (assert #:when (task-mngr) + (submitted-task (task-mngr) t)) + (on #:when (task-mngr) + (asserted (task-state this-id $state)) + (match state + [(== OVERLOAD) + ;; need to find a new task manager + ;; don't think we need a release-slot! here, because if we've heard back from a task manager, + ;; they should have told us a different slot count since we tried to give them work + (log "JM overloaded manager ~a with task ~a" (task-mngr) this-id) + (task-mngr #f)] + [(finished results) + ;; TODO - guess-timation of what this should look like + (log "JM receives the results of task ~a" this-id) + (stop-current-facet (k this-id results))] + [_ + ;; TODO - needs more data? + #f])))) + + ;; ID Data -> Void + ;; Update any dependent tasks with the results of the given task, moving + ;; them to the ready queue when possible + (define (push-results task-id data) + (cond + [(and (zero? (tasks-in-progress)) + (empty? (ready-tasks)) + (empty? (waiting-tasks))) + ;; TODO - also need to ensure there are no tasks in progress + (log "JM finished with job ~a" job-id) + (react (assert (job-finished job-id data)))] + [else + ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. + (define still-waiting + (for/fold ([ts '()]) + ([t (in-list (waiting-tasks))]) + (define t+ (task+data t task-id data)) + (cond + [(task-ready? t+) + (add-ready-task! t+) + ts] + [else + (cons t+ ts)]))) + (waiting-tasks still-waiting)])) + #f))) + +;; Task -> Bool +;; Test if the task is ready to run +(define (task-ready? t) + (match t + [(task _ (reduce-task l r)) + (not (or (id? l) (id? r)))] + [_ #t])) + +;; Task Id Any -> Task +;; If the given task is waiting for this data, replace the waiting ID with the data +(define (task+data t id data) + (match t + [(task tid (reduce-task (== id) r)) + (task tid (reduce-task data r))] + [(task tid (reduce-task l (== id))) + (task tid (reduce-task l data))] + [_ t])) + +;; Any -> Bool +;; recognize ids +(define (id? x) + (or (symbol? x) (exact-nonnegative-integer? x))) + +;; (Listof A) Nat -> (Values (Listof A) (Listof A)) +;; like split-at but allow a number larger than the length of the list +(define (split-at/lenient lst n) + (split-at lst (min n (length lst)))) + +;; --------------------------------------------------------------------------------------------------- +;; Client + +(define (spawn-client) + (spawn + (define j (job 1 + (list (task 1 (map-task (list "a" "b" "c" "a" "b" "c"))) + (task 2 (map-task (list "a" "b"))) + (task 3 (map-task (list "a" "b"))) + (task 4 (map-task (list "a" "b"))) + (task 5 (reduce-task 1 2)) + (task 6 (reduce-task 3 4)) + (task 7 (reduce-task 5 6))))) + (assert j) + (on (asserted (job-finished (job-id j) $data)) + (printf "job done!\n~a\n" data)))) + +;; --------------------------------------------------------------------------------------------------- +;; Main + +(spawn-client) +(spawn-job-manager) +(spawn-task-manager) +(spawn-task-runner) +(spawn-task-runner)