Better Worker state management, including proper shutdown

This commit is contained in:
Tony Garnock-Jones 2021-03-03 16:23:22 +01:00
parent 5c3408f488
commit 9865fe4efd
1 changed files with 25 additions and 13 deletions

View File

@ -12,8 +12,15 @@ const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()(
const STARTING_ACTOR_ID = (threadId & (2 ** 20 - 1)) * 1000000000; const STARTING_ACTOR_ID = (threadId & (2 ** 20 - 1)) * 1000000000;
__setNextActorId(STARTING_ACTOR_ID); __setNextActorId(STARTING_ACTOR_ID);
type TaskState =
| { readonly state: "start_pending" }
| { readonly state: "starting" }
| { readonly state: "shutdown_pending" }
| { readonly state: "running", shutdownRef: Ref };
Turn.for(new Actor(), t => { Turn.for(new Actor(), t => {
const p = parentPort!; const p = parentPort!;
let taskState: TaskState = { state: "start_pending" };
spawnRelay(t, { spawnRelay(t, {
nextLocalOid: STARTING_ACTOR_ID + 500000000, nextLocalOid: STARTING_ACTOR_ID + 500000000,
packetWriter: bs => p.postMessage(bs), packetWriter: bs => p.postMessage(bs),
@ -22,29 +29,34 @@ Turn.for(new Actor(), t => {
p.on('close', () => Turn.for(t.actor, t => t.quit())); p.on('close', () => Turn.for(t.actor, t => t.quit()));
}, },
initialRef: t.ref({ initialRef: t.ref({
handleMap: new IdentityMap<Handle, false | Ref>(), async assert(t, inst) {
async assert(t, inst, handle) {
if (!Instance.isClassOf(inst)) return; if (!Instance.isClassOf(inst)) return;
if (taskState.state !== "start_pending") return;
taskState = { state: "starting" };
const m = await import(Instance._.moduleName(inst)); const m = await import(Instance._.moduleName(inst));
t.freshen(t => t.spawn(t => { t.freshen(t => t.spawn(t => {
const q = (t: Turn) => { t.actor.atExit(() => {
this.handleMap.delete(handle); console.log('Worker terminating');
process.exit(0);
});
if (taskState.state === "shutdown_pending") {
t.quit(); t.quit();
};
if (this.handleMap.has(handle)) {
q(t);
} else { } else {
this.handleMap.set(handle, t.ref({ message: q })); taskState = {
state: "running",
shutdownRef: t.ref({ message(t) { t.quit(); } }),
};
m.default(t, Instance._.arg(inst)); m.default(t, Instance._.arg(inst));
} }
})); }));
}, },
retract(t, handle) { retract(t) {
const r = this.handleMap.get(handle); if (taskState.state === "running") {
if (r === void 0) { t.message(taskState.shutdownRef, true);
this.handleMap.set(handle, false);
} else { } else {
t.message(r as Ref, true); taskState = { state: "shutdown_pending" };
} }
} }
}), }),