diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index 879c8c2..17be6af 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -2,6 +2,7 @@ (require racket/set) (require (only-in racket/list + first partition empty? split-at)) @@ -71,17 +72,20 @@ Task Delegation has two roles, TaskAssigner (TA) and TaskPerformer (TP). A TaskAssigner asserts the association of a Task with a particular TaskPerformer through - (task-assignment ID ID Task) -where the first ID identifies the TP and the second identifies the job. + (task-assignment ID Task) +where the ID identifies the TP |# -(assertion-struct task-assignment (assignee job-id task)) +(assertion-struct task-assignment (assignee task)) #| -A Task is a (task ID Work), where Work is one of +A Task is a (task TaskID Work), where Work is one of - (map-work String) - (reduce-work (U ID TaskResult) (U ID TaskResult)), referring to either the ID of the dependent task or its results. A reduce-work is ready to be executed when it has both results. +A TaskID is a (list ID ID), where the first ID is specific to the individual +task and the second identifies the job it belongs to. + A TaskResult is a (Hashof String Natural), counting the occurrences of words |# (struct task (id desc) #:transparent) @@ -91,9 +95,7 @@ A TaskResult is a (Hashof String Natural), counting the occurrences of words #| The TaskPerformer responds to a task-assignment by describing its state with respect to that task, - (task-state ID ID ID TaskStateDesc) -where the first ID is that of the TP, the second is that of the job, -and the third that of the task. + (task-state TaskID TaskStateDesc) A TaskStateDesc is one of - ACCEPTED, when the TP has the resources to perform the task. (TODO - not sure if this is ever visible, currently) @@ -101,7 +103,7 @@ A TaskStateDesc is one of - RUNNING, indicating that the task is being performed - (finished TaskResult), describing the results |# -(assertion-struct task-state (assignee job-id task-id desc)) +(assertion-struct task-state (task-id desc)) (struct finished (data) #:transparent) (define ACCEPTED 'accepted) (define RUNNING 'running) @@ -138,30 +140,27 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; gonna need some effect polymorphism to type uses of this (define (task-performer my-id can-accept? perform-task) (react - (during (task-assignment my-id $job-id $task) + (during (task-assignment my-id $task) (field [status #f]) - (assert (task-state my-id job-id (task-id task) (status))) + (assert (task-state (task-id task) (status))) (cond [(can-accept?) (status RUNNING) (define (on-complete results) (status (finished results))) - (perform-task task job-id on-complete status)] + (perform-task task on-complete status)] [else (status OVERLOAD)])))) ;; Task ;; ID -;; ID ;; (-> Void) ;; (TaskResults -> Void) ;; -> TaskAssigner -(define (task-assigner tsk job-id performer - on-overload! - on-complete!) +(define (task-assigner tsk performer on-overload! on-complete!) (react - (assert (task-assignment performer job-id tsk)) - (on (asserted (task-state performer job-id (task-id tsk) $status)) + (assert (task-assignment performer tsk)) + (on (asserted (task-state (task-id tsk) $status)) (match status [(or (== ACCEPTED) (== RUNNING)) @@ -173,14 +172,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; TaskRunner - ;; ID ID -> Spawn (define (spawn-task-runner id tm-id) (spawn #:name id (assert (task-runner id)) (stop-when (retracted (task-manager tm-id _))) ;; Task (TaskStateDesc -> Void) -> Void - (define (perform-task tsk job-id on-complete! update-status!) + (define (perform-task tsk on-complete! update-status!) (match-define (task tid desc) tsk) (match desc [(map-work data) @@ -219,7 +217,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (list "good" "eve" "ma'am")) (check-equal? (string->words "please sir. may I have another?") (list "please" "sir" "may" "I" "have" "another")) - ;; TODO - currently fails + ;; currently fails, doesn't seem worth fixing #;(check-equal? (string->words "but wait---there's more") (list "but" "wait" "there's" "more"))) @@ -252,26 +250,30 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; Assign incoming tasks (field [busy-runners (set)]) + (define/dataflow idle-runners (set-count (set-subtract (task-runners) (busy-runners)))) (assert (task-manager id (idle-runners))) (define (can-accept?) + (log "TM ~a idle-runners = ~a" id (idle-runners)) (positive? (idle-runners))) (define (select-runner) (define runner (for/first ([r (in-set (task-runners))] #:unless (set-member? (busy-runners) r)) r)) + (unless runner + (log "ERROR: TM ~a failed to select a runner.\nrunners: ~a\nbusy: ~a" id (task-runners) (busy-runners))) (busy-runners (set-add (busy-runners) runner)) runner) - (define (perform-task tsk job-id on-complete! update-status!) + (define (perform-task tsk on-complete! update-status!) (match-define (task task-id desc) tsk) (define runner (select-runner)) (log "TM ~a assigns task ~a to runner ~a" id task-id runner) (on-stop (busy-runners (set-remove (busy-runners) runner))) (on-start - (task-assigner tsk job-id runner + (task-assigner tsk runner (lambda () (update-status! OVERLOAD)) (lambda (results) (on-complete! results))))) (on-start @@ -362,9 +364,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; things didn't work. I think that due to script scheduling, all ready ;; tasks were being assigned to the manager #;(take-slot! mngr) - (react (stop-when (asserted (task-state mngr job-id this-id _)) + (react (stop-when (asserted (task-state this-id _)) (received-answer! mngr))) - (task-assigner t job-id mngr + (task-assigner t mngr (lambda () ;; need to find a new task manager ;; don't think we need a release-slot! here, because if we've heard back from a task manager, @@ -373,7 +375,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (stop-facet this-facet (select-a-task-manager))) (lambda (results) (log "JM receives the results of task ~a" this-id) - (stop-facet task-facet (k this-id results))))))) + (stop-facet task-facet (k (first this-id) results))))))) (on-start (select-a-task-manager)))) @@ -443,7 +445,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (spawn (during (job-manager-alive) (during (task-manager $tm-id _) - (define/query-set requests (task-assignment tm-id _ (task $tid _)) tid) + (define/query-set requests (task-assignment _ (task $tid _)) tid) (field [high-water-mark 0]) (on (asserted (task-manager tm-id $slots)) (when (> slots (high-water-mark)) @@ -500,7 +502,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; TaskTree -> (Listof Task) ;; flatten a task tree by assigning job-unique IDs -(define (task-tree->list tt) +(define (task-tree->list tt job-id) (define-values (tasks _) ;; TaskTree ID -> (Values (Listof Task) ID) ;; the input id is for the current node of the tree @@ -509,7 +511,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [next-id 0]) (match tt [(map-work _) - (values (list (task next-id tt)) + (values (list (task (list next-id job-id) tt)) (add1 next-id))] [(reduce-work left right) (define left-id (add1 next-id)) @@ -517,8 +519,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (loop left left-id)) (define-values (rights next) (loop right right-id)) - (values (cons (task next-id (reduce-work left-id right-id)) - (append lefts rights)) + (values (cons (task (list next-id job-id) (reduce-work left-id right-id)) + (append lefts rights)) next)]))) tasks) @@ -526,7 +528,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (create-job in) (define job-id (gensym 'job)) (define input-lines (sequence->list (in-lines in))) - (define tasks (task-tree->list (create-task-tree input-lines))) + (define tasks (task-tree->list (create-task-tree input-lines) job-id)) (job job-id tasks)) ;; String -> Job @@ -556,7 +558,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (task mid2 (map-work data2))) (check-true (id? left)) (check-true (id? right)) - (check-equal? (set left right) (set mid1 mid2)) + (check-equal? (set left right) (set (first mid1) (first mid2))) (check-equal? (set data1 data2) (set "a b c" "d e f"))] [_ @@ -582,7 +584,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; expected: ;; #hash((a . 5) (b . 5) (c . 2)) -(spawn-client j #;(file->job "lorem.txt")) +(spawn-client j) +(spawn-client (file->job "lorem.txt")) (spawn-job-manager) (spawn-task-manager 2) (spawn-task-manager 3)