diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 62e1916..4b95a4e 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -164,7 +164,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (require/typed "flink-support.rkt" [string->words : (→fn String (List String))]) -#;(define (spawn-task-runner) +(define (spawn-task-runner) (define id (gensym 'task-runner)) (spawn τc (start-facet runner @@ -174,9 +174,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (begin/dataflow (log "task-runner ~v state is: ~a" id (ref status))) (during (task-assignment id - (bind job-id ID) - (task (bind task-id TaskID) - (bind desc ConcreteWork))) + $job-id:ID + (task $task-id:TaskID + $desc:ConcreteWork)) (field [state TaskStateDesc ACCEPTED]) (assert (task-state id job-id task-id (ref state))) (cond @@ -199,17 +199,17 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; --------------------------------------------------------------------------------------------------- ;; TaskManager -#;(define (spawn-task-manager) +(define (spawn-task-manager) (define id (gensym 'task-manager)) (spawn τc (start-facet tm (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) (log "TM learns about JM") - (define/query-set task-runners (task-runner (bind id ID) discard) id + (define/query-set task-runners (task-runner $id:ID _) id #;#:on-add #;(log "TM learns about task-runner ~a" id)) ;; I wonder just how inefficient this is - (define/query-set idle-runners (task-runner (bind id ID) IDLE) id + (define/query-set idle-runners (task-runner $id:ID IDLE) id #;#:on-add #;(log "TM learns that task-runner ~a is IDLE" id) #;#:on-remove #;(log "TM learns that task-runner ~a is NOT IDLE" id)) (assert (task-manager id (set-count (ref idle-runners)))) @@ -217,9 +217,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (can-accept?) (not (set-empty? (ref idle-runners)))) (during (task-assignment id - (bind job-id ID) - (task (bind task-id TaskID) - (bind desc ConcreteWork))) + $job-id:ID + (task $task-id:TaskID + $desc:ConcreteWork)) (define status0 : TaskStateDesc (if (can-accept?) RUNNING @@ -237,7 +237,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; need to be making assertions about their idleness (on stop (set! idle-runners (set-add (ref idle-runners) runner))) (assert (task-assignment runner job-id (task task-id desc))) - (on (asserted (task-state runner job-id task-id (bind st TaskStateDesc))) + (on (asserted (task-state runner job-id task-id $st:TaskStateDesc)) (match st [ACCEPTED #f] [RUNNING #f] @@ -253,13 +253,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; Test if the task is ready to run (define (task-ready? [t : PendingTask] -> (Maybe ConcreteTask)) (match t - [(task (bind tid TaskID) (map-work (bind s String))) + [(task $tid (map-work $s)) ;; having to re-produce this is directly bc of no occurrence typing (some (task tid (map-work s)))] - [(task (bind tid TaskID) (reduce-work (right (bind v1 TaskResult)) - (right (bind v2 TaskResult)))) + [(task $tid (reduce-work (right $v1) + (right $v2))) (some (task tid (reduce-work v1 v2)))] - [discard + [_ none])) ;; Task Id Any -> Task @@ -269,13 +269,11 @@ The JobManager then performs the job and, when finished, asserts (job-finished I [data : TaskResult] -> PendingTask) (match t - [(task (bind tid TaskID) - (reduce-work (left id) (bind r (Either TaskID TaskResult)))) + [(task $tid (reduce-work (left id) $r)) (task tid (reduce-work (right data) r))] - [(task (bind tid TaskID) - (reduce-work (bind l (Either TaskID TaskResult)) (left id))) + [(task $tid (reduce-work $l (left id))) (task tid (reduce-work l (right data)))] - [discard t])) + [_ t])) (require/typed "flink-support.rkt" @@ -294,7 +292,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (part tasks (lambda ([t : PendingTask]) (match (task-ready? t) - [(some (bind ct ConcreteTask)) + [(some $ct) (right ct)] [none (left t)])))) @@ -306,7 +304,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (log "Job Manager Up") ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. - (define/query-hash task-managers (task-manager (bind id ID) (bind slots Int)) id slots + (define/query-hash task-managers (task-manager $id:ID $slots:Int) id slots #;#:on-add #;(log "JM learns that ~a has ~v slots" id slots)) ;; (Hashof TaskManagerID Nat) @@ -327,7 +325,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (received-answer! [id : ID]) (set! requests-in-flight (hash-update (ref requests-in-flight) id sub1))) - (during (job (bind job-id ID) (bind tasks (List PendingTask))) + (during (job $job-id:ID ($ tasks (List PendingTask))) (log "JM receives job ~a" job-id) (define-tuple (not-ready ready) (partition-ready-tasks tasks)) (field [ready-tasks (List ConcreteTask) ready] @@ -337,7 +335,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; Task -> Void (define (add-ready-task! [t : ConcreteTask]) ;; TODO - use functional-queue.rkt from ../../ - (match-define (task (bind tid TaskID) discard) t) + (match-define (task $tid _) t) (log "JM marks task ~a as ready" tid) (set! ready-tasks (cons t (ref ready-tasks)))) @@ -359,10 +357,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ([t (ref waiting-tasks)]) (define t+ (task+data t task-id data)) (match (task-ready? t+) - [(some (bind ready ConcreteTask)) + [(some $ready) (add-ready-task! ready) ts] - [discard + [_ (cons t+ ts)]))) (set! waiting-tasks still-waiting)])) @@ -374,7 +372,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (start-facet perform (on start (set! tasks-in-progress (add1 (ref tasks-in-progress)))) (on stop (set! tasks-in-progress (sub1 (ref tasks-in-progress)))) - (match-define (task (bind this-id TaskID) (bind desc ConcreteWork)) t) + (match-define (task $this-id $desc) t) (log "JM begins on task ~a" this-id) (define (∀ (ρ2) (select-a-task-manager [assign-task : (proc ID -> ★/t @@ -386,7 +384,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) id)) (match mngr? - [(some (bind mngr ID)) + [(some $mngr) (take-slot! mngr) (stop this-facet (assign-task mngr))] @@ -407,9 +405,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; tasks were being assigned to the manager #;(take-slot! mngr) (start-facet take-slot - (stop-when (asserted (task-state mngr job-id this-id discard)) - (received-answer! mngr)))) - (on (asserted (task-state mngr job-id this-id (bind status TaskStateDesc))) + (on (asserted (task-state mngr job-id this-id _)) + (stop take-slot + (received-answer! mngr))))) + (on (asserted (task-state mngr job-id this-id $status:TaskStateDesc)) (match status [ACCEPTED #f] [RUNNING #f] @@ -419,7 +418,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; they should have told us a different slot count since we tried to give them work (log "JM overloaded manager ~a with task ~a" mngr this-id) (stop this-facet #;(select-a-task-manager))] - [(finished (bind results TaskResult)) + [(finished $results) (log "JM receives the results of task ~a" this-id) (stop perform (k this-id results))]))))