Facets, first draft

This commit is contained in:
Tony Garnock-Jones 2021-04-16 20:29:16 +02:00
parent 6224a4dbf3
commit ff1feff82b
11 changed files with 199 additions and 84 deletions

View File

@ -21,7 +21,7 @@ export interface Entity {
}
export interface Ref {
readonly relay: Actor;
readonly relay: Facet;
readonly target: Partial<Entity>;
readonly attenuation?: Attenuation;
}
@ -29,24 +29,28 @@ export interface Ref {
//---------------------------------------------------------------------------
export function isRef(v: any): v is Ref {
return 'relay' in v && v.relay instanceof Actor && 'target' in v;
return 'relay' in v && v.relay instanceof Facet && 'target' in v;
}
export function toRef(_v: any): Ref | undefined {
return isRef(_v) ? _v : void 0;
}
type OutboundAssertion = { handle: Handle, peer: Ref, established: boolean };
type OutboundMap = Map<Handle, OutboundAssertion>;
let nextActorId = 0;
export const __setNextActorId = (v: number) => nextActorId = v;
export class Actor {
readonly id = nextActorId++;
readonly outbound: Map<Handle, Ref>;
readonly root: Facet;
exitReason: ExitReason = null;
readonly exitHooks: Array<LocalAction> = [];
constructor(initialAssertions = new Map<Handle, Ref>()) {
this.outbound = initialAssertions;
constructor(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()) {
this.root = new Facet(this, null, initialAssertions);
Turn.for(new Facet(this, this.root), stopIfInertAfter(bootProc));
}
atExit(a: LocalAction): void {
@ -60,63 +64,145 @@ export class Actor {
console.error(`Actor ${this.id} crashed:`, this.exitReason.err);
}
this.exitHooks.forEach(hook => hook(t));
queueTask(() => Turn.for(
t.actor,
t => this.outbound.forEach((peer, h) => t._retract(peer, h)),
true));
queueTask(() => Turn.for(this.root, t => this.root._terminate(t, false), true));
}
}
let nextHandle = 0;
export class Facet {
readonly id = nextActorId++;
readonly actor: Actor;
readonly parent: Facet | null;
readonly children = new Set<Facet>();
readonly outbound: OutboundMap;
readonly shutdownActions: Array<LocalAction> = [];
// ^ shutdownActions are not exitHooks - those run even on error. These are for clean shutdown
isLive = true;
inertCheckPreventers = 0;
constructor(actor: Actor, parent: Facet | null, initialAssertions: OutboundMap = new Map()) {
this.actor = actor;
this.parent = parent;
if (parent) parent.children.add(this);
this.outbound = initialAssertions;
}
onStop(a: LocalAction): void {
this.shutdownActions.push(a);
}
isInert(): boolean {
return this.children.size === 0 && this.outbound.size === 0 && this.inertCheckPreventers === 0;
}
preventInertCheck(): () => void {
let armed = true;
this.inertCheckPreventers++;
return () => {
if (!armed) return;
armed = false;
this.inertCheckPreventers--;
};
}
_terminate(t: Turn, orderly: boolean): void {
if (!this.isLive) return;
this.isLive = false;
const parent = this.parent;
if (parent) parent.children.delete(this);
t._inFacet(this, t => {
this.children.forEach(child => child._terminate(t, orderly));
if (orderly) this.shutdownActions.forEach(a => a(t));
this.outbound.forEach(e => t._retract(e));
if (orderly) {
queueTask(() => {
if (parent) {
if (parent.isInert()) {
Turn.for(parent, t => parent._terminate(t, true));
}
} else {
Turn.for(this.actor.root, t => this.actor.terminateWith(t, { ok: true }), true);
}
});
}
});
}
}
export function _sync_impl(turn: Turn, e: Partial<Entity>, peer: Ref): void {
e.sync ? e.sync!(turn, peer) : turn.message(peer, true);
}
let nextHandle = 0;
let nextTurnId = 0;
export class Turn {
readonly id = nextTurnId++;
readonly actor: Actor;
queues: Map<Actor, LocalAction[]> | null = new Map();
readonly activeFacet: Facet;
queues: Map<Facet, LocalAction[]> | null;
static for(actor: Actor, f: LocalAction, zombieTurn = false): void {
if ((actor.exitReason === null) === zombieTurn) return;
const t = new Turn(actor);
static for(facet: Facet, f: LocalAction, zombieTurn = false): void {
if (!zombieTurn) {
if (facet.actor.exitReason !== null) return;
if (!facet.isLive) return;
}
const t = new Turn(facet);
try {
f(t);
t.queues!.forEach((q, a) => queueTask(() => q.forEach(f => Turn.for(a, f))));
t.queues!.forEach((q, facet) => queueTask(() => q.forEach(f => Turn.for(facet, f))));
t.queues = null;
} catch (err) {
Turn.for(actor, t => actor.terminateWith(t, { ok: false, err }));
Turn.for(facet.actor.root, t => facet.actor.terminateWith(t, { ok: false, err }));
}
}
private constructor(actor: Actor) {
this.actor = actor;
private constructor(facet: Facet, queues = new Map<Facet, LocalAction[]>()) {
this.activeFacet = facet;
this.queues = queues;
}
_inFacet(facet: Facet, f: LocalAction): void {
const t = new Turn(facet, this.queues!);
f(t);
t.queues = null;
}
ref<T extends Partial<Entity>>(e: T): Ref {
return { relay: this.actor, target: e };
return { relay: this.activeFacet, target: e };
}
spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): void {
this.enqueue(this.actor, () => {
const newOutbound = new Map<Handle, Ref>();
initialAssertions.forEach(key => {
newOutbound.set(key, this.actor.outbound.get(key)!); // we trust initialAssertions
this.actor.outbound.delete(key);
});
queueTask(() => Turn.for(new Actor(newOutbound), bootProc));
facet(bootProc: LocalAction): Facet {
const newFacet = new Facet(this.activeFacet.actor, this.activeFacet);
this._inFacet(newFacet, stopIfInertAfter(bootProc));
return newFacet;
}
stop(facet: Facet = this.activeFacet, continuation?: LocalAction) {
this.enqueue(facet.parent!, t => {
facet._terminate(t, true);
if (continuation) continuation(t);
});
}
quit(): void {
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: true }));
spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): void {
this.enqueue(this.activeFacet, () => {
const newOutbound: OutboundMap = new Map();
initialAssertions.forEach(key => {
newOutbound.set(key, this.activeFacet.outbound.get(key)!); // we trust initialAssertions
this.activeFacet.outbound.delete(key);
});
queueTask(() => new Actor(bootProc, newOutbound));
});
}
stopActor(): void {
this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: true }));
}
crash(err: Error): void {
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: false, err }));
this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: false, err }));
}
assert(ref: Ref, assertion: Assertion): Handle {
@ -128,8 +214,10 @@ export class Turn {
_assert(ref: Ref, assertion: Assertion, h: Handle) {
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) {
const e = { handle: h, peer: ref, established: false };
this.activeFacet.outbound.set(h, e);
this.enqueue(ref.relay, t => {
this.actor.outbound.set(h, ref);
e.established = true;
ref.target.assert?.(t, a, h);
});
}
@ -137,9 +225,9 @@ export class Turn {
retract(h: Handle | undefined): void {
if (h !== void 0) {
const peer = this.actor.outbound.get(h);
if (peer === void 0) return;
this._retract(peer, h);
const e = this.activeFacet.outbound.get(h);
if (e === void 0) return;
this._retract(e);
}
}
@ -149,10 +237,13 @@ export class Turn {
return newHandle;
}
_retract(ref: Ref, handle: Handle): void {
this.enqueue(ref.relay, t => {
this.actor.outbound.delete(handle);
ref.target.retract?.(t, handle);
_retract(e: OutboundAssertion): void {
this.activeFacet.outbound.delete(e.handle);
this.enqueue(e.peer.relay, t => {
if (e.established) {
e.established = false;
e.peer.target.retract?.(t, e.handle);
}
});
}
@ -169,7 +260,7 @@ export class Turn {
if (a !== null) this.enqueue(ref.relay, t => ref.target.message?.(t, assertion));
}
enqueue(relay: Actor, a: LocalAction): void {
enqueue(relay: Facet, a: LocalAction): void {
if (this.queues === null) {
throw new Error("Attempt to reuse a committed Turn");
}
@ -180,6 +271,17 @@ export class Turn {
if (this.queues !== null) {
throw new Error("Attempt to freshen a non-stale Turn");
}
Turn.for(this.actor, a);
Turn.for(this.activeFacet, a);
}
}
function stopIfInertAfter(a: LocalAction): LocalAction {
return t => {
a(t);
t.enqueue(t.activeFacet, t => {
if ((t.activeFacet.parent && !t.activeFacet.parent.isLive) || t.activeFacet.isInert()) {
t.stop();
}
});
};
}

View File

@ -26,9 +26,9 @@ export default function (t: Turn, arg: Assertion) {
const count = newValue - prevValue;
prevValue = newValue;
startTime = endTime;
console.log(`Box ${t.actor.id}: got ${newValue} (${count / delta} Hz)`);
console.log(`Box ${t.activeFacet.id}: got ${newValue} (${count / delta} Hz)`);
}
if (newValue === LIMIT) t.quit();
if (newValue === LIMIT) t.stopActor();
setValue(t, newValue);
}
})

View File

@ -23,8 +23,8 @@ export default function (t: Turn, ds: Ref) {
},
retract(t: Turn) {
if (--count === 0) {
console.log(`Client ${t.actor.id}: detected box termination`);
t.quit();
console.log(`Client ${t.activeFacet.id}: detected box termination`);
t.stopActor();
}
// console.log('dec to', count);
},

View File

@ -1,5 +1,5 @@
import { Assertion, Entity, Handle, LocalAction, Ref, Turn } from 'actor';
import { Dictionary, IdentityMap, is, Record, Tuple } from '@preserves/core';
import { Dictionary, IdentityMap, is, preserves, Record, Tuple } from '@preserves/core';
import { Bag, ChangeDescription } from './bag';
import { fromObserve, toObserve, Observe } from './gen/dataspace';
@ -41,7 +41,7 @@ export class Dataspace implements Partial<Entity> {
readonly subscriptions = new Dictionary<Ref, Map<Ref, Dictionary<Ref, Handle>>>();
assert(turn: Turn, rec: Assertion, handle: Handle): void {
// console.log(preserves`ds ${turn.actor.id} assert ${rec} ${handle}`);
// console.log(preserves`ds ${turn.activeFacet.id} assert ${rec} ${handle}`);
if (!Record.isRecord<Assertion, Tuple<Assertion>, Ref>(rec)) return;
this.handleMap.set(handle, rec);
if (this.assertions.change(rec, +1) !== ChangeDescription.ABSENT_TO_PRESENT) return;
@ -62,7 +62,7 @@ export class Dataspace implements Partial<Entity> {
retract(turn: Turn, upstreamHandle: Handle): void {
const rec = this.handleMap.get(upstreamHandle);
// console.log(preserves`ds ${turn.actor.id} retract ${rec} ${upstreamHandle}`);
// console.log(preserves`ds ${turn.activeFacet.id} retract ${rec} ${upstreamHandle}`);
if (rec === void 0) return;
this.handleMap.delete(upstreamHandle);
if (this.assertions.change(rec, -1) !== ChangeDescription.PRESENT_TO_ABSENT) return;
@ -81,7 +81,7 @@ export class Dataspace implements Partial<Entity> {
}
message(turn: Turn, rec: Assertion): void {
// console.log(preserves`ds ${turn.actor.id} message ${rec}`);
// console.log(preserves`ds ${turn.activeFacet.id} message ${rec}`);
if (!Record.isRecord<Assertion, Tuple<Assertion>, Ref>(rec)) return;
this.subscriptions.get(rec.label)?.forEach((_seen, peer) => turn.message(peer, rec));
}

View File

@ -14,23 +14,25 @@ const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()(
function spawnWorker(t: Turn, moduleName: string, arg: Assertion) {
const w = new Worker(path.join(__dirname, 'wload.js'));
const reenable = t.activeFacet.preventInertCheck();
spawnRelay(t, {
packetWriter: bs => w.postMessage(bs),
setup(t: Turn, r: Relay) {
w.on('message', bs => r.accept(bs));
w.on('error', err => Turn.for(t.actor, t => t.crash(err)));
w.on('exit', code => Turn.for(t.actor, t => {
w.on('error', err => Turn.for(t.activeFacet, t => t.crash(err)));
w.on('exit', code => Turn.for(t.activeFacet, t => {
if (code === 0) {
t.quit();
t.stopActor();
} else {
t.crash(new Error(`Worker crashed with code ${code}`));
}
}));
},
initialOid: 0,
}).then(factory => Turn.for(new Actor(), t => {
t.assert(factory!, Instance(moduleName, arg));
}));
}).then(factory => {
reenable();
new Actor(t => t.assert(factory, Instance(moduleName, arg)));
});
}
function spawnModule(t: Turn, moduleName: string, arg: Assertion) {
@ -39,7 +41,7 @@ function spawnModule(t: Turn, moduleName: string, arg: Assertion) {
// __setNextActorId(1000);
Turn.for(new Actor(), async (t: Turn) => {
new Actor(async (t: Turn) => {
const ds_unproxied = t.ref(new Dataspace());
const ds = ds_unproxied;

View File

@ -1,4 +1,4 @@
import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js';
import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from './actor.js';
import { BytesLike, canonicalString, Decoder, encode, FlexMap, IdentityMap, mapPointers, underlying, Value } from '@preserves/core';
import * as IO from './gen/protocol.js';
import { myRef, toWireRef, WireRef, WireSymbol, yourRef } from './protocol.js';
@ -106,9 +106,8 @@ export class Membrane {
export const INERT_REF: Ref = {
relay: (() => {
const a = new Actor();
a.exitReason = { ok: true };
return a;
const a = new Actor(t => t.stop());
return a.root;
})(),
target: {},
};
@ -123,7 +122,7 @@ export interface RelayOptions {
}
export class Relay {
readonly actor: Actor;
readonly facet: Facet;
readonly w: PacketWriter;
readonly inboundAssertions = new IdentityMap<Handle, {
localHandle: Handle,
@ -141,10 +140,12 @@ export class Relay {
.replacePointerDecoder<WireRef>(d => toWireRef(d.next()));
constructor(t: Turn, options: RelayOptions) {
this.actor = t.actor;
this.facet = t.activeFacet;
this.w = options.packetWriter;
this.debug = options.debug ?? false;
this.trustPeer = options.trustPeer ?? true;
this.facet.preventInertCheck();
options.setup(t, this);
}
@ -229,7 +230,7 @@ export class Relay {
send(remoteOid: IO.Oid, m: IO.Event): void {
if (this.pendingTurn.length === 0) {
queueTask(() => {
if (this.debug) console.log('OUT', this.pendingTurn.asPreservesText());
if (this.debug) console.log('OUT', IO.fromTurn(this.pendingTurn).asPreservesText());
this.w(underlying(encode<WireRef>(IO.fromTurn(this.pendingTurn), {
canonical: true,
encodePointer: (e, n) => {
@ -251,14 +252,14 @@ export class Relay {
}
accept(bs: BytesLike): void {
Turn.for(this.actor, t => {
Turn.for(this.facet, t => {
this.decoder.write(bs);
while (true) {
const rawTurn = this.decoder.try_next();
if (rawTurn === void 0) break;
const wireTurn = IO.toTurn(rawTurn);
if (wireTurn === void 0) throw new Error("Bad IO.Turn");
if (this.debug) console.log('IN', wireTurn.asPreservesText());
if (this.debug) console.log('IN', rawTurn.asPreservesText());
wireTurn.forEach(v => {
const { oid: localOid, event: m } = v;
this.handle(t, this.lookupLocal(localOid), m);

View File

@ -10,24 +10,30 @@ const [ moduleName, hexCap ] = process.argv.slice(2);
const cap = sturdyDecode(Bytes.fromHex(hexCap ?? ''));
const socket = net.createConnection({ port: 5999, host: 'localhost' }, () => {
Turn.for(new Actor(), t => {
new Actor(t => {
let shutdownRef: Ref;
const reenable = t.activeFacet.preventInertCheck();
const connectionClosedRef = t.ref({
retract(t) { t.stopActor(); },
});
spawnRelay(t, {
packetWriter: bs => socket.write(bs),
setup(t: Turn, r: Relay) {
socket.on('error', err => t.freshen(t =>
((err as any).code === 'ECONNRESET') ? t.quit() : t.crash(err)));
socket.on('close', () => t.freshen(t => t.quit()));
socket.on('end', () => t.freshen(t => t.quit()));
((err as any).code === 'ECONNRESET') ? t.stopActor() : t.crash(err)));
socket.on('close', () => t.freshen(t => t.stopActor()));
socket.on('end', () => t.freshen(t => t.stopActor()));
socket.on('data', data => r.accept(data));
t.actor.atExit(() => socket.destroy());
t.activeFacet.actor.atExit(() => socket.destroy());
t.assert(connectionClosedRef, true);
shutdownRef = t.ref({
retract(t) { t.quit(); }
retract(t) { t.stopActor(); }
});
},
initialOid: 0,
// debug: true,
}).then(gatekeeper => import(moduleName).then(m => t.freshen(t => {
reenable();
t.assert(shutdownRef, true);
t.assert(gatekeeper, fromResolve(Resolve({
sturdyref: asSturdyRef(cap),

View File

@ -62,5 +62,5 @@ function runSession(t: Turn, uid: UserId, session: Ref) {
}
}));
rl.on('close', () => t.freshen(t => t.quit()));
rl.on('close', () => t.freshen(t => t.stopActor()));
}

View File

@ -15,7 +15,8 @@ mint('syndicate', secretKey).then(v => {
console.log(sturdyEncode(fromSturdyRef(v)).toHex());
});
Turn.for(new Actor(), t => {
new Actor(t => {
t.activeFacet.preventInertCheck();
const ds = t.ref(new Dataspace());
function spawnConnection(t: Turn, socket: net.Socket) {
@ -24,11 +25,11 @@ Turn.for(new Actor(), t => {
packetWriter: bs => socket.write(bs),
setup(t: Turn, r: Relay) {
socket.on('error', err => t.freshen(t =>
((err as any).code === 'ECONNRESET') ? t.quit() : t.crash(err)));
socket.on('close', () => t.freshen(t => t.quit()));
socket.on('end', () => t.freshen(t => t.quit()));
((err as any).code === 'ECONNRESET') ? t.stopActor() : t.crash(err)));
socket.on('close', () => t.freshen(t => t.stopActor()));
socket.on('end', () => t.freshen(t => t.stopActor()));
socket.on('data', data => r.accept(data));
t.actor.atExit(() => socket.destroy());
t.activeFacet.actor.atExit(() => socket.destroy());
},
initialRef: t.ref({
handleMap: new IdentityMap<Handle, Handle>(),
@ -60,7 +61,8 @@ Turn.for(new Actor(), t => {
const server = net.createServer(socket => t.freshen(t => spawnConnection(t, socket)));
server.on('error', err => t.freshen(t => t.crash(err)));
server.listen(5999, '0.0.0.0', 512);
t.actor.atExit(() => {
t.activeFacet.preventInertCheck();
t.activeFacet.actor.atExit(() => {
try {
server.close();
} catch (e) {

View File

@ -27,6 +27,7 @@ export default function (t: Turn, ds: Ref) {
});
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
t.activeFacet.actor.atExit(_t => rl.close());
rl.on('line', (line: string) => t.freshen(t => {
if (line.toLowerCase().startsWith('/nick ')) {
@ -36,5 +37,5 @@ export default function (t: Turn, ds: Ref) {
}
}));
rl.on('close', () => t.freshen(t => t.quit()));
rl.on('close', () => t.freshen(t => t.stopActor()));
}

View File

@ -18,15 +18,16 @@ type TaskState =
| { readonly state: "shutdown_pending" }
| { readonly state: "running", shutdownRef: Ref };
Turn.for(new Actor(), t => {
new Actor(t => {
const p = parentPort!;
let taskState: TaskState = { state: "start_pending" };
t.activeFacet.preventInertCheck();
spawnRelay(t, {
nextLocalOid: STARTING_ACTOR_ID + 500000000,
packetWriter: bs => p.postMessage(bs),
setup(t: Turn, r: Relay) {
p.on('message', bs => r.accept(bs));
p.on('close', () => Turn.for(t.actor, t => t.quit()));
p.on('close', () => Turn.for(t.activeFacet, t => t.stopActor()));
},
initialRef: t.ref({
async assert(t, inst) {
@ -37,16 +38,16 @@ Turn.for(new Actor(), t => {
const m = await import(Instance._.moduleName(inst));
t.freshen(t => t.spawn(t => {
t.actor.atExit(() => {
t.activeFacet.actor.atExit(() => {
console.log('Worker terminating');
process.exit(0);
});
if (taskState.state === "shutdown_pending") {
t.quit();
t.stopActor();
} else {
taskState = {
state: "running",
shutdownRef: t.ref({ message(t) { t.quit(); } }),
shutdownRef: t.ref({ message(t) { t.stopActor(); } }),
};
m.default(t, Instance._.arg(inst));
}