This commit is contained in:
Sam Caldwell 2019-12-30 16:27:29 -05:00
parent f5331eb24f
commit 8afed87e99
2 changed files with 276 additions and 388 deletions

View File

@ -126,7 +126,8 @@ TaskRunners.
(Role (listen)
(During (Observe (TaskPerformance ID ConcreteTask ★/t))
;; would be nice to say how the IDs and TaskIDs relate to each other
(Shares (TaskPerformance TaskID TaskStateDesc)))))
;; BUG in spec; ConcreteTask used to be just TaskID (when I streamlined protocol)
(Shares (TaskPerformance ID ConcreteTask TaskStateDesc)))))
#|
Job Submission Protocol

View File

@ -174,6 +174,7 @@
(define (build-transitions D effs)
(for*/set ([eff* (in-set effs)]
[txn (in-set (apply-effects eff* current ft roles#))]
;; TODO - why?
;; filter effect-free self-loops
#:unless (and (empty? (transition-effs txn))
(equal? (transition-dest txn) current)))
@ -2276,84 +2277,80 @@
;; ---------------------------------------------------------------------------
;; Flink Examples
(define job-manager-actual
(define task-assigner-spec
'(Role
(jm)
(Shares (JobManagerAlive))
(assign)
(Shares
(Observe
(TaskPerformance
Symbol
(Task
(Tuple Int Symbol)
(U (MapWork String) (ReduceWork (Hash String Int) (Hash String Int))))
★/t)))
(Reacts
(Asserted
(Job
(Bind Symbol)
(Bind (List (Task Int (U (MapWork String) (ReduceWork Int Int)))))))
(TaskPerformance
Symbol
(Task
(Tuple Int Symbol)
(U (MapWork String) (ReduceWork (Hash String Int) (Hash String Int))))
★/t))
(Branch (Stop assign) (Effs)))))
(module+ test
(test-case "parse and compile task-assigner-spec"
(check-true (Role? (parse-T task-assigner-spec)))
(check-true (role-graph? (compile (parse-T task-assigner-spec))))))
(define task-performer-spec
'(Role
(listen)
(Reacts
(Asserted
(Observe-
(TaskPerformance-
Symbol
(Task-
(Tuple- Int Symbol)
(U
(MapWork- String)
(ReduceWork- (Hash- String Int) (Hash- String Int))))
★/t)))
(Role
(during-inner)
(Reacts
OnDataflow
(Role
(perform)
(Reacts
OnStart
(Role
(select)
(Reacts
OnDataflow
(Branch
(Effs
(Branch
(Effs
(Role
(assign)
(Shares
(TaskAssignment
Symbol
Symbol
(Task
Int
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))))
(Reacts
(Asserted
(TaskState
Symbol
Symbol
Int
(Bind (U (Finished (Hash String Int)) Symbol))))
(Branch
(Effs)
(Effs)
(Effs (Stop assign))
(Effs
(Stop
perform
(Branch
(Effs
(Role
(done)
(Shares (JobFinished Symbol (Hash String Int)))))
(Effs))))))
(Reacts
OnStart
(Role
(take-slot)
(Reacts
(Asserted (TaskState Symbol Symbol Int Discard))
(Stop take-slot))))
(Reacts (Retracted (TaskManager Symbol Discard)) (Stop assign))))
(Effs)))
(Effs)))))
(Reacts OnStop)
(Reacts OnStart)))
(Reacts
(Retracted
(Job
(Observe-
(TaskPerformance-
Symbol
(List (Task Int (U (MapWork String) (ReduceWork Int Int))))))
(Stop during-inner))))
(Reacts (Retracted (TaskManager (Bind Symbol) (Bind Int))))
(Reacts (Asserted (TaskManager (Bind Symbol) (Bind Int))))))
(Task-
(Tuple- Int Symbol)
(U
(MapWork- String)
(ReduceWork- (Hash- String Int) (Hash- String Int))))
★/t)))
(Stop during-inner))
(Shares
(TaskPerformance-
Symbol
(Task-
(Tuple- Int Symbol)
(U
(MapWork- String)
(ReduceWork- (Hash- String Int) (Hash- String Int))))
(U (Finished- (Hash- String Int)) Symbol)))))))
(module+ test
(test-case "parse and compile task-performer-spec"
(check-true (Role? (parse-T task-performer-spec)))
(check-true (role-graph? (compile (parse-T task-performer-spec))))))
(define job-manager-actual
'())
#;(module+ test
(test-case
"job manager reads and compiles"
(define jmr (parse-T job-manager-actual))
@ -2362,162 +2359,62 @@
(check-true (role-graph? jm))
(check-true (simulates? jmr jmr))))
(define task-performer-spec
'(Role
(listen)
(Reacts
(Asserted
(TaskAssignment
Symbol
Symbol
(Task
Int
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))))
(Role
(during-inner)
(Reacts
(Retracted
(TaskAssignment
Symbol
Symbol
(Task
Int
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))))
(Stop during-inner))
(Shares
(TaskState
Symbol
Symbol
Int
(U (Finished (Hash String Int)) Symbol)))))))
(module+ test
(test-case "parse and compile task-performer-spec"
(check-true (Role? (parse-T task-performer-spec)))
(check-true (role-graph? (compile (parse-T task-performer-spec))))))
(define task-runner-ty
'(Role
(runner)
(Shares (TaskRunner Symbol (U (Executing Int) Symbol)))
(Shares (TaskRunner Symbol))
(Reacts
(Asserted
(TaskAssignment
(Observe
(TaskPerformance
Symbol
(Bind Symbol)
(Task
(Bind Int)
(Bind
(Task
(Tuple Int Symbol)
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int)))))))
(ReduceWork (Hash String Int) (Hash String Int)))))
Discard)))
(Role
(during-inner)
(Shares
(TaskState Symbol Symbol Int (U (Finished (Hash String Int)) Symbol)))
(Reacts
(Retracted
(TaskAssignment
Symbol
(TaskPerformance
Symbol
(Task
Int
(Tuple Int Symbol)
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))))
(ReduceWork (Hash String Int) (Hash String Int))))
(U (Finished (Hash String Int)) Symbol)))
(Reacts
(Retracted
(Observe
(TaskPerformance
Symbol
(Task
(Tuple Int Symbol)
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))
Discard)))
(Stop during-inner))))
(Reacts OnDataflow)))
(Reacts (Retracted (TaskManager Symbol Discard)) (Stop runner))))
(module+ test
(test-case "parse and compile task-runner-ty"
(check-true (Role? (parse-T task-runner-ty)))
(check-true (role-graph? (compile (parse-T task-runner-ty))))
(check-true (simulates? (parse-T task-runner-ty)
(parse-T task-performer-spec)))))
(define task-assigner-spec
'(Role
(assign)
(Shares
(TaskAssignment
Symbol
Symbol
(Task
Int
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))))
(Reacts
(Asserted (TaskState Symbol Symbol Int ★/t))
())))
(module+ test
(test-case "parse and compile task-assigner-spec"
(check-true (Role? (parse-T task-assigner-spec)))
(check-true (role-graph? (compile (parse-T task-assigner-spec))))))
(check-true (role-graph? (compile (parse-T task-runner-ty)))))
(test-case "task-runner subgraph(s) simulate task-performer"
(define tr (parse-T task-runner-ty))
(define tpr (parse-T task-performer-spec))
(define ans (run/timeout (thunk (simulating-subgraphs tr tpr))))
(check-true (list? ans))
(check-false (empty? ans))))
(define task-manager-ty
'(Role
(tm)
(Reacts
(Asserted (JobManagerAlive))
(Role
(during-inner1)
(Shares (TaskManager Symbol Int))
(Reacts
(Asserted
(TaskAssignment
Symbol
(Bind Symbol)
(Task
(Bind Int)
(Bind
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int)))))))
(Role
(during-inner2)
(Shares
(TaskAssignment
Symbol
Symbol
(Task
Int
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))))
(Shares
(TaskState Symbol Symbol Int (U (Finished (Hash String Int)) Symbol)))
(Reacts
(Asserted
(TaskState
Symbol
Symbol
Int
(Bind (U (Finished (Hash String Int)) Symbol)))))
(Reacts OnStop)
(Reacts
(Retracted
(TaskAssignment
Symbol
Symbol
(Task
Int
(U
(MapWork String)
(ReduceWork (Hash String Int) (Hash String Int))))))
(Stop during-inner2))))
(Reacts (Retracted (TaskRunner (Bind Symbol) (U (Executing Int) Symbol))))
(Reacts (Asserted (TaskRunner (Bind Symbol) (U (Executing Int) Symbol))))
(Reacts (Retracted (TaskRunner (Bind Symbol) Discard)))
(Reacts (Asserted (TaskRunner (Bind Symbol) Discard)))
(Reacts (Retracted (JobManagerAlive)) (Stop during-inner1))))))
'())
(module+ test
#;(module+ test
(test-case "parse and compile task-manager-ty"
(check-true (Role? (parse-T task-manager-ty)))
(check-true (role-graph? (compile (parse-T task-manager-ty)))))
@ -2529,8 +2426,27 @@
(check-false (simulates? tm (parse-T task-assigner-spec)))
(check-false (simulates? tm (parse-T task-performer-spec)))))
;; has a bug with done facet dying too soon
(define job-manager-v2
#;(module+ test
(test-case
"job manager with internal events basic functionality"
(define jmr (run/timeout (thunk (label-internal-events (parse-T job-manager-v2)))))
(check-true (Role? jmr))
(define jmrg (compile jmr))
(check-true (role-graph? jmrg))
(check-true (simulates? jmr jmr)))
(test-case
"job manager subgraph(s) implement task assigner"
(define jmr (parse-T job-manager-v2))
(define tar (parse-T task-assigner-spec))
;; TODO - would be good to have a timeout
(define ans (simulating-subgraphs jmr tar))
(check-true (list? ans))
(check-false (empty? ans))))
(module+ done-facet-dying-too-soon
;; has a bug with done facet dying too soon
(define job-manager-v2
'(Role
(jm)
(Shares (JobManagerAlive))
@ -2617,25 +2533,8 @@
(Reacts (Retracted (TaskManager (Bind Symbol) (Bind Int))))
(Reacts (Asserted (TaskManager (Bind Symbol) (Bind Int))))))
(module+ test
(test-case
"job manager with internal events basic functionality"
(define jmr (run/timeout (thunk (label-internal-events (parse-T job-manager-v2)))))
(check-true (Role? jmr))
(define jmrg (compile jmr))
(check-true (role-graph? jmrg))
(check-true (simulates? jmr jmr)))
(test-case
"job manager subgraph(s) implement task assigner"
(define jmr (parse-T job-manager-v2))
(define tar (parse-T task-assigner-spec))
;; TODO - would be good to have a timeout
(define ans (simulating-subgraphs jmr tar))
(check-true (list? ans))
(check-false (empty? ans))))
;; fixed above bug
(define job-manager-v3
;; fixed above bug
(define job-manager-v3
'(Role
(jm)
(Shares (JobManagerAlive))
@ -2721,19 +2620,7 @@
(List (Task Int (U (MapWork String) (ReduceWork Int Int))))))
(Stop during-inner))))
(Reacts (Retracted (TaskManager (Bind Symbol) (Bind Int))))
(Reacts (Asserted (TaskManager (Bind Symbol) (Bind Int))))))
(module+ test
(test-case
"job manager v3 basic functionality"
(define jmr (run/timeout (thunk (parse-T job-manager-v3))))
(check-true (Role? jmr))
(define jmrg (run/timeout (thunk (compile jmr))))
(check-true (role-graph? jmrg))
(check-true (run/timeout (thunk (simulates? jmr jmr))))
(define jmrgi (run/timeout (thunk (compile/internal-events jmrg jmr))))
(check-true (role-graph? jmrgi))
(check-true (run/timeout (thunk (simulates?/rg jmrgi jmr jmrgi jmr))))))
(Reacts (Asserted (TaskManager (Bind Symbol) (Bind Int)))))))
;; ---------------------------------------------------------------------------
;; Message Examples/Tests