flink: task runners don't need a status
This commit is contained in:
parent
33b516b7a6
commit
42d025cc7f
|
@ -1,12 +1,6 @@
|
||||||
#lang syndicate
|
#lang syndicate
|
||||||
|
|
||||||
(require (only-in racket/set
|
(require racket/set)
|
||||||
set
|
|
||||||
set-count
|
|
||||||
set-empty?
|
|
||||||
set-first
|
|
||||||
set-remove
|
|
||||||
set-add))
|
|
||||||
(require (only-in racket/list
|
(require (only-in racket/list
|
||||||
partition
|
partition
|
||||||
empty?
|
empty?
|
||||||
|
@ -18,6 +12,7 @@
|
||||||
string-trim))
|
string-trim))
|
||||||
(require (only-in racket/sequence
|
(require (only-in racket/sequence
|
||||||
sequence->list))
|
sequence->list))
|
||||||
|
(require (only-in racket/function const))
|
||||||
|
|
||||||
(module+ test
|
(module+ test
|
||||||
(require rackunit))
|
(require rackunit))
|
||||||
|
@ -53,15 +48,17 @@ perform a task.
|
||||||
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
|
The resources available to a TM are its associated TaskRunners (TRs). TaskRunners
|
||||||
assert their presence with (task-runner ID)
|
assert their presence with (task-runner ID)
|
||||||
|
|
||||||
a Status is one of
|
|
||||||
- IDLE, when the TR is not executing a task
|
|
||||||
- (executing ID), when the TR is executing the task with the given ID
|
|
||||||
- OVERLOAD, when the TR has been asked to perform a task before it has
|
|
||||||
finished its previous assignment. For the purposes of this model, it indicates a
|
|
||||||
failure in the protocol; like the exchange between the JM and the TM, a TR
|
|
||||||
should only receive tasks when it is IDLE.
|
|
||||||
|#
|
|#
|
||||||
(assertion-struct task-runner (id status))
|
(assertion-struct task-runner (id))
|
||||||
|
#|
|
||||||
|
a Status is one of
|
||||||
|
- IDLE, when the TR is not executing a task
|
||||||
|
- (executing ID), when the TR is executing the task with the given ID
|
||||||
|
- OVERLOAD, when the TR has been asked to perform a task before it has
|
||||||
|
finished its previous assignment. For the purposes of this model, it indicates a
|
||||||
|
failure in the protocol; like the exchange between the JM and the TM, a TR
|
||||||
|
should only receive tasks when it is IDLE.
|
||||||
|
|#
|
||||||
(define IDLE 'idle)
|
(define IDLE 'idle)
|
||||||
(define OVERLOAD 'overload)
|
(define OVERLOAD 'overload)
|
||||||
(struct executing (id) #:transparent)
|
(struct executing (id) #:transparent)
|
||||||
|
@ -180,18 +177,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(define (spawn-task-runner)
|
(define (spawn-task-runner)
|
||||||
(define id (gensym 'task-runner))
|
(define id (gensym 'task-runner))
|
||||||
(spawn #:name id
|
(spawn #:name id
|
||||||
(field [status IDLE])
|
(assert (task-runner id))
|
||||||
(define (idle?) (equal? IDLE (status)))
|
|
||||||
(assert (task-runner id (status)))
|
|
||||||
(begin/dataflow
|
|
||||||
(log "task-runner ~v state is: ~a" id (status)))
|
|
||||||
;; Task (TaskStateDesc -> Void) -> Void
|
;; Task (TaskStateDesc -> Void) -> Void
|
||||||
(define (perform-task tsk job-id on-complete! update-status!)
|
(define (perform-task tsk job-id on-complete! update-status!)
|
||||||
(unless (idle?)
|
|
||||||
(error "tried to perform a task when not idle"))
|
|
||||||
;; since we currently finish everything in one turn, these changes to status aren't
|
|
||||||
;; actually visible.
|
|
||||||
(status RUNNING)
|
|
||||||
(match-define (task tid desc) tsk)
|
(match-define (task tid desc) tsk)
|
||||||
(match desc
|
(match desc
|
||||||
[(map-work data)
|
[(map-work data)
|
||||||
|
@ -199,10 +187,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(on-complete! wc)]
|
(on-complete! wc)]
|
||||||
[(reduce-work left right)
|
[(reduce-work left right)
|
||||||
(define wc (hash-union left right #:combine +))
|
(define wc (hash-union left right #:combine +))
|
||||||
(on-complete! wc)])
|
(on-complete! wc)]))
|
||||||
(status IDLE))
|
|
||||||
(on-start
|
(on-start
|
||||||
(task-performer id idle? perform-task))))
|
(task-performer id (const #t) perform-task))))
|
||||||
|
|
||||||
;; (Hash String Nat) String -> (Hash String Nat)
|
;; (Hash String Nat) String -> (Hash String Nat)
|
||||||
(define (word-count-increment h word)
|
(define (word-count-increment h word)
|
||||||
|
@ -244,27 +231,25 @@ The JobManager then performs the job and, when finished, asserts (job-finished I
|
||||||
(log "Task Manager (TM) ~a is running" id)
|
(log "Task Manager (TM) ~a is running" id)
|
||||||
(during (job-manager-alive)
|
(during (job-manager-alive)
|
||||||
(log "TM learns about JM")
|
(log "TM learns about JM")
|
||||||
(define/query-set task-runners (task-runner $id _) id
|
(define/query-set task-runners (task-runner $id) id
|
||||||
#:on-add (log "TM learns about task-runner ~a" id))
|
#:on-add (log "TM learns about task-runner ~a" id))
|
||||||
;; I wonder just how inefficient this is
|
(field [busy-runners (set)])
|
||||||
(define/query-set idle-runners (task-runner $id IDLE) id
|
(define/dataflow idle-runners
|
||||||
#:on-add (log "TM learns that task-runner ~a is IDLE" id)
|
(set-count (set-subtract (task-runners) (busy-runners))))
|
||||||
#:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id))
|
(assert (task-manager id (idle-runners)))
|
||||||
(assert (task-manager id (set-count (idle-runners))))
|
|
||||||
(field [busy-runners (list)])
|
|
||||||
(define (can-accept?)
|
(define (can-accept?)
|
||||||
(not (set-empty? (idle-runners))))
|
(positive? (idle-runners)))
|
||||||
|
(define (select-runner)
|
||||||
|
(define runner (for/first ([r (in-set (task-runners))]
|
||||||
|
#:unless (set-member? (busy-runners) r))
|
||||||
|
r))
|
||||||
|
(busy-runners (set-add (busy-runners) runner))
|
||||||
|
runner)
|
||||||
(define (perform-task tsk job-id on-complete! update-status!)
|
(define (perform-task tsk job-id on-complete! update-status!)
|
||||||
(match-define (task task-id desc) tsk)
|
(match-define (task task-id desc) tsk)
|
||||||
(define runner (set-first (idle-runners)))
|
(define runner (select-runner))
|
||||||
;; n.b. modifying a query set is questionable
|
|
||||||
;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner.
|
|
||||||
;; Could use the busy-runners field to avoid that
|
|
||||||
(idle-runners (set-remove (idle-runners) runner))
|
|
||||||
(log "TM assigns task ~a to runner ~a" task-id runner)
|
(log "TM assigns task ~a to runner ~a" task-id runner)
|
||||||
;; TODO - since we're both adding and removing from this set I'm not sure TRs
|
(on-stop (busy-runners (set-remove (busy-runners) runner)))
|
||||||
;; need to be making assertions about their idleness
|
|
||||||
(on-stop (idle-runners (set-add (idle-runners) runner)))
|
|
||||||
(on-start
|
(on-start
|
||||||
(task-assigner tsk job-id runner
|
(task-assigner tsk job-id runner
|
||||||
(lambda () (update-status! OVERLOAD))
|
(lambda () (update-status! OVERLOAD))
|
||||||
|
|
Loading…
Reference in New Issue