From 9865fe4efd474e9547a9502d70f68c18de80245d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 3 Mar 2021 16:23:22 +0100 Subject: [PATCH] Better Worker state management, including proper shutdown --- src/wload.ts | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/wload.ts b/src/wload.ts index 28cbaf5..3d6bbf5 100644 --- a/src/wload.ts +++ b/src/wload.ts @@ -12,8 +12,15 @@ const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()( const STARTING_ACTOR_ID = (threadId & (2 ** 20 - 1)) * 1000000000; __setNextActorId(STARTING_ACTOR_ID); +type TaskState = + | { readonly state: "start_pending" } + | { readonly state: "starting" } + | { readonly state: "shutdown_pending" } + | { readonly state: "running", shutdownRef: Ref }; + Turn.for(new Actor(), t => { const p = parentPort!; + let taskState: TaskState = { state: "start_pending" }; spawnRelay(t, { nextLocalOid: STARTING_ACTOR_ID + 500000000, packetWriter: bs => p.postMessage(bs), @@ -22,29 +29,34 @@ Turn.for(new Actor(), t => { p.on('close', () => Turn.for(t.actor, t => t.quit())); }, initialRef: t.ref({ - handleMap: new IdentityMap(), - async assert(t, inst, handle) { + async assert(t, inst) { if (!Instance.isClassOf(inst)) return; + + if (taskState.state !== "start_pending") return; + taskState = { state: "starting" }; + const m = await import(Instance._.moduleName(inst)); t.freshen(t => t.spawn(t => { - const q = (t: Turn) => { - this.handleMap.delete(handle); + t.actor.atExit(() => { + console.log('Worker terminating'); + process.exit(0); + }); + if (taskState.state === "shutdown_pending") { t.quit(); - }; - if (this.handleMap.has(handle)) { - q(t); } else { - this.handleMap.set(handle, t.ref({ message: q })); + taskState = { + state: "running", + shutdownRef: t.ref({ message(t) { t.quit(); } }), + }; m.default(t, Instance._.arg(inst)); } })); }, - retract(t, handle) { - const r = this.handleMap.get(handle); - if (r === void 0) { - this.handleMap.set(handle, false); + retract(t) { + if (taskState.state === "running") { + t.message(taskState.shutdownRef, true); } else { - t.message(r as Ref, true); + taskState = { state: "shutdown_pending" }; } } }),