diff --git a/packages/syntax-server/src/job.js b/packages/syntax-server/src/job.js index e8b66b1..1fd2229 100644 --- a/packages/syntax-server/src/job.js +++ b/packages/syntax-server/src/job.js @@ -1,4 +1,4 @@ -const { currentFacet, Observe, List, Set } = require("@syndicate-lang/core"); +const { currentFacet, Observe, List, Map, Set, Skeleton } = require("@syndicate-lang/core"); assertion type WorkItem(worker, item, result); assertion type Job(item, result); @@ -11,14 +11,51 @@ export { }; spawn named 'JobDispatcher' { - field this.readyWorkers = Set(); + field this.readyWorkers = Map(); // Pattern -> Set(WorkerId) field this.runningJobs = 0; - // dataflow { - // console.log(this.runningJobs, 'running jobs', this.readyWorkers.size, 'idle workers'); + // dataflow console.log(this.runningJobs + ' runningJobs; readyWorkers: ' + this.readyWorkers); + + during Observe(Observe(WorkItem($worker, $pattern, _))) { + if (!Skeleton.isCompletelyConcrete(pattern)) { + on start addWorker(pattern, worker); + on stop removeWorker(pattern, worker); + } + } + + // TODO: reuse the dataspace's index somehow to get a more efficient + // job-dispatcher? Once quoting issues are solved, something like + // + // during Observe(Observe(WorkItem(_, $pattern, _))) { + // field this.readyWorkers = Set(); + // during Observe(Observe(WorkItem($worker, pattern, _))) { ... } + // during Observe(Job(fixquoting(pattern), _)) { ... } // } - on asserted Observe(Observe(WorkItem($w, _, _))) this.readyWorkers = this.readyWorkers.add(w); - on retracted Observe(Observe(WorkItem($w, _, _))) this.readyWorkers = this.readyWorkers.remove(w); + const addWorker = (pattern, worker) => { + const rw = this.readyWorkers; + this.readyWorkers = rw.set(pattern, rw.get(pattern, Set()).add(worker)); + }; + + const removeWorker = (pattern, worker) => { + const rw = this.readyWorkers; + const ws = rw.get(pattern, Set()).remove(worker); + if (ws.isEmpty()) { + this.readyWorkers = rw.remove(pattern); + } else { + this.readyWorkers = rw.set(pattern, ws); + } + }; + + const findWorkerSet = (item) => { + const matches = this.readyWorkers.filter((ids, pat) => Skeleton.match(pat, item) !== false); + switch (matches.size) { + case 0: return [null, Set()]; + default: + console.error('Multiple workers claiming job', item, List(matches.keys())); + /* FALL THROUGH */ + case 1: return matches.entries().next().value; + } + }; during Observe(Job($item, _)) { on start this.runningJobs++; @@ -34,9 +71,10 @@ spawn named 'JobDispatcher' { react { const facet = currentFacet(); dataflow { - const worker = this.readyWorkers.first(); - if (worker !== void 0) { - this.readyWorkers = this.readyWorkers.remove(worker); + const [itemPattern, workers] = findWorkerSet(item); + if (!workers.isEmpty()) { + const worker = workers.first(); + removeWorker(itemPattern, worker); facet.stop(() => { react { stop on retracted Observe(Observe(WorkItem(worker, _, _))) { @@ -44,10 +82,8 @@ spawn named 'JobDispatcher' { waitForWorker(retryCount + 1); } stop on asserted WorkItem(worker, item, $result) { - this.readyWorkers = this.readyWorkers.add(worker); - react { - assert Job(item, result); - } + addWorker(itemPattern, worker); + react assert Job(item, result); } } });