import { Dictionary, IdentitySet, Record, Tuple, Value, is, IdentityMap } from 'preserves'; import { Attenuation, runRewrites } from './rewrite.js'; import { queueTask } from './task.js'; //--------------------------------------------------------------------------- if ('stackTraceLimit' in Error) { Error.stackTraceLimit = Infinity; } export type Assertion = Value; export type Handle = number; export type ExitReason = null | { ok: true } | { ok: false, err: Error }; export type LocalAction = (t: Turn) => void; export interface Entity { assert(turn: Turn, assertion: Assertion, handle: Handle): void; retract(turn: Turn, handle: Handle): void; message(turn: Turn, body: Assertion): void; sync(turn: Turn, peer: Ref): void; } export interface Ref { readonly relay: Actor; readonly target: Partial; readonly attenuation?: Attenuation; } export type Bindings = { [name: string]: Assertion }; //--------------------------------------------------------------------------- export function isRef(v: any): v is Ref { return 'relay' in v && v.relay instanceof Actor && 'target' in v; } let nextActorId = 0; export const __setNextActorId = (v: number) => nextActorId = v; export class Actor { readonly id = nextActorId++; readonly outbound: Map; exitReason: ExitReason = null; readonly exitHooks: Array = []; constructor(initialAssertions = new Map()) { this.outbound = initialAssertions; } atExit(a: LocalAction): void { this.exitHooks.push(a); } terminateWith(t: Turn, reason: Exclude) { if (this.exitReason !== null) return; this.exitReason = reason; if (!this.exitReason.ok) { console.error(`Actor ${this.id} crashed:`, this.exitReason.err); } this.exitHooks.forEach(hook => hook(t)); queueTask(() => Turn.for( t.actor, t => this.outbound.forEach((peer, h) => t._retract(peer, h)), true)); } } let nextHandle = 0; export function _sync_impl(turn: Turn, e: Partial, peer: Ref): void { e.sync ? e.sync!(turn, peer) : turn.message(peer, true); } let nextTurnId = 0; export class Turn { readonly id = nextTurnId++; readonly actor: Actor; queues: Map | null = new Map(); static for(actor: Actor, f: LocalAction, zombieTurn = false): void { if ((actor.exitReason === null) === zombieTurn) return; const t = new Turn(actor); try { f(t); t.queues!.forEach((q, a) => queueTask(() => q.forEach(f => Turn.for(a, f)))); t.queues = null; } catch (err) { Turn.for(actor, t => actor.terminateWith(t, { ok: false, err })); } } private constructor(actor: Actor) { this.actor = actor; } ref>(e: T): Ref { return { relay: this.actor, target: e }; } spawn(bootProc: LocalAction, initialAssertions = new IdentitySet()): void { this.enqueue(this.actor, () => { const newOutbound = new Map(); initialAssertions.forEach(key => { newOutbound.set(key, this.actor.outbound.get(key)!); // we trust initialAssertions this.actor.outbound.delete(key); }); queueTask(() => Turn.for(new Actor(newOutbound), bootProc)); }); } quit(): void { this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: true })); } crash(err: Error): void { this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: false, err })); } assert(ref: Ref, assertion: Assertion): Handle { const h = nextHandle++; this._assert(ref, assertion, h); return h; } _assert(ref: Ref, assertion: Assertion, h: Handle) { const a = runRewrites(ref.attenuation, assertion); if (a !== null) { this.enqueue(ref.relay, t => { this.actor.outbound.set(h, ref); ref.target.assert?.(t, a, h); }); } } retract(h: Handle | undefined): void { if (h !== void 0) { const peer = this.actor.outbound.get(h); if (peer === void 0) return; this._retract(peer, h); } } replace(ref: Ref, h: Handle | undefined, assertion: Assertion): Handle { const newHandle = this.assert(ref, assertion); this.retract(h); return newHandle; } _retract(ref: Ref, handle: Handle): void { this.enqueue(ref.relay, t => { this.actor.outbound.delete(handle); ref.target.retract?.(t, handle); }); } sync(ref: Ref): Promise { return new Promise(resolve => this._sync(ref, this.ref({ message: resolve }))); } _sync(ref: Ref, peer: Ref): void { this.enqueue(ref.relay, t => _sync_impl(t, ref.target, peer)); } message(ref: Ref, assertion: Assertion): void { const a = runRewrites(ref.attenuation, assertion); if (a !== null) this.enqueue(ref.relay, t => ref.target.message?.(t, assertion)); } enqueue(relay: Actor, a: LocalAction): void { if (this.queues === null) { throw new Error("Attempt to reuse a committed Turn"); } this.queues.get(relay)?.push(a) ?? this.queues.set(relay, [a]); } freshen(a: LocalAction): void { if (this.queues !== null) { throw new Error("Attempt to freshen a non-stale Turn"); } Turn.for(this.actor, a); } }