novy-syndicate/src/actor.ts

186 lines
5.6 KiB
TypeScript
Raw Normal View History

2021-03-23 11:18:57 +00:00
import { DecodeError, IdentitySet, Value } from '@preserves/core';
2021-03-04 18:54:12 +00:00
import { Attenuation, runRewrites } from './rewrite.js';
import { queueTask } from './task.js';
2021-02-24 20:48:55 +00:00
//---------------------------------------------------------------------------
2021-02-17 19:57:15 +00:00
2021-03-02 08:50:23 +00:00
if ('stackTraceLimit' in Error) {
Error.stackTraceLimit = Infinity;
}
export type Assertion = Value<Ref>;
2021-02-22 18:37:47 +00:00
export type Handle = number;
2021-02-22 09:12:10 +00:00
export type ExitReason = null | { ok: true } | { ok: false, err: Error };
2021-02-23 14:41:54 +00:00
export type LocalAction = (t: Turn) => void;
2021-02-17 19:57:15 +00:00
2021-02-22 09:12:10 +00:00
export interface Entity {
2021-02-23 15:16:15 +00:00
assert(turn: Turn, assertion: Assertion, handle: Handle): void;
retract(turn: Turn, handle: Handle): void;
message(turn: Turn, body: Assertion): void;
sync(turn: Turn, peer: Ref): void;
2021-02-17 19:57:15 +00:00
}
2021-02-24 20:48:55 +00:00
export interface Ref {
readonly relay: Actor;
readonly target: Partial<Entity>;
readonly attenuation?: Attenuation;
}
//---------------------------------------------------------------------------
export function isRef(v: any): v is Ref {
return 'relay' in v && v.relay instanceof Actor && 'target' in v;
}
2021-03-23 11:18:57 +00:00
export function toRef(_v: any): Ref | undefined {
return isRef(_v) ? _v : void 0;
2021-03-12 19:49:18 +00:00
}
2021-03-02 08:50:23 +00:00
let nextActorId = 0;
2021-03-02 15:42:53 +00:00
export const __setNextActorId = (v: number) => nextActorId = v;
2021-03-02 08:50:23 +00:00
2021-02-22 18:51:19 +00:00
export class Actor {
2021-03-02 08:50:23 +00:00
readonly id = nextActorId++;
2021-02-23 14:53:42 +00:00
readonly outbound: Map<Handle, Ref>;
2021-02-17 19:57:15 +00:00
exitReason: ExitReason = null;
2021-03-03 15:21:47 +00:00
readonly exitHooks: Array<LocalAction> = [];
2021-02-17 19:57:15 +00:00
2021-02-23 14:53:42 +00:00
constructor(initialAssertions = new Map<Handle, Ref>()) {
2021-02-22 09:12:10 +00:00
this.outbound = initialAssertions;
2021-02-17 19:57:15 +00:00
}
2021-03-03 15:21:47 +00:00
atExit(a: LocalAction): void {
this.exitHooks.push(a);
}
2021-03-02 08:50:23 +00:00
2021-02-22 18:37:47 +00:00
terminateWith(t: Turn, reason: Exclude<ExitReason, null>) {
2021-02-23 10:09:41 +00:00
if (this.exitReason !== null) return;
2021-02-22 21:30:36 +00:00
this.exitReason = reason;
2021-03-02 08:50:23 +00:00
if (!this.exitReason.ok) {
console.error(`Actor ${this.id} crashed:`, this.exitReason.err);
}
2021-03-03 15:21:47 +00:00
this.exitHooks.forEach(hook => hook(t));
queueTask(() => Turn.for(
t.actor,
t => this.outbound.forEach((peer, h) => t._retract(peer, h)),
true));
2021-02-22 09:12:10 +00:00
}
2021-02-17 19:57:15 +00:00
}
2021-02-22 09:12:10 +00:00
let nextHandle = 0;
2021-02-17 19:57:15 +00:00
2021-03-02 08:50:23 +00:00
export function _sync_impl(turn: Turn, e: Partial<Entity>, peer: Ref): void {
2021-02-23 15:16:15 +00:00
e.sync ? e.sync!(turn, peer) : turn.message(peer, true);
}
2021-03-02 08:50:23 +00:00
let nextTurnId = 0;
2021-02-22 09:12:10 +00:00
export class Turn {
2021-03-02 08:50:23 +00:00
readonly id = nextTurnId++;
2021-02-23 09:56:12 +00:00
readonly actor: Actor;
2021-03-02 08:50:23 +00:00
queues: Map<Actor, LocalAction[]> | null = new Map();
2021-02-22 09:12:10 +00:00
static for(actor: Actor, f: LocalAction, zombieTurn = false): void {
if ((actor.exitReason === null) === zombieTurn) return;
2021-02-22 09:12:10 +00:00
const t = new Turn(actor);
try {
f(t);
t.queues!.forEach((q, a) => queueTask(() => q.forEach(f => Turn.for(a, f))));
t.queues = null;
} catch (err) {
Turn.for(actor, t => actor.terminateWith(t, { ok: false, err }));
}
2021-02-22 09:12:10 +00:00
}
2021-02-23 09:56:12 +00:00
private constructor(actor: Actor) {
2021-02-22 09:12:10 +00:00
this.actor = actor;
2021-02-17 19:57:15 +00:00
}
2021-03-02 08:50:23 +00:00
ref<T extends Partial<Entity>>(e: T): Ref {
2021-02-23 14:53:42 +00:00
return { relay: this.actor, target: e };
2021-02-22 18:37:47 +00:00
}
2021-02-23 10:17:38 +00:00
spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): void {
2021-02-23 14:40:43 +00:00
this.enqueue(this.actor, () => {
2021-02-23 14:53:42 +00:00
const newOutbound = new Map<Handle, Ref>();
2021-02-23 10:13:41 +00:00
initialAssertions.forEach(key => {
newOutbound.set(key, this.actor.outbound.get(key)!); // we trust initialAssertions
this.actor.outbound.delete(key);
});
queueTask(() => Turn.for(new Actor(newOutbound), bootProc));
2021-02-22 09:12:10 +00:00
});
}
2021-02-22 18:37:47 +00:00
quit(): void {
2021-02-23 14:40:43 +00:00
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: true }));
2021-02-22 18:37:47 +00:00
}
2021-03-02 08:50:23 +00:00
crash(err: Error): void {
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: false, err }));
}
2021-02-23 14:38:57 +00:00
assert(ref: Ref, assertion: Assertion): Handle {
2021-03-04 18:54:12 +00:00
const h = nextHandle++;
2021-03-03 10:45:01 +00:00
this._assert(ref, assertion, h);
return h;
}
_assert(ref: Ref, assertion: Assertion, h: Handle) {
2021-02-24 21:21:14 +00:00
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) {
this.enqueue(ref.relay, t => {
2021-02-24 20:48:55 +00:00
this.actor.outbound.set(h, ref);
ref.target.assert?.(t, a, h);
2021-02-24 21:21:14 +00:00
});
}
2021-02-22 09:12:10 +00:00
}
retract(h: Handle | undefined): void {
if (h !== void 0) {
const peer = this.actor.outbound.get(h);
if (peer === void 0) return;
this._retract(peer, h);
}
2021-02-22 09:12:10 +00:00
}
2021-02-23 14:38:57 +00:00
replace(ref: Ref, h: Handle | undefined, assertion: Assertion): Handle {
const newHandle = this.assert(ref, assertion);
this.retract(h);
2021-02-22 18:37:47 +00:00
return newHandle;
}
2021-02-23 14:38:57 +00:00
_retract(ref: Ref, handle: Handle): void {
this.enqueue(ref.relay, t => {
2021-02-23 09:56:12 +00:00
this.actor.outbound.delete(handle);
2021-02-23 14:53:42 +00:00
ref.target.retract?.(t, handle);
2021-02-22 09:12:10 +00:00
});
2021-02-17 19:57:15 +00:00
}
2021-02-23 14:38:57 +00:00
sync(ref: Ref): Promise<Turn> {
2021-03-02 08:50:23 +00:00
return new Promise(resolve => this._sync(ref, this.ref({ message: resolve })));
}
_sync(ref: Ref, peer: Ref): void {
this.enqueue(ref.relay, t => _sync_impl(t, ref.target, peer));
2021-02-22 09:12:10 +00:00
}
2021-02-23 14:38:57 +00:00
message(ref: Ref, assertion: Assertion): void {
2021-02-24 21:19:37 +00:00
const a = runRewrites(ref.attenuation, assertion);
2021-02-24 20:48:55 +00:00
if (a !== null) this.enqueue(ref.relay, t => ref.target.message?.(t, assertion));
2021-02-22 09:12:10 +00:00
}
2021-02-23 14:40:43 +00:00
enqueue(relay: Actor, a: LocalAction): void {
2021-03-02 08:50:23 +00:00
if (this.queues === null) {
throw new Error("Attempt to reuse a committed Turn");
}
2021-02-23 14:40:43 +00:00
this.queues.get(relay)?.push(a) ?? this.queues.set(relay, [a]);
2021-02-22 09:12:10 +00:00
}
2021-03-02 08:50:23 +00:00
freshen(a: LocalAction): void {
if (this.queues !== null) {
throw new Error("Attempt to freshen a non-stale Turn");
}
Turn.for(this.actor, a);
}
2021-02-22 09:12:10 +00:00
}