syndicate-js/packages/core/src/runtime/space.ts

186 lines
5.7 KiB
TypeScript

/// SPDX-License-Identifier: GPL-3.0-or-later
/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
import { IdentityMap, IdentitySet, embeddedId, forEachEmbedded } from '@preserves/core';
import type { Actor, Assertion, ExitReason, Handle, Ref } from './actor.js';
import { Mirror } from './mirror.js';
import * as Refl from '../gen/mirror';
import type { StructuredTask, TaskDescription } from './task.js';
const LIMIT = 25000;
export enum ActorSpaceState {
RUNNING,
PAUSED,
TERMINATED,
}
export type InboundAssertion = { target: Ref, pins: Ref[] };
export class ActorSpace {
actors = new IdentitySet<Actor>();
state = ActorSpaceState.RUNNING;
inboundAssertions = new IdentityMap<Handle, InboundAssertion>();
taskCounter = 0;
delayedTasks: Array<StructuredTask<TaskDescription>> = [];
taskFlushHandle: ReturnType<typeof setTimeout> | null = null;
_reflection?: {
mirror: Mirror;
stateHandle?: Handle;
taskCountHandle?: Handle;
actorHandles: { [key: string]: Handle };
};
register(actor: Actor): boolean {
if (this.state === ActorSpaceState.TERMINATED) return false;
this.actors.add(actor);
this._reflection?.mirror.setProp(
this._reflection.actorHandles,
'' + embeddedId(actor),
Refl.SpaceActor(actor.asRef()));
return true;
}
deregister(actor: Actor) {
this.actors.delete(actor);
this._reflection?.mirror.setProp(
this._reflection.actorHandles,
'' + embeddedId(actor),
void 0);
if (this.actors.size === 0) {
this.shutdown({ ok: true });
}
}
extractPins(assertion: Assertion): Ref[] {
const pins: Ref[] = [];
forEachEmbedded(assertion, (r: Ref) => {
if (r.relay.actor.space === this) {
pins.push(r);
}
});
return pins;
}
registerInbound(handle: Handle, target: Ref, assertion: Assertion) {
this.inboundAssertions.set(handle, { target, pins: this.extractPins(assertion) });
}
deregisterInbound(handle: Handle) {
this.inboundAssertions.delete(handle);
}
isRunning() {
return this.state === ActorSpaceState.RUNNING;
}
shutdown(reason: Exclude<ExitReason, null>) {
if (this.state === ActorSpaceState.TERMINATED) return;
this.state = ActorSpaceState.TERMINATED;
this._reflectState();
Array.from(this.actors.values()).forEach(a => a._terminateWith(reason));
}
queueTask(t: StructuredTask<TaskDescription>) {
switch (this.state) {
case ActorSpaceState.TERMINATED:
break;
case ActorSpaceState.PAUSED:
this.taskCounter++;
this._reflectTaskCount();
this.delayedTasks.push(t);
break;
case ActorSpaceState.RUNNING:
this.taskCounter++;
if (this.taskCounter === LIMIT) {
this.taskFlushHandle = setTimeout(() => this._scheduleDelayedTasks(), 0);
}
if (this.taskCounter >= LIMIT) {
this._reflectTaskCount();
this.delayedTasks.push(t);
} else {
queueMicrotask(() => t.perform());
}
break;
}
}
_scheduleDelayedTasks() {
this.taskCounter = 0;
this._reflectTaskCount();
this.delayedTasks.forEach(t => queueMicrotask(() => t.perform()));
this.delayedTasks = [];
}
pause(cb: () => void): boolean {
switch (this.state) {
case ActorSpaceState.TERMINATED:
return false;
case ActorSpaceState.PAUSED:
queueMicrotask(cb);
return true;
case ActorSpaceState.RUNNING:
this.state = ActorSpaceState.PAUSED;
this._reflectState();
if (this.taskFlushHandle !== null) {
clearTimeout(this.taskFlushHandle);
this.taskFlushHandle = null;
}
queueMicrotask(cb);
return true;
}
}
unpause(): boolean {
switch (this.state) {
case ActorSpaceState.TERMINATED:
return false;
case ActorSpaceState.PAUSED:
this.state = ActorSpaceState.RUNNING;
this._reflectState();
this._scheduleDelayedTasks();
return true;
case ActorSpaceState.RUNNING:
return true;
}
}
setMirror(mirror: Mirror | null): void {
if (mirror === null) {
delete this._reflection;
} else {
this._reflection = { mirror, actorHandles: {} };
mirror.setFocusType(Refl.TypeName.space());
this._reflectState();
this._reflectTaskCount();
this.actors.forEach(a => mirror.setProp(
this._reflection!.actorHandles,
'' + embeddedId(a),
Refl.SpaceActor(a.asRef())));
}
}
_reflectState() {
this._reflection?.mirror.setProp(
this._reflection,
'stateHandle',
(this.state === ActorSpaceState.RUNNING ? Refl.SpaceStatus.running() :
this.state === ActorSpaceState.PAUSED ? Refl.SpaceStatus.paused() :
this.state === ActorSpaceState.TERMINATED ? Refl.SpaceStatus.terminated() :
void 0));
}
_reflectTaskCount() {
this._reflection?.mirror.setProp(
this._reflection, 'taskCountHandle', Refl.SpaceTaskCount(this.taskCounter));
}
}