Much good progress
This commit is contained in:
parent
c7f5f487e7
commit
3a4a8a6ccb
77
actor.ts
77
actor.ts
|
@ -1,13 +1,11 @@
|
||||||
import {
|
import { Dictionary, IdentitySet, Record, Tuple, Value, is, preserves } from 'preserves';
|
||||||
Dictionary,
|
|
||||||
IdentitySet,
|
|
||||||
Record,
|
|
||||||
Value,
|
|
||||||
is,
|
|
||||||
} from 'preserves';
|
|
||||||
|
|
||||||
//---------------------------------------------------------------------------
|
//---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
if ('stackTraceLimit' in Error) {
|
||||||
|
Error.stackTraceLimit = Infinity;
|
||||||
|
}
|
||||||
|
|
||||||
export type Assertion = Value<Ref>;
|
export type Assertion = Value<Ref>;
|
||||||
export type Handle = number;
|
export type Handle = number;
|
||||||
export type ExitReason = null | { ok: true } | { ok: false, err: Error };
|
export type ExitReason = null | { ok: true } | { ok: false, err: Error };
|
||||||
|
@ -69,7 +67,7 @@ export const Lit = Record.makeConstructor<{value: Assertion}, Ref>()(
|
||||||
|
|
||||||
export const _PCompound = Symbol.for('compound');
|
export const _PCompound = Symbol.for('compound');
|
||||||
export const PCompound =
|
export const PCompound =
|
||||||
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Pattern>}, Ref>()(
|
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Pattern, Ref>}, Ref>()(
|
||||||
_PCompound, ['ctor', 'members']);
|
_PCompound, ['ctor', 'members']);
|
||||||
|
|
||||||
export type Pattern =
|
export type Pattern =
|
||||||
|
@ -78,7 +76,7 @@ export type Pattern =
|
||||||
| Record<typeof _PAnd, [Pattern[]], Ref>
|
| Record<typeof _PAnd, [Pattern[]], Ref>
|
||||||
| Record<typeof _PNot, [Pattern], Ref>
|
| Record<typeof _PNot, [Pattern], Ref>
|
||||||
| Record<typeof _Lit, [Assertion], Ref>
|
| Record<typeof _Lit, [Assertion], Ref>
|
||||||
| Record<typeof _PCompound, [ConstructorSpec, Dictionary<Pattern>], Ref>;
|
| Record<typeof _PCompound, [ConstructorSpec, Dictionary<Pattern, Ref>], Ref>;
|
||||||
|
|
||||||
export const _TRef = Symbol.for('ref');
|
export const _TRef = Symbol.for('ref');
|
||||||
export const TRef = Record.makeConstructor<{name: string}, Ref>()(
|
export const TRef = Record.makeConstructor<{name: string}, Ref>()(
|
||||||
|
@ -86,13 +84,13 @@ export const TRef = Record.makeConstructor<{name: string}, Ref>()(
|
||||||
|
|
||||||
export const _TCompound = Symbol.for('compound');
|
export const _TCompound = Symbol.for('compound');
|
||||||
export const TCompound =
|
export const TCompound =
|
||||||
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Template>}, Ref>()(
|
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Template, Ref>}, Ref>()(
|
||||||
_TCompound, ['ctor', 'members']);
|
_TCompound, ['ctor', 'members']);
|
||||||
|
|
||||||
export type Template =
|
export type Template =
|
||||||
| Record<typeof _TRef, [string], Ref>
|
| Record<typeof _TRef, [string], Ref>
|
||||||
| Record<typeof _Lit, [Assertion], Ref>
|
| Record<typeof _Lit, [Assertion], Ref>
|
||||||
| Record<typeof _TCompound, [ConstructorSpec, Dictionary<Template>], Ref>;
|
| Record<typeof _TCompound, [ConstructorSpec, Dictionary<Template, Ref>], Ref>;
|
||||||
|
|
||||||
export type Bindings = { [name: string]: Assertion };
|
export type Bindings = { [name: string]: Assertion };
|
||||||
|
|
||||||
|
@ -102,18 +100,32 @@ export function isRef(v: any): v is Ref {
|
||||||
return 'relay' in v && v.relay instanceof Actor && 'target' in v;
|
return 'relay' in v && v.relay instanceof Actor && 'target' in v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let nextActorId = 0;
|
||||||
|
|
||||||
export class Actor {
|
export class Actor {
|
||||||
|
readonly id = nextActorId++;
|
||||||
readonly outbound: Map<Handle, Ref>;
|
readonly outbound: Map<Handle, Ref>;
|
||||||
exitReason: ExitReason = null;
|
exitReason: ExitReason = null;
|
||||||
|
// readonly exitHooks: Array<LocalAction> = [];
|
||||||
|
|
||||||
constructor(initialAssertions = new Map<Handle, Ref>()) {
|
constructor(initialAssertions = new Map<Handle, Ref>()) {
|
||||||
this.outbound = initialAssertions;
|
this.outbound = initialAssertions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// atExit(a: LocalAction): void {
|
||||||
|
// this.exitHooks.push(a);
|
||||||
|
// }
|
||||||
|
|
||||||
terminateWith(t: Turn, reason: Exclude<ExitReason, null>) {
|
terminateWith(t: Turn, reason: Exclude<ExitReason, null>) {
|
||||||
if (this.exitReason !== null) return;
|
if (this.exitReason !== null) return;
|
||||||
this.exitReason = reason;
|
this.exitReason = reason;
|
||||||
this.outbound.forEach((peer, h) => t._retract(peer, h));
|
if (!this.exitReason.ok) {
|
||||||
|
console.error(`Actor ${this.id} crashed:`, this.exitReason.err);
|
||||||
|
}
|
||||||
|
// this.exitHooks.forEach(hook => hook(t));
|
||||||
|
queueMicrotask(() =>
|
||||||
|
t.freshen(t =>
|
||||||
|
this.outbound.forEach((peer, h) => t._retract(peer, h))));
|
||||||
}
|
}
|
||||||
|
|
||||||
execute(proc: () => void): void {
|
execute(proc: () => void): void {
|
||||||
|
@ -122,7 +134,6 @@ export class Actor {
|
||||||
try {
|
try {
|
||||||
proc();
|
proc();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(Actor, err);
|
|
||||||
Turn.for(this, t => this.terminateWith(t, { ok: false, err }));
|
Turn.for(this, t => this.terminateWith(t, { ok: false, err }));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -130,26 +141,33 @@ export class Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
let nextHandle = 0;
|
let nextHandle = 0;
|
||||||
|
function allocateHandle(): Handle {
|
||||||
|
return nextHandle++;
|
||||||
|
}
|
||||||
|
|
||||||
export function _sync(turn: Turn, e: Partial<Entity>, peer: Ref): void {
|
export function _sync_impl(turn: Turn, e: Partial<Entity>, peer: Ref): void {
|
||||||
e.sync ? e.sync!(turn, peer) : turn.message(peer, true);
|
e.sync ? e.sync!(turn, peer) : turn.message(peer, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let nextTurnId = 0;
|
||||||
|
|
||||||
export class Turn {
|
export class Turn {
|
||||||
|
readonly id = nextTurnId++;
|
||||||
readonly actor: Actor;
|
readonly actor: Actor;
|
||||||
readonly queues = new Map<Actor, LocalAction[]>();
|
queues: Map<Actor, LocalAction[]> | null = new Map();
|
||||||
|
|
||||||
static for(actor: Actor, f: LocalAction): void {
|
static for(actor: Actor, f: LocalAction): void {
|
||||||
const t = new Turn(actor);
|
const t = new Turn(actor);
|
||||||
f(t);
|
f(t);
|
||||||
t.queues.forEach((q, a) => a.execute(() => q.forEach(f => Turn.for(a, f))));
|
t.queues!.forEach((q, a) => a.execute(() => q.forEach(f => Turn.for(a, f))));
|
||||||
|
t.queues = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private constructor(actor: Actor) {
|
private constructor(actor: Actor) {
|
||||||
this.actor = actor;
|
this.actor = actor;
|
||||||
}
|
}
|
||||||
|
|
||||||
ref(e: Partial<Entity>): Ref {
|
ref<T extends Partial<Entity>>(e: T): Ref {
|
||||||
return { relay: this.actor, target: e };
|
return { relay: this.actor, target: e };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,8 +187,12 @@ export class Turn {
|
||||||
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: true }));
|
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: true }));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
crash(err: Error): void {
|
||||||
|
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: false, err }));
|
||||||
|
}
|
||||||
|
|
||||||
assert(ref: Ref, assertion: Assertion): Handle {
|
assert(ref: Ref, assertion: Assertion): Handle {
|
||||||
const h = nextHandle++;
|
const h = allocateHandle();
|
||||||
const a = runRewrites(ref.attenuation, assertion);
|
const a = runRewrites(ref.attenuation, assertion);
|
||||||
if (a !== null) {
|
if (a !== null) {
|
||||||
this.enqueue(ref.relay, t => {
|
this.enqueue(ref.relay, t => {
|
||||||
|
@ -201,8 +223,11 @@ export class Turn {
|
||||||
}
|
}
|
||||||
|
|
||||||
sync(ref: Ref): Promise<Turn> {
|
sync(ref: Ref): Promise<Turn> {
|
||||||
return new Promise(resolve => this.enqueue(ref.relay, t =>
|
return new Promise(resolve => this._sync(ref, this.ref({ message: resolve })));
|
||||||
_sync(t, ref.target, this.ref({ message: resolve }))));
|
}
|
||||||
|
|
||||||
|
_sync(ref: Ref, peer: Ref): void {
|
||||||
|
this.enqueue(ref.relay, t => _sync_impl(t, ref.target, peer));
|
||||||
}
|
}
|
||||||
|
|
||||||
message(ref: Ref, assertion: Assertion): void {
|
message(ref: Ref, assertion: Assertion): void {
|
||||||
|
@ -211,8 +236,18 @@ export class Turn {
|
||||||
}
|
}
|
||||||
|
|
||||||
enqueue(relay: Actor, a: LocalAction): void {
|
enqueue(relay: Actor, a: LocalAction): void {
|
||||||
|
if (this.queues === null) {
|
||||||
|
throw new Error("Attempt to reuse a committed Turn");
|
||||||
|
}
|
||||||
this.queues.get(relay)?.push(a) ?? this.queues.set(relay, [a]);
|
this.queues.get(relay)?.push(a) ?? this.queues.set(relay, [a]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
freshen(a: LocalAction): void {
|
||||||
|
if (this.queues !== null) {
|
||||||
|
throw new Error("Attempt to freshen a non-stale Turn");
|
||||||
|
}
|
||||||
|
Turn.for(this.actor, a);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------
|
//---------------------------------------------------------------------------
|
||||||
|
@ -249,7 +284,7 @@ export function match(p: Pattern, v: Assertion): Bindings | null {
|
||||||
const members = PCompound._.members(p);
|
const members = PCompound._.members(p);
|
||||||
switch (ctor.label) {
|
switch (ctor.label) {
|
||||||
case _CRec:
|
case _CRec:
|
||||||
if (!Record.isRecord<Assertion, any, Ref>(v)) return false;
|
if (!Record.isRecord<Assertion, Tuple<Assertion>, Ref>(v)) return false;
|
||||||
if (!is(CRec._.label(ctor), v.label)) return false;
|
if (!is(CRec._.label(ctor), v.label)) return false;
|
||||||
if (CRec._.arity(ctor) !== v.length) return false;
|
if (CRec._.arity(ctor) !== v.length) return false;
|
||||||
for (const [key, pp] of members) {
|
for (const [key, pp] of members) {
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
import { Ref } from "actor";
|
||||||
|
import { Record } from "preserves";
|
||||||
|
|
||||||
|
export const BoxState = Record.makeConstructor<{value: number}, Ref>()(
|
||||||
|
Symbol.for('BoxState'), ['value']);
|
||||||
|
export const SetBox = Record.makeConstructor<{newValue: number}, Ref>()(
|
||||||
|
Symbol.for('SetBox'), ['newValue']);
|
|
@ -0,0 +1,32 @@
|
||||||
|
import { BoxState, SetBox } from "./box-protocol.js";
|
||||||
|
import { Handle, Ref, Turn } from "./actor.js";
|
||||||
|
import { Observe } from "./dataspace.js";
|
||||||
|
|
||||||
|
let startTime = Date.now();
|
||||||
|
let prevValue = 0;
|
||||||
|
|
||||||
|
export default function (t: Turn, [ds, LIMIT, REPORT_EVERY]: [Ref, number, number]) {
|
||||||
|
t.spawn(t => {
|
||||||
|
console.log('Spawning Box');
|
||||||
|
let valueHandle: Handle | undefined;
|
||||||
|
function setValue(t: Turn, value: number) {
|
||||||
|
valueHandle = t.replace(ds, valueHandle, BoxState(value));
|
||||||
|
}
|
||||||
|
setValue(t, 0);
|
||||||
|
t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({
|
||||||
|
message(t: Turn, [newValue]: [number]): void {
|
||||||
|
// console.log(`Box: got ${newValue}`);
|
||||||
|
if (newValue % REPORT_EVERY === 0) {
|
||||||
|
const endTime = Date.now();
|
||||||
|
const delta = (endTime - startTime) / 1000.0;
|
||||||
|
const count = newValue - prevValue;
|
||||||
|
prevValue = newValue;
|
||||||
|
startTime = endTime;
|
||||||
|
console.log(`Box: got ${newValue} (${count / delta} Hz)`);
|
||||||
|
}
|
||||||
|
if (newValue === LIMIT) t.quit();
|
||||||
|
setValue(t, newValue);
|
||||||
|
}
|
||||||
|
})));
|
||||||
|
});
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
import { BoxState, SetBox } from "./box-protocol.js";
|
||||||
|
import { Observe } from "./dataspace.js";
|
||||||
|
import { Assertion, Handle, Ref, Turn } from "./actor.js";
|
||||||
|
|
||||||
|
export default function (t: Turn, ds: Ref) {
|
||||||
|
t.spawn(t => {
|
||||||
|
console.log('Spawning Client');
|
||||||
|
let count = 0;
|
||||||
|
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
|
||||||
|
assert(t: Turn, [currentValue]: [number]): void {
|
||||||
|
// console.log(`Client: got ${currentValue}`);
|
||||||
|
t.message(ds, SetBox(currentValue + 1));
|
||||||
|
}
|
||||||
|
})));
|
||||||
|
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
|
||||||
|
assert(_t: Turn, _assertion: Assertion): void { count++; },
|
||||||
|
retract(t: Turn, _handle: Handle) {
|
||||||
|
if (--count === 0) {
|
||||||
|
console.log('Client: detected box termination');
|
||||||
|
t.quit();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})));
|
||||||
|
});
|
||||||
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
import { Assertion, Entity, Handle, Ref, Turn } from 'actor';
|
import { Assertion, Entity, Handle, Ref, Turn } from 'actor';
|
||||||
import { Dictionary, IdentityMap, is, Record } from 'preserves';
|
import { Dictionary, IdentityMap, is, preserves, Record, Tuple } from 'preserves';
|
||||||
import { Bag, ChangeDescription } from './bag';
|
import { Bag, ChangeDescription } from './bag';
|
||||||
|
|
||||||
// Q. Why keep "Observe"? Why not do the clever trick of asserting the
|
// Q. Why keep "Observe"? Why not do the clever trick of asserting the
|
||||||
|
@ -36,7 +36,8 @@ export class Dataspace implements Partial<Entity> {
|
||||||
readonly subscriptions: Dictionary<Map<Ref, Dictionary<Handle>>> = new Dictionary();
|
readonly subscriptions: Dictionary<Map<Ref, Dictionary<Handle>>> = new Dictionary();
|
||||||
|
|
||||||
assert(turn: Turn, rec: Assertion, handle: Handle): void {
|
assert(turn: Turn, rec: Assertion, handle: Handle): void {
|
||||||
if (!Record.isRecord<Assertion, any, Ref>(rec)) return;
|
// console.log(preserves`ds ${turn.actor.id} assert ${rec} ${handle}`);
|
||||||
|
if (!Record.isRecord<Assertion, Tuple<Assertion>, Ref>(rec)) return;
|
||||||
this.handleMap.set(handle, rec);
|
this.handleMap.set(handle, rec);
|
||||||
if (this.assertions.change(rec, +1) !== ChangeDescription.ABSENT_TO_PRESENT) return;
|
if (this.assertions.change(rec, +1) !== ChangeDescription.ABSENT_TO_PRESENT) return;
|
||||||
if (Observe.isClassOf(rec)) {
|
if (Observe.isClassOf(rec)) {
|
||||||
|
@ -55,6 +56,7 @@ export class Dataspace implements Partial<Entity> {
|
||||||
|
|
||||||
retract(turn: Turn, upstreamHandle: Handle): void {
|
retract(turn: Turn, upstreamHandle: Handle): void {
|
||||||
const rec = this.handleMap.get(upstreamHandle);
|
const rec = this.handleMap.get(upstreamHandle);
|
||||||
|
// console.log(preserves`ds ${turn.actor.id} retract ${rec} ${upstreamHandle}`);
|
||||||
if (rec === void 0) return;
|
if (rec === void 0) return;
|
||||||
this.handleMap.delete(upstreamHandle);
|
this.handleMap.delete(upstreamHandle);
|
||||||
if (this.assertions.change(rec, -1) !== ChangeDescription.PRESENT_TO_ABSENT) return;
|
if (this.assertions.change(rec, -1) !== ChangeDescription.PRESENT_TO_ABSENT) return;
|
||||||
|
@ -73,7 +75,8 @@ export class Dataspace implements Partial<Entity> {
|
||||||
}
|
}
|
||||||
|
|
||||||
message(turn: Turn, rec: Assertion): void {
|
message(turn: Turn, rec: Assertion): void {
|
||||||
if (!Record.isRecord<Assertion, any, Ref>(rec)) return;
|
// console.log(preserves`ds ${turn.actor.id} message ${rec}`);
|
||||||
|
if (!Record.isRecord<Assertion, Tuple<Assertion>, Ref>(rec)) return;
|
||||||
this.subscriptions.get(rec.label)?.forEach((_seen, peer) => turn.message(peer, rec));
|
this.subscriptions.get(rec.label)?.forEach((_seen, peer) => turn.message(peer, rec));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
137
main.ts
137
main.ts
|
@ -1,98 +1,63 @@
|
||||||
import {
|
import { Actor, Assertion, Turn } from './actor.js';
|
||||||
Actor,
|
import { Record } from 'preserves';
|
||||||
Assertion,
|
import { Dataspace } from './dataspace.js';
|
||||||
Handle,
|
import { Worker } from 'worker_threads';
|
||||||
Ref,
|
import { Relay, spawnRelay } from './relay.js';
|
||||||
Turn,
|
|
||||||
attenuate,
|
|
||||||
rfilter,
|
|
||||||
PCompound,
|
|
||||||
CRec,
|
|
||||||
Lit,
|
|
||||||
} from './actor.js';
|
|
||||||
import { Dictionary, Record } from 'preserves';
|
|
||||||
import { Dataspace, Observe } from './dataspace.js';
|
|
||||||
|
|
||||||
const BoxState = Record.makeConstructor<{value: number}, Ref>()(
|
const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()(
|
||||||
Symbol.for('BoxState'), ['value']);
|
Symbol.for('Instance'), ['moduleName', 'arg']);
|
||||||
const SetBox = Record.makeConstructor<{newValue: number}, Ref>()(
|
|
||||||
Symbol.for('SetBox'), ['newValue']);
|
|
||||||
|
|
||||||
let startTime = Date.now();
|
function spawnWorker(t: Turn, moduleName: string, arg: Assertion) {
|
||||||
let prevValue = 0;
|
const w = new Worker('./wload.js');
|
||||||
const LIMIT = 500000;
|
spawnRelay(t, {
|
||||||
|
packetWriter: bs => w.postMessage(bs),
|
||||||
function spawnBox(t: Turn, ds: Ref) {
|
setup(t: Turn, r: Relay) {
|
||||||
t.spawn(t => {
|
w.on('message', bs => r.accept(bs));
|
||||||
console.log('Spawning Box');
|
w.on('error', err => Turn.for(t.actor, t => t.crash(err)));
|
||||||
let valueHandle: Handle | undefined;
|
w.on('exit', code => Turn.for(t.actor, t => {
|
||||||
function setValue(t: Turn, value: number) {
|
if (code === 0) {
|
||||||
valueHandle = t.replace(ds, valueHandle, BoxState(value));
|
|
||||||
}
|
|
||||||
setValue(t, 0);
|
|
||||||
t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({
|
|
||||||
message(t: Turn, [newValue]: [number]): void {
|
|
||||||
// console.log(`Box: got ${newValue}`);
|
|
||||||
if (newValue % 25000 === 0) {
|
|
||||||
const endTime = Date.now();
|
|
||||||
const delta = (endTime - startTime) / 1000.0;
|
|
||||||
const count = newValue - prevValue;
|
|
||||||
prevValue = newValue;
|
|
||||||
startTime = endTime;
|
|
||||||
console.log(`Box: got ${newValue} (${count / delta} Hz)`);
|
|
||||||
}
|
|
||||||
if (newValue === LIMIT - 20000) t.quit();
|
|
||||||
setValue(t, newValue);
|
|
||||||
}
|
|
||||||
})));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function spawnClient(t: Turn, ds: Ref) {
|
|
||||||
t.spawn(t => {
|
|
||||||
console.log('Spawning Client');
|
|
||||||
let count = 0;
|
|
||||||
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
|
|
||||||
assert(t: Turn, [currentValue]: [number]): void {
|
|
||||||
// console.log(`Client: got ${currentValue}`);
|
|
||||||
if (currentValue === LIMIT) {
|
|
||||||
console.log(`Client: quitting at limit`);
|
|
||||||
t.quit();
|
t.quit();
|
||||||
} else {
|
} else {
|
||||||
t.message(ds, SetBox(currentValue + 1));
|
t.crash(new Error(`Worker crashed with code ${code}`));
|
||||||
}
|
}
|
||||||
}
|
}));
|
||||||
})));
|
},
|
||||||
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
|
initialOid: 0,
|
||||||
assert(_t: Turn, _assertion: Assertion): void { count++; },
|
}).then(factory => Turn.for(new Actor(), t => {
|
||||||
retract(t: Turn, _handle: Handle) {
|
t.assert(factory!, Instance(moduleName, arg));
|
||||||
if (--count === 0) {
|
}));
|
||||||
console.log('Client: detected box termination');
|
}
|
||||||
t.quit();
|
|
||||||
}
|
function spawnModule(t: Turn, moduleName: string, arg: Assertion) {
|
||||||
},
|
import(moduleName).then(m => t.freshen(t => t.spawn(t =>
|
||||||
})));
|
m.default(t, arg))));
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Turn.for(new Actor(), async (t: Turn) => {
|
Turn.for(new Actor(), async (t: Turn) => {
|
||||||
const ds = t.ref(new Dataspace());
|
const ds = t.ref(new Dataspace());
|
||||||
|
|
||||||
spawnBox(t, attenuate(
|
// spawnModule(t, './box.js', [ds, 500000, 25000]);
|
||||||
ds,
|
spawnWorker(t, './box.js', [ds, 50000, 2500]);
|
||||||
rfilter(PCompound(CRec(BoxState.constructorInfo.label,
|
|
||||||
BoxState.constructorInfo.arity),
|
spawnModule(t, './client.js', ds);
|
||||||
new Dictionary()),
|
// spawnWorker(t, './client.js', ds);
|
||||||
PCompound(CRec(Observe.constructorInfo.label,
|
|
||||||
Observe.constructorInfo.arity),
|
// spawnBox(t, attenuate(
|
||||||
new Dictionary([[0, Lit(SetBox.constructorInfo.label)]])))));
|
// ds,
|
||||||
|
// rfilter(PCompound(CRec(BoxState.constructorInfo.label,
|
||||||
|
// BoxState.constructorInfo.arity),
|
||||||
|
// new Dictionary()),
|
||||||
|
// PCompound(CRec(Observe.constructorInfo.label,
|
||||||
|
// Observe.constructorInfo.arity),
|
||||||
|
// new Dictionary([[0, Lit(SetBox.constructorInfo.label)]])))));
|
||||||
|
|
||||||
|
// spawnClient(t, attenuate(
|
||||||
|
// ds,
|
||||||
|
// rfilter(PCompound(CRec(SetBox.constructorInfo.label,
|
||||||
|
// SetBox.constructorInfo.arity),
|
||||||
|
// new Dictionary()),
|
||||||
|
// PCompound(CRec(Observe.constructorInfo.label,
|
||||||
|
// Observe.constructorInfo.arity),
|
||||||
|
// new Dictionary([[0, Lit(BoxState.constructorInfo.label)]])))));
|
||||||
|
|
||||||
spawnClient(t, attenuate(
|
|
||||||
ds,
|
|
||||||
rfilter(PCompound(CRec(SetBox.constructorInfo.label,
|
|
||||||
SetBox.constructorInfo.arity),
|
|
||||||
new Dictionary()),
|
|
||||||
PCompound(CRec(Observe.constructorInfo.label,
|
|
||||||
Observe.constructorInfo.arity),
|
|
||||||
new Dictionary([[0, Lit(BoxState.constructorInfo.label)]])))));
|
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
{
|
{
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
"@types/node": "^14.14.31",
|
||||||
"typescript": "^4.1.5"
|
"typescript": "^4.1.5"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
import { Handle, Ref } from 'actor';
|
||||||
|
import { Record, Value } from 'preserves';
|
||||||
|
|
||||||
|
export const _Assert = Symbol.for('assert');
|
||||||
|
export const Assert = Record.makeConstructor<{assertion: Value<WireRef>, handle: Handle}, WireRef>()(
|
||||||
|
_Assert, ['assertion', 'handle']);
|
||||||
|
|
||||||
|
export const _Retract = Symbol.for('retract');
|
||||||
|
export const Retract = Record.makeConstructor<{handle: Handle}, WireRef>()(
|
||||||
|
_Retract, ['handle']);
|
||||||
|
|
||||||
|
export const _Message = Symbol.for('message');
|
||||||
|
export const Message = Record.makeConstructor<{body: Value<WireRef>}, WireRef>()(
|
||||||
|
_Message, ['body']);
|
||||||
|
|
||||||
|
export const _Sync = Symbol.for('sync');
|
||||||
|
export const Sync = Record.makeConstructor<{peer: WireRef}, WireRef>()(
|
||||||
|
_Sync, ['peer']);
|
||||||
|
|
||||||
|
export type EntityMessage =
|
||||||
|
| Record<typeof _Assert, [Value<WireRef>, Handle], WireRef>
|
||||||
|
| Record<typeof _Retract, [Handle], WireRef>
|
||||||
|
| Record<typeof _Message, [Value<WireRef>], WireRef>
|
||||||
|
| Record<typeof _Sync, [WireRef], WireRef>;
|
||||||
|
|
||||||
|
export type TurnMessage = Array<[Oid, EntityMessage]>;
|
||||||
|
|
||||||
|
export type Oid = number;
|
||||||
|
export type RefLocation = "mine" | "your";
|
||||||
|
export type WireRef = { loc: RefLocation, oid: Oid };
|
||||||
|
export type WireSymbol = { name: WireRef, ref: Ref, count: number };
|
|
@ -0,0 +1,322 @@
|
||||||
|
import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js';
|
||||||
|
import { Bytes, decode, encode, IdentityMap, mapPointers, underlying, Value } from 'preserves';
|
||||||
|
import {
|
||||||
|
Oid,
|
||||||
|
Assert,
|
||||||
|
EntityMessage,
|
||||||
|
Message,
|
||||||
|
Retract,
|
||||||
|
Sync,
|
||||||
|
WireSymbol,
|
||||||
|
_Assert,
|
||||||
|
_Retract,
|
||||||
|
_Message,
|
||||||
|
_Sync,
|
||||||
|
WireRef,
|
||||||
|
TurnMessage,
|
||||||
|
} from './protocol.js';
|
||||||
|
|
||||||
|
export class SyncPeerEntity implements Entity {
|
||||||
|
readonly relay: Relay;
|
||||||
|
readonly peer: Ref;
|
||||||
|
readonly handleMap = new IdentityMap<Handle, Handle>();
|
||||||
|
e: WireSymbol | null = null;
|
||||||
|
|
||||||
|
constructor(relay: Relay, peer: Ref) {
|
||||||
|
this.relay = relay;
|
||||||
|
this.peer = peer;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
|
||||||
|
this.handleMap.set(handle, turn.assert(this.peer, assertion));
|
||||||
|
}
|
||||||
|
|
||||||
|
retract(turn: Turn, handle: Handle): void {
|
||||||
|
turn.retract(this.handleMap.get(handle)!);
|
||||||
|
this.handleMap.delete(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
message(turn: Turn, body: Assertion): void {
|
||||||
|
// We get to vanish from the indexes now
|
||||||
|
this.relay.releaseRefOut(this.e!);
|
||||||
|
turn.message(this.peer, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
sync(turn: Turn, peer: Ref): void {
|
||||||
|
turn._sync(this.peer, peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class RelayEntity implements Entity {
|
||||||
|
readonly relay: Relay;
|
||||||
|
readonly e: WireSymbol;
|
||||||
|
|
||||||
|
constructor(relay: Relay, e: WireSymbol) {
|
||||||
|
this.relay = relay;
|
||||||
|
this.e = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
send(m: EntityMessage): void {
|
||||||
|
this.relay.send(this.e.name.oid, m);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(_turn: Turn, assertion: Assertion, handle: Handle): void {
|
||||||
|
this.send(Assert(this.relay.register(assertion, handle), handle));
|
||||||
|
}
|
||||||
|
|
||||||
|
retract(_turn: Turn, handle: Handle): void {
|
||||||
|
this.relay.deregister(handle);
|
||||||
|
this.send(Retract(handle));
|
||||||
|
}
|
||||||
|
|
||||||
|
message(_turn: Turn, body: Assertion): void {
|
||||||
|
this.send(Message(this.relay.register(body, null)));
|
||||||
|
}
|
||||||
|
|
||||||
|
sync(turn: Turn, peer: Ref): void {
|
||||||
|
const peerEntity = new SyncPeerEntity(this.relay, peer);
|
||||||
|
peerEntity.e = this.relay.rewriteRefOut(turn.ref(peerEntity), false, null);
|
||||||
|
this.send(Sync(peerEntity.e.name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Membrane {
|
||||||
|
readonly byOid = new IdentityMap<Oid, WireSymbol>();
|
||||||
|
readonly byRef = new IdentityMap<Ref, WireSymbol>();
|
||||||
|
|
||||||
|
grab<Table extends "byOid" | "byRef">(table: Table,
|
||||||
|
key: (Table extends "byOid" ? Oid : Ref),
|
||||||
|
transient: boolean,
|
||||||
|
f: () => WireSymbol): WireSymbol
|
||||||
|
{
|
||||||
|
let e =
|
||||||
|
(this[table] as IdentityMap<Table extends "byOid" ? Oid : Ref, WireSymbol>)
|
||||||
|
.get(key);
|
||||||
|
if (e === void 0) {
|
||||||
|
e = f();
|
||||||
|
this.byRef.set(e.ref, e);
|
||||||
|
this.byOid.set(e.name.oid, e);
|
||||||
|
}
|
||||||
|
if (!transient) e.count++;
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(e: WireSymbol): void {
|
||||||
|
e.count--;
|
||||||
|
if (e.count === 0) {
|
||||||
|
this.byOid.delete(e.name.oid);
|
||||||
|
this.byRef.delete(e.ref);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const INERT_REF: Ref = {
|
||||||
|
relay: (() => {
|
||||||
|
const a = new Actor();
|
||||||
|
a.exitReason = { ok: true };
|
||||||
|
return a;
|
||||||
|
})(),
|
||||||
|
target: {},
|
||||||
|
};
|
||||||
|
|
||||||
|
export type PacketWriter = (bs: Uint8Array) => void;
|
||||||
|
|
||||||
|
export class Relay {
|
||||||
|
readonly actor: Actor;
|
||||||
|
readonly w: PacketWriter;
|
||||||
|
readonly inboundAssertions = new IdentityMap<Handle, {
|
||||||
|
localHandle: Handle,
|
||||||
|
imported: Array<WireSymbol>,
|
||||||
|
}>();
|
||||||
|
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
|
||||||
|
readonly exported = new Membrane();
|
||||||
|
readonly imported = new Membrane();
|
||||||
|
nextLocalOid: Oid = 0;
|
||||||
|
pendingTurn: TurnMessage = [];
|
||||||
|
debug: boolean;
|
||||||
|
|
||||||
|
constructor(actor: Actor, w: PacketWriter, debug: boolean) {
|
||||||
|
this.actor = actor;
|
||||||
|
this.w = w;
|
||||||
|
this.debug = debug;
|
||||||
|
}
|
||||||
|
|
||||||
|
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
|
||||||
|
{
|
||||||
|
const exported: Array<WireSymbol> = [];
|
||||||
|
const rewritten =
|
||||||
|
mapPointers(assertion, r => this.rewriteRefOut(r, transient, exported).name);
|
||||||
|
return [rewritten, exported];
|
||||||
|
}
|
||||||
|
|
||||||
|
rewriteIn(t: Turn, a: Value<WireRef>): [Assertion, Array<WireSymbol>]
|
||||||
|
{
|
||||||
|
const imported: Array<WireSymbol> = [];
|
||||||
|
const rewritten = mapPointers(a, r => this.rewriteRefIn(t, r, imported));
|
||||||
|
return [rewritten, imported];
|
||||||
|
}
|
||||||
|
|
||||||
|
register(assertion: Assertion, handle: Handle | null): Value<WireRef> {
|
||||||
|
const [rewritten, exported] = this.rewriteOut(assertion, handle === null);
|
||||||
|
if (handle !== null) this.outboundAssertions.set(handle, exported);
|
||||||
|
return rewritten;
|
||||||
|
}
|
||||||
|
|
||||||
|
deregister(handle: Handle): void {
|
||||||
|
(this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
rewriteRefOut(r: Ref, transient: boolean, exported: Array<WireSymbol> | null): WireSymbol {
|
||||||
|
if (r.target instanceof RelayEntity && r.target.relay === this) {
|
||||||
|
return r.target.e;
|
||||||
|
} else {
|
||||||
|
const e = this.exported.grab("byRef", r, transient, () => {
|
||||||
|
if (transient) throw new Error("Cannot send transient reference");
|
||||||
|
return { name: { loc: "mine", oid: this.nextLocalOid++ }, ref: r, count: 0 };
|
||||||
|
});
|
||||||
|
exported?.push(e);
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseRefOut(e: WireSymbol) {
|
||||||
|
this.exported.drop(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
rewriteRefIn(t: Turn, n: WireRef, imported: Array<WireSymbol> | null): Ref {
|
||||||
|
switch (n.loc) {
|
||||||
|
case 'your':
|
||||||
|
return this.lookupLocal(n.oid);
|
||||||
|
case 'mine': {
|
||||||
|
const e = this.imported.grab("byOid", n.oid, false, () => {
|
||||||
|
const e: WireSymbol = {
|
||||||
|
name: { loc: 'your', oid: n.oid },
|
||||||
|
ref: null as any,
|
||||||
|
count: 0,
|
||||||
|
};
|
||||||
|
e.ref = t.ref(new RelayEntity(this, e as WireSymbol));
|
||||||
|
return e;
|
||||||
|
});
|
||||||
|
imported?.push(e);
|
||||||
|
return e.ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
send(remoteOid: Oid, m: EntityMessage): void {
|
||||||
|
if (this.pendingTurn.length === 0) {
|
||||||
|
queueMicrotask(() => {
|
||||||
|
if (this.debug) console.log('OUT', this.pendingTurn.asPreservesText());
|
||||||
|
this.w(underlying(encode<WireRef>(this.pendingTurn, {
|
||||||
|
canonical: true,
|
||||||
|
encodePointer: n => {
|
||||||
|
switch (n.loc) {
|
||||||
|
case 'mine': return [0, n.oid];
|
||||||
|
case 'your': return [1, n.oid];
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})));
|
||||||
|
this.pendingTurn = [];
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.pendingTurn.push([remoteOid, m]);
|
||||||
|
}
|
||||||
|
|
||||||
|
lookupLocal(localOid: Oid): Ref {
|
||||||
|
return this.exported.byOid.get(localOid)?.ref ?? INERT_REF;
|
||||||
|
}
|
||||||
|
|
||||||
|
accept(bs0: Uint8Array): void {
|
||||||
|
Turn.for(this.actor, t => {
|
||||||
|
const bs = Bytes.from(bs0);
|
||||||
|
const wireTurn = decode<WireRef>(bs, {
|
||||||
|
decodePointer: v => {
|
||||||
|
if (!Array.isArray(v) || v.length !== 2) {
|
||||||
|
throw new Error(
|
||||||
|
`Received invalid object reference ${v.asPreservesText()} from peer`);
|
||||||
|
}
|
||||||
|
const loc = v[0] === 0 ? 'mine' : 'your';
|
||||||
|
return { loc, oid: v[1] as Oid };
|
||||||
|
},
|
||||||
|
});
|
||||||
|
if (this.debug) console.log('IN', wireTurn.asPreservesText());
|
||||||
|
if (!Array.isArray(wireTurn)) invalidTopLevelMessage(wireTurn);
|
||||||
|
wireTurn.forEach(v => {
|
||||||
|
if (Array.isArray(v) && v.length === 2 && typeof v[0] === 'number') {
|
||||||
|
const [localOid, m] = v;
|
||||||
|
// TODO: deep check that m is EntityMessage
|
||||||
|
this.handle(t, this.lookupLocal(localOid), m as EntityMessage);
|
||||||
|
} else {
|
||||||
|
invalidTopLevelMessage(wireTurn);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
handle(t: Turn, r: Ref, m: EntityMessage) {
|
||||||
|
switch (m.label) {
|
||||||
|
case _Assert: {
|
||||||
|
const [a, imported] = this.rewriteIn(t, Assert._.assertion(m));
|
||||||
|
this.inboundAssertions.set(Assert._.handle(m), {
|
||||||
|
localHandle: t.assert(r, a),
|
||||||
|
imported,
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case _Retract: {
|
||||||
|
const remoteHandle = Retract._.handle(m);
|
||||||
|
const h = this.inboundAssertions.get(remoteHandle);
|
||||||
|
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
|
||||||
|
this.inboundAssertions.delete(remoteHandle);
|
||||||
|
h.imported.forEach(e => this.imported.drop(e));
|
||||||
|
t.retract(h.localHandle);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case _Message: {
|
||||||
|
const [a, imported] = this.rewriteIn(t, Message._.body(m));
|
||||||
|
if (imported.length > 0) throw new Error("Cannot receive transient reference");
|
||||||
|
t.message(r, a);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case _Sync: {
|
||||||
|
const imported: Array<WireSymbol> = [];
|
||||||
|
const k = this.rewriteRefIn(t, Sync._.peer(m), imported);
|
||||||
|
t.sync(r).then(t => {
|
||||||
|
t.message(k, true);
|
||||||
|
imported.forEach(e => this.imported.drop(e));
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function invalidTopLevelMessage(m: Value<WireRef>): never {
|
||||||
|
throw new Error(
|
||||||
|
`Received invalid top-level protocol message from peer: ${m.asPreservesText()}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
export type RelayOptions = {
|
||||||
|
packetWriter: PacketWriter,
|
||||||
|
setup(t: Turn, r: Relay): void,
|
||||||
|
initialOid?: Oid,
|
||||||
|
initialRef?: Ref,
|
||||||
|
debug?: boolean,
|
||||||
|
};
|
||||||
|
|
||||||
|
export function spawnRelay(t: Turn, options: RelayOptions): Promise<Ref | null> {
|
||||||
|
return new Promise(resolve => {
|
||||||
|
t.spawn(t => {
|
||||||
|
const relay = new Relay(t.actor, options.packetWriter, options.debug ?? false);
|
||||||
|
options.setup(t, relay);
|
||||||
|
if (options.initialRef !== void 0) {
|
||||||
|
relay.rewriteRefOut(options.initialRef, false, null);
|
||||||
|
}
|
||||||
|
if (options.initialOid !== void 0) {
|
||||||
|
resolve(relay.rewriteRefIn(t, { loc: 'mine', oid: options.initialOid }, null));
|
||||||
|
} else {
|
||||||
|
resolve(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
import { Assertion, Turn } from './actor';
|
||||||
|
|
||||||
|
console.log('hi')
|
||||||
|
|
||||||
|
export default function (_t: Turn, arg: Assertion) {
|
||||||
|
console.log('in exported main', arg);
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
// Web Worker loader
|
||||||
|
|
||||||
|
import { Actor, Turn, Assertion, Handle, Ref } from './actor.js';
|
||||||
|
import { IdentityMap, Record } from 'preserves';
|
||||||
|
import { parentPort } from 'worker_threads';
|
||||||
|
import { Relay, spawnRelay } from './relay.js';
|
||||||
|
import { Dataspace, Observe } from './dataspace.js';
|
||||||
|
|
||||||
|
const _Instance = Symbol.for('Instance');
|
||||||
|
const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()(
|
||||||
|
_Instance, ['moduleName', 'arg']);
|
||||||
|
|
||||||
|
Turn.for(new Actor(), t => {
|
||||||
|
const p = parentPort!;
|
||||||
|
const ds = t.ref(new Dataspace());
|
||||||
|
spawnRelay(t, {
|
||||||
|
packetWriter: bs => p.postMessage(bs),
|
||||||
|
setup(t: Turn, r: Relay) {
|
||||||
|
p.on('message', bs => r.accept(bs));
|
||||||
|
p.on('close', () => Turn.for(t.actor, t => t.quit()));
|
||||||
|
},
|
||||||
|
initialRef: ds,
|
||||||
|
// debug: true,
|
||||||
|
});
|
||||||
|
t.assert(ds, Observe(Instance.constructorInfo.label, t.ref({
|
||||||
|
handleMap: new IdentityMap<Handle, false | Ref>(),
|
||||||
|
async assert(t, inst0, handle) {
|
||||||
|
// console.log('+Factory:', handle, inst0);
|
||||||
|
const inst = inst0 as ReturnType<typeof Instance>;
|
||||||
|
const m = await import(Instance._.moduleName(inst));
|
||||||
|
t.freshen(t => t.spawn(t => {
|
||||||
|
const q = (t: Turn) => {
|
||||||
|
this.handleMap.delete(handle);
|
||||||
|
t.quit();
|
||||||
|
};
|
||||||
|
if (this.handleMap.has(handle)) {
|
||||||
|
q(t);
|
||||||
|
} else {
|
||||||
|
this.handleMap.set(handle, t.ref({ message: q }));
|
||||||
|
m.default(t, Instance._.arg(inst));
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
},
|
||||||
|
retract(t, handle) {
|
||||||
|
// console.log('-Factory:', handle);
|
||||||
|
const r = this.handleMap.get(handle);
|
||||||
|
if (r === void 0) {
|
||||||
|
this.handleMap.set(handle, false);
|
||||||
|
} else {
|
||||||
|
t.message(r as Ref, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})));
|
||||||
|
});
|
Loading…
Reference in New Issue