From c37c060dc99a4310cb3802083536b5dffa5adb7f Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Fri, 24 May 2019 11:25:29 -0400 Subject: [PATCH] fancify patterns in flink --- racket/typed/examples/roles/flink.rkt | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index 4b95a4e..e8dbce6 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -173,10 +173,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-runner id (ref status))) (begin/dataflow (log "task-runner ~v state is: ~a" id (ref status))) - (during (task-assignment id - $job-id:ID - (task $task-id:TaskID - $desc:ConcreteWork)) + (during (task-assignment id $job-id (task $task-id $desc)) (field [state TaskStateDesc ACCEPTED]) (assert (task-state id job-id task-id (ref state))) (cond @@ -186,10 +183,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (set! state RUNNING) (set! status (executing task-id)) (match desc - [(map-work (bind data String)) + [(map-work $data) (define wc (count-new-words (ann (hash) WordCount) (string->words data))) (set! state (finished wc))] - [(reduce-work (bind left WordCount) (bind right WordCount)) + [(reduce-work $left $right) (define wc (hash-union/combine left right +)) (set! state (finished wc))]) (set! status IDLE)] @@ -206,20 +203,17 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) (log "TM learns about JM") - (define/query-set task-runners (task-runner $id:ID _) id + (define/query-set task-runners (task-runner $id _) id #;#:on-add #;(log "TM learns about task-runner ~a" id)) ;; I wonder just how inefficient this is - (define/query-set idle-runners (task-runner $id:ID IDLE) id + (define/query-set idle-runners (task-runner $id IDLE) id #;#:on-add #;(log "TM learns that task-runner ~a is IDLE" id) #;#:on-remove #;(log "TM learns that task-runner ~a is NOT IDLE" id)) (assert (task-manager id (set-count (ref idle-runners)))) (field [busy-runners (List ID) (list)]) (define (can-accept?) (not (set-empty? (ref idle-runners)))) - (during (task-assignment id - $job-id:ID - (task $task-id:TaskID - $desc:ConcreteWork)) + (during (task-assignment id $job-id (task $task-id $desc)) (define status0 : TaskStateDesc (if (can-accept?) RUNNING @@ -237,7 +231,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; need to be making assertions about their idleness (on stop (set! idle-runners (set-add (ref idle-runners) runner))) (assert (task-assignment runner job-id (task task-id desc))) - (on (asserted (task-state runner job-id task-id $st:TaskStateDesc)) + (on (asserted (task-state runner job-id task-id $st)) (match st [ACCEPTED #f] [RUNNING #f] @@ -304,7 +298,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (log "Job Manager Up") ;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned. - (define/query-hash task-managers (task-manager $id:ID $slots:Int) id slots + (define/query-hash task-managers (task-manager $id $slots) id slots #;#:on-add #;(log "JM learns that ~a has ~v slots" id slots)) ;; (Hashof TaskManagerID Nat) @@ -325,7 +319,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (received-answer! [id : ID]) (set! requests-in-flight (hash-update (ref requests-in-flight) id sub1))) - (during (job $job-id:ID ($ tasks (List PendingTask))) + (during (job $job-id $tasks) (log "JM receives job ~a" job-id) (define-tuple (not-ready ready) (partition-ready-tasks tasks)) (field [ready-tasks (List ConcreteTask) ready] @@ -408,7 +402,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (on (asserted (task-state mngr job-id this-id _)) (stop take-slot (received-answer! mngr))))) - (on (asserted (task-state mngr job-id this-id $status:TaskStateDesc)) + (on (asserted (task-state mngr job-id this-id $status)) (match status [ACCEPTED #f] [RUNNING #f]