From b922a53d6aad26116b847599aa61ae4868191840 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 2 Mar 2021 16:42:53 +0100 Subject: [PATCH] Simpler wload --- package.json | 2 +- src/actor.ts | 4 +--- src/main.ts | 2 +- src/relay.ts | 36 +++++++++++++++++----------- src/wload.ts | 67 ++++++++++++++++++++++++++-------------------------- 5 files changed, 58 insertions(+), 53 deletions(-) diff --git a/package.json b/package.json index 2e33807..221e62e 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "typescript": "^4.1.5" }, "dependencies": { - "preserves": "^0.6.0" + "preserves": "^0.6.1" }, "scripts": { "compile": "npx tsc", diff --git a/src/actor.ts b/src/actor.ts index 7992c36..9ce0ff9 100644 --- a/src/actor.ts +++ b/src/actor.ts @@ -102,9 +102,7 @@ export function isRef(v: any): v is Ref { } let nextActorId = 0; -// export function __setNextActorId(v: number) { -// nextActorId = v; -// } +export const __setNextActorId = (v: number) => nextActorId = v; export class Actor { readonly id = nextActorId++; diff --git a/src/main.ts b/src/main.ts index 39d5a79..5006b15 100644 --- a/src/main.ts +++ b/src/main.ts @@ -64,7 +64,7 @@ Turn.for(new Actor(), async (t: Turn) => { const boxpath = path.join(__dirname, 'box.js'); const clientpath = path.join(__dirname, 'client.js'); - spawnModule(t, boxpath, [ds, 500000, 25000]); + spawnModule(t, boxpath, [ds_for_box, 500000, 25000]); // spawnWorker(t, boxpath, [ds_for_box, 50000, 2500]); spawnModule(t, clientpath, ds_for_client); diff --git a/src/relay.ts b/src/relay.ts index df80b87..8af4a0e 100644 --- a/src/relay.ts +++ b/src/relay.ts @@ -122,6 +122,13 @@ export const INERT_REF: Ref = { export type PacketWriter = (bs: Uint8Array) => void; +export interface RelayOptions { + packetWriter: PacketWriter; + setup(t: Turn, r: Relay): void; + debug?: boolean; +} + + export class Relay { readonly actor: Actor; readonly w: PacketWriter; @@ -136,10 +143,11 @@ export class Relay { pendingTurn: TurnMessage = []; debug: boolean; - constructor(actor: Actor, w: PacketWriter, debug: boolean) { - this.actor = actor; - this.w = w; - this.debug = debug; + constructor(t: Turn, options: RelayOptions) { + this.actor = t.actor; + this.w = options.packetWriter; + this.debug = options.debug ?? false; + options.setup(t, this); } rewriteOut(assertion: Assertion, transient: boolean): [Value, Array] @@ -297,19 +305,16 @@ function invalidTopLevelMessage(m: Value): never { `Received invalid top-level protocol message from peer: ${m.asPreservesText()}`); } -export type RelayOptions = { - packetWriter: PacketWriter, - setup(t: Turn, r: Relay): void, - initialOid?: Oid, - initialRef?: Ref, - debug?: boolean, -}; +export interface RelayActorOptions extends RelayOptions { + initialOid?: Oid; + initialRef?: Ref; + nextLocalOid?: Oid; +} -export function spawnRelay(t: Turn, options: RelayOptions): Promise { +export function spawnRelay(t: Turn, options: RelayActorOptions): Promise { return new Promise(resolve => { t.spawn(t => { - const relay = new Relay(t.actor, options.packetWriter, options.debug ?? false); - options.setup(t, relay); + const relay = new Relay(t, options); if (options.initialRef !== void 0) { relay.rewriteRefOut(options.initialRef, false, null); } @@ -318,6 +323,9 @@ export function spawnRelay(t: Turn, options: RelayOptions): Promise } else { resolve(null); } + if (options.nextLocalOid !== void 0) { + relay.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid; + } }); }); } diff --git a/src/wload.ts b/src/wload.ts index 684d8c7..28cbaf5 100644 --- a/src/wload.ts +++ b/src/wload.ts @@ -1,54 +1,53 @@ // Web Worker loader -import { Actor, Turn, Assertion, Handle, Ref } from './actor.js'; +import { Actor, Turn, Assertion, Handle, Ref, __setNextActorId } from './actor.js'; import { IdentityMap, Record } from 'preserves'; -import { parentPort } from 'worker_threads'; +import { parentPort, threadId } from 'worker_threads'; import { Relay, spawnRelay } from './relay.js'; -import { Dataspace, Observe } from './dataspace.js'; const _Instance = Symbol.for('Instance'); const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()( _Instance, ['moduleName', 'arg']); +const STARTING_ACTOR_ID = (threadId & (2 ** 20 - 1)) * 1000000000; +__setNextActorId(STARTING_ACTOR_ID); + Turn.for(new Actor(), t => { const p = parentPort!; - const ds = t.ref(new Dataspace()); spawnRelay(t, { + nextLocalOid: STARTING_ACTOR_ID + 500000000, packetWriter: bs => p.postMessage(bs), setup(t: Turn, r: Relay) { p.on('message', bs => r.accept(bs)); p.on('close', () => Turn.for(t.actor, t => t.quit())); }, - initialRef: ds, + initialRef: t.ref({ + handleMap: new IdentityMap(), + async assert(t, inst, handle) { + if (!Instance.isClassOf(inst)) return; + const m = await import(Instance._.moduleName(inst)); + t.freshen(t => t.spawn(t => { + const q = (t: Turn) => { + this.handleMap.delete(handle); + t.quit(); + }; + if (this.handleMap.has(handle)) { + q(t); + } else { + this.handleMap.set(handle, t.ref({ message: q })); + m.default(t, Instance._.arg(inst)); + } + })); + }, + retract(t, handle) { + const r = this.handleMap.get(handle); + if (r === void 0) { + this.handleMap.set(handle, false); + } else { + t.message(r as Ref, true); + } + } + }), // debug: true, }); - t.assert(ds, Observe(Instance.constructorInfo.label, t.ref({ - handleMap: new IdentityMap(), - async assert(t, inst0, handle) { - // console.log('+Factory:', handle, inst0); - const inst = inst0 as ReturnType; - const m = await import(Instance._.moduleName(inst)); - t.freshen(t => t.spawn(t => { - const q = (t: Turn) => { - this.handleMap.delete(handle); - t.quit(); - }; - if (this.handleMap.has(handle)) { - q(t); - } else { - this.handleMap.set(handle, t.ref({ message: q })); - m.default(t, Instance._.arg(inst)); - } - })); - }, - retract(t, handle) { - // console.log('-Factory:', handle); - const r = this.handleMap.get(handle); - if (r === void 0) { - this.handleMap.set(handle, false); - } else { - t.message(r as Ref, true); - } - } - }))); });