From 98a779bdc1a742a6d636e0d7f3a395f751271a73 Mon Sep 17 00:00:00 2001 From: Sam Caldwell Date: Fri, 24 May 2019 11:42:21 -0400 Subject: [PATCH] resolve mutual dependency in flink via dataflow --- racket/typed/examples/roles/flink.rkt | 58 ++++++++++++++------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/racket/typed/examples/roles/flink.rkt b/racket/typed/examples/roles/flink.rkt index e8dbce6..8e6c4f0 100644 --- a/racket/typed/examples/roles/flink.rkt +++ b/racket/typed/examples/roles/flink.rkt @@ -369,22 +369,8 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (match-define (task $this-id $desc) t) (log "JM begins on task ~a" this-id) - (define (∀ (ρ2) (select-a-task-manager [assign-task : (proc ID -> ★/t - #:roles (ρ2))])) - (start-facet this-facet - (begin/dataflow - (define mngr? - (for/first ([(id slots) (ref task-managers)] - #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) - id)) - (match mngr? - [(some $mngr) - (take-slot! mngr) - (stop this-facet - (assign-task mngr))] - [none - #f]) - #f))) + (define not-a-real-task-manager (gensym 'FAKE)) + (field [task-mngr ID not-a-real-task-manager]) ;; ID -> ... (define (assign-task [mngr : ID]) @@ -392,16 +378,17 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (assert (task-assignment mngr job-id t)) (on (retracted (task-manager mngr discard)) ;; our task manager has crashed - (stop this-facet #;(select-a-task-manager))) + (stop this-facet + (set! task-mngr not-a-real-task-manager))) (on start - ;; N.B. when this line was here, and not after `(when mngr ...)` above, - ;; things didn't work. I think that due to script scheduling, all ready - ;; tasks were being assigned to the manager - #;(take-slot! mngr) - (start-facet take-slot - (on (asserted (task-state mngr job-id this-id _)) - (stop take-slot - (received-answer! mngr))))) + ;; N.B. when this line was here, and not after `(when mngr ...)` above, + ;; things didn't work. I think that due to script scheduling, all ready + ;; tasks were being assigned to the manager + #;(take-slot! mngr) + (start-facet take-slot + (on (asserted (task-state mngr job-id this-id _)) + (stop take-slot + (received-answer! mngr))))) (on (asserted (task-state mngr job-id this-id $status)) (match status [ACCEPTED #f] @@ -411,12 +398,29 @@ The JobManager then performs the job and, when finished, asserts (job-finished I ;; don't think we need a release-slot! here, because if we've heard back from a task manager, ;; they should have told us a different slot count since we tried to give them work (log "JM overloaded manager ~a with task ~a" mngr this-id) - (stop this-facet #;(select-a-task-manager))] + (stop this-facet + (set! task-mngr not-a-real-task-manager))] [(finished $results) (log "JM receives the results of task ~a" this-id) (stop perform (k this-id results))])))) - (on start (select-a-task-manager assign-task)))) + (define (select-a-task-manager) + (start-facet this-facet + (begin/dataflow + (when (equal? (ref task-mngr) not-a-real-task-manager) + (define mngr? + (for/first ([(id slots) (ref task-managers)] + #:when (positive? (- slots (hash-ref/failure (ref requests-in-flight) id 0)))) + id)) + (match mngr? + [(some $mngr) + (take-slot! mngr) + (set! task-mngr mngr) + (assign-task mngr)] + [none + #f]))))) + + (on start (select-a-task-manager)))) (begin/dataflow (define slots (slots-available))