novy-syndicate/actor.ts

150 lines
4.3 KiB
TypeScript
Raw Normal View History

2021-02-22 09:12:10 +00:00
import { IdentitySet, Value } from 'preserves';
2021-02-17 19:57:15 +00:00
export type Assertion = Value<Ref>;
2021-02-22 18:37:47 +00:00
export type Handle = number;
2021-02-22 09:12:10 +00:00
export type ExitReason = null | { ok: true } | { ok: false, err: Error };
2021-02-23 14:41:54 +00:00
export type LocalAction = (t: Turn) => void;
2021-02-17 19:57:15 +00:00
2021-02-22 09:12:10 +00:00
export const assert = Symbol('assert');
export const retract = Symbol('retract');
export const message = Symbol('message');
2021-02-23 14:38:57 +00:00
export const sync = Symbol('sync');
2021-02-17 19:57:15 +00:00
2021-02-22 09:12:10 +00:00
export interface Entity {
[assert]?(turn: Turn, assertion: Assertion, handle: Handle): void;
[retract]?(turn: Turn, handle: Handle): void;
2021-02-23 14:38:57 +00:00
[message]?(turn: Turn, body: Assertion): void;
[sync]?(turn: Turn, peer: Entity): void;
2021-02-22 09:12:10 +00:00
}
2021-02-17 19:57:15 +00:00
2021-02-23 14:38:57 +00:00
export class Ref implements Entity {
readonly relay: Actor;
readonly target: Entity;
2021-02-22 18:51:19 +00:00
2021-02-23 14:38:57 +00:00
constructor(relay: Actor, target: Entity) {
this.relay = relay;
2021-02-17 19:57:15 +00:00
this.target = target;
}
2021-02-22 18:51:19 +00:00
2021-02-23 14:38:57 +00:00
[assert](turn: Turn, assertion: Assertion, handle: Handle): void {
this.target[assert]?.(turn, assertion, handle);
}
[retract](turn: Turn, handle: Handle): void {
this.target[retract]?.(turn, handle);
}
[message](turn: Turn, body: Assertion): void {
this.target[message]?.(turn, body);
}
[sync](turn: Turn, peer: Ref) {
this.target[sync] ? this.target[sync]!(turn, peer) : turn.message(peer, true);
2021-02-22 18:51:19 +00:00
}
2021-02-17 19:57:15 +00:00
}
2021-02-23 14:41:54 +00:00
type OutboundMap = Map<Handle, Ref>;
2021-02-17 19:57:15 +00:00
2021-02-22 18:51:19 +00:00
export class Actor {
2021-02-22 09:12:10 +00:00
readonly outbound: OutboundMap;
2021-02-17 19:57:15 +00:00
exitReason: ExitReason = null;
2021-02-22 09:12:10 +00:00
constructor(initialAssertions: OutboundMap = new Map()) {
this.outbound = initialAssertions;
2021-02-17 19:57:15 +00:00
}
2021-02-22 18:37:47 +00:00
terminateWith(t: Turn, reason: Exclude<ExitReason, null>) {
2021-02-23 10:09:41 +00:00
if (this.exitReason !== null) return;
2021-02-22 21:30:36 +00:00
this.exitReason = reason;
this.outbound.forEach((peer, h) => t._retract(peer, h));
2021-02-17 19:57:15 +00:00
}
2021-02-22 09:12:10 +00:00
execute(proc: () => void): void {
queueMicrotask(() => {
2021-02-23 10:09:41 +00:00
if (this.exitReason !== null) return;
2021-02-22 21:30:36 +00:00
try {
proc();
} catch (err) {
console.error(Actor, err);
Turn.for(this, t => this.terminateWith(t, { ok: false, err }));
2021-02-22 09:12:10 +00:00
}
});
}
2021-02-17 19:57:15 +00:00
}
2021-02-22 09:12:10 +00:00
let nextHandle = 0;
2021-02-17 19:57:15 +00:00
2021-02-22 09:12:10 +00:00
export class Turn {
2021-02-23 09:56:12 +00:00
readonly actor: Actor;
2021-02-22 09:12:10 +00:00
readonly queues: Map<Actor, LocalAction[]> = new Map();
2021-02-23 10:17:38 +00:00
static for(actor: Actor, f: LocalAction): void {
2021-02-22 09:12:10 +00:00
const t = new Turn(actor);
f(t);
2021-02-23 13:08:29 +00:00
t.queues.forEach((q, a) => a.execute(() => q.forEach(f => Turn.for(a, f))));
2021-02-22 09:12:10 +00:00
}
2021-02-23 09:56:12 +00:00
private constructor(actor: Actor) {
2021-02-22 09:12:10 +00:00
this.actor = actor;
2021-02-17 19:57:15 +00:00
}
2021-02-23 14:38:57 +00:00
ref(e: Entity): Ref {
return (e instanceof Ref) ? e : new Ref(this.actor, e);
2021-02-22 18:37:47 +00:00
}
2021-02-23 10:17:38 +00:00
spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): void {
2021-02-23 14:40:43 +00:00
this.enqueue(this.actor, () => {
2021-02-23 10:13:41 +00:00
const newOutbound: OutboundMap = new Map();
initialAssertions.forEach(key => {
newOutbound.set(key, this.actor.outbound.get(key)!); // we trust initialAssertions
this.actor.outbound.delete(key);
});
const child = new Actor(newOutbound);
2021-02-22 09:12:10 +00:00
child.execute(() => Turn.for(child, bootProc));
});
}
2021-02-22 18:37:47 +00:00
quit(): void {
2021-02-23 14:40:43 +00:00
this.enqueue(this.actor, t => this.actor.terminateWith(t, { ok: true }));
2021-02-22 18:37:47 +00:00
}
2021-02-23 14:38:57 +00:00
assert(ref: Ref, assertion: Assertion): Handle {
2021-02-22 09:12:10 +00:00
const h = nextHandle++;
2021-02-23 14:38:57 +00:00
this.enqueue(ref.relay, t => {
this.actor.outbound.set(h, ref);
ref[assert]?.(t, assertion, h);
2021-02-22 09:12:10 +00:00
});
return h;
}
retract(h: Handle): void {
2021-02-23 09:56:12 +00:00
this._retract(this.actor.outbound.get(h)!, h);
2021-02-22 09:12:10 +00:00
}
2021-02-23 14:38:57 +00:00
replace(ref: Ref, h: Handle | undefined, assertion: Assertion): Handle {
const newHandle = this.assert(ref, assertion);
2021-02-22 18:37:47 +00:00
if (h !== void 0) this.retract(h);
return newHandle;
}
2021-02-23 14:38:57 +00:00
_retract(ref: Ref, handle: Handle): void {
this.enqueue(ref.relay, t => {
2021-02-23 09:56:12 +00:00
this.actor.outbound.delete(handle);
2021-02-23 14:38:57 +00:00
ref[retract]?.(t, handle);
2021-02-22 09:12:10 +00:00
});
2021-02-17 19:57:15 +00:00
}
2021-02-23 14:38:57 +00:00
sync(ref: Ref): Promise<Turn> {
return new Promise(resolve =>
this.enqueue(ref.relay, t => ref[sync]?.(t, this.ref({ [message]: resolve }))));
2021-02-22 09:12:10 +00:00
}
2021-02-23 14:38:57 +00:00
message(ref: Ref, assertion: Assertion): void {
this.enqueue(ref.relay, t => ref[message]?.(t, assertion));
2021-02-22 09:12:10 +00:00
}
2021-02-23 14:40:43 +00:00
enqueue(relay: Actor, a: LocalAction): void {
this.queues.get(relay)?.push(a) ?? this.queues.set(relay, [a]);
2021-02-22 09:12:10 +00:00
}
}