diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 28c4215..c586e52 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -16,8 +16,8 @@ export * as QuasiValue from './runtime/quasivalue.js'; export * from './runtime/randomid.js'; export * as Rewrite from './runtime/rewrite.js'; export * as Skeleton from './runtime/skeleton.js'; +export * from './runtime/space.js'; export * from './runtime/supervise.js'; -export * as Task from './runtime/task.js'; export * as Cryptography from './transport/cryptography.js'; export * as WireProtocol from './transport/protocol.js'; diff --git a/packages/core/src/runtime/actor.ts b/packages/core/src/runtime/actor.ts index 0bda200..8a27ff6 100644 --- a/packages/core/src/runtime/actor.ts +++ b/packages/core/src/runtime/actor.ts @@ -4,7 +4,8 @@ import { IdentitySet, Value, embeddedId, is, fromJS, stringify } from '@preserves/core'; import { Cell, Field, Graph } from './dataflow.js'; import { Caveat, runRewrites } from './rewrite.js'; -import { queueTask } from './task.js'; +import { ActorSpace } from './space.js'; +import { randomId } from './randomid.js'; export type AnyValue = Value; @@ -90,25 +91,29 @@ export type DataflowGraph = Graph; export type DataflowBlock = () => void; export class Actor { - readonly id = nextActorId++; - name: AnyValue = this.id; + name: AnyValue = Symbol.for('A-' + randomId(16)); + readonly space: ActorSpace; readonly root: Facet; _dataflowGraph: DataflowGraph | null = null; exitReason: ExitReason = null; readonly exitHooks: Array = []; static boot(bootProc: LocalAction, initialAssertions: OutboundMap = new Map()): Actor { - const newActor = new Actor(initialAssertions); + const newActor = new Actor(new ActorSpace(), initialAssertions); newActor._boot(bootProc); return newActor; } - static __unsafeNew(initialAssertions: OutboundMap = new Map()) { - return new Actor(initialAssertions); + static __unsafeNew(space: ActorSpace, initialAssertions: OutboundMap = new Map()) { + return new Actor(space, initialAssertions); } - private constructor(initialAssertions: OutboundMap = new Map()) { + private constructor(space: ActorSpace, initialAssertions: OutboundMap = new Map()) { + this.space = space; this.root = new Facet(this, null, initialAssertions); + if (!space.register(this)) { + this._terminateWith({ ok: false, err: 'Spawned into shutdown ActorSpace' }); + } } _boot(bootProc: LocalAction) { @@ -135,6 +140,7 @@ export class Actor { } this.exitHooks.forEach(hook => hook()); this.root._terminate(reason.ok); + this.space.deregister(this); } repairDataflowGraph() { @@ -340,10 +346,10 @@ export class Turn { initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!)); // ^ we trust initialAssertions, so can use `!` safely - const newActor = Actor.__unsafeNew(newOutbound); + const newActor = Actor.__unsafeNew(this.activeFacet.actor.space, newOutbound); this.enqueue(this.activeFacet, () => { initialAssertions.forEach(key => this.activeFacet.outbound.delete(key)); - queueTask(() => newActor._boot(bootProc)); + newActor.space.queueTask(() => newActor._boot(bootProc)); }); return newActor; } @@ -496,7 +502,7 @@ export class Turn { deliver() { this.queues!.forEach((q, actor) => - queueTask(() => Turn.for(actor.root, () => q.forEach(f => f())))); + actor.space.queueTask(() => Turn.for(actor.root, () => q.forEach(f => f())))); this.queues = null; } } diff --git a/packages/core/src/runtime/space.ts b/packages/core/src/runtime/space.ts new file mode 100644 index 0000000..681de9c --- /dev/null +++ b/packages/core/src/runtime/space.ts @@ -0,0 +1,105 @@ +/// SPDX-License-Identifier: GPL-3.0-or-later +/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones + +import { IdentitySet } from '@preserves/core'; +import type { Actor, ExitReason } from './actor.js'; + +const LIMIT = 25000; + +export enum ActorSpaceState { + RUNNING, + PAUSED, + TERMINATED, +} + +export class ActorSpace { + actors = new IdentitySet(); + state = ActorSpaceState.RUNNING; + + taskCounter = 0; + delayedTasks: Array<() => void> = []; + taskFlushHandle: ReturnType | null = null; + + register(actor: Actor): boolean { + if (this.state === ActorSpaceState.TERMINATED) return false; + this.actors.add(actor); + return true; + } + + deregister(actor: Actor) { + this.actors.delete(actor); + if (this.actors.size === 0) { + this.shutdown({ ok: true }); + } + } + + shutdown(reason: Exclude) { + if (this.state === ActorSpaceState.TERMINATED) return; + this.state = ActorSpaceState.TERMINATED; + Array.from(this.actors.values()).forEach(a => a._terminateWith(reason)); + } + + queueTask(f: () => void) { + switch (this.state) { + case ActorSpaceState.TERMINATED: + break; + + case ActorSpaceState.PAUSED: + this.delayedTasks.push(f); + break; + + case ActorSpaceState.RUNNING: + this.taskCounter++; + if (this.taskCounter === LIMIT) { + this.taskFlushHandle = setTimeout(() => this._scheduleDelayedTasks(), 0); + } + if (this.taskCounter >= LIMIT) { + this.delayedTasks.push(f); + } else { + queueMicrotask(f); + } + break; + } + } + + _scheduleDelayedTasks() { + this.taskCounter = 0; + this.delayedTasks.forEach(queueMicrotask); + this.delayedTasks = []; + } + + pause(cb: () => void): boolean { + switch (this.state) { + case ActorSpaceState.TERMINATED: + return false; + + case ActorSpaceState.PAUSED: + queueMicrotask(cb); + return true; + + case ActorSpaceState.RUNNING: + this.state = ActorSpaceState.PAUSED; + if (this.taskFlushHandle !== null) { + clearTimeout(this.taskFlushHandle); + this.taskFlushHandle = null; + } + queueMicrotask(cb); + return true; + } + } + + unpause(): boolean { + switch (this.state) { + case ActorSpaceState.TERMINATED: + return false; + + case ActorSpaceState.PAUSED: + this.state = ActorSpaceState.RUNNING; + this._scheduleDelayedTasks(); + return true; + + case ActorSpaceState.RUNNING: + return true; + } + } +} diff --git a/packages/core/src/runtime/task.ts b/packages/core/src/runtime/task.ts deleted file mode 100644 index d3f4de5..0000000 --- a/packages/core/src/runtime/task.ts +++ /dev/null @@ -1,22 +0,0 @@ -/// SPDX-License-Identifier: GPL-3.0-or-later -/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones - -const LIMIT = 25000; - -let taskCounter = 0; -let delayedTasks: Array<() => void> = []; -export function queueTask(f: () => void) { - taskCounter++; - if (taskCounter === LIMIT) { - setTimeout(() => { - taskCounter = 0; - delayedTasks.forEach(queueMicrotask); - delayedTasks = []; - }, 0); - } - if (taskCounter >= LIMIT) { - delayedTasks.push(f); - } else { - queueMicrotask(f); - } -} diff --git a/packages/core/src/transport/relay.ts b/packages/core/src/transport/relay.ts index 9f6bc64..105f42f 100644 --- a/packages/core/src/transport/relay.ts +++ b/packages/core/src/transport/relay.ts @@ -5,7 +5,6 @@ import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/a import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, stringify, underlying, Value } from '@preserves/core'; import * as IO from '../gen/protocol.js'; import { wireRefEmbeddedType } from './protocol.js'; -import { queueTask } from '../runtime/task.js'; import { attenuate } from '../runtime/rewrite.js'; import { fromCaveat, WireRef } from '../gen/sturdy.js'; @@ -280,7 +279,7 @@ export class Relay { send(remoteOid: IO.Oid, m: IO.Event): void { if (this.pendingTurn.length === 0) { - queueTask(() => { + Turn.activeFacet.actor.space.queueTask(() => { if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn))); this.w(underlying(encode(IO.fromTurn(this.pendingTurn), { canonical: true,