syndicate-js/packages/core/src/runtime/actor.ts

515 lines
16 KiB
TypeScript

/// SPDX-License-Identifier: GPL-3.0-or-later
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
import { IdentitySet, Value, embeddedId, is, fromJS, stringify } from '@preserves/core';
import { Cell, Field, Graph } from './dataflow.js';
import { Caveat, runRewrites } from './rewrite.js';
import { queueTask } from './task.js';
export type AnyValue = Value<Ref>;
//---------------------------------------------------------------------------
if ('stackTraceLimit' in Error) {
Error.stackTraceLimit = Infinity;
}
export type Assertion = Value<Ref>;
export type Handle = number;
export type ExitReason = null | { ok: true } | { ok: false, err: unknown };
export type LocalAction = () => void;
export type Assertable = Assertion | { __as_preserve__: <T>() => Value<T> } | { __as_preserve__: () => Assertion };
export interface Entity {
assert(assertion: Assertion, handle: Handle): void;
retract(handle: Handle): void;
message(body: Assertion): void;
sync(peer: Ref): void;
data?: unknown;
}
export type Cap = Ref;
export interface Ref {
readonly relay: Facet;
readonly target: Partial<Entity>;
readonly attenuation?: Caveat[];
}
export class RefImpl implements Ref {
readonly relay: Facet;
readonly target: Partial<Entity>;
readonly attenuation?: Caveat[];
constructor(relay: Facet, target: Partial<Entity>, attenuation?: Caveat[]) {
this.relay = relay;
this.target = target;
this.attenuation = attenuation;
}
toString() {
let entityRepr = '' + this.target;
if (entityRepr === '[object Object]') {
entityRepr = '#' + embeddedId(this.target);
}
let sig = '';
if ('assert' in this.target) sig = sig + 'A';
if ('retract' in this.target) sig = sig + 'R';
if ('message' in this.target) sig = sig + 'M';
if ('sync' in this.target) sig = sig + 'S';
return `${this.relay.idChain()}<${sig}>${entityRepr}`;
}
}
//---------------------------------------------------------------------------
export function isRef(v: any): v is Ref {
return 'relay' in v && v.relay instanceof Facet && 'target' in v;
}
export function toRef(_v: any): Ref | undefined {
return isRef(_v) ? _v : void 0;
}
export function assertionFrom(a: Assertable): Assertion {
if (typeof a === 'object' && '__as_preserve__' in a) {
return fromJS(a);
} else {
return a;
}
}
type OutboundAssertion = { handle: Handle, peer: Ref, established: boolean };
type OutboundMap = Map<Handle, OutboundAssertion>;
let nextActorId = 0;
export const __setNextActorId = (v: number) => nextActorId = v;
export type DataflowGraph = Graph<DataflowBlock, Cell>;
export type DataflowBlock = () => void;
export class Actor {
readonly id = nextActorId++;
name: AnyValue = this.id;
readonly root: Facet;
_dataflowGraph: DataflowGraph | null = null;
exitReason: ExitReason = null;
readonly exitHooks: Array<LocalAction> = [];
static boot(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()): Actor {
const newActor = new Actor(initialAssertions);
newActor._boot(bootProc);
return newActor;
}
static __unsafeNew(initialAssertions: OutboundMap = new Map()) {
return new Actor(initialAssertions);
}
private constructor(initialAssertions: OutboundMap = new Map()) {
this.root = new Facet(this, null, initialAssertions);
}
_boot(bootProc: LocalAction) {
Turn.for(new Facet(this, this.root), stopIfInertAfter(bootProc));
}
get dataflowGraph(): DataflowGraph {
if (this._dataflowGraph === null) {
this._dataflowGraph =
new Graph((b: DataflowBlock) => '' + embeddedId(b), Cell.canonicalizer);
}
return this._dataflowGraph;
}
atExit(a: LocalAction): void {
this.exitHooks.push(a);
}
_terminateWith(reason: Exclude<ExitReason, null>) {
if (this.exitReason !== null) return;
this.exitReason = reason;
if (!reason.ok) {
console.error(`${this} crashed:`, reason.err);
}
this.exitHooks.forEach(hook => hook());
this.root._terminate(reason.ok);
}
repairDataflowGraph() {
if (this._dataflowGraph === null) return;
this._dataflowGraph.repairDamage(block => block());
}
toString(): string {
return `Actor(${stringify(this.name)})`;
}
}
export class Facet {
readonly id = nextActorId++;
readonly actor: Actor;
readonly parent: Facet | null;
readonly children = new Set<Facet>();
readonly outbound: OutboundMap;
readonly shutdownActions: Array<LocalAction> = [];
// ^ shutdownActions are not exitHooks - those run even on error. These are for clean shutdown
isLive = true;
inertCheckPreventers = 0;
constructor(actor: Actor, parent: Facet | null, initialAssertions: OutboundMap = new Map()) {
this.actor = actor;
this.parent = parent;
if (parent) parent.children.add(this);
this.outbound = initialAssertions;
}
turn(a: LocalAction) {
Turn.for(this, a);
}
onStop(a: LocalAction): void {
this.shutdownActions.push(a);
}
isInert(): boolean {
const noKids = this.children.size === 0;
const noOutboundHandles = this.outbound.size === 0;
// The only outbound handle the root facet of an actor may have is a link
// assertion, from _halfLink(). This is not to be considered a "real"
// assertion for purposes of keeping the facet alive!
const isRootFacet = this.parent === null;
const noInertCheckPreventers = this.inertCheckPreventers === 0;
return noKids && (noOutboundHandles || isRootFacet) && noInertCheckPreventers;
}
preventInertCheck(): () => void {
let armed = true;
this.inertCheckPreventers++;
return () => {
if (!armed) return;
armed = false;
this.inertCheckPreventers--;
};
}
_halfLink(other: Facet): void {
const h = nextHandle++;
const e = { handle: h, peer: { relay: other, target: new StopOnRetract() }, established: true };
this.outbound.set(h, e);
}
_terminate(orderly: boolean): void {
if (!this.isLive) return;
this.isLive = false;
const parent = this.parent;
if (parent) parent.children.delete(this);
Turn.active._inFacet(this, () => {
this.children.forEach(child => child._terminate(orderly));
if (orderly) {
Turn.active._inFacet(parent ?? this, () => {
this.shutdownActions.forEach(a => a());
});
}
this.outbound.forEach(e => Turn.active._retract(e));
if (orderly) {
if (parent) {
if (parent.isInert()) {
parent._terminate(true);
}
} else {
this.actor._terminateWith({ ok: true });
}
}
});
}
idChain(): string {
let facetIds = [];
for (let f: Facet | null = this; f !== null; f = f.parent) {
facetIds.push(f.id);
}
return facetIds.reverse().join(':') + ':' + stringify(this.actor.name);
}
toString(): string {
return `Facet(${this.idChain()})`;
}
}
export class StopOnRetract implements Partial<Entity> {
retract(_handle: Handle): void {
Turn.active.stop();
}
}
export function _sync_impl(e: Partial<Entity>, peer: Ref): void {
e.sync ? e.sync!(peer) : Turn.active.message(peer, true);
}
let nextHandle = 0;
let nextTurnId = 0;
export class Turn {
static active: Turn = void 0 as unknown as Turn;
static get activeFacet(): Facet {
return Turn.active.activeFacet;
}
static ref<T extends Partial<Entity>>(e: T): Ref {
return Turn.active.ref(e);
}
readonly id = nextTurnId++;
_activeFacet: Facet;
queues: Map<Actor, LocalAction[]> | null;
static for(facet: Facet, f: LocalAction, zombieTurn = false): void {
if (!zombieTurn) {
if (facet.actor.exitReason !== null) return;
if (!facet.isLive) return;
}
const t = new Turn(facet);
try {
const saved = Turn.active;
Turn.active = t;
try {
f();
facet.actor.repairDataflowGraph();
} finally {
Turn.active = saved;
}
t.deliver();
} catch (err) {
Turn.for(facet.actor.root, () => facet.actor._terminateWith({ ok: false, err }));
}
}
private constructor(facet: Facet, queues = new Map<Actor, LocalAction[]>()) {
this._activeFacet = facet;
this.queues = queues;
}
get activeFacet(): Facet {
return this._activeFacet;
}
_inFacet(facet: Facet, f: LocalAction): void {
const saved = this._activeFacet;
this._activeFacet = facet;
f();
this._activeFacet = saved;
}
ref<T extends Partial<Entity>>(e: T): Ref {
return new RefImpl(this.activeFacet, e);
}
facet(bootProc: LocalAction): Facet {
const newFacet = new Facet(this.activeFacet.actor, this.activeFacet);
this._inFacet(newFacet, stopIfInertAfter(bootProc));
return newFacet;
}
// Alias for syndicatec code generator to use
_stop(facet: Facet = this.activeFacet, continuation?: LocalAction) {
this.stop(facet, continuation);
}
stop(facet: Facet = this.activeFacet, continuation?: LocalAction) {
if (continuation) facet.onStop(continuation);
facet._terminate(true);
}
// Alias for syndicatec code generator to use
_spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
return this.spawn(bootProc, initialAssertions);
}
spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
return this.__spawn(bootProc, initialAssertions);
}
__spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
const newOutbound: OutboundMap = new Map();
initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!));
// ^ we trust initialAssertions, so can use `!` safely
const newActor = Actor.__unsafeNew(newOutbound);
this.enqueue(this.activeFacet, () => {
initialAssertions.forEach(key => this.activeFacet.outbound.delete(key));
queueTask(() => newActor._boot(bootProc));
});
return newActor;
}
// Alias for syndicatec code generator to use
_spawnLink(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor | null {
return this.spawnLink(bootProc, initialAssertions);
}
spawnLink(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor | null {
if (!this.activeFacet.isLive) return null;
const newActor = this.__spawn(bootProc, initialAssertions);
this.activeFacet._halfLink(newActor.root);
newActor.root._halfLink(this.activeFacet);
return newActor;
}
stopActor(): void {
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: true }));
}
crash(err: Error): void {
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: false, err }));
}
field<V>(initial: V, name?: string): Field<V> {
return new Field(this.activeFacet.actor.dataflowGraph, initial, name);
}
// Alias for syndicatec code generator to use
_dataflow(a: LocalAction) {
this.dataflow(a);
}
dataflow(a: LocalAction) {
const f = this.activeFacet;
f.preventInertCheck();
const b = () => f.isLive && Turn.active._inFacet(f, a);
f.onStop(() => f.actor.dataflowGraph.forgetSubject(b));
f.actor.dataflowGraph.withSubject(b, b);
}
assertDataflow(assertionFunction: () => {
target: Ref | undefined,
assertion: Assertable | undefined
}) {
let handle: Handle | undefined = void 0;
let target: Ref | undefined = void 0;
let assertion: Assertable | undefined = void 0;
this.dataflow(() => {
let {target: nextTarget, assertion: nextAssertion} = assertionFunction();
if (target !== nextTarget || !is(assertion, nextAssertion)) {
target = nextTarget;
assertion = nextAssertion;
handle = Turn.active.replace(target, handle, assertion);
}
});
}
assert(ref: Ref, assertion: Assertable): Handle {
const h = nextHandle++;
this._assert(ref, assertion, h);
return h;
}
_assert(ref: Ref, assertable: Assertable, h: Handle) {
const assertion = assertionFrom(assertable);
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) {
const e = { handle: h, peer: ref, established: false };
this.activeFacet.outbound.set(h, e);
this.enqueue(ref.relay, () => {
e.established = true;
ref.target.assert?.(a, h);
});
}
}
retract(h: Handle | undefined): void {
if (h !== void 0) {
const e = this.activeFacet.outbound.get(h);
if (e === void 0) return;
this._retract(e);
}
}
replace(ref: Ref | undefined, h: Handle | undefined, assertion: Assertable | undefined): Handle | undefined {
const newHandle = (assertion === void 0 || ref === void 0)
? void 0
: this.assert(ref, assertion);
this.retract(h);
return newHandle;
}
_retract(e: OutboundAssertion): void {
this.activeFacet.outbound.delete(e.handle);
this.enqueue(e.peer.relay, () => {
if (e.established) {
e.established = false;
e.peer.target.retract?.(e.handle);
}
});
}
sync(ref: Ref): Promise<void> {
return new Promise(resolve => this._sync(ref, this.ref({ message() { resolve() } })));
}
_sync(ref: Ref, peer: Ref): void {
this.enqueue(ref.relay, () => _sync_impl(ref.target, peer));
}
message(ref: Ref, assertable: Assertable): void {
const assertion = assertionFrom(assertable);
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) this.enqueue(ref.relay, () => ref.target.message?.(assertion));
}
every(periodMilliseconds: number, a: LocalAction): any {
const facet = this.activeFacet;
facet.preventInertCheck();
let handle: any = setInterval(() => {
facet.turn(a);
}, periodMilliseconds);
facet.onStop(() => {
if (handle !== null) {
clearInterval(handle);
handle = null;
}
});
return handle;
}
after(delayMilliseconds: number, a: LocalAction): any {
const facet = this.activeFacet;
const release = facet.preventInertCheck();
return setTimeout(() => {
release();
facet.turn(a);
}, delayMilliseconds);
}
enqueue(relay: Facet, a0: LocalAction): void {
if (this.queues === null) {
throw new Error("Attempt to reuse a committed Turn");
}
const a: LocalAction = () => Turn.active._inFacet(relay, a0);
this.queues.get(relay.actor)?.push(a) ?? this.queues.set(relay.actor, [a]);
}
deliver() {
this.queues!.forEach((q, actor) =>
queueTask(() => Turn.for(actor.root, () => q.forEach(f => f()))));
this.queues = null;
}
}
function stopIfInertAfter(a: LocalAction): LocalAction {
return () => {
const facet = Turn.activeFacet;
a();
Turn.active.enqueue(facet, () => {
if ((facet.parent && !facet.parent.isLive) || facet.isInert()) {
Turn.active.stop(facet);
}
});
};
}