From ff1feff82b34380f45c32cd5b2d9d159ad3a8b17 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 16 Apr 2021 20:29:16 +0200 Subject: [PATCH] Facets, first draft --- src/actor.ts | 184 +++++++++++++++++++++++++++++--------- src/box.ts | 4 +- src/client.ts | 4 +- src/dataspace.ts | 8 +- src/main.ts | 16 ++-- src/relay.ts | 19 ++-- src/sandbox.ts | 18 ++-- src/secure-chat-client.ts | 2 +- src/server.ts | 14 +-- src/simple-chat.ts | 3 +- src/wload.ts | 11 +-- 11 files changed, 199 insertions(+), 84 deletions(-) diff --git a/src/actor.ts b/src/actor.ts index a010bfe..beb4bfc 100644 --- a/src/actor.ts +++ b/src/actor.ts @@ -21,7 +21,7 @@ export interface Entity { } export interface Ref { - readonly relay: Actor; + readonly relay: Facet; readonly target: Partial; readonly attenuation?: Attenuation; } @@ -29,24 +29,28 @@ export interface Ref { //--------------------------------------------------------------------------- export function isRef(v: any): v is Ref { - return 'relay' in v && v.relay instanceof Actor && 'target' in v; + return 'relay' in v && v.relay instanceof Facet && 'target' in v; } export function toRef(_v: any): Ref | undefined { return isRef(_v) ? _v : void 0; } +type OutboundAssertion = { handle: Handle, peer: Ref, established: boolean }; +type OutboundMap = Map; + let nextActorId = 0; export const __setNextActorId = (v: number) => nextActorId = v; export class Actor { readonly id = nextActorId++; - readonly outbound: Map; + readonly root: Facet; exitReason: ExitReason = null; readonly exitHooks: Array = []; - constructor(initialAssertions = new Map()) { - this.outbound = initialAssertions; + constructor(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()) { + this.root = new Facet(this, null, initialAssertions); + Turn.for(new Facet(this, this.root), stopIfInertAfter(bootProc)); } atExit(a: LocalAction): void { @@ -60,63 +64,145 @@ export class Actor { console.error(`Actor ${this.id} crashed:`, this.exitReason.err); } this.exitHooks.forEach(hook => hook(t)); - queueTask(() => Turn.for( - t.actor, - t => this.outbound.forEach((peer, h) => t._retract(peer, h)), - true)); + queueTask(() => Turn.for(this.root, t => this.root._terminate(t, false), true)); } } -let nextHandle = 0; +export class Facet { + readonly id = nextActorId++; + readonly actor: Actor; + readonly parent: Facet | null; + readonly children = new Set(); + readonly outbound: OutboundMap; + readonly shutdownActions: Array = []; + // ^ shutdownActions are not exitHooks - those run even on error. These are for clean shutdown + isLive = true; + inertCheckPreventers = 0; + + constructor(actor: Actor, parent: Facet | null, initialAssertions: OutboundMap = new Map()) { + this.actor = actor; + this.parent = parent; + if (parent) parent.children.add(this); + this.outbound = initialAssertions; + } + + onStop(a: LocalAction): void { + this.shutdownActions.push(a); + } + + isInert(): boolean { + return this.children.size === 0 && this.outbound.size === 0 && this.inertCheckPreventers === 0; + } + + preventInertCheck(): () => void { + let armed = true; + this.inertCheckPreventers++; + return () => { + if (!armed) return; + armed = false; + this.inertCheckPreventers--; + }; + } + + _terminate(t: Turn, orderly: boolean): void { + if (!this.isLive) return; + this.isLive = false; + + const parent = this.parent; + if (parent) parent.children.delete(this); + + t._inFacet(this, t => { + this.children.forEach(child => child._terminate(t, orderly)); + if (orderly) this.shutdownActions.forEach(a => a(t)); + this.outbound.forEach(e => t._retract(e)); + + if (orderly) { + queueTask(() => { + if (parent) { + if (parent.isInert()) { + Turn.for(parent, t => parent._terminate(t, true)); + } + } else { + Turn.for(this.actor.root, t => this.actor.terminateWith(t, { ok: true }), true); + } + }); + } + }); + } +} export function _sync_impl(turn: Turn, e: Partial, peer: Ref): void { e.sync ? e.sync!(turn, peer) : turn.message(peer, true); } +let nextHandle = 0; let nextTurnId = 0; export class Turn { readonly id = nextTurnId++; - readonly actor: Actor; - queues: Map | null = new Map(); + readonly activeFacet: Facet; + queues: Map | null; - static for(actor: Actor, f: LocalAction, zombieTurn = false): void { - if ((actor.exitReason === null) === zombieTurn) return; - const t = new Turn(actor); + static for(facet: Facet, f: LocalAction, zombieTurn = false): void { + if (!zombieTurn) { + if (facet.actor.exitReason !== null) return; + if (!facet.isLive) return; + } + const t = new Turn(facet); try { f(t); - t.queues!.forEach((q, a) => queueTask(() => q.forEach(f => Turn.for(a, f)))); + t.queues!.forEach((q, facet) => queueTask(() => q.forEach(f => Turn.for(facet, f)))); t.queues = null; } catch (err) { - Turn.for(actor, t => actor.terminateWith(t, { ok: false, err })); + Turn.for(facet.actor.root, t => facet.actor.terminateWith(t, { ok: false, err })); } } - private constructor(actor: Actor) { - this.actor = actor; + private constructor(facet: Facet, queues = new Map()) { + this.activeFacet = facet; + this.queues = queues; + } + + _inFacet(facet: Facet, f: LocalAction): void { + const t = new Turn(facet, this.queues!); + f(t); + t.queues = null; } ref>(e: T): Ref { - return { relay: this.actor, target: e }; + return { relay: this.activeFacet, target: e }; } - spawn(bootProc: LocalAction, initialAssertions = new IdentitySet()): void { - this.enqueue(this.actor, () => { - const newOutbound = new Map(); - initialAssertions.forEach(key => { - newOutbound.set(key, this.actor.outbound.get(key)!); // we trust initialAssertions - this.actor.outbound.delete(key); - }); - queueTask(() => Turn.for(new Actor(newOutbound), bootProc)); + facet(bootProc: LocalAction): Facet { + const newFacet = new Facet(this.activeFacet.actor, this.activeFacet); + this._inFacet(newFacet, stopIfInertAfter(bootProc)); + return newFacet; + } + + stop(facet: Facet = this.activeFacet, continuation?: LocalAction) { + this.enqueue(facet.parent!, t => { + facet._terminate(t, true); + if (continuation) continuation(t); }); } - quit(): void { - this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: true })); + spawn(bootProc: LocalAction, initialAssertions = new IdentitySet()): void { + this.enqueue(this.activeFacet, () => { + const newOutbound: OutboundMap = new Map(); + initialAssertions.forEach(key => { + newOutbound.set(key, this.activeFacet.outbound.get(key)!); // we trust initialAssertions + this.activeFacet.outbound.delete(key); + }); + queueTask(() => new Actor(bootProc, newOutbound)); + }); + } + + stopActor(): void { + this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: true })); } crash(err: Error): void { - this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: false, err })); + this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: false, err })); } assert(ref: Ref, assertion: Assertion): Handle { @@ -128,8 +214,10 @@ export class Turn { _assert(ref: Ref, assertion: Assertion, h: Handle) { const a = runRewrites(ref.attenuation, assertion); if (a !== null) { + const e = { handle: h, peer: ref, established: false }; + this.activeFacet.outbound.set(h, e); this.enqueue(ref.relay, t => { - this.actor.outbound.set(h, ref); + e.established = true; ref.target.assert?.(t, a, h); }); } @@ -137,9 +225,9 @@ export class Turn { retract(h: Handle | undefined): void { if (h !== void 0) { - const peer = this.actor.outbound.get(h); - if (peer === void 0) return; - this._retract(peer, h); + const e = this.activeFacet.outbound.get(h); + if (e === void 0) return; + this._retract(e); } } @@ -149,10 +237,13 @@ export class Turn { return newHandle; } - _retract(ref: Ref, handle: Handle): void { - this.enqueue(ref.relay, t => { - this.actor.outbound.delete(handle); - ref.target.retract?.(t, handle); + _retract(e: OutboundAssertion): void { + this.activeFacet.outbound.delete(e.handle); + this.enqueue(e.peer.relay, t => { + if (e.established) { + e.established = false; + e.peer.target.retract?.(t, e.handle); + } }); } @@ -169,7 +260,7 @@ export class Turn { if (a !== null) this.enqueue(ref.relay, t => ref.target.message?.(t, assertion)); } - enqueue(relay: Actor, a: LocalAction): void { + enqueue(relay: Facet, a: LocalAction): void { if (this.queues === null) { throw new Error("Attempt to reuse a committed Turn"); } @@ -180,6 +271,17 @@ export class Turn { if (this.queues !== null) { throw new Error("Attempt to freshen a non-stale Turn"); } - Turn.for(this.actor, a); + Turn.for(this.activeFacet, a); } } + +function stopIfInertAfter(a: LocalAction): LocalAction { + return t => { + a(t); + t.enqueue(t.activeFacet, t => { + if ((t.activeFacet.parent && !t.activeFacet.parent.isLive) || t.activeFacet.isInert()) { + t.stop(); + } + }); + }; +} diff --git a/src/box.ts b/src/box.ts index 21f2ad4..8c761a5 100644 --- a/src/box.ts +++ b/src/box.ts @@ -26,9 +26,9 @@ export default function (t: Turn, arg: Assertion) { const count = newValue - prevValue; prevValue = newValue; startTime = endTime; - console.log(`Box ${t.actor.id}: got ${newValue} (${count / delta} Hz)`); + console.log(`Box ${t.activeFacet.id}: got ${newValue} (${count / delta} Hz)`); } - if (newValue === LIMIT) t.quit(); + if (newValue === LIMIT) t.stopActor(); setValue(t, newValue); } }) diff --git a/src/client.ts b/src/client.ts index 51113fc..6559879 100644 --- a/src/client.ts +++ b/src/client.ts @@ -23,8 +23,8 @@ export default function (t: Turn, ds: Ref) { }, retract(t: Turn) { if (--count === 0) { - console.log(`Client ${t.actor.id}: detected box termination`); - t.quit(); + console.log(`Client ${t.activeFacet.id}: detected box termination`); + t.stopActor(); } // console.log('dec to', count); }, diff --git a/src/dataspace.ts b/src/dataspace.ts index edf9238..b514eda 100644 --- a/src/dataspace.ts +++ b/src/dataspace.ts @@ -1,5 +1,5 @@ import { Assertion, Entity, Handle, LocalAction, Ref, Turn } from 'actor'; -import { Dictionary, IdentityMap, is, Record, Tuple } from '@preserves/core'; +import { Dictionary, IdentityMap, is, preserves, Record, Tuple } from '@preserves/core'; import { Bag, ChangeDescription } from './bag'; import { fromObserve, toObserve, Observe } from './gen/dataspace'; @@ -41,7 +41,7 @@ export class Dataspace implements Partial { readonly subscriptions = new Dictionary>>(); assert(turn: Turn, rec: Assertion, handle: Handle): void { - // console.log(preserves`ds ${turn.actor.id} assert ${rec} ${handle}`); + // console.log(preserves`ds ${turn.activeFacet.id} assert ${rec} ${handle}`); if (!Record.isRecord, Ref>(rec)) return; this.handleMap.set(handle, rec); if (this.assertions.change(rec, +1) !== ChangeDescription.ABSENT_TO_PRESENT) return; @@ -62,7 +62,7 @@ export class Dataspace implements Partial { retract(turn: Turn, upstreamHandle: Handle): void { const rec = this.handleMap.get(upstreamHandle); - // console.log(preserves`ds ${turn.actor.id} retract ${rec} ${upstreamHandle}`); + // console.log(preserves`ds ${turn.activeFacet.id} retract ${rec} ${upstreamHandle}`); if (rec === void 0) return; this.handleMap.delete(upstreamHandle); if (this.assertions.change(rec, -1) !== ChangeDescription.PRESENT_TO_ABSENT) return; @@ -81,7 +81,7 @@ export class Dataspace implements Partial { } message(turn: Turn, rec: Assertion): void { - // console.log(preserves`ds ${turn.actor.id} message ${rec}`); + // console.log(preserves`ds ${turn.activeFacet.id} message ${rec}`); if (!Record.isRecord, Ref>(rec)) return; this.subscriptions.get(rec.label)?.forEach((_seen, peer) => turn.message(peer, rec)); } diff --git a/src/main.ts b/src/main.ts index c79524f..71b3e56 100644 --- a/src/main.ts +++ b/src/main.ts @@ -14,23 +14,25 @@ const Instance = Record.makeConstructor<{moduleName: string, arg: Assertion}>()( function spawnWorker(t: Turn, moduleName: string, arg: Assertion) { const w = new Worker(path.join(__dirname, 'wload.js')); + const reenable = t.activeFacet.preventInertCheck(); spawnRelay(t, { packetWriter: bs => w.postMessage(bs), setup(t: Turn, r: Relay) { w.on('message', bs => r.accept(bs)); - w.on('error', err => Turn.for(t.actor, t => t.crash(err))); - w.on('exit', code => Turn.for(t.actor, t => { + w.on('error', err => Turn.for(t.activeFacet, t => t.crash(err))); + w.on('exit', code => Turn.for(t.activeFacet, t => { if (code === 0) { - t.quit(); + t.stopActor(); } else { t.crash(new Error(`Worker crashed with code ${code}`)); } })); }, initialOid: 0, - }).then(factory => Turn.for(new Actor(), t => { - t.assert(factory!, Instance(moduleName, arg)); - })); + }).then(factory => { + reenable(); + new Actor(t => t.assert(factory, Instance(moduleName, arg))); + }); } function spawnModule(t: Turn, moduleName: string, arg: Assertion) { @@ -39,7 +41,7 @@ function spawnModule(t: Turn, moduleName: string, arg: Assertion) { // __setNextActorId(1000); -Turn.for(new Actor(), async (t: Turn) => { +new Actor(async (t: Turn) => { const ds_unproxied = t.ref(new Dataspace()); const ds = ds_unproxied; diff --git a/src/relay.ts b/src/relay.ts index 009e3e0..c0088c8 100644 --- a/src/relay.ts +++ b/src/relay.ts @@ -1,4 +1,4 @@ -import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js'; +import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from './actor.js'; import { BytesLike, canonicalString, Decoder, encode, FlexMap, IdentityMap, mapPointers, underlying, Value } from '@preserves/core'; import * as IO from './gen/protocol.js'; import { myRef, toWireRef, WireRef, WireSymbol, yourRef } from './protocol.js'; @@ -106,9 +106,8 @@ export class Membrane { export const INERT_REF: Ref = { relay: (() => { - const a = new Actor(); - a.exitReason = { ok: true }; - return a; + const a = new Actor(t => t.stop()); + return a.root; })(), target: {}, }; @@ -123,7 +122,7 @@ export interface RelayOptions { } export class Relay { - readonly actor: Actor; + readonly facet: Facet; readonly w: PacketWriter; readonly inboundAssertions = new IdentityMap(d => toWireRef(d.next())); constructor(t: Turn, options: RelayOptions) { - this.actor = t.actor; + this.facet = t.activeFacet; this.w = options.packetWriter; this.debug = options.debug ?? false; this.trustPeer = options.trustPeer ?? true; + + this.facet.preventInertCheck(); options.setup(t, this); } @@ -229,7 +230,7 @@ export class Relay { send(remoteOid: IO.Oid, m: IO.Event): void { if (this.pendingTurn.length === 0) { queueTask(() => { - if (this.debug) console.log('OUT', this.pendingTurn.asPreservesText()); + if (this.debug) console.log('OUT', IO.fromTurn(this.pendingTurn).asPreservesText()); this.w(underlying(encode(IO.fromTurn(this.pendingTurn), { canonical: true, encodePointer: (e, n) => { @@ -251,14 +252,14 @@ export class Relay { } accept(bs: BytesLike): void { - Turn.for(this.actor, t => { + Turn.for(this.facet, t => { this.decoder.write(bs); while (true) { const rawTurn = this.decoder.try_next(); if (rawTurn === void 0) break; const wireTurn = IO.toTurn(rawTurn); if (wireTurn === void 0) throw new Error("Bad IO.Turn"); - if (this.debug) console.log('IN', wireTurn.asPreservesText()); + if (this.debug) console.log('IN', rawTurn.asPreservesText()); wireTurn.forEach(v => { const { oid: localOid, event: m } = v; this.handle(t, this.lookupLocal(localOid), m); diff --git a/src/sandbox.ts b/src/sandbox.ts index 3816e5c..7668576 100644 --- a/src/sandbox.ts +++ b/src/sandbox.ts @@ -10,24 +10,30 @@ const [ moduleName, hexCap ] = process.argv.slice(2); const cap = sturdyDecode(Bytes.fromHex(hexCap ?? '')); const socket = net.createConnection({ port: 5999, host: 'localhost' }, () => { - Turn.for(new Actor(), t => { + new Actor(t => { let shutdownRef: Ref; + const reenable = t.activeFacet.preventInertCheck(); + const connectionClosedRef = t.ref({ + retract(t) { t.stopActor(); }, + }); spawnRelay(t, { packetWriter: bs => socket.write(bs), setup(t: Turn, r: Relay) { socket.on('error', err => t.freshen(t => - ((err as any).code === 'ECONNRESET') ? t.quit() : t.crash(err))); - socket.on('close', () => t.freshen(t => t.quit())); - socket.on('end', () => t.freshen(t => t.quit())); + ((err as any).code === 'ECONNRESET') ? t.stopActor() : t.crash(err))); + socket.on('close', () => t.freshen(t => t.stopActor())); + socket.on('end', () => t.freshen(t => t.stopActor())); socket.on('data', data => r.accept(data)); - t.actor.atExit(() => socket.destroy()); + t.activeFacet.actor.atExit(() => socket.destroy()); + t.assert(connectionClosedRef, true); shutdownRef = t.ref({ - retract(t) { t.quit(); } + retract(t) { t.stopActor(); } }); }, initialOid: 0, // debug: true, }).then(gatekeeper => import(moduleName).then(m => t.freshen(t => { + reenable(); t.assert(shutdownRef, true); t.assert(gatekeeper, fromResolve(Resolve({ sturdyref: asSturdyRef(cap), diff --git a/src/secure-chat-client.ts b/src/secure-chat-client.ts index ba57e03..53775c5 100644 --- a/src/secure-chat-client.ts +++ b/src/secure-chat-client.ts @@ -62,5 +62,5 @@ function runSession(t: Turn, uid: UserId, session: Ref) { } })); - rl.on('close', () => t.freshen(t => t.quit())); + rl.on('close', () => t.freshen(t => t.stopActor())); } diff --git a/src/server.ts b/src/server.ts index f418f9c..45e9ae0 100644 --- a/src/server.ts +++ b/src/server.ts @@ -15,7 +15,8 @@ mint('syndicate', secretKey).then(v => { console.log(sturdyEncode(fromSturdyRef(v)).toHex()); }); -Turn.for(new Actor(), t => { +new Actor(t => { + t.activeFacet.preventInertCheck(); const ds = t.ref(new Dataspace()); function spawnConnection(t: Turn, socket: net.Socket) { @@ -24,11 +25,11 @@ Turn.for(new Actor(), t => { packetWriter: bs => socket.write(bs), setup(t: Turn, r: Relay) { socket.on('error', err => t.freshen(t => - ((err as any).code === 'ECONNRESET') ? t.quit() : t.crash(err))); - socket.on('close', () => t.freshen(t => t.quit())); - socket.on('end', () => t.freshen(t => t.quit())); + ((err as any).code === 'ECONNRESET') ? t.stopActor() : t.crash(err))); + socket.on('close', () => t.freshen(t => t.stopActor())); + socket.on('end', () => t.freshen(t => t.stopActor())); socket.on('data', data => r.accept(data)); - t.actor.atExit(() => socket.destroy()); + t.activeFacet.actor.atExit(() => socket.destroy()); }, initialRef: t.ref({ handleMap: new IdentityMap(), @@ -60,7 +61,8 @@ Turn.for(new Actor(), t => { const server = net.createServer(socket => t.freshen(t => spawnConnection(t, socket))); server.on('error', err => t.freshen(t => t.crash(err))); server.listen(5999, '0.0.0.0', 512); - t.actor.atExit(() => { + t.activeFacet.preventInertCheck(); + t.activeFacet.actor.atExit(() => { try { server.close(); } catch (e) { diff --git a/src/simple-chat.ts b/src/simple-chat.ts index 418e5dc..75bb0b9 100644 --- a/src/simple-chat.ts +++ b/src/simple-chat.ts @@ -27,6 +27,7 @@ export default function (t: Turn, ds: Ref) { }); const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); + t.activeFacet.actor.atExit(_t => rl.close()); rl.on('line', (line: string) => t.freshen(t => { if (line.toLowerCase().startsWith('/nick ')) { @@ -36,5 +37,5 @@ export default function (t: Turn, ds: Ref) { } })); - rl.on('close', () => t.freshen(t => t.quit())); + rl.on('close', () => t.freshen(t => t.stopActor())); } diff --git a/src/wload.ts b/src/wload.ts index 7aebaed..49be4d5 100644 --- a/src/wload.ts +++ b/src/wload.ts @@ -18,15 +18,16 @@ type TaskState = | { readonly state: "shutdown_pending" } | { readonly state: "running", shutdownRef: Ref }; -Turn.for(new Actor(), t => { +new Actor(t => { const p = parentPort!; let taskState: TaskState = { state: "start_pending" }; + t.activeFacet.preventInertCheck(); spawnRelay(t, { nextLocalOid: STARTING_ACTOR_ID + 500000000, packetWriter: bs => p.postMessage(bs), setup(t: Turn, r: Relay) { p.on('message', bs => r.accept(bs)); - p.on('close', () => Turn.for(t.actor, t => t.quit())); + p.on('close', () => Turn.for(t.activeFacet, t => t.stopActor())); }, initialRef: t.ref({ async assert(t, inst) { @@ -37,16 +38,16 @@ Turn.for(new Actor(), t => { const m = await import(Instance._.moduleName(inst)); t.freshen(t => t.spawn(t => { - t.actor.atExit(() => { + t.activeFacet.actor.atExit(() => { console.log('Worker terminating'); process.exit(0); }); if (taskState.state === "shutdown_pending") { - t.quit(); + t.stopActor(); } else { taskState = { state: "running", - shutdownRef: t.ref({ message(t) { t.quit(); } }), + shutdownRef: t.ref({ message(t) { t.stopActor(); } }), }; m.default(t, Instance._.arg(inst)); }