Simpler wload

This commit is contained in:
Tony Garnock-Jones 2021-03-02 16:42:53 +01:00
parent 9b904ee4df
commit b922a53d6a
5 changed files with 58 additions and 53 deletions

View File

@ -4,7 +4,7 @@
"typescript": "^4.1.5" "typescript": "^4.1.5"
}, },
"dependencies": { "dependencies": {
"preserves": "^0.6.0" "preserves": "^0.6.1"
}, },
"scripts": { "scripts": {
"compile": "npx tsc", "compile": "npx tsc",

View File

@ -102,9 +102,7 @@ export function isRef(v: any): v is Ref {
} }
let nextActorId = 0; let nextActorId = 0;
// export function __setNextActorId(v: number) { export const __setNextActorId = (v: number) => nextActorId = v;
// nextActorId = v;
// }
export class Actor { export class Actor {
readonly id = nextActorId++; readonly id = nextActorId++;

View File

@ -64,7 +64,7 @@ Turn.for(new Actor(), async (t: Turn) => {
const boxpath = path.join(__dirname, 'box.js'); const boxpath = path.join(__dirname, 'box.js');
const clientpath = path.join(__dirname, 'client.js'); const clientpath = path.join(__dirname, 'client.js');
spawnModule(t, boxpath, [ds, 500000, 25000]); spawnModule(t, boxpath, [ds_for_box, 500000, 25000]);
// spawnWorker(t, boxpath, [ds_for_box, 50000, 2500]); // spawnWorker(t, boxpath, [ds_for_box, 50000, 2500]);
spawnModule(t, clientpath, ds_for_client); spawnModule(t, clientpath, ds_for_client);

View File

@ -122,6 +122,13 @@ export const INERT_REF: Ref = {
export type PacketWriter = (bs: Uint8Array) => void; export type PacketWriter = (bs: Uint8Array) => void;
export interface RelayOptions {
packetWriter: PacketWriter;
setup(t: Turn, r: Relay): void;
debug?: boolean;
}
export class Relay { export class Relay {
readonly actor: Actor; readonly actor: Actor;
readonly w: PacketWriter; readonly w: PacketWriter;
@ -136,10 +143,11 @@ export class Relay {
pendingTurn: TurnMessage = []; pendingTurn: TurnMessage = [];
debug: boolean; debug: boolean;
constructor(actor: Actor, w: PacketWriter, debug: boolean) { constructor(t: Turn, options: RelayOptions) {
this.actor = actor; this.actor = t.actor;
this.w = w; this.w = options.packetWriter;
this.debug = debug; this.debug = options.debug ?? false;
options.setup(t, this);
} }
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>] rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
@ -297,19 +305,16 @@ function invalidTopLevelMessage(m: Value<WireRef>): never {
`Received invalid top-level protocol message from peer: ${m.asPreservesText()}`); `Received invalid top-level protocol message from peer: ${m.asPreservesText()}`);
} }
export type RelayOptions = { export interface RelayActorOptions extends RelayOptions {
packetWriter: PacketWriter, initialOid?: Oid;
setup(t: Turn, r: Relay): void, initialRef?: Ref;
initialOid?: Oid, nextLocalOid?: Oid;
initialRef?: Ref, }
debug?: boolean,
};
export function spawnRelay(t: Turn, options: RelayOptions): Promise<Ref | null> { export function spawnRelay(t: Turn, options: RelayActorOptions): Promise<Ref | null> {
return new Promise(resolve => { return new Promise(resolve => {
t.spawn(t => { t.spawn(t => {
const relay = new Relay(t.actor, options.packetWriter, options.debug ?? false); const relay = new Relay(t, options);
options.setup(t, relay);
if (options.initialRef !== void 0) { if (options.initialRef !== void 0) {
relay.rewriteRefOut(options.initialRef, false, null); relay.rewriteRefOut(options.initialRef, false, null);
} }
@ -318,6 +323,9 @@ export function spawnRelay(t: Turn, options: RelayOptions): Promise<Ref | null>
} else { } else {
resolve(null); resolve(null);
} }
if (options.nextLocalOid !== void 0) {
relay.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid;
}
}); });
}); });
} }

View File

@ -1,54 +1,53 @@
// Web Worker loader // Web Worker loader
import { Actor, Turn, Assertion, Handle, Ref } from './actor.js'; import { Actor, Turn, Assertion, Handle, Ref, __setNextActorId } from './actor.js';
import { IdentityMap, Record } from 'preserves'; import { IdentityMap, Record } from 'preserves';
import { parentPort } from 'worker_threads'; import { parentPort, threadId } from 'worker_threads';
import { Relay, spawnRelay } from './relay.js'; import { Relay, spawnRelay } from './relay.js';
import { Dataspace, Observe } from './dataspace.js';
const _Instance = Symbol.for('Instance'); const _Instance = Symbol.for('Instance');
const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()( const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()(
_Instance, ['moduleName', 'arg']); _Instance, ['moduleName', 'arg']);
const STARTING_ACTOR_ID = (threadId & (2 ** 20 - 1)) * 1000000000;
__setNextActorId(STARTING_ACTOR_ID);
Turn.for(new Actor(), t => { Turn.for(new Actor(), t => {
const p = parentPort!; const p = parentPort!;
const ds = t.ref(new Dataspace());
spawnRelay(t, { spawnRelay(t, {
nextLocalOid: STARTING_ACTOR_ID + 500000000,
packetWriter: bs => p.postMessage(bs), packetWriter: bs => p.postMessage(bs),
setup(t: Turn, r: Relay) { setup(t: Turn, r: Relay) {
p.on('message', bs => r.accept(bs)); p.on('message', bs => r.accept(bs));
p.on('close', () => Turn.for(t.actor, t => t.quit())); p.on('close', () => Turn.for(t.actor, t => t.quit()));
}, },
initialRef: ds, initialRef: t.ref({
handleMap: new IdentityMap<Handle, false | Ref>(),
async assert(t, inst, handle) {
if (!Instance.isClassOf(inst)) return;
const m = await import(Instance._.moduleName(inst));
t.freshen(t => t.spawn(t => {
const q = (t: Turn) => {
this.handleMap.delete(handle);
t.quit();
};
if (this.handleMap.has(handle)) {
q(t);
} else {
this.handleMap.set(handle, t.ref({ message: q }));
m.default(t, Instance._.arg(inst));
}
}));
},
retract(t, handle) {
const r = this.handleMap.get(handle);
if (r === void 0) {
this.handleMap.set(handle, false);
} else {
t.message(r as Ref, true);
}
}
}),
// debug: true, // debug: true,
}); });
t.assert(ds, Observe(Instance.constructorInfo.label, t.ref({
handleMap: new IdentityMap<Handle, false | Ref>(),
async assert(t, inst0, handle) {
// console.log('+Factory:', handle, inst0);
const inst = inst0 as ReturnType<typeof Instance>;
const m = await import(Instance._.moduleName(inst));
t.freshen(t => t.spawn(t => {
const q = (t: Turn) => {
this.handleMap.delete(handle);
t.quit();
};
if (this.handleMap.has(handle)) {
q(t);
} else {
this.handleMap.set(handle, t.ref({ message: q }));
m.default(t, Instance._.arg(inst));
}
}));
},
retract(t, handle) {
// console.log('-Factory:', handle);
const r = this.handleMap.get(handle);
if (r === void 0) {
this.handleMap.set(handle, false);
} else {
t.message(r as Ref, true);
}
}
})));
}); });