Introduce ActorSpace
This commit is contained in:
parent
70f1289057
commit
818f35c471
|
@ -16,8 +16,8 @@ export * as QuasiValue from './runtime/quasivalue.js';
|
||||||
export * from './runtime/randomid.js';
|
export * from './runtime/randomid.js';
|
||||||
export * as Rewrite from './runtime/rewrite.js';
|
export * as Rewrite from './runtime/rewrite.js';
|
||||||
export * as Skeleton from './runtime/skeleton.js';
|
export * as Skeleton from './runtime/skeleton.js';
|
||||||
|
export * from './runtime/space.js';
|
||||||
export * from './runtime/supervise.js';
|
export * from './runtime/supervise.js';
|
||||||
export * as Task from './runtime/task.js';
|
|
||||||
|
|
||||||
export * as Cryptography from './transport/cryptography.js';
|
export * as Cryptography from './transport/cryptography.js';
|
||||||
export * as WireProtocol from './transport/protocol.js';
|
export * as WireProtocol from './transport/protocol.js';
|
||||||
|
|
|
@ -4,7 +4,8 @@
|
||||||
import { IdentitySet, Value, embeddedId, is, fromJS, stringify } from '@preserves/core';
|
import { IdentitySet, Value, embeddedId, is, fromJS, stringify } from '@preserves/core';
|
||||||
import { Cell, Field, Graph } from './dataflow.js';
|
import { Cell, Field, Graph } from './dataflow.js';
|
||||||
import { Caveat, runRewrites } from './rewrite.js';
|
import { Caveat, runRewrites } from './rewrite.js';
|
||||||
import { queueTask } from './task.js';
|
import { ActorSpace } from './space.js';
|
||||||
|
import { randomId } from './randomid.js';
|
||||||
|
|
||||||
export type AnyValue = Value<Ref>;
|
export type AnyValue = Value<Ref>;
|
||||||
|
|
||||||
|
@ -90,25 +91,29 @@ export type DataflowGraph = Graph<DataflowBlock, Cell>;
|
||||||
export type DataflowBlock = () => void;
|
export type DataflowBlock = () => void;
|
||||||
|
|
||||||
export class Actor {
|
export class Actor {
|
||||||
readonly id = nextActorId++;
|
name: AnyValue = Symbol.for('A-' + randomId(16));
|
||||||
name: AnyValue = this.id;
|
readonly space: ActorSpace;
|
||||||
readonly root: Facet;
|
readonly root: Facet;
|
||||||
_dataflowGraph: DataflowGraph | null = null;
|
_dataflowGraph: DataflowGraph | null = null;
|
||||||
exitReason: ExitReason = null;
|
exitReason: ExitReason = null;
|
||||||
readonly exitHooks: Array<LocalAction> = [];
|
readonly exitHooks: Array<LocalAction> = [];
|
||||||
|
|
||||||
static boot(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()): Actor {
|
static boot(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()): Actor {
|
||||||
const newActor = new Actor(initialAssertions);
|
const newActor = new Actor(new ActorSpace(), initialAssertions);
|
||||||
newActor._boot(bootProc);
|
newActor._boot(bootProc);
|
||||||
return newActor;
|
return newActor;
|
||||||
}
|
}
|
||||||
|
|
||||||
static __unsafeNew(initialAssertions: OutboundMap = new Map()) {
|
static __unsafeNew(space: ActorSpace, initialAssertions: OutboundMap = new Map()) {
|
||||||
return new Actor(initialAssertions);
|
return new Actor(space, initialAssertions);
|
||||||
}
|
}
|
||||||
|
|
||||||
private constructor(initialAssertions: OutboundMap = new Map()) {
|
private constructor(space: ActorSpace, initialAssertions: OutboundMap = new Map()) {
|
||||||
|
this.space = space;
|
||||||
this.root = new Facet(this, null, initialAssertions);
|
this.root = new Facet(this, null, initialAssertions);
|
||||||
|
if (!space.register(this)) {
|
||||||
|
this._terminateWith({ ok: false, err: 'Spawned into shutdown ActorSpace' });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_boot(bootProc: LocalAction) {
|
_boot(bootProc: LocalAction) {
|
||||||
|
@ -135,6 +140,7 @@ export class Actor {
|
||||||
}
|
}
|
||||||
this.exitHooks.forEach(hook => hook());
|
this.exitHooks.forEach(hook => hook());
|
||||||
this.root._terminate(reason.ok);
|
this.root._terminate(reason.ok);
|
||||||
|
this.space.deregister(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
repairDataflowGraph() {
|
repairDataflowGraph() {
|
||||||
|
@ -340,10 +346,10 @@ export class Turn {
|
||||||
initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!));
|
initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!));
|
||||||
// ^ we trust initialAssertions, so can use `!` safely
|
// ^ we trust initialAssertions, so can use `!` safely
|
||||||
|
|
||||||
const newActor = Actor.__unsafeNew(newOutbound);
|
const newActor = Actor.__unsafeNew(this.activeFacet.actor.space, newOutbound);
|
||||||
this.enqueue(this.activeFacet, () => {
|
this.enqueue(this.activeFacet, () => {
|
||||||
initialAssertions.forEach(key => this.activeFacet.outbound.delete(key));
|
initialAssertions.forEach(key => this.activeFacet.outbound.delete(key));
|
||||||
queueTask(() => newActor._boot(bootProc));
|
newActor.space.queueTask(() => newActor._boot(bootProc));
|
||||||
});
|
});
|
||||||
return newActor;
|
return newActor;
|
||||||
}
|
}
|
||||||
|
@ -496,7 +502,7 @@ export class Turn {
|
||||||
|
|
||||||
deliver() {
|
deliver() {
|
||||||
this.queues!.forEach((q, actor) =>
|
this.queues!.forEach((q, actor) =>
|
||||||
queueTask(() => Turn.for(actor.root, () => q.forEach(f => f()))));
|
actor.space.queueTask(() => Turn.for(actor.root, () => q.forEach(f => f()))));
|
||||||
this.queues = null;
|
this.queues = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
/// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||||
|
|
||||||
|
import { IdentitySet } from '@preserves/core';
|
||||||
|
import type { Actor, ExitReason } from './actor.js';
|
||||||
|
|
||||||
|
const LIMIT = 25000;
|
||||||
|
|
||||||
|
export enum ActorSpaceState {
|
||||||
|
RUNNING,
|
||||||
|
PAUSED,
|
||||||
|
TERMINATED,
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ActorSpace {
|
||||||
|
actors = new IdentitySet<Actor>();
|
||||||
|
state = ActorSpaceState.RUNNING;
|
||||||
|
|
||||||
|
taskCounter = 0;
|
||||||
|
delayedTasks: Array<() => void> = [];
|
||||||
|
taskFlushHandle: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
register(actor: Actor): boolean {
|
||||||
|
if (this.state === ActorSpaceState.TERMINATED) return false;
|
||||||
|
this.actors.add(actor);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
deregister(actor: Actor) {
|
||||||
|
this.actors.delete(actor);
|
||||||
|
if (this.actors.size === 0) {
|
||||||
|
this.shutdown({ ok: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdown(reason: Exclude<ExitReason, null>) {
|
||||||
|
if (this.state === ActorSpaceState.TERMINATED) return;
|
||||||
|
this.state = ActorSpaceState.TERMINATED;
|
||||||
|
Array.from(this.actors.values()).forEach(a => a._terminateWith(reason));
|
||||||
|
}
|
||||||
|
|
||||||
|
queueTask(f: () => void) {
|
||||||
|
switch (this.state) {
|
||||||
|
case ActorSpaceState.TERMINATED:
|
||||||
|
break;
|
||||||
|
|
||||||
|
case ActorSpaceState.PAUSED:
|
||||||
|
this.delayedTasks.push(f);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case ActorSpaceState.RUNNING:
|
||||||
|
this.taskCounter++;
|
||||||
|
if (this.taskCounter === LIMIT) {
|
||||||
|
this.taskFlushHandle = setTimeout(() => this._scheduleDelayedTasks(), 0);
|
||||||
|
}
|
||||||
|
if (this.taskCounter >= LIMIT) {
|
||||||
|
this.delayedTasks.push(f);
|
||||||
|
} else {
|
||||||
|
queueMicrotask(f);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_scheduleDelayedTasks() {
|
||||||
|
this.taskCounter = 0;
|
||||||
|
this.delayedTasks.forEach(queueMicrotask);
|
||||||
|
this.delayedTasks = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
pause(cb: () => void): boolean {
|
||||||
|
switch (this.state) {
|
||||||
|
case ActorSpaceState.TERMINATED:
|
||||||
|
return false;
|
||||||
|
|
||||||
|
case ActorSpaceState.PAUSED:
|
||||||
|
queueMicrotask(cb);
|
||||||
|
return true;
|
||||||
|
|
||||||
|
case ActorSpaceState.RUNNING:
|
||||||
|
this.state = ActorSpaceState.PAUSED;
|
||||||
|
if (this.taskFlushHandle !== null) {
|
||||||
|
clearTimeout(this.taskFlushHandle);
|
||||||
|
this.taskFlushHandle = null;
|
||||||
|
}
|
||||||
|
queueMicrotask(cb);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unpause(): boolean {
|
||||||
|
switch (this.state) {
|
||||||
|
case ActorSpaceState.TERMINATED:
|
||||||
|
return false;
|
||||||
|
|
||||||
|
case ActorSpaceState.PAUSED:
|
||||||
|
this.state = ActorSpaceState.RUNNING;
|
||||||
|
this._scheduleDelayedTasks();
|
||||||
|
return true;
|
||||||
|
|
||||||
|
case ActorSpaceState.RUNNING:
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,22 +0,0 @@
|
||||||
/// SPDX-License-Identifier: GPL-3.0-or-later
|
|
||||||
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
|
||||||
|
|
||||||
const LIMIT = 25000;
|
|
||||||
|
|
||||||
let taskCounter = 0;
|
|
||||||
let delayedTasks: Array<() => void> = [];
|
|
||||||
export function queueTask(f: () => void) {
|
|
||||||
taskCounter++;
|
|
||||||
if (taskCounter === LIMIT) {
|
|
||||||
setTimeout(() => {
|
|
||||||
taskCounter = 0;
|
|
||||||
delayedTasks.forEach(queueMicrotask);
|
|
||||||
delayedTasks = [];
|
|
||||||
}, 0);
|
|
||||||
}
|
|
||||||
if (taskCounter >= LIMIT) {
|
|
||||||
delayedTasks.push(f);
|
|
||||||
} else {
|
|
||||||
queueMicrotask(f);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -5,7 +5,6 @@ import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/a
|
||||||
import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, stringify, underlying, Value } from '@preserves/core';
|
import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, stringify, underlying, Value } from '@preserves/core';
|
||||||
import * as IO from '../gen/protocol.js';
|
import * as IO from '../gen/protocol.js';
|
||||||
import { wireRefEmbeddedType } from './protocol.js';
|
import { wireRefEmbeddedType } from './protocol.js';
|
||||||
import { queueTask } from '../runtime/task.js';
|
|
||||||
import { attenuate } from '../runtime/rewrite.js';
|
import { attenuate } from '../runtime/rewrite.js';
|
||||||
import { fromCaveat, WireRef } from '../gen/sturdy.js';
|
import { fromCaveat, WireRef } from '../gen/sturdy.js';
|
||||||
|
|
||||||
|
@ -280,7 +279,7 @@ export class Relay {
|
||||||
|
|
||||||
send(remoteOid: IO.Oid, m: IO.Event<WireRef>): void {
|
send(remoteOid: IO.Oid, m: IO.Event<WireRef>): void {
|
||||||
if (this.pendingTurn.length === 0) {
|
if (this.pendingTurn.length === 0) {
|
||||||
queueTask(() => {
|
Turn.activeFacet.actor.space.queueTask(() => {
|
||||||
if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)));
|
if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)));
|
||||||
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
||||||
canonical: true,
|
canonical: true,
|
||||||
|
|
Loading…
Reference in New Issue