Task descriptions
This commit is contained in:
parent
d55e322a0e
commit
39fb9cb92e
|
@ -1,10 +1,11 @@
|
||||||
/// SPDX-License-Identifier: GPL-3.0-or-later
|
/// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||||
|
|
||||||
import { IdentitySet, Value, embeddedId, is, fromJS, stringify } from '@preserves/core';
|
import { IdentitySet, Value, embeddedId, is, fromJS, stringify, Dictionary } from '@preserves/core';
|
||||||
import { Cell, Field, Graph } from './dataflow.js';
|
import { Cell, Field, Graph } from './dataflow.js';
|
||||||
import { Caveat, runRewrites } from './rewrite.js';
|
import { Caveat, runRewrites } from './rewrite.js';
|
||||||
import { ActorSpace } from './space.js';
|
import { ActorSpace } from './space.js';
|
||||||
|
import type { ActionDescription, StructuredTask, TaskAction } from './task.js';
|
||||||
import { randomId } from './randomid.js';
|
import { randomId } from './randomid.js';
|
||||||
|
|
||||||
export type AnyValue = Value<Ref>;
|
export type AnyValue = Value<Ref>;
|
||||||
|
@ -19,6 +20,7 @@ export type Assertion = Value<Ref>;
|
||||||
export type Handle = number;
|
export type Handle = number;
|
||||||
export type ExitReason = null | { ok: true } | { ok: false, err: unknown };
|
export type ExitReason = null | { ok: true } | { ok: false, err: unknown };
|
||||||
export type LocalAction = () => void;
|
export type LocalAction = () => void;
|
||||||
|
export type DetailedAction<T = AnyValue> = LocalAction & { detail: T };
|
||||||
|
|
||||||
export type Assertable = Assertion | { __as_preserve__: <T>() => Value<T> } | { __as_preserve__: () => Assertion };
|
export type Assertable = Assertion | { __as_preserve__: <T>() => Value<T> } | { __as_preserve__: () => Assertion };
|
||||||
|
|
||||||
|
@ -273,7 +275,7 @@ export class Turn {
|
||||||
|
|
||||||
readonly id = nextTurnId++;
|
readonly id = nextTurnId++;
|
||||||
_activeFacet: Facet;
|
_activeFacet: Facet;
|
||||||
queues: Map<Actor, LocalAction[]> | null;
|
queues: Map<Actor, StructuredTask<TaskAction>[]> | null;
|
||||||
|
|
||||||
static for(facet: Facet, f: LocalAction, zombieTurn = false): void {
|
static for(facet: Facet, f: LocalAction, zombieTurn = false): void {
|
||||||
if (!zombieTurn) {
|
if (!zombieTurn) {
|
||||||
|
@ -296,7 +298,7 @@ export class Turn {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private constructor(facet: Facet, queues = new Map<Actor, LocalAction[]>()) {
|
private constructor(facet: Facet, queues = new Map<Actor, StructuredTask<TaskAction>[]>()) {
|
||||||
this._activeFacet = facet;
|
this._activeFacet = facet;
|
||||||
this.queues = queues;
|
this.queues = queues;
|
||||||
}
|
}
|
||||||
|
@ -333,24 +335,31 @@ export class Turn {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Alias for syndicatec code generator to use
|
// Alias for syndicatec code generator to use
|
||||||
_spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
|
_spawn(bootProc: LocalAction | DetailedAction, initialAssertions = new IdentitySet<Handle>()): Actor {
|
||||||
return this.spawn(bootProc, initialAssertions);
|
return this.spawn(bootProc, initialAssertions);
|
||||||
}
|
}
|
||||||
|
|
||||||
spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
|
spawn(bootProc: LocalAction | DetailedAction, initialAssertions = new IdentitySet<Handle>()): Actor {
|
||||||
return this.__spawn(bootProc, initialAssertions);
|
return this.__spawn(bootProc, initialAssertions);
|
||||||
}
|
}
|
||||||
|
|
||||||
__spawn(bootProc: LocalAction, initialAssertions = new IdentitySet<Handle>()): Actor {
|
__spawn(bootProc: LocalAction | DetailedAction, initialAssertions = new IdentitySet<Handle>()): Actor {
|
||||||
const newOutbound: OutboundMap = new Map();
|
const newOutbound: OutboundMap = new Map();
|
||||||
initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!));
|
initialAssertions.forEach(key => newOutbound.set(key, this.activeFacet.outbound.get(key)!));
|
||||||
// ^ we trust initialAssertions, so can use `!` safely
|
// ^ we trust initialAssertions, so can use `!` safely
|
||||||
|
|
||||||
const newActor = Actor.__unsafeNew(this.activeFacet.actor.space, newOutbound);
|
const newActor = Actor.__unsafeNew(this.activeFacet.actor.space, newOutbound);
|
||||||
this.enqueue(this.activeFacet, () => {
|
const detail = 'detail' in bootProc ? bootProc.detail : void 0;
|
||||||
initialAssertions.forEach(key => this.activeFacet.outbound.delete(key));
|
const spawningFacet = this.activeFacet;
|
||||||
newActor.space.queueTask(() => newActor._boot(bootProc));
|
this.enqueue(spawningFacet,
|
||||||
});
|
() => {
|
||||||
|
initialAssertions.forEach(key => spawningFacet.outbound.delete(key));
|
||||||
|
newActor.space.queueTask({
|
||||||
|
perform() { newActor._boot(bootProc); },
|
||||||
|
describe() { return { type: 'bootActor', detail }; },
|
||||||
|
});
|
||||||
|
},
|
||||||
|
{ type: 'spawnActor', detail, spawningFacet, initialAssertions });
|
||||||
return newActor;
|
return newActor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,11 +377,21 @@ export class Turn {
|
||||||
}
|
}
|
||||||
|
|
||||||
stopActor(): void {
|
stopActor(): void {
|
||||||
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: true }));
|
this.enqueue(this.activeFacet.actor.root,
|
||||||
|
() => this.activeFacet.actor._terminateWith({ ok: true }),
|
||||||
|
{ type: 'stopActor', err: void 0 });
|
||||||
}
|
}
|
||||||
|
|
||||||
crash(err: Error): void {
|
crash(err: Error): void {
|
||||||
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor._terminateWith({ ok: false, err }));
|
this.enqueue(this.activeFacet.actor.root,
|
||||||
|
() => this.activeFacet.actor._terminateWith({ ok: false, err }),
|
||||||
|
{
|
||||||
|
type: 'stopActor',
|
||||||
|
err: Dictionary.fromJS({
|
||||||
|
message: err.message,
|
||||||
|
stack: err.stack ? err.stack : false,
|
||||||
|
}),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
field<V>(initial: V, name?: string): Field<V> {
|
field<V>(initial: V, name?: string): Field<V> {
|
||||||
|
@ -421,10 +440,17 @@ export class Turn {
|
||||||
if (a !== null) {
|
if (a !== null) {
|
||||||
const e = { handle: h, peer: ref, established: false };
|
const e = { handle: h, peer: ref, established: false };
|
||||||
this.activeFacet.outbound.set(h, e);
|
this.activeFacet.outbound.set(h, e);
|
||||||
this.enqueue(ref.relay, () => {
|
this.enqueue(ref.relay,
|
||||||
e.established = true;
|
() => {
|
||||||
ref.target.assert?.(a, h);
|
e.established = true;
|
||||||
});
|
ref.target.assert?.(a, h);
|
||||||
|
},
|
||||||
|
{
|
||||||
|
type: 'assert',
|
||||||
|
target: ref,
|
||||||
|
handle: h,
|
||||||
|
assertion,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,12 +472,18 @@ export class Turn {
|
||||||
|
|
||||||
_retract(e: OutboundAssertion): void {
|
_retract(e: OutboundAssertion): void {
|
||||||
this.activeFacet.outbound.delete(e.handle);
|
this.activeFacet.outbound.delete(e.handle);
|
||||||
this.enqueue(e.peer.relay, () => {
|
this.enqueue(e.peer.relay,
|
||||||
if (e.established) {
|
() => {
|
||||||
e.established = false;
|
if (e.established) {
|
||||||
e.peer.target.retract?.(e.handle);
|
e.established = false;
|
||||||
}
|
e.peer.target.retract?.(e.handle);
|
||||||
});
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
type: 'retract',
|
||||||
|
target: e.peer,
|
||||||
|
handle: e.handle,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
sync(ref: Ref): Promise<void> {
|
sync(ref: Ref): Promise<void> {
|
||||||
|
@ -459,13 +491,27 @@ export class Turn {
|
||||||
}
|
}
|
||||||
|
|
||||||
_sync(ref: Ref, peer: Ref): void {
|
_sync(ref: Ref, peer: Ref): void {
|
||||||
this.enqueue(ref.relay, () => _sync_impl(ref.target, peer));
|
this.enqueue(ref.relay,
|
||||||
|
() => _sync_impl(ref.target, peer),
|
||||||
|
{
|
||||||
|
type: 'sync',
|
||||||
|
target: ref,
|
||||||
|
callback: peer,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
message(ref: Ref, assertable: Assertable): void {
|
message(ref: Ref, assertable: Assertable): void {
|
||||||
const assertion = assertionFrom(assertable);
|
const assertion = assertionFrom(assertable);
|
||||||
const a = runRewrites(ref.attenuation, assertion);
|
const a = runRewrites(ref.attenuation, assertion);
|
||||||
if (a !== null) this.enqueue(ref.relay, () => ref.target.message?.(assertion));
|
if (a !== null) {
|
||||||
|
this.enqueue(ref.relay,
|
||||||
|
() => ref.target.message?.(assertion),
|
||||||
|
{
|
||||||
|
type: 'message',
|
||||||
|
target: ref,
|
||||||
|
assertion,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
every(periodMilliseconds: number, a: LocalAction): any {
|
every(periodMilliseconds: number, a: LocalAction): any {
|
||||||
|
@ -492,17 +538,23 @@ export class Turn {
|
||||||
}, delayMilliseconds);
|
}, delayMilliseconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
enqueue(relay: Facet, a0: LocalAction): void {
|
enqueue(relay: Facet, a0: LocalAction, detail: ActionDescription): void {
|
||||||
if (this.queues === null) {
|
if (this.queues === null) {
|
||||||
throw new Error("Attempt to reuse a committed Turn");
|
throw new Error("Attempt to reuse a committed Turn");
|
||||||
}
|
}
|
||||||
const a: LocalAction = () => Turn.active._inFacet(relay, a0);
|
const a: StructuredTask<TaskAction> = {
|
||||||
|
perform() { Turn.active._inFacet(relay, a0); },
|
||||||
|
describe() { return { targetFacet: relay, action: detail }; },
|
||||||
|
};
|
||||||
this.queues.get(relay.actor)?.push(a) ?? this.queues.set(relay.actor, [a]);
|
this.queues.get(relay.actor)?.push(a) ?? this.queues.set(relay.actor, [a]);
|
||||||
}
|
}
|
||||||
|
|
||||||
deliver() {
|
deliver() {
|
||||||
this.queues!.forEach((q, actor) =>
|
this.queues!.forEach((q, actor) =>
|
||||||
actor.space.queueTask(() => Turn.for(actor.root, () => q.forEach(f => f()))));
|
actor.space.queueTask({
|
||||||
|
perform() { Turn.for(actor.root, () => q.forEach(f => f.perform())); },
|
||||||
|
describe() { return { type: 'turn', tasks: q.map(f => f.describe()) }; },
|
||||||
|
}));
|
||||||
this.queues = null;
|
this.queues = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -511,10 +563,12 @@ function stopIfInertAfter(a: LocalAction): LocalAction {
|
||||||
return () => {
|
return () => {
|
||||||
const facet = Turn.activeFacet;
|
const facet = Turn.activeFacet;
|
||||||
a();
|
a();
|
||||||
Turn.active.enqueue(facet, () => {
|
Turn.active.enqueue(facet,
|
||||||
if ((facet.parent && !facet.parent.isLive) || facet.isInert()) {
|
() => {
|
||||||
Turn.active.stop(facet);
|
if ((facet.parent && !facet.parent.isLive) || facet.isInert()) {
|
||||||
}
|
Turn.active.stop(facet);
|
||||||
});
|
}
|
||||||
|
},
|
||||||
|
{ type: 'inertCheck' });
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
import { IdentitySet } from '@preserves/core';
|
import { IdentitySet } from '@preserves/core';
|
||||||
import type { Actor, ExitReason } from './actor.js';
|
import type { Actor, ExitReason } from './actor.js';
|
||||||
|
import type { StructuredTask, TaskDescription } from './task.js';
|
||||||
|
|
||||||
const LIMIT = 25000;
|
const LIMIT = 25000;
|
||||||
|
|
||||||
|
@ -17,7 +18,7 @@ export class ActorSpace {
|
||||||
state = ActorSpaceState.RUNNING;
|
state = ActorSpaceState.RUNNING;
|
||||||
|
|
||||||
taskCounter = 0;
|
taskCounter = 0;
|
||||||
delayedTasks: Array<() => void> = [];
|
delayedTasks: Array<StructuredTask<TaskDescription>> = [];
|
||||||
taskFlushHandle: ReturnType<typeof setTimeout> | null = null;
|
taskFlushHandle: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
register(actor: Actor): boolean {
|
register(actor: Actor): boolean {
|
||||||
|
@ -39,13 +40,13 @@ export class ActorSpace {
|
||||||
Array.from(this.actors.values()).forEach(a => a._terminateWith(reason));
|
Array.from(this.actors.values()).forEach(a => a._terminateWith(reason));
|
||||||
}
|
}
|
||||||
|
|
||||||
queueTask(f: () => void) {
|
queueTask(t: StructuredTask<TaskDescription>) {
|
||||||
switch (this.state) {
|
switch (this.state) {
|
||||||
case ActorSpaceState.TERMINATED:
|
case ActorSpaceState.TERMINATED:
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ActorSpaceState.PAUSED:
|
case ActorSpaceState.PAUSED:
|
||||||
this.delayedTasks.push(f);
|
this.delayedTasks.push(t);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ActorSpaceState.RUNNING:
|
case ActorSpaceState.RUNNING:
|
||||||
|
@ -54,9 +55,9 @@ export class ActorSpace {
|
||||||
this.taskFlushHandle = setTimeout(() => this._scheduleDelayedTasks(), 0);
|
this.taskFlushHandle = setTimeout(() => this._scheduleDelayedTasks(), 0);
|
||||||
}
|
}
|
||||||
if (this.taskCounter >= LIMIT) {
|
if (this.taskCounter >= LIMIT) {
|
||||||
this.delayedTasks.push(f);
|
this.delayedTasks.push(t);
|
||||||
} else {
|
} else {
|
||||||
queueMicrotask(f);
|
queueMicrotask(() => t.perform());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -64,7 +65,7 @@ export class ActorSpace {
|
||||||
|
|
||||||
_scheduleDelayedTasks() {
|
_scheduleDelayedTasks() {
|
||||||
this.taskCounter = 0;
|
this.taskCounter = 0;
|
||||||
this.delayedTasks.forEach(queueMicrotask);
|
this.delayedTasks.forEach(t => queueMicrotask(() => t.perform()));
|
||||||
this.delayedTasks = [];
|
this.delayedTasks = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
/// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
/// SPDX-FileCopyrightText: Copyright © 2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||||
|
|
||||||
|
// Each Turn executes one Task
|
||||||
|
|
||||||
|
import type { AnyValue, Handle, Ref, Facet } from './actor.js';
|
||||||
|
import type { IdentitySet } from '@preserves/core';
|
||||||
|
|
||||||
|
export type Task = StructuredTask<TaskDescription>;
|
||||||
|
|
||||||
|
export interface StructuredTask<T> {
|
||||||
|
perform(): void;
|
||||||
|
describe(): T;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type TaskDescription =
|
||||||
|
| { type: 'bootActor', detail?: AnyValue }
|
||||||
|
| { type: 'turn', tasks: TaskAction[] }
|
||||||
|
;
|
||||||
|
|
||||||
|
export type TaskAction = { targetFacet: Facet, action: ActionDescription };
|
||||||
|
|
||||||
|
export type ActionDescription =
|
||||||
|
| { type: 'spawnActor', detail?: AnyValue, spawningFacet: Facet, initialAssertions: IdentitySet<Handle> }
|
||||||
|
| { type: 'stopActor', err?: AnyValue }
|
||||||
|
| { type: 'inertCheck' }
|
||||||
|
|
||||||
|
| { type: 'assert', target: Ref, handle: Handle, assertion: AnyValue }
|
||||||
|
| { type: 'retract', target: Ref, handle: Handle }
|
||||||
|
| { type: 'message', target: Ref, assertion: AnyValue }
|
||||||
|
| { type: 'sync', target: Ref, callback: Ref }
|
||||||
|
;
|
|
@ -8,6 +8,8 @@ import { wireRefEmbeddedType } from './protocol.js';
|
||||||
import { attenuate } from '../runtime/rewrite.js';
|
import { attenuate } from '../runtime/rewrite.js';
|
||||||
import { fromCaveat, WireRef } from '../gen/sturdy.js';
|
import { fromCaveat, WireRef } from '../gen/sturdy.js';
|
||||||
|
|
||||||
|
const FLUSH = Symbol.for('flush');
|
||||||
|
|
||||||
export class WireSymbol {
|
export class WireSymbol {
|
||||||
count = 0;
|
count = 0;
|
||||||
|
|
||||||
|
@ -143,6 +145,7 @@ export interface RelayOptions {
|
||||||
|
|
||||||
export class Relay {
|
export class Relay {
|
||||||
readonly facet: Facet;
|
readonly facet: Facet;
|
||||||
|
readonly selfRef: Ref;
|
||||||
readonly w: PacketWriter;
|
readonly w: PacketWriter;
|
||||||
readonly inboundAssertions = new IdentityMap<Handle, {
|
readonly inboundAssertions = new IdentityMap<Handle, {
|
||||||
localHandle: Handle,
|
localHandle: Handle,
|
||||||
|
@ -164,6 +167,7 @@ export class Relay {
|
||||||
|
|
||||||
constructor(options: RelayOptions) {
|
constructor(options: RelayOptions) {
|
||||||
this.facet = Turn.activeFacet;
|
this.facet = Turn.activeFacet;
|
||||||
|
this.selfRef = Turn.ref(this);
|
||||||
this.w = options.packetWriter;
|
this.w = options.packetWriter;
|
||||||
this.debug = options.debug ?? false;
|
this.debug = options.debug ?? false;
|
||||||
this.trustPeer = options.trustPeer ?? true;
|
this.trustPeer = options.trustPeer ?? true;
|
||||||
|
@ -277,16 +281,20 @@ export class Relay {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message(body: Assertion) {
|
||||||
|
if (body === FLUSH) {
|
||||||
|
if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)));
|
||||||
|
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
||||||
|
canonical: true,
|
||||||
|
embeddedEncode: wireRefEmbeddedType,
|
||||||
|
})));
|
||||||
|
this.pendingTurn = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
send(remoteOid: IO.Oid, m: IO.Event<WireRef>): void {
|
send(remoteOid: IO.Oid, m: IO.Event<WireRef>): void {
|
||||||
if (this.pendingTurn.length === 0) {
|
if (this.pendingTurn.length === 0) {
|
||||||
Turn.activeFacet.actor.space.queueTask(() => {
|
Turn.active.message(this.selfRef, FLUSH);
|
||||||
if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)));
|
|
||||||
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
|
||||||
canonical: true,
|
|
||||||
embeddedEncode: wireRefEmbeddedType,
|
|
||||||
})));
|
|
||||||
this.pendingTurn = [];
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m }));
|
this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m }));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue