clean up patterns in flink

This commit is contained in:
Sam Caldwell 2019-05-24 10:06:01 -04:00
parent 4fdce7fc0c
commit 296a77d714
1 changed files with 31 additions and 32 deletions

View File

@ -164,7 +164,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(require/typed "flink-support.rkt"
[string->words : (→fn String (List String))])
#;(define (spawn-task-runner)
(define (spawn-task-runner)
(define id (gensym 'task-runner))
(spawn τc
(start-facet runner
@ -174,9 +174,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(begin/dataflow
(log "task-runner ~v state is: ~a" id (ref status)))
(during (task-assignment id
(bind job-id ID)
(task (bind task-id TaskID)
(bind desc ConcreteWork)))
$job-id:ID
(task $task-id:TaskID
$desc:ConcreteWork))
(field [state TaskStateDesc ACCEPTED])
(assert (task-state id job-id task-id (ref state)))
(cond
@ -199,17 +199,17 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; ---------------------------------------------------------------------------------------------------
;; TaskManager
#;(define (spawn-task-manager)
(define (spawn-task-manager)
(define id (gensym 'task-manager))
(spawn τc
(start-facet tm
(log "Task Manager (TM) ~a is running" id)
(during (job-manager-alive)
(log "TM learns about JM")
(define/query-set task-runners (task-runner (bind id ID) discard) id
(define/query-set task-runners (task-runner $id:ID _) id
#;#:on-add #;(log "TM learns about task-runner ~a" id))
;; I wonder just how inefficient this is
(define/query-set idle-runners (task-runner (bind id ID) IDLE) id
(define/query-set idle-runners (task-runner $id:ID IDLE) id
#;#:on-add #;(log "TM learns that task-runner ~a is IDLE" id)
#;#:on-remove #;(log "TM learns that task-runner ~a is NOT IDLE" id))
(assert (task-manager id (set-count (ref idle-runners))))
@ -217,9 +217,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (can-accept?)
(not (set-empty? (ref idle-runners))))
(during (task-assignment id
(bind job-id ID)
(task (bind task-id TaskID)
(bind desc ConcreteWork)))
$job-id:ID
(task $task-id:TaskID
$desc:ConcreteWork))
(define status0 : TaskStateDesc
(if (can-accept?)
RUNNING
@ -237,7 +237,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; need to be making assertions about their idleness
(on stop (set! idle-runners (set-add (ref idle-runners) runner)))
(assert (task-assignment runner job-id (task task-id desc)))
(on (asserted (task-state runner job-id task-id (bind st TaskStateDesc)))
(on (asserted (task-state runner job-id task-id $st:TaskStateDesc))
(match st
[ACCEPTED #f]
[RUNNING #f]
@ -253,13 +253,13 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; Test if the task is ready to run
(define (task-ready? [t : PendingTask] -> (Maybe ConcreteTask))
(match t
[(task (bind tid TaskID) (map-work (bind s String)))
[(task $tid (map-work $s))
;; having to re-produce this is directly bc of no occurrence typing
(some (task tid (map-work s)))]
[(task (bind tid TaskID) (reduce-work (right (bind v1 TaskResult))
(right (bind v2 TaskResult))))
[(task $tid (reduce-work (right $v1)
(right $v2)))
(some (task tid (reduce-work v1 v2)))]
[discard
[_
none]))
;; Task Id Any -> Task
@ -269,13 +269,11 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[data : TaskResult]
-> PendingTask)
(match t
[(task (bind tid TaskID)
(reduce-work (left id) (bind r (Either TaskID TaskResult))))
[(task $tid (reduce-work (left id) $r))
(task tid (reduce-work (right data) r))]
[(task (bind tid TaskID)
(reduce-work (bind l (Either TaskID TaskResult)) (left id)))
[(task $tid (reduce-work $l (left id)))
(task tid (reduce-work l (right data)))]
[discard t]))
[_ t]))
(require/typed "flink-support.rkt"
@ -294,7 +292,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(part tasks
(lambda ([t : PendingTask])
(match (task-ready? t)
[(some (bind ct ConcreteTask))
[(some $ct)
(right ct)]
[none
(left t)]))))
@ -306,7 +304,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(log "Job Manager Up")
;; keep track of task managers, how many slots they say are open, and how many tasks we have assigned.
(define/query-hash task-managers (task-manager (bind id ID) (bind slots Int)) id slots
(define/query-hash task-managers (task-manager $id:ID $slots:Int) id slots
#;#:on-add #;(log "JM learns that ~a has ~v slots" id slots))
;; (Hashof TaskManagerID Nat)
@ -327,7 +325,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (received-answer! [id : ID])
(set! requests-in-flight (hash-update (ref requests-in-flight) id sub1)))
(during (job (bind job-id ID) (bind tasks (List PendingTask)))
(during (job $job-id:ID ($ tasks (List PendingTask)))
(log "JM receives job ~a" job-id)
(define-tuple (not-ready ready) (partition-ready-tasks tasks))
(field [ready-tasks (List ConcreteTask) ready]
@ -337,7 +335,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; Task -> Void
(define (add-ready-task! [t : ConcreteTask])
;; TODO - use functional-queue.rkt from ../../
(match-define (task (bind tid TaskID) discard) t)
(match-define (task $tid _) t)
(log "JM marks task ~a as ready" tid)
(set! ready-tasks (cons t (ref ready-tasks))))
@ -359,10 +357,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
([t (ref waiting-tasks)])
(define t+ (task+data t task-id data))
(match (task-ready? t+)
[(some (bind ready ConcreteTask))
[(some $ready)
(add-ready-task! ready)
ts]
[discard
[_
(cons t+ ts)])))
(set! waiting-tasks still-waiting)]))
@ -374,7 +372,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(start-facet perform
(on start (set! tasks-in-progress (add1 (ref tasks-in-progress))))
(on stop (set! tasks-in-progress (sub1 (ref tasks-in-progress))))
(match-define (task (bind this-id TaskID) (bind desc ConcreteWork)) t)
(match-define (task $this-id $desc) t)
(log "JM begins on task ~a" this-id)
(define ( (ρ2) (select-a-task-manager [assign-task : (proc ID -> ★/t
@ -386,7 +384,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
#:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0))))
id))
(match mngr?
[(some (bind mngr ID))
[(some $mngr)
(take-slot! mngr)
(stop this-facet
(assign-task mngr))]
@ -407,9 +405,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; tasks were being assigned to the manager
#;(take-slot! mngr)
(start-facet take-slot
(stop-when (asserted (task-state mngr job-id this-id discard))
(received-answer! mngr))))
(on (asserted (task-state mngr job-id this-id (bind status TaskStateDesc)))
(on (asserted (task-state mngr job-id this-id _))
(stop take-slot
(received-answer! mngr)))))
(on (asserted (task-state mngr job-id this-id $status:TaskStateDesc))
(match status
[ACCEPTED #f]
[RUNNING #f]
@ -419,7 +418,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" mngr this-id)
(stop this-facet #;(select-a-task-manager))]
[(finished (bind results TaskResult))
[(finished $results)
(log "JM receives the results of task ~a" this-id)
(stop perform (k this-id results))]))))