syndicate-js/packages/core/src/runtime/actor.ts

515 lines
16 KiB
TypeScript
Raw Normal View History

2021-12-01 16:24:29 +00:00
/// SPDX-License-Identifier: GPL-3.0-or-later
2023-01-17 10:43:15 +00:00
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
2021-12-01 15:27:06 +00:00
2022-01-26 13:44:35 +00:00
import { IdentitySet, Value, embeddedId, is, fromJS, stringify } from '@preserves/core';
2021-12-02 13:40:24 +00:00
import { Cell, Field, Graph } from './dataflow.js';
2021-12-01 15:27:06 +00:00
import { Attenuation, runRewrites } from './rewrite.js';
import { queueTask } from './task.js';
export type AnyValue = Value<Ref>;
//---------------------------------------------------------------------------
if ('stackTraceLimit' in Error) {
Error.stackTraceLimit = Infinity;
}
export type Assertion = Value<Ref>;
export type Handle = number;
2021-12-01 16:13:00 +00:00
export type ExitReason = null | { ok: true } | { ok: false, err: unknown };
export type LocalAction = () => void;
2021-12-01 15:27:06 +00:00
export type Assertable = Assertion | { __as_preserve__: <T>() => Value<T> } | { __as_preserve__: () => Assertion };
2021-12-01 15:27:06 +00:00
export interface Entity {
assert(assertion: Assertion, handle: Handle): void;
retract(handle: Handle): void;
message(body: Assertion): void;
sync(peer: Ref): void;
data?: unknown;
2021-12-01 15:27:06 +00:00
}
export type Cap = Ref;
export interface Ref {
readonly relay: Facet;
readonly target: Partial<Entity>;
readonly attenuation?: Attenuation;
}
2021-12-09 17:51:41 +00:00
export class RefImpl implements Ref {
readonly relay: Facet;
readonly target: Partial<Entity>;
readonly attenuation?: Attenuation;
constructor(relay: Facet, target: Partial<Entity>, attenuation?: Attenuation) {
this.relay = relay;
this.target = target;
this.attenuation = attenuation;
}
toString() {
let entityRepr = '' + this.target;
if (entityRepr === '[object Object]') {
entityRepr = '#' + embeddedId(this.target);
}
let sig = '';
if ('assert' in this.target) sig = sig + 'A';
if ('retract' in this.target) sig = sig + 'R';
if ('message' in this.target) sig = sig + 'M';
if ('sync' in this.target) sig = sig + 'S';
return `${this.relay.idChain()}<${sig}>${entityRepr}`;
}
}
2021-12-01 15:27:06 +00:00
//---------------------------------------------------------------------------
export function isRef(v: any): v is Ref {
return 'relay' in v && v.relay instanceof Facet && 'target' in v;
}
export function toRef(_v: any): Ref | undefined {
return isRef(_v) ? _v : void 0;
}
export function assertionFrom(a: Assertable): Assertion {
if (typeof a === 'object' && '__as_preserve__' in a) {
return fromJS(a);
} else {
return a;
}
}
2021-12-01 15:27:06 +00:00
type OutboundAssertion = { handle: Handle, peer: Ref, established: boolean };
type OutboundMap = Map<Handle, OutboundAssertion>;
let nextActorId = 0;
export const __setNextActorId = (v: number) => nextActorId = v;
2021-12-02 13:40:24 +00:00
export type DataflowGraph = Graph<DataflowBlock, Cell>;
export type DataflowBlock = () => void;
2021-12-02 13:40:24 +00:00
2021-12-01 15:27:06 +00:00
export class Actor {
readonly id = nextActorId++;
2021-12-02 15:04:07 +00:00
name: AnyValue = this.id;
2021-12-01 15:27:06 +00:00
readonly root: Facet;
2021-12-02 13:40:24 +00:00
_dataflowGraph: DataflowGraph | null = null;
2021-12-01 15:27:06 +00:00
exitReason: ExitReason = null;
readonly exitHooks: Array<LocalAction> = [];
2021-12-02 15:04:07 +00:00
static boot(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()): Actor {
const newActor = new Actor(initialAssertions);
newActor._boot(bootProc);
return newActor;
}
static __unsafeNew(initialAssertions: OutboundMap = new Map()) {
return new Actor(initialAssertions);
}
private constructor(initialAssertions: OutboundMap = new Map()) {
2021-12-01 15:27:06 +00:00
this.root = new Facet(this, null, initialAssertions);
2021-12-02 15:04:07 +00:00
}
_boot(bootProc: LocalAction) {
2021-12-01 15:27:06 +00:00
Turn.for(new Facet(this, this.root), stopIfInertAfter(bootProc));
}
2021-12-02 13:40:24 +00:00
get dataflowGraph(): DataflowGraph {
if (this._dataflowGraph === null) {
this._dataflowGraph =
new Graph((b: DataflowBlock) => '' + embeddedId(b), Cell.canonicalizer);
}
return this._dataflowGraph;
}
2021-12-01 15:27:06 +00:00
atExit(a: LocalAction): void {
this.exitHooks.push(a);
}
_terminateWith(reason: Exclude<ExitReason, null>) {
2021-12-01 15:27:06 +00:00
if (this.exitReason !== null) return;
this.exitReason = reason;
if (!reason.ok) {
2021-12-02 15:04:07 +00:00
console.error(`${this} crashed:`, reason.err);
2021-12-01 15:27:06 +00:00
}
this.exitHooks.forEach(hook => hook());
this.root._terminate(reason.ok);
2021-12-01 15:27:06 +00:00
}
2021-12-02 13:40:24 +00:00
repairDataflowGraph() {
2021-12-02 13:40:24 +00:00
if (this._dataflowGraph === null) return;
this._dataflowGraph.repairDamage(block => block());
2021-12-02 13:40:24 +00:00
}
2021-12-02 15:04:07 +00:00
toString(): string {
2022-01-26 13:44:35 +00:00
return `Actor(${stringify(this.name)})`;
2021-12-02 15:04:07 +00:00
}
2021-12-01 15:27:06 +00:00
}
export class Facet {
readonly id = nextActorId++;
readonly actor: Actor;
readonly parent: Facet | null;
readonly children = new Set<Facet>();
readonly outbound: OutboundMap;
readonly shutdownActions: Array<LocalAction> = [];
// ^ shutdownActions are not exitHooks - those run even on error. These are for clean shutdown
isLive = true;
inertCheckPreventers = 0;
constructor(actor: Actor, parent: Facet | null, initialAssertions: OutboundMap = new Map()) {
this.actor = actor;
this.parent = parent;
if (parent) parent.children.add(this);
this.outbound = initialAssertions;
}
2021-12-02 23:55:42 +00:00
turn(a: LocalAction) {
Turn.for(this, a);
}
2021-12-01 15:27:06 +00:00
onStop(a: LocalAction): void {
this.shutdownActions.push(a);
}
isInert(): boolean {
const noKids = this.children.size === 0;
const noOutboundHandles = this.outbound.size === 0;
// The only outbound handle the root facet of an actor may have is a link
// assertion, from _halfLink(). This is not to be considered a "real"
// assertion for purposes of keeping the facet alive!
const isRootFacet = this.parent === null;
const noInertCheckPreventers = this.inertCheckPreventers === 0;
return noKids && (noOutboundHandles || isRootFacet) && noInertCheckPreventers;
2021-12-01 15:27:06 +00:00
}
preventInertCheck(): () => void {
let armed = true;
this.inertCheckPreventers++;
return () => {
if (!armed) return;
armed = false;
this.inertCheckPreventers--;
};
}
2021-12-02 15:04:07 +00:00
_halfLink(other: Facet): void {
const h = nextHandle++;
const e = { handle: h, peer: { relay: other, target: new StopOnRetract() }, established: true };
this.outbound.set(h, e);
}
_terminate(orderly: boolean): void {
2021-12-01 15:27:06 +00:00
if (!this.isLive) return;
this.isLive = false;
const parent = this.parent;
if (parent) parent.children.delete(this);
Turn.active._inFacet(this, () => {
this.children.forEach(child => child._terminate(orderly));
if (orderly) {
Turn.active._inFacet(parent ?? this, () => {
this.shutdownActions.forEach(a => a());
});
}
this.outbound.forEach(e => Turn.active._retract(e));
2021-12-01 15:27:06 +00:00
if (orderly) {
if (parent) {
if (parent.isInert()) {
parent._terminate(true);
2021-12-01 15:27:06 +00:00
}
} else {
this.actor._terminateWith({ ok: true });
}
2021-12-01 15:27:06 +00:00
}
});
}
2021-12-02 15:04:07 +00:00
idChain(): string {
let facetIds = [];
for (let f: Facet | null = this; f !== null; f = f.parent) {
facetIds.push(f.id);
}
2022-01-26 13:44:35 +00:00
return facetIds.reverse().join(':') + ':' + stringify(this.actor.name);
2021-12-02 15:04:07 +00:00
}
toString(): string {
2021-12-09 17:51:57 +00:00
return `Facet(${this.idChain()})`;
2021-12-02 15:04:07 +00:00
}
}
export class StopOnRetract implements Partial<Entity> {
retract(_handle: Handle): void {
Turn.active.stop();
2021-12-02 15:04:07 +00:00
}
2021-12-01 15:27:06 +00:00
}
export function _sync_impl(e: Partial<Entity>, peer: Ref): void {
e.sync ? e.sync!(peer) : Turn.active.message(peer, true);
2021-12-01 15:27:06 +00:00
}
let nextHandle = 0;
let nextTurnId = 0;
export class Turn {
static active: Turn = void 0 as unknown as Turn;
static get activeFacet(): Facet {
return Turn.active.activeFacet;
}
static ref<T extends Partial<Entity>>(e: T): Ref {
return Turn.active.ref(e);
}
2021-12-01 15:27:06 +00:00
readonly id = nextTurnId++;
2021-12-02 13:40:24 +00:00
_activeFacet: Facet;
queues: Map<Actor, LocalAction[]> | null;
2021-12-01 15:27:06 +00:00
static for(facet: Facet, f: LocalAction, zombieTurn = false): void {
if (!zombieTurn) {
if (facet.actor.exitReason !== null) return;
if (!facet.isLive) return;
}
const t = new Turn(facet);
try {
const saved = Turn.active;
Turn.active = t;
try {
f();
facet.actor.repairDataflowGraph();
} finally {
Turn.active = saved;
}
2021-12-02 13:40:24 +00:00
t.deliver();
2021-12-01 15:27:06 +00:00
} catch (err) {
Turn.for(facet.actor.root, () => facet.actor._terminateWith({ ok: false, err }));
2021-12-01 15:27:06 +00:00
}
}
2021-12-02 13:40:24 +00:00
private constructor(facet: Facet, queues = new Map<Actor, LocalAction[]>()) {
this._activeFacet = facet;
2021-12-01 15:27:06 +00:00
this.queues = queues;
}
2021-12-02 13:40:24 +00:00
get activeFacet(): Facet {
return this._activeFacet;
}
2021-12-01 15:27:06 +00:00
_inFacet(facet: Facet, f: LocalAction): void {
2021-12-02 13:40:24 +00:00
const saved = this._activeFacet;
this._activeFacet = facet;
f();
2021-12-02 13:40:24 +00:00
this._activeFacet = saved;
2021-12-01 15:27:06 +00:00
}
ref<T extends Partial<Entity>>(e: T): Ref {
2021-12-09 17:51:41 +00:00
return new RefImpl(this.activeFacet, e);
2021-12-01 15:27:06 +00:00
}
facet(bootProc: LocalAction): Facet {
const newFacet = new Facet(this.activeFacet.actor, this.activeFacet);
this._inFacet(newFacet, stopIfInertAfter(bootProc));
return newFacet;
}
2021-12-02 23:55:42 +00:00
// Alias for syndicatec code generator to use
_stop(facet: Facet = this.activeFacet, continuation?: LocalAction) {
this.stop(facet, continuation);
}
2021-12-01 15:27:06 +00:00
stop(facet: Facet = this.activeFacet, continuation?: LocalAction) {
if (continuation) facet.onStop(continuation);
facet._terminate(true);
2021-12-01 15:27:06 +00:00
}
2021-12-02 23:55:42 +00:00
// Alias for syndicatec code generator to use
_spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
return this.spawn(bootProc, initialAssertions);
2021-12-02 23:55:42 +00:00
}
spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
return this.__spawn(bootProc, initialAssertions);
2021-12-02 15:04:07 +00:00
}
2021-12-02 23:55:42 +00:00
__spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
2021-12-02 15:04:07 +00:00
const newOutbound: OutboundMap = new Map();
initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!));
// ^ we trust initialAssertions, so can use `!` safely
const newActor = Actor.__unsafeNew(newOutbound);
2021-12-01 15:27:06 +00:00
this.enqueue(this.activeFacet, () => {
2021-12-02 15:04:07 +00:00
initialAssertions.forEach(key => this.activeFacet.outbound.delete(key));
queueTask(() => newActor._boot(bootProc));
2021-12-01 15:27:06 +00:00
});
2021-12-02 15:04:07 +00:00
return newActor;
}
2021-12-02 23:55:42 +00:00
// Alias for syndicatec code generator to use
_spawnLink(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor | null {
return this.spawnLink(bootProc, initialAssertions);
2021-12-02 23:55:42 +00:00
}
spawnLink(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor | null {
if (!this.activeFacet.isLive) return null;
2021-12-02 23:55:42 +00:00
const newActor = this.__spawn(bootProc, initialAssertions);
2021-12-02 15:04:07 +00:00
this.activeFacet._halfLink(newActor.root);
newActor.root._halfLink(this.activeFacet);
return newActor;
2021-12-01 15:27:06 +00:00
}
stopActor(): void {
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: true }));
2021-12-01 15:27:06 +00:00
}
crash(err: Error): void {
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: false, err }));
2021-12-01 15:27:06 +00:00
}
2022-04-28 20:03:30 +00:00
field<V>(initial: V, name?: string): Field<V> {
2021-12-02 13:40:24 +00:00
return new Field(this.activeFacet.actor.dataflowGraph, initial, name);
}
2021-12-02 23:55:42 +00:00
// Alias for syndicatec code generator to use
_dataflow(a: LocalAction) {
this.dataflow(a);
}
2021-12-02 13:40:24 +00:00
dataflow(a: LocalAction) {
const f = this.activeFacet;
f.preventInertCheck();
const b = () => f.isLive && Turn.active._inFacet(f, a);
f.onStop(() => f.actor.dataflowGraph.forgetSubject(b));
f.actor.dataflowGraph.withSubject(b, b);
2021-12-02 13:40:24 +00:00
}
assertDataflow(assertionFunction: () => {
target: Ref | undefined,
assertion: Assertable | undefined
}) {
2021-12-02 13:40:24 +00:00
let handle: Handle | undefined = void 0;
let target: Ref | undefined = void 0;
let assertion: Assertable | undefined = void 0;
this.dataflow(() => {
let {target: nextTarget, assertion: nextAssertion} = assertionFunction();
2021-12-02 13:40:24 +00:00
if (target !== nextTarget || !is(assertion, nextAssertion)) {
target = nextTarget;
assertion = nextAssertion;
handle = Turn.active.replace(target, handle, assertion);
2021-12-02 13:40:24 +00:00
}
});
}
assert(ref: Ref, assertion: Assertable): Handle {
2021-12-01 15:27:06 +00:00
const h = nextHandle++;
this._assert(ref, assertion, h);
return h;
}
_assert(ref: Ref, assertable: Assertable, h: Handle) {
const assertion = assertionFrom(assertable);
2021-12-01 15:27:06 +00:00
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) {
const e = { handle: h, peer: ref, established: false };
this.activeFacet.outbound.set(h, e);
this.enqueue(ref.relay, () => {
2021-12-01 15:27:06 +00:00
e.established = true;
ref.target.assert?.(a, h);
2021-12-01 15:27:06 +00:00
});
}
}
retract(h: Handle | undefined): void {
if (h !== void 0) {
const e = this.activeFacet.outbound.get(h);
if (e === void 0) return;
this._retract(e);
}
}
replace(ref: Ref | undefined, h: Handle | undefined, assertion: Assertable | undefined): Handle | undefined {
2021-12-02 13:40:24 +00:00
const newHandle = (assertion === void 0 || ref === void 0)
? void 0
: this.assert(ref, assertion);
2021-12-01 15:27:06 +00:00
this.retract(h);
return newHandle;
}
_retract(e: OutboundAssertion): void {
this.activeFacet.outbound.delete(e.handle);
this.enqueue(e.peer.relay, () => {
2021-12-01 15:27:06 +00:00
if (e.established) {
e.established = false;
e.peer.target.retract?.(e.handle);
2021-12-01 15:27:06 +00:00
}
});
}
sync(ref: Ref): Promise<void> {
return new Promise(resolve => this._sync(ref, this.ref({ message() { resolve() } })));
2021-12-01 15:27:06 +00:00
}
_sync(ref: Ref, peer: Ref): void {
this.enqueue(ref.relay, () => _sync_impl(ref.target, peer));
2021-12-01 15:27:06 +00:00
}
message(ref: Ref, assertable: Assertable): void {
const assertion = assertionFrom(assertable);
2021-12-01 15:27:06 +00:00
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) this.enqueue(ref.relay, () => ref.target.message?.(assertion));
2021-12-01 15:27:06 +00:00
}
2022-01-20 19:48:30 +00:00
every(periodMilliseconds: number, a: LocalAction): any {
const facet = this.activeFacet;
facet.preventInertCheck();
let handle: any = setInterval(() => {
facet.turn(a);
}, periodMilliseconds);
facet.onStop(() => {
if (handle !== null) {
clearInterval(handle);
handle = null;
}
});
return handle;
}
2021-12-12 22:02:58 +00:00
after(delayMilliseconds: number, a: LocalAction): any {
const facet = this.activeFacet;
const release = facet.preventInertCheck();
return setTimeout(() => {
release();
facet.turn(a);
}, delayMilliseconds);
}
2021-12-02 15:04:07 +00:00
enqueue(relay: Facet, a0: LocalAction): void {
2021-12-01 15:27:06 +00:00
if (this.queues === null) {
throw new Error("Attempt to reuse a committed Turn");
}
const a: LocalAction = () => Turn.active._inFacet(relay, a0);
2021-12-02 13:40:24 +00:00
this.queues.get(relay.actor)?.push(a) ?? this.queues.set(relay.actor, [a]);
2021-12-01 15:27:06 +00:00
}
2021-12-02 13:40:24 +00:00
deliver() {
this.queues!.forEach((q, actor) =>
queueTask(() => Turn.for(actor.root, () => q.forEach(f => f()))));
2021-12-02 13:40:24 +00:00
this.queues = null;
2021-12-01 15:27:06 +00:00
}
}
function stopIfInertAfter(a: LocalAction): LocalAction {
return () => {
const facet = Turn.activeFacet;
a();
Turn.active.enqueue(facet, () => {
if ((facet.parent && !facet.parent.isLive) || facet.isInert()) {
Turn.active.stop(facet);
2021-12-01 15:27:06 +00:00
}
});
};
}