diff --git a/racket/syndicate/examples/actor/flink.rkt b/racket/syndicate/examples/actor/flink.rkt index afb9046..756fb35 100644 --- a/racket/syndicate/examples/actor/flink.rkt +++ b/racket/syndicate/examples/actor/flink.rkt @@ -1,12 +1,6 @@ #lang syndicate -(require (only-in racket/set - set - set-count - set-empty? - set-first - set-remove - set-add)) +(require racket/set) (require (only-in racket/list partition empty? @@ -18,6 +12,7 @@ string-trim)) (require (only-in racket/sequence sequence->list)) +(require (only-in racket/function const)) (module+ test (require rackunit)) @@ -53,15 +48,17 @@ perform a task. The resources available to a TM are its associated TaskRunners (TRs). TaskRunners assert their presence with (task-runner ID) -a Status is one of - - IDLE, when the TR is not executing a task - - (executing ID), when the TR is executing the task with the given ID - - OVERLOAD, when the TR has been asked to perform a task before it has - finished its previous assignment. For the purposes of this model, it indicates a - failure in the protocol; like the exchange between the JM and the TM, a TR - should only receive tasks when it is IDLE. |# -(assertion-struct task-runner (id status)) +(assertion-struct task-runner (id)) +#| +a Status is one of +- IDLE, when the TR is not executing a task +- (executing ID), when the TR is executing the task with the given ID +- OVERLOAD, when the TR has been asked to perform a task before it has +finished its previous assignment. For the purposes of this model, it indicates a +failure in the protocol; like the exchange between the JM and the TM, a TR +should only receive tasks when it is IDLE. +|# (define IDLE 'idle) (define OVERLOAD 'overload) (struct executing (id) #:transparent) @@ -180,18 +177,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (define (spawn-task-runner) (define id (gensym 'task-runner)) (spawn #:name id - (field [status IDLE]) - (define (idle?) (equal? IDLE (status))) - (assert (task-runner id (status))) - (begin/dataflow - (log "task-runner ~v state is: ~a" id (status))) + (assert (task-runner id)) ;; Task (TaskStateDesc -> Void) -> Void (define (perform-task tsk job-id on-complete! update-status!) - (unless (idle?) - (error "tried to perform a task when not idle")) - ;; since we currently finish everything in one turn, these changes to status aren't - ;; actually visible. - (status RUNNING) (match-define (task tid desc) tsk) (match desc [(map-work data) @@ -199,10 +187,9 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (on-complete! wc)] [(reduce-work left right) (define wc (hash-union left right #:combine +)) - (on-complete! wc)]) - (status IDLE)) + (on-complete! wc)])) (on-start - (task-performer id idle? perform-task)))) + (task-performer id (const #t) perform-task)))) ;; (Hash String Nat) String -> (Hash String Nat) (define (word-count-increment h word) @@ -244,27 +231,25 @@ The JobManager then performs the job and, when finished, asserts (job-finished I (log "Task Manager (TM) ~a is running" id) (during (job-manager-alive) (log "TM learns about JM") - (define/query-set task-runners (task-runner $id _) id + (define/query-set task-runners (task-runner $id) id #:on-add (log "TM learns about task-runner ~a" id)) - ;; I wonder just how inefficient this is - (define/query-set idle-runners (task-runner $id IDLE) id - #:on-add (log "TM learns that task-runner ~a is IDLE" id) - #:on-remove (log "TM learns that task-runner ~a is NOT IDLE" id)) - (assert (task-manager id (set-count (idle-runners)))) - (field [busy-runners (list)]) + (field [busy-runners (set)]) + (define/dataflow idle-runners + (set-count (set-subtract (task-runners) (busy-runners)))) + (assert (task-manager id (idle-runners))) (define (can-accept?) - (not (set-empty? (idle-runners)))) + (positive? (idle-runners))) + (define (select-runner) + (define runner (for/first ([r (in-set (task-runners))] + #:unless (set-member? (busy-runners) r)) + r)) + (busy-runners (set-add (busy-runners) runner)) + runner) (define (perform-task tsk job-id on-complete! update-status!) (match-define (task task-id desc) tsk) - (define runner (set-first (idle-runners))) - ;; n.b. modifying a query set is questionable - ;; but if we wait for the IDLE assertion to be retracted, we might assign multiple tasks to the same runner. - ;; Could use the busy-runners field to avoid that - (idle-runners (set-remove (idle-runners) runner)) + (define runner (select-runner)) (log "TM assigns task ~a to runner ~a" task-id runner) - ;; TODO - since we're both adding and removing from this set I'm not sure TRs - ;; need to be making assertions about their idleness - (on-stop (idle-runners (set-add (idle-runners) runner))) + (on-stop (busy-runners (set-remove (busy-runners) runner))) (on-start (task-assigner tsk job-id runner (lambda () (update-status! OVERLOAD))