diff --git a/packages/core/src/runtime/actor.ts b/packages/core/src/runtime/actor.ts index 8a27ff6..14062b1 100644 --- a/packages/core/src/runtime/actor.ts +++ b/packages/core/src/runtime/actor.ts @@ -1,10 +1,11 @@ /// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones -import { IdentitySet, Value, embeddedId, is, fromJS, stringify } from '@preserves/core'; +import { IdentitySet, Value, embeddedId, is, fromJS, stringify, Dictionary } from '@preserves/core'; import { Cell, Field, Graph } from './dataflow.js'; import { Caveat, runRewrites } from './rewrite.js'; import { ActorSpace } from './space.js'; +import type { ActionDescription, StructuredTask, TaskAction } from './task.js'; import { randomId } from './randomid.js'; export type AnyValue = Value; @@ -19,6 +20,7 @@ export type Assertion = Value; export type Handle = number; export type ExitReason = null | { ok: true } | { ok: false, err: unknown }; export type LocalAction = () => void; +export type DetailedAction = LocalAction & { detail: T }; export type Assertable = Assertion | { __as_preserve__: () => Value } | { __as_preserve__: () => Assertion }; @@ -273,7 +275,7 @@ export class Turn { readonly id = nextTurnId++; _activeFacet: Facet; - queues: Map | null; + queues: Map[]> | null; static for(facet: Facet, f: LocalAction, zombieTurn = false): void { if (!zombieTurn) { @@ -296,7 +298,7 @@ export class Turn { } } - private constructor(facet: Facet, queues = new Map()) { + private constructor(facet: Facet, queues = new Map[]>()) { this._activeFacet = facet; this.queues = queues; } @@ -333,24 +335,31 @@ export class Turn { } // Alias for syndicatec code generator to use - _spawn(bootProc: LocalAction, initialAssertions = new IdentitySet()): Actor { + _spawn(bootProc: LocalAction | DetailedAction, initialAssertions = new IdentitySet()): Actor { return this.spawn(bootProc, initialAssertions); } - spawn(bootProc: LocalAction, initialAssertions = new IdentitySet()): Actor { + spawn(bootProc: LocalAction | DetailedAction, initialAssertions = new IdentitySet()): Actor { return this.__spawn(bootProc, initialAssertions); } - __spawn(bootProc: LocalAction, initialAssertions = new IdentitySet()): Actor { + __spawn(bootProc: LocalAction | DetailedAction, initialAssertions = new IdentitySet()): Actor { const newOutbound: OutboundMap = new Map(); initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!)); // ^ we trust initialAssertions, so can use `!` safely const newActor = Actor.__unsafeNew(this.activeFacet.actor.space, newOutbound); - this.enqueue(this.activeFacet, () => { - initialAssertions.forEach(key => this.activeFacet.outbound.delete(key)); - newActor.space.queueTask(() => newActor._boot(bootProc)); - }); + const detail = 'detail' in bootProc ? bootProc.detail : void 0; + const spawningFacet = this.activeFacet; + this.enqueue(spawningFacet, + () => { + initialAssertions.forEach(key => spawningFacet.outbound.delete(key)); + newActor.space.queueTask({ + perform() { newActor._boot(bootProc); }, + describe() { return { type: 'bootActor', detail }; }, + }); + }, + { type: 'spawnActor', detail, spawningFacet, initialAssertions }); return newActor; } @@ -368,11 +377,21 @@ export class Turn { } stopActor(): void { - this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: true })); + this.enqueue(this.activeFacet.actor.root, + () => this.activeFacet.actor._terminateWith({ ok: true }), + { type: 'stopActor', err: void 0 }); } crash(err: Error): void { - this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: false, err })); + this.enqueue(this.activeFacet.actor.root, + () => this.activeFacet.actor._terminateWith({ ok: false, err }), + { + type: 'stopActor', + err: Dictionary.fromJS({ + message: err.message, + stack: err.stack ? err.stack : false, + }), + }); } field(initial: V, name?: string): Field { @@ -421,10 +440,17 @@ export class Turn { if (a !== null) { const e = { handle: h, peer: ref, established: false }; this.activeFacet.outbound.set(h, e); - this.enqueue(ref.relay, () => { - e.established = true; - ref.target.assert?.(a, h); - }); + this.enqueue(ref.relay, + () => { + e.established = true; + ref.target.assert?.(a, h); + }, + { + type: 'assert', + target: ref, + handle: h, + assertion, + }); } } @@ -446,12 +472,18 @@ export class Turn { _retract(e: OutboundAssertion): void { this.activeFacet.outbound.delete(e.handle); - this.enqueue(e.peer.relay, () => { - if (e.established) { - e.established = false; - e.peer.target.retract?.(e.handle); - } - }); + this.enqueue(e.peer.relay, + () => { + if (e.established) { + e.established = false; + e.peer.target.retract?.(e.handle); + } + }, + { + type: 'retract', + target: e.peer, + handle: e.handle, + }); } sync(ref: Ref): Promise { @@ -459,13 +491,27 @@ export class Turn { } _sync(ref: Ref, peer: Ref): void { - this.enqueue(ref.relay, () => _sync_impl(ref.target, peer)); + this.enqueue(ref.relay, + () => _sync_impl(ref.target, peer), + { + type: 'sync', + target: ref, + callback: peer, + }); } message(ref: Ref, assertable: Assertable): void { const assertion = assertionFrom(assertable); const a = runRewrites(ref.attenuation, assertion); - if (a !== null) this.enqueue(ref.relay, () => ref.target.message?.(assertion)); + if (a !== null) { + this.enqueue(ref.relay, + () => ref.target.message?.(assertion), + { + type: 'message', + target: ref, + assertion, + }); + } } every(periodMilliseconds: number, a: LocalAction): any { @@ -492,17 +538,23 @@ export class Turn { }, delayMilliseconds); } - enqueue(relay: Facet, a0: LocalAction): void { + enqueue(relay: Facet, a0: LocalAction, detail: ActionDescription): void { if (this.queues === null) { throw new Error("Attempt to reuse a committed Turn"); } - const a: LocalAction = () => Turn.active._inFacet(relay, a0); + const a: StructuredTask = { + perform() { Turn.active._inFacet(relay, a0); }, + describe() { return { targetFacet: relay, action: detail }; }, + }; this.queues.get(relay.actor)?.push(a) ?? this.queues.set(relay.actor, [a]); } deliver() { this.queues!.forEach((q, actor) => - actor.space.queueTask(() => Turn.for(actor.root, () => q.forEach(f => f())))); + actor.space.queueTask({ + perform() { Turn.for(actor.root, () => q.forEach(f => f.perform())); }, + describe() { return { type: 'turn', tasks: q.map(f => f.describe()) }; }, + })); this.queues = null; } } @@ -511,10 +563,12 @@ function stopIfInertAfter(a: LocalAction): LocalAction { return () => { const facet = Turn.activeFacet; a(); - Turn.active.enqueue(facet, () => { - if ((facet.parent && !facet.parent.isLive) || facet.isInert()) { - Turn.active.stop(facet); - } - }); + Turn.active.enqueue(facet, + () => { + if ((facet.parent && !facet.parent.isLive) || facet.isInert()) { + Turn.active.stop(facet); + } + }, + { type: 'inertCheck' }); }; } diff --git a/packages/core/src/runtime/space.ts b/packages/core/src/runtime/space.ts index 681de9c..ac627ce 100644 --- a/packages/core/src/runtime/space.ts +++ b/packages/core/src/runtime/space.ts @@ -3,6 +3,7 @@ import { IdentitySet } from '@preserves/core'; import type { Actor, ExitReason } from './actor.js'; +import type { StructuredTask, TaskDescription } from './task.js'; const LIMIT = 25000; @@ -17,7 +18,7 @@ export class ActorSpace { state = ActorSpaceState.RUNNING; taskCounter = 0; - delayedTasks: Array<() => void> = []; + delayedTasks: Array> = []; taskFlushHandle: ReturnType | null = null; register(actor: Actor): boolean { @@ -39,13 +40,13 @@ export class ActorSpace { Array.from(this.actors.values()).forEach(a => a._terminateWith(reason)); } - queueTask(f: () => void) { + queueTask(t: StructuredTask) { switch (this.state) { case ActorSpaceState.TERMINATED: break; case ActorSpaceState.PAUSED: - this.delayedTasks.push(f); + this.delayedTasks.push(t); break; case ActorSpaceState.RUNNING: @@ -54,9 +55,9 @@ export class ActorSpace { this.taskFlushHandle = setTimeout(() => this._scheduleDelayedTasks(), 0); } if (this.taskCounter >= LIMIT) { - this.delayedTasks.push(f); + this.delayedTasks.push(t); } else { - queueMicrotask(f); + queueMicrotask(() => t.perform()); } break; } @@ -64,7 +65,7 @@ export class ActorSpace { _scheduleDelayedTasks() { this.taskCounter = 0; - this.delayedTasks.forEach(queueMicrotask); + this.delayedTasks.forEach(t => queueMicrotask(() => t.perform())); this.delayedTasks = []; } diff --git a/packages/core/src/runtime/task.ts b/packages/core/src/runtime/task.ts new file mode 100644 index 0000000..5fd1cfb --- /dev/null +++ b/packages/core/src/runtime/task.ts @@ -0,0 +1,32 @@ +/// SPDX-License-Identifier: GPL-3.0-or-later +/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones + +// Each Turn executes one Task + +import type { AnyValue, Handle, Ref, Facet } from './actor.js'; +import type { IdentitySet } from '@preserves/core'; + +export type Task = StructuredTask; + +export interface StructuredTask { + perform(): void; + describe(): T; +} + +export type TaskDescription = + | { type: 'bootActor', detail?: AnyValue } + | { type: 'turn', tasks: TaskAction[] } +; + +export type TaskAction = { targetFacet: Facet, action: ActionDescription }; + +export type ActionDescription = + | { type: 'spawnActor', detail?: AnyValue, spawningFacet: Facet, initialAssertions: IdentitySet } + | { type: 'stopActor', err?: AnyValue } + | { type: 'inertCheck' } + + | { type: 'assert', target: Ref, handle: Handle, assertion: AnyValue } + | { type: 'retract', target: Ref, handle: Handle } + | { type: 'message', target: Ref, assertion: AnyValue } + | { type: 'sync', target: Ref, callback: Ref } +; diff --git a/packages/core/src/transport/relay.ts b/packages/core/src/transport/relay.ts index 105f42f..dbc32e1 100644 --- a/packages/core/src/transport/relay.ts +++ b/packages/core/src/transport/relay.ts @@ -8,6 +8,8 @@ import { wireRefEmbeddedType } from './protocol.js'; import { attenuate } from '../runtime/rewrite.js'; import { fromCaveat, WireRef } from '../gen/sturdy.js'; +const FLUSH = Symbol.for('flush'); + export class WireSymbol { count = 0; @@ -143,6 +145,7 @@ export interface RelayOptions { export class Relay { readonly facet: Facet; + readonly selfRef: Ref; readonly w: PacketWriter; readonly inboundAssertions = new IdentityMap): void { if (this.pendingTurn.length === 0) { - Turn.activeFacet.actor.space.queueTask(() => { - if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn))); - this.w(underlying(encode(IO.fromTurn(this.pendingTurn), { - canonical: true, - embeddedEncode: wireRefEmbeddedType, - }))); - this.pendingTurn = []; - }); + Turn.active.message(this.selfRef, FLUSH); } this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m })); }