/// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-FileCopyrightText: Copyright © 2016-2021 Tony Garnock-Jones import { IdentitySet, Value, embeddedId, is } from '@preserves/core'; import { Cell, Field, Graph } from './dataflow.js'; import { Attenuation, runRewrites } from './rewrite.js'; import { queueTask } from './task.js'; export type AnyValue = Value; //--------------------------------------------------------------------------- if ('stackTraceLimit' in Error) { Error.stackTraceLimit = Infinity; } export type Assertion = Value; export type Handle = number; export type ExitReason = null | { ok: true } | { ok: false, err: unknown }; export type LocalAction = (t: Turn) => void; export interface Entity { assert(turn: Turn, assertion: Assertion, handle: Handle): void; retract(turn: Turn, handle: Handle): void; message(turn: Turn, body: Assertion): void; sync(turn: Turn, peer: Ref): void; } export type Cap = Ref; export interface Ref { readonly relay: Facet; readonly target: Partial; readonly attenuation?: Attenuation; } //--------------------------------------------------------------------------- export function isRef(v: any): v is Ref { return 'relay' in v && v.relay instanceof Facet && 'target' in v; } export function toRef(_v: any): Ref | undefined { return isRef(_v) ? _v : void 0; } type OutboundAssertion = { handle: Handle, peer: Ref, established: boolean }; type OutboundMap = Map; let nextActorId = 0; export const __setNextActorId = (v: number) => nextActorId = v; export type DataflowGraph = Graph; export type DataflowBlock = (t: Turn) => void; export class Actor { readonly id = nextActorId++; readonly root: Facet; _dataflowGraph: DataflowGraph | null = null; exitReason: ExitReason = null; readonly exitHooks: Array = []; constructor(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()) { this.root = new Facet(this, null, initialAssertions); Turn.for(new Facet(this, this.root), stopIfInertAfter(bootProc)); } get dataflowGraph(): DataflowGraph { if (this._dataflowGraph === null) { this._dataflowGraph = new Graph((b: DataflowBlock) => '' + embeddedId(b), Cell.canonicalizer); } return this._dataflowGraph; } atExit(a: LocalAction): void { this.exitHooks.push(a); } terminateWith(t: Turn, reason: Exclude) { if (this.exitReason !== null) return; this.exitReason = reason; if (!reason.ok) { console.error(`Actor ${this.id} crashed:`, reason.err); } this.exitHooks.forEach(hook => hook(t)); queueTask(() => Turn.for(this.root, t => this.root._terminate(t, reason.ok), true)); } repairDataflowGraph(t: Turn) { if (this._dataflowGraph === null) return; this._dataflowGraph.repairDamage(block => block(t)); } } export class Facet { readonly id = nextActorId++; readonly actor: Actor; readonly parent: Facet | null; readonly children = new Set(); readonly outbound: OutboundMap; readonly shutdownActions: Array = []; // ^ shutdownActions are not exitHooks - those run even on error. These are for clean shutdown isLive = true; inertCheckPreventers = 0; constructor(actor: Actor, parent: Facet | null, initialAssertions: OutboundMap = new Map()) { this.actor = actor; this.parent = parent; if (parent) parent.children.add(this); this.outbound = initialAssertions; } onStop(a: LocalAction): void { this.shutdownActions.push(a); } isInert(): boolean { return this.children.size === 0 && this.outbound.size === 0 && this.inertCheckPreventers === 0; } preventInertCheck(): () => void { let armed = true; this.inertCheckPreventers++; return () => { if (!armed) return; armed = false; this.inertCheckPreventers--; }; } _terminate(t: Turn, orderly: boolean): void { if (!this.isLive) return; this.isLive = false; const parent = this.parent; if (parent) parent.children.delete(this); t._inFacet(this, t => { this.children.forEach(child => child._terminate(t, orderly)); if (orderly) this.shutdownActions.forEach(a => a(t)); this.outbound.forEach(e => t._retract(e)); if (orderly) { queueTask(() => { if (parent) { if (parent.isInert()) { Turn.for(parent, t => parent._terminate(t, true)); } } else { Turn.for(this.actor.root, t => this.actor.terminateWith(t, { ok: true }), true); } }); } }); } } export function _sync_impl(turn: Turn, e: Partial, peer: Ref): void { e.sync ? e.sync!(turn, peer) : turn.message(peer, true); } let nextHandle = 0; let nextTurnId = 0; export class Turn { readonly id = nextTurnId++; _activeFacet: Facet; queues: Map | null; static for(facet: Facet, f: LocalAction, zombieTurn = false): void { if (!zombieTurn) { if (facet.actor.exitReason !== null) return; if (!facet.isLive) return; } const t = new Turn(facet); try { f(t); facet.actor.repairDataflowGraph(t); t.deliver(); } catch (err) { Turn.for(facet.actor.root, t => facet.actor.terminateWith(t, { ok: false, err })); } } private constructor(facet: Facet, queues = new Map()) { this._activeFacet = facet; this.queues = queues; } get activeFacet(): Facet { return this._activeFacet; } _inFacet(facet: Facet, f: LocalAction): void { const saved = this._activeFacet; this._activeFacet = facet; f(this); this._activeFacet = saved; } ref>(e: T): Ref { return { relay: this.activeFacet, target: e }; } facet(bootProc: LocalAction): Facet { const newFacet = new Facet(this.activeFacet.actor, this.activeFacet); this._inFacet(newFacet, stopIfInertAfter(bootProc)); return newFacet; } stop(facet: Facet = this.activeFacet, continuation?: LocalAction) { this.enqueue(facet.parent!, t => { facet._terminate(t, true); if (continuation) continuation(t); }); } spawn(bootProc: LocalAction, initialAssertions = new IdentitySet()): void { this.enqueue(this.activeFacet, () => { const newOutbound: OutboundMap = new Map(); initialAssertions.forEach(key => { newOutbound.set(key, this.activeFacet.outbound.get(key)!); // we trust initialAssertions this.activeFacet.outbound.delete(key); }); queueTask(() => new Actor(bootProc, newOutbound)); }); } stopActor(): void { this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: true })); } crash(err: Error): void { this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: false, err })); } field, T = Ref>(initial: V, name?: string): Field { return new Field(this.activeFacet.actor.dataflowGraph, initial, name); } dataflow(a: LocalAction) { const f = this.activeFacet; const b = (t: Turn) => f.isLive && t._inFacet(f, a); f.onStop(_t => f.actor.dataflowGraph.forgetSubject(b)); f.actor.dataflowGraph.withSubject(b, () => b(this)); } assertDataflow(assertionFunction: (t: Turn) => {target: Ref, assertion: Assertion}) { let handle: Handle | undefined = void 0; let target: Ref | undefined = void 0; let assertion: Assertion | undefined = void 0; this.dataflow(t => { let {target: nextTarget, assertion: nextAssertion} = assertionFunction(t); if (target !== nextTarget || !is(assertion, nextAssertion)) { target = nextTarget; assertion = nextAssertion; handle = t.replace(target, handle, assertion); } }); } assert(ref: Ref, assertion: Assertion): Handle { const h = nextHandle++; this._assert(ref, assertion, h); return h; } _assert(ref: Ref, assertion: Assertion, h: Handle) { const a = runRewrites(ref.attenuation, assertion); if (a !== null) { const e = { handle: h, peer: ref, established: false }; this.activeFacet.outbound.set(h, e); this.enqueue(ref.relay, t => { e.established = true; ref.target.assert?.(t, a, h); }); } } retract(h: Handle | undefined): void { if (h !== void 0) { const e = this.activeFacet.outbound.get(h); if (e === void 0) return; this._retract(e); } } replace(ref: Ref | undefined, h: Handle | undefined, assertion: Assertion | undefined): Handle | undefined { const newHandle = (assertion === void 0 || ref === void 0) ? void 0 : this.assert(ref, assertion); this.retract(h); return newHandle; } _retract(e: OutboundAssertion): void { this.activeFacet.outbound.delete(e.handle); this.enqueue(e.peer.relay, t => { if (e.established) { e.established = false; e.peer.target.retract?.(t, e.handle); } }); } sync(ref: Ref): Promise { return new Promise(resolve => this._sync(ref, this.ref({ message: resolve }))); } _sync(ref: Ref, peer: Ref): void { this.enqueue(ref.relay, t => _sync_impl(t, ref.target, peer)); } message(ref: Ref, assertion: Assertion): void { const a = runRewrites(ref.attenuation, assertion); if (a !== null) this.enqueue(ref.relay, t => ref.target.message?.(t, assertion)); } enqueue(relay: Facet, a: LocalAction): void { if (this.queues === null) { throw new Error("Attempt to reuse a committed Turn"); } a = t => t._inFacet(relay, a); this.queues.get(relay.actor)?.push(a) ?? this.queues.set(relay.actor, [a]); } deliver() { this.queues!.forEach((q, actor) => queueTask(() => Turn.for(actor.root, t => q.forEach(f => f(t))))); this.queues = null; } } function stopIfInertAfter(a: LocalAction): LocalAction { return t => { a(t); t.enqueue(t.activeFacet, t => { if ((t.activeFacet.parent && !t.activeFacet.parent.isLive) || t.activeFacet.isInert()) { t.stop(); } }); }; }