replace some dataflow with internal events in typed flink

This commit is contained in:
Sam Caldwell 2019-06-17 11:25:09 -04:00
parent 5f38b6cc94
commit 8f8f4c416f
1 changed files with 46 additions and 29 deletions

View File

@ -219,7 +219,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (spawn-task-manager) (define (spawn-task-manager)
(define id (gensym 'task-manager)) (define id (gensym 'task-manager))
(spawn τc (spawn τc
(print-role (begin
(start-facet tm (start-facet tm
(log "Task Manager (TM) ~a is running" id) (log "Task Manager (TM) ~a is running" id)
(during (job-manager-alive) (during (job-manager-alive)
@ -320,9 +320,12 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
[(task $id (reduce-work $l $r)) [(task $id (reduce-work $l $r))
(task id (reduce-work (left l) (left r)))])) (task id (reduce-work (left l) (left r)))]))
(assertion-struct assigned-task : SelectedTM (mngr))
(message-struct tasks-finished : TasksFinished (id))
(define (spawn-job-manager) (define (spawn-job-manager)
(spawn τc (spawn τc
(begin (print-role
(start-facet jm (start-facet jm
(assert (job-manager-alive)) (assert (job-manager-alive))
(log "Job Manager Up") (log "Job Manager Up")
@ -375,6 +378,7 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(empty? (ref ready-tasks)) (empty? (ref ready-tasks))
(empty? (ref waiting-tasks))) (empty? (ref waiting-tasks)))
(log "JM finished with job ~a" job-id) (log "JM finished with job ~a" job-id)
(realize! (tasks-finished job-id))
(start-facet done (assert (job-finished job-id data)))] (start-facet done (assert (job-finished job-id data)))]
[else [else
;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done. ;; TODO - in MapReduce, there should be either 1 waiting task, or 0, meaning the job is done.
@ -408,10 +412,10 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(define (assign-task [mngr : ID]) (define (assign-task [mngr : ID])
(start-facet assign (start-facet assign
(assert (task-assignment mngr job-id t)) (assert (task-assignment mngr job-id t))
(on (retracted (task-manager mngr discard)) (know (assigned-task mngr))
(on (retracted (task-manager mngr _))
;; our task manager has crashed ;; our task manager has crashed
(stop assign (stop assign))
(set! task-mngr not-a-real-task-manager)))
(on start (on start
;; N.B. when this line was here, and not after `(when mngr ...)` above, ;; N.B. when this line was here, and not after `(when mngr ...)` above,
;; things didn't work. I think that due to script scheduling, all ready ;; things didn't work. I think that due to script scheduling, all ready
@ -430,39 +434,52 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
;; don't think we need a release-slot! here, because if we've heard back from a task manager, ;; don't think we need a release-slot! here, because if we've heard back from a task manager,
;; they should have told us a different slot count since we tried to give them work ;; they should have told us a different slot count since we tried to give them work
(log "JM overloaded manager ~a with task ~a" mngr this-id) (log "JM overloaded manager ~a with task ~a" mngr this-id)
(stop assign (stop assign)]
(set! task-mngr not-a-real-task-manager))]
[(finished $results) [(finished $results)
(log "JM receives the results of task ~a" this-id) (log "JM receives the results of task ~a" this-id)
(stop perform (k this-id results))])))) (stop perform (k this-id results))]))))
(define (select-a-task-manager) (define (select-a-task-manager)
(start-facet select (start-facet select
(field [mngr (Maybe ID) none])
(define (try-assign!)
(define mngr?
(for/first ([(id slots) (ref task-managers)]
#:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0))))
id))
(match mngr?
[(some $m)
(take-slot! m)
(set! mngr (some m))
(assign-task m)]
[none
#f]))
(begin/dataflow (begin/dataflow
(when (equal? (ref task-mngr) not-a-real-task-manager) (when (equal? (ref mngr) none)
(define mngr? (try-assign!)))
(for/first ([(id slots) (ref task-managers)]
#:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) (on (forget (assigned-task $m:ID))
id)) (when (equal? (some m) (ref mngr))
(match mngr? (set! mngr none)))))
[(some $mngr)
(take-slot! mngr)
(set! task-mngr mngr)
(assign-task mngr)]
[none
#f])))))
(on start (select-a-task-manager)))) (on start (select-a-task-manager))))
(begin/dataflow (on start
(define slots (slots-available)) (start-facet delegate-tasks
(define-tuple (ts readys) (on (realize (tasks-finished job-id))
(split-at/lenient (ref ready-tasks) slots)) (stop delegate-tasks))
(for ([t ts]) (begin/dataflow
(perform-task t push-results)) (define slots (slots-available))
(unless (empty? ts) (define-tuple (ts readys)
;; the empty? check may be necessary to avoid a dataflow loop (split-at/lenient (ref ready-tasks) slots))
(set! ready-tasks readys)))))))) (for ([t ts])
(perform-task t push-results))
(unless (empty? ts)
;; the empty? check may be necessary to avoid a dataflow loop
(set! ready-tasks readys))))))))))
;; --------------------------------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------------------------------
;; Client ;; Client
@ -492,5 +509,5 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
(spawn-task-manager) (spawn-task-manager)
(spawn-task-runner) (spawn-task-runner)
(spawn-task-runner) (spawn-task-runner)
#;(spawn-client (file->job "lorem.txt")) (spawn-client (file->job "lorem.txt"))
(spawn-client (string->job INPUT))) (spawn-client (string->job INPUT)))