fancify patterns in flink

This commit is contained in:
Sam Caldwell 2019-05-24 11:25:29 -04:00
parent c78b76b38c
commit c37c060dc9
1 changed files with 10 additions and 16 deletions

View File

@ -173,10 +173,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(assert (task-runner id (ref status)))
(begin/dataflow
(log "task-runner ~v state is: ~a" id (ref status)))
(during (task-assignment id
$job-id:ID
(task $task-id:TaskID
$desc:ConcreteWork))
(during (task-assignment id $job-id (task $task-id $desc))
(field [state TaskStateDesc ACCEPTED])
(assert (task-state id job-id task-id (ref state)))
(cond
@ -186,10 +183,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(set! state RUNNING)
(set! status (executing task-id))
(match desc
[(map-work (bind data String))
[(map-work $data)
(define wc (count-new-words (ann (hash) WordCount) (string->words data)))
(set! state (finished wc))]
[(reduce-work (bind left WordCount) (bind right WordCount))
[(reduce-work $left $right)
(define wc (hash-union/combine left right +))
(set! state (finished wc))])
(set! status IDLE)]
@ -206,20 +203,17 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(log "Task Manager (TM) ~a is running" id)
(during (job-manager-alive)
(log "TM learns about JM")
(define/query-set task-runners (task-runner $id:ID _) id
(define/query-set task-runners (task-runner $id _) id
#;#:on-add #;(log "TM learns about task-runner ~a" id))
;; I wonder just how inefficient this is
(define/query-set idle-runners (task-runner $id:ID IDLE) id
(define/query-set idle-runners (task-runner $id IDLE) id
#;#:on-add #;(log "TM learns that task-runner ~a is IDLE" id)
#;#:on-remove #;(log "TM learns that task-runner ~a is NOT IDLE" id))
(assert (task-manager id (set-count (ref idle-runners))))
(field [busy-runners (List ID) (list)])
(define (can-accept?)
(not (set-empty? (ref idle-runners))))
(during (task-assignment id
$job-id:ID
(task $task-id:TaskID
$desc:ConcreteWork))
(during (task-assignment id $job-id (task $task-id $desc))
(define status0 : TaskStateDesc
(if (can-accept?)
RUNNING
@ -237,7 +231,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; need to be making assertions about their idleness
(on stop (set! idle-runners (set-add (ref idle-runners) runner)))
(assert (task-assignment runner job-id (task task-id desc)))
(on (asserted (task-state runner job-id task-id $st:TaskStateDesc))
(on (asserted (task-state runner job-id task-id $st))
(match st
[ACCEPTED #f]
[RUNNING #f]
@ -304,7 +298,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(log "Job Manager Up")
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
(define/query-hash task-managers (task-manager $id:ID $slots:Int) id slots
(define/query-hash task-managers (task-manager $id $slots) id slots
#;#:on-add #;(log "JM learns that ~a has ~v slots" id slots))
;; (Hashof TaskManagerID Nat)
@ -325,7 +319,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (received-answer! [id : ID])
(set! requests-in-flight (hash-update (ref requests-in-flight) id sub1)))
(during (job $job-id:ID ($ tasks (List PendingTask)))
(during (job $job-id $tasks)
(log "JM receives job ~a" job-id)
(define-tuple (not-ready ready) (partition-ready-tasks tasks))
(field [ready-tasks (List ConcreteTask) ready]
@ -408,7 +402,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(on (asserted (task-state mngr job-id this-id _))
(stop take-slot
(received-answer! mngr)))))
(on (asserted (task-state mngr job-id this-id $status:TaskStateDesc))
(on (asserted (task-state mngr job-id this-id $status))
(match status
[ACCEPTED #f]
[RUNNING #f]