syndicate-js/packages/core/src/runtime/space.ts

128 lines
3.8 KiB
TypeScript

/// SPDX-License-Identifier: GPL-3.0-or-later
/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
import { IdentityMap, IdentitySet, forEachEmbedded } from '@preserves/core';
import type { Actor, Assertion, ExitReason, Handle, Ref } from './actor.js';
import type { StructuredTask, TaskDescription } from './task.js';
const LIMIT = 25000;
export enum ActorSpaceState {
RUNNING,
PAUSED,
TERMINATED,
}
export type InboundAssertion = { target: Ref, pins: Ref[] };
export class ActorSpace {
actors = new IdentitySet<Actor>();
state = ActorSpaceState.RUNNING;
inboundAssertions = new IdentityMap<Handle, InboundAssertion>();
taskCounter = 0;
delayedTasks: Array<StructuredTask<TaskDescription>> = [];
taskFlushHandle: ReturnType<typeof setTimeout> | null = null;
register(actor: Actor): boolean {
if (this.state === ActorSpaceState.TERMINATED) return false;
this.actors.add(actor);
return true;
}
deregister(actor: Actor) {
this.actors.delete(actor);
if (this.actors.size === 0) {
this.shutdown({ ok: true });
}
}
extractPins(assertion: Assertion): Ref[] {
const pins: Ref[] = [];
forEachEmbedded(assertion, (r: Ref) => {
if (r.relay.actor.space === this) {
pins.push(r);
}
});
return pins;
}
registerInbound(handle: Handle, target: Ref, assertion: Assertion) {
this.inboundAssertions.set(handle, { target, pins: this.extractPins(assertion) });
}
deregisterInbound(handle: Handle) {
this.inboundAssertions.delete(handle);
}
shutdown(reason: Exclude<ExitReason, null>) {
if (this.state === ActorSpaceState.TERMINATED) return;
this.state = ActorSpaceState.TERMINATED;
Array.from(this.actors.values()).forEach(a => a._terminateWith(reason));
}
queueTask(t: StructuredTask<TaskDescription>) {
switch (this.state) {
case ActorSpaceState.TERMINATED:
break;
case ActorSpaceState.PAUSED:
this.delayedTasks.push(t);
break;
case ActorSpaceState.RUNNING:
this.taskCounter++;
if (this.taskCounter === LIMIT) {
this.taskFlushHandle = setTimeout(() => this._scheduleDelayedTasks(), 0);
}
if (this.taskCounter >= LIMIT) {
this.delayedTasks.push(t);
} else {
queueMicrotask(() => t.perform());
}
break;
}
}
_scheduleDelayedTasks() {
this.taskCounter = 0;
this.delayedTasks.forEach(t => queueMicrotask(() => t.perform()));
this.delayedTasks = [];
}
pause(cb: () => void): boolean {
switch (this.state) {
case ActorSpaceState.TERMINATED:
return false;
case ActorSpaceState.PAUSED:
queueMicrotask(cb);
return true;
case ActorSpaceState.RUNNING:
this.state = ActorSpaceState.PAUSED;
if (this.taskFlushHandle !== null) {
clearTimeout(this.taskFlushHandle);
this.taskFlushHandle = null;
}
queueMicrotask(cb);
return true;
}
}
unpause(): boolean {
switch (this.state) {
case ActorSpaceState.TERMINATED:
return false;
case ActorSpaceState.PAUSED:
this.state = ActorSpaceState.RUNNING;
this._scheduleDelayedTasks();
return true;
case ActorSpaceState.RUNNING:
return true;
}
}
}