Dispatch jobs based on advertised interest in job specs

This commit is contained in:
Tony Garnock-Jones 2018-12-14 13:02:48 +00:00
parent fb5f569342
commit bbbc1cc0a0
1 changed files with 49 additions and 13 deletions

View File

@ -1,4 +1,4 @@
const { currentFacet, Observe, List, Set } = require("@syndicate-lang/core");
const { currentFacet, Observe, List, Map, Set, Skeleton } = require("@syndicate-lang/core");
assertion type WorkItem(worker, item, result);
assertion type Job(item, result);
@ -11,14 +11,51 @@ export {
};
spawn named 'JobDispatcher' {
field this.readyWorkers = Set();
field this.readyWorkers = Map(); // Pattern -> Set(WorkerId)
field this.runningJobs = 0;
// dataflow {
// console.log(this.runningJobs, 'running jobs', this.readyWorkers.size, 'idle workers');
// dataflow console.log(this.runningJobs + ' runningJobs; readyWorkers: ' + this.readyWorkers);
during Observe(Observe(WorkItem($worker, $pattern, _))) {
if (!Skeleton.isCompletelyConcrete(pattern)) {
on start addWorker(pattern, worker);
on stop removeWorker(pattern, worker);
}
}
// TODO: reuse the dataspace's index somehow to get a more efficient
// job-dispatcher? Once quoting issues are solved, something like
//
// during Observe(Observe(WorkItem(_, $pattern, _))) {
// field this.readyWorkers = Set();
// during Observe(Observe(WorkItem($worker, pattern, _))) { ... }
// during Observe(Job(fixquoting(pattern), _)) { ... }
// }
on asserted Observe(Observe(WorkItem($w, _, _))) this.readyWorkers = this.readyWorkers.add(w);
on retracted Observe(Observe(WorkItem($w, _, _))) this.readyWorkers = this.readyWorkers.remove(w);
const addWorker = (pattern, worker) => {
const rw = this.readyWorkers;
this.readyWorkers = rw.set(pattern, rw.get(pattern, Set()).add(worker));
};
const removeWorker = (pattern, worker) => {
const rw = this.readyWorkers;
const ws = rw.get(pattern, Set()).remove(worker);
if (ws.isEmpty()) {
this.readyWorkers = rw.remove(pattern);
} else {
this.readyWorkers = rw.set(pattern, ws);
}
};
const findWorkerSet = (item) => {
const matches = this.readyWorkers.filter((ids, pat) => Skeleton.match(pat, item) !== false);
switch (matches.size) {
case 0: return [null, Set()];
default:
console.error('Multiple workers claiming job', item, List(matches.keys()));
/* FALL THROUGH */
case 1: return matches.entries().next().value;
}
};
during Observe(Job($item, _)) {
on start this.runningJobs++;
@ -34,9 +71,10 @@ spawn named 'JobDispatcher' {
react {
const facet = currentFacet();
dataflow {
const worker = this.readyWorkers.first();
if (worker !== void 0) {
this.readyWorkers = this.readyWorkers.remove(worker);
const [itemPattern, workers] = findWorkerSet(item);
if (!workers.isEmpty()) {
const worker = workers.first();
removeWorker(itemPattern, worker);
facet.stop(() => {
react {
stop on retracted Observe(Observe(WorkItem(worker, _, _))) {
@ -44,10 +82,8 @@ spawn named 'JobDispatcher' {
waitForWorker(retryCount + 1);
}
stop on asserted WorkItem(worker, item, $result) {
this.readyWorkers = this.readyWorkers.add(worker);
react {
assert Job(item, result);
}
addWorker(itemPattern, worker);
react assert Job(item, result);
}
}
});