Use a global variable instead of threading thisTurn through everywhere. Should improve ergonomics a little.

This commit is contained in:
Tony Garnock-Jones 2021-12-03 01:46:41 +01:00
parent 33948be6b1
commit fb420855e9
16 changed files with 241 additions and 285 deletions

View File

@ -45,14 +45,9 @@ export interface CompilerOutput {
sourceToTargetMap: SpanIndex<number>;
}
function receiverFor(s: TurnAction): Substitution {
return (s.implicitTurn) ? 'thisTurn.' : '.';
}
export class ExpansionContext {
readonly parser: SyndicateParser;
readonly moduleType: ModuleType;
hasBootableBootProc: boolean = false;
readonly typescript: boolean;
nextIdNumber = 0;
@ -71,10 +66,6 @@ export class ExpansionContext {
argDecl(t: TemplateFunction, name: Substitution, type: Substitution): Items {
return (this.typescript) ? t`${name}: ${type}` : t`${name}`;
}
turnDecl(t: TemplateFunction): Items {
return this.argDecl(t, 'thisTurn', '__SYNDICATE__.Turn');
}
}
function stringifyId(i: Identifier): Items {
@ -114,7 +105,7 @@ export function expand(tree: Items, ctx: ExpansionContext): Items {
function terminalWrap(t: TemplateFunction, isTerminal: boolean, body: Statement): Statement {
if (isTerminal) {
return t`thisTurn._stop(thisTurn.activeFacet, (${ctx.turnDecl(t)}) => {${body}})`
return t`__SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => {${body}})`
} else {
return body;
}
@ -125,7 +116,7 @@ export function expand(tree: Items, ctx: ExpansionContext): Items {
}
function xf<T extends TurnAction>(p: Pattern<T>, f: (v: T, t: TemplateFunction) => Items) {
x(p, (v, t) => t`${receiverFor(v)}${f(v, t)}`);
x(p, (v, t) => t`__SYNDICATE__.Turn.active.${f(v, t)}`);
}
const walk = (tree: Items): Items => expand(tree, ctx);
@ -134,12 +125,12 @@ export function expand(tree: Items, ctx: ExpansionContext): Items {
xf(ctx.parser.duringStatement, (s, t) => {
// TODO: untyped template
const sa = compilePattern(s.pattern);
return t`assertDataflow((${ctx.turnDecl(t)}) => ({
return t`assertDataflow(() => ({
target: currentSyndicateTarget,
assertion: __SYNDICATE__.fromObserve(__SYNDICATE__.Observe({
pattern: ${sa.skeleton},
observer: thisTurn.ref(__SYNDICATE__.assertionFacetObserver(
(${ctx.turnDecl(t)}, ${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}) => {
observer: __SYNDICATE__.Turn.ref(__SYNDICATE__.assertionFacetObserver(
(${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}) => {
if (Array.isArray(__vs)) {
${joinItems(sa.captureBinders.map(binderTypeGuard(t)), '\n')}
${walk(s.body)}
@ -158,13 +149,13 @@ ${joinItems(sa.captureBinders.map(binderTypeGuard(t)), '\n')}
? t`, new __SYNDICATE__.Set([${commaJoin(s.initialAssertions.map(walk))}])`
: ``;
*/
const n = s.name === void 0 ? '' : t` thisTurn.activeFacet.actor.name = ${walk(s.name)};`;
return t`_spawn${s.isLink ? 'Link': ''}((${ctx.turnDecl(t)}) => {${n} ${body} });`;
const n = s.name === void 0 ? '' : t` __SYNDICATE__.Turn.activeFacet.actor.name = ${walk(s.name)};`;
return t`_spawn${s.isLink ? 'Link': ''}(() => {${n} ${body} });`;
});
x(ctx.parser.fieldDeclarationStatement, (s, t) => {
const ft = ctx.typescript ? t`<${s.field.type ?? '__SYNDICATE__.AnyValue'}>` : '';
return t`const ${[s.field.id]} = thisTurn.field${ft}(${maybeWalk(s.init) ?? 'void 0'}, ${stringifyId(s.field.id)});`;
return t`const ${[s.field.id]} = __SYNDICATE__.Turn.active.field${ft}(${maybeWalk(s.init) ?? 'void 0'}, ${stringifyId(s.field.id)});`;
});
x(ctx.parser.atStatement, (s, t) => {
@ -172,15 +163,15 @@ ${joinItems(sa.captureBinders.map(binderTypeGuard(t)), '\n')}
});
x(ctx.parser.createExpression, (s, t) => {
return t`thisTurn.ref(${walk(s.entity)})`;
return t`__SYNDICATE__.Turn.ref(${walk(s.entity)})`;
});
xf(ctx.parser.assertionEndpointStatement, (s, t) => {
if (s.isDynamic) {
if (s.test == void 0) {
return t`assertDataflow((${ctx.turnDecl(t)}) => ({ target: currentSyndicateTarget, assertion: ${walk(s.template)} }));`;
return t`assertDataflow(() => ({ target: currentSyndicateTarget, assertion: ${walk(s.template)} }));`;
} else {
return t`assertDataflow((${ctx.turnDecl(t)}) => (${walk(s.test)})
return t`assertDataflow(() => (${walk(s.test)})
? ({ target: currentSyndicateTarget, assertion: ${walk(s.template)} })
: ({ target: void 0, assertion: void 0 }));`;
}
@ -194,15 +185,15 @@ ${joinItems(sa.captureBinders.map(binderTypeGuard(t)), '\n')}
});
xf(ctx.parser.dataflowStatement, (s, t) =>
t`_dataflow((${ctx.turnDecl(t)}) => {${walk(s.body)}});`);
t`_dataflow(() => {${walk(s.body)}});`);
xf(ctx.parser.eventHandlerEndpointStatement, (s, t) => {
x(ctx.parser.eventHandlerEndpointStatement, (s, t) => {
if (s.triggerType === 'dataflow') {
return t`withSelfDo((${ctx.turnDecl(t)}) => { dataflow { if (${walk(s.predicate)}) { ${terminalWrap(t, s.terminal, walk(s.body))} } } });`;
return t`dataflow { if (${walk(s.predicate)}) { ${terminalWrap(t, s.terminal, walk(s.body))} } }`;
}
if (s.triggerType === 'stop') {
return t`activeFacet.onStop((${ctx.turnDecl(t)}) => {${walk(s.body)}});`;
return t`__SYNDICATE__.Turn.activeFacet.onStop(() => {${walk(s.body)}});`;
}
const sa = compilePattern(s.pattern);
@ -215,19 +206,19 @@ ${joinItems(sa.captureBinders.map(binderTypeGuard(t)), '\n')}
switch (s.triggerType) {
case 'asserted':
entity = t`{
assert(${ctx.turnDecl(t)}, ${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}, ${ctx.argDecl(t, '__handle', '__SYNDICATE__.Handle')}) {
assert(${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}, ${ctx.argDecl(t, '__handle', '__SYNDICATE__.Handle')}) {
${guardBody(terminalWrap(t, s.terminal, walk(s.body)))}
}
}`;
break;
case 'retracted':
entity = t`__SYNDICATE__.assertionObserver((${ctx.turnDecl(t)}, ${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}) => {
${guardBody(t`return (${ctx.turnDecl(t)}) => { ${terminalWrap(t, s.terminal, walk(s.body))} };`)}
entity = t`__SYNDICATE__.assertionObserver((${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}) => {
${guardBody(t`return () => { ${terminalWrap(t, s.terminal, walk(s.body))} };`)}
})`;
break;
case 'message':
entity = t`{
message(${ctx.turnDecl(t)}, ${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}) {
message(${ctx.argDecl(t, '__vs', '__SYNDICATE__.AnyValue')}) {
${guardBody(terminalWrap(t, s.terminal, walk(s.body)))}
}
}`;
@ -236,16 +227,16 @@ ${joinItems(sa.captureBinders.map(binderTypeGuard(t)), '\n')}
const assertion = t`__SYNDICATE__.fromObserve(__SYNDICATE__.Observe({
pattern: ${sa.skeleton},
observer: thisTurn.ref(${entity}),
observer: __SYNDICATE__.Turn.ref(${entity}),
}))`;
if (s.isDynamic) {
return t`assertDataflow((${ctx.turnDecl(t)}) => ({
return t`__SYNDICATE__.Turn.active.assertDataflow(() => ({
target: currentSyndicateTarget,
assertion: ${assertion},
}));`;
} else {
return t`replace(currentSyndicateTarget, void 0, ${assertion});`;
return t`__SYNDICATE__.Turn.active.replace(currentSyndicateTarget, void 0, ${assertion});`;
}
});
@ -262,26 +253,11 @@ ${joinItems(sa.captureBinders.map(binderTypeGuard(t)), '\n')}
xf(ctx.parser.messageSendStatement, (s, t) => t`message(currentSyndicateTarget, ${walk(s.expr)});`);
xf(ctx.parser.reactStatement, (s, t) => {
return t`facet((${ctx.turnDecl(t)}) => {${s.body}});`;
return t`facet(() => {${s.body}});`;
});
x(ctx.parser.bootStatement, (s, t) => {
ctx.hasBootableBootProc = s.formals.length == 0;
const body = t`{${walk(s.body)}}`;
const bootFormals = commaJoin([ctx.turnDecl(t), ... s.formals.map(
b => ctx.argDecl(t, [b.id], b.type ?? '__SYNDICATE__.AnyValue'))]);
switch (ctx.moduleType) {
case 'es6':
return t`export function boot(${bootFormals}) ${body}`;
case 'require':
return t`module.exports.boot = (${bootFormals}) => ${body};`;
case 'global':
return t`function boot(${bootFormals}) ${body}`;
}
});
xf(ctx.parser.stopStatement, (s, t) =>
t`withSelfDo((${ctx.turnDecl(t)}) => thisTurn._stop(thisTurn.activeFacet, (${ctx.turnDecl(t)}) => {${walk(s.body)}});`)
x(ctx.parser.stopStatement, (s, t) =>
t`__SYNDICATE__.Turn.active._stop(__SYNDICATE__.Turn.activeFacet, () => {${walk(s.body)}});`)
return tree;
}
@ -297,7 +273,7 @@ export function compile(options: CompileOptions): CompilerOutput {
const start = startPos(inputFilename);
let tree = stripShebang(laxRead(source, { start, extraDelimiters: ':' }));
const end = tree.length > 0 ? tree[tree.length - 1].end : start;
// const end = tree.length > 0 ? tree[tree.length - 1].end : start;
const macro = new Templates(undefined, { extraDelimiters: ':' });
@ -306,21 +282,7 @@ export function compile(options: CompileOptions): CompilerOutput {
tree = expand(tree, ctx);
const ts = macro.template(fixPos(start));
const te = macro.template(fixPos(end));
if (ctx.hasBootableBootProc) {
let bp;
switch (moduleType) {
case 'es6':
case 'global':
bp = te`boot`;
break;
case 'require':
bp = te`module.exports.boot`;
break;
}
tree = te`${tree}\nif (typeof module !== 'undefined' && ((typeof require === 'undefined' ? {main: void 0} : require).main === module)) __SYNDICATE__.Actor.boot(${bp});`;
}
// const te = macro.template(fixPos(end));
{
const runtime = options.runtime ?? '@syndicate-lang/core';

View File

@ -22,7 +22,6 @@ export type Type = Items;
export type Binder = { id: Identifier, type?: Type };
export interface TurnAction {
implicitTurn: boolean;
}
export interface FacetSetupAction extends TurnAction {
@ -91,11 +90,6 @@ export interface DuringStatement extends FacetSetupAction {
export interface ReactStatement extends FacetSetupAction {
}
export interface BootStatement {
formals: Binder[];
body: Statement;
}
export interface AtStatement {
target: Expr;
body: Statement;
@ -187,8 +181,7 @@ export class SyndicateParser {
turnAction<T extends TurnAction>(pattern: (scope: T) => Pattern<any>): Pattern<T> {
return i => {
const scope = Object.create(null);
scope.implicitTurn = true;
const p = seq(option(map(atom('.'), _ => scope.implicitTurn = false)), pattern(scope));
const p = pattern(scope);
const r = p(i);
if (r === null) return null;
return [scope, r[1]];
@ -318,16 +311,6 @@ export class SyndicateParser {
return seq(atom('react'), this.block(o.body));
});
// Principal: none
readonly bootStatement: Pattern<BootStatement> =
scope(o => {
o.body = [];
return seq(
atom('boot'),
group('(', bind(o, 'formals', repeat(this.binder, { separator: atom(',') }))),
this.block(o.body));
});
// Principal: Turn
readonly stopStatement = this.blockTurnAction(atom('stop'));

View File

@ -6,7 +6,7 @@ import {
Pattern as P,
assertObserve,
Record,
Actor, Dataspace,
Actor, Dataspace, Turn,
} from '..';
const BoxState = Record.makeConstructor()(Symbol.for('BoxState'), ['value']);
@ -16,15 +16,15 @@ const N = 100000;
console.time('box-and-client-' + N.toString());
Actor.boot(t => {
t.activeFacet.preventInertCheck();
const ds = t.ref(new Dataspace());
Actor.boot(() => {
Turn.activeFacet.preventInertCheck();
const ds = Turn.ref(new Dataspace());
t.spawn(t => {
t.activeFacet.actor.name = 'box';
const boxValue = t.field(0, 'value');
Turn.active.spawn(() => {
Turn.activeFacet.actor.name = 'box';
const boxValue = Turn.active.field(0, 'value');
t.assertDataflow(_t => {
Turn.active.assertDataflow(() => {
// console.log('recomputing published BoxState', boxValue.value);
return {
target: ds,
@ -32,35 +32,35 @@ Actor.boot(t => {
};
});
t.dataflow(t => {
Turn.active.dataflow(() => {
// console.log('dataflow saw new value', boxValue.value);
if (boxValue.value === N) {
t.stop(t.activeFacet, _t => {
Turn.active.stop(Turn.activeFacet, () => {
console.log('terminated box root facet');
});
}
});
assertObserve(t, ds, P.rec(SetBox.constructorInfo.label, P.bind()), {
message(_t, [v]) {
assertObserve(ds, P.rec(SetBox.constructorInfo.label, P.bind()), {
message([v]) {
boxValue.value = v;
// console.log('box updated value', v);
}
});
});
t.spawn(t => {
t.activeFacet.actor.name = 'client';
Turn.active.spawn(() => {
Turn.activeFacet.actor.name = 'client';
assertObserve(t, ds, P.rec(BoxState.constructorInfo.label, P.bind()), {
assert(t, [v], _handle) {
assertObserve(ds, P.rec(BoxState.constructorInfo.label, P.bind()), {
assert([v], _handle) {
// console.log('client sending SetBox', v + 1);
t.message(ds, SetBox(v + 1));
Turn.active.message(ds, SetBox(v + 1));
}
});
assertObserve(t, ds, P.rec(BoxState.constructorInfo.label, P._), {
retract(_t) {
assertObserve(ds, P.rec(BoxState.constructorInfo.label, P._), {
retract() {
console.log('box gone');
console.timeEnd('box-and-client-' + N.toString());
}

View File

@ -6,7 +6,7 @@ import {
Pattern as P,
assertObserve,
Record,
Actor, Dataspace,
Actor, Dataspace, Turn,
} from '..';
const BoxState = Record.makeConstructor<{value: number}>()(Symbol.for('BoxState'), ['value']);
@ -16,15 +16,15 @@ const N = 100000;
console.time('box-and-client-' + N.toString());
Actor.boot(t => {
t.activeFacet.preventInertCheck();
const ds = t.ref(new Dataspace());
Actor.boot(() => {
Turn.activeFacet.preventInertCheck();
const ds = Turn.ref(new Dataspace());
t.spawn(t => {
t.activeFacet.actor.name = 'box';
const boxValue = t.field<number>(0, 'value');
Turn.active.spawn(() => {
Turn.activeFacet.actor.name = 'box';
const boxValue = Turn.active.field<number>(0, 'value');
t.assertDataflow(_t => {
Turn.active.assertDataflow(() => {
// console.log('recomputing published BoxState', boxValue.value);
return {
target: ds,
@ -32,35 +32,35 @@ Actor.boot(t => {
};
});
t.dataflow(t => {
Turn.active.dataflow(() => {
// console.log('dataflow saw new value', boxValue.value);
if (boxValue.value === N) {
t.stop(t.activeFacet, _t => {
Turn.active.stop(Turn.activeFacet, () => {
console.log('terminated box root facet');
});
}
});
assertObserve(t, ds, P.rec(SetBox.constructorInfo.label, P.bind()), {
message(_t, [v]: [number]) {
assertObserve(ds, P.rec(SetBox.constructorInfo.label, P.bind()), {
message([v]: [number]) {
boxValue.value = v;
// console.log('box updated value', v);
}
});
});
t.spawn(t => {
t.activeFacet.actor.name = 'client';
Turn.active.spawn(() => {
Turn.activeFacet.actor.name = 'client';
assertObserve(t, ds, P.rec(BoxState.constructorInfo.label, P.bind()), {
assert(t, [v]: [number], _handle) {
assertObserve(ds, P.rec(BoxState.constructorInfo.label, P.bind()), {
assert([v]: [number], _handle) {
// console.log('client sending SetBox', v + 1);
t.message(ds, SetBox(v + 1));
Turn.active.message(ds, SetBox(v + 1));
}
});
assertObserve(t, ds, P.rec(BoxState.constructorInfo.label, P._), {
retract(_t) {
assertObserve(ds, P.rec(BoxState.constructorInfo.label, P._), {
retract() {
console.log('box gone');
console.timeEnd('box-and-client-' + N.toString());
}

View File

@ -17,13 +17,13 @@ if ('stackTraceLimit' in Error) {
export type Assertion = Value<Ref>;
export type Handle = number;
export type ExitReason = null | { ok: true } | { ok: false, err: unknown };
export type LocalAction = (t: Turn) => void;
export type LocalAction = () => void;
export interface Entity {
assert(turn: Turn, assertion: Assertion, handle: Handle): void;
retract(turn: Turn, handle: Handle): void;
message(turn: Turn, body: Assertion): void;
sync(turn: Turn, peer: Ref): void;
assert(assertion: Assertion, handle: Handle): void;
retract(handle: Handle): void;
message(body: Assertion): void;
sync(peer: Ref): void;
}
export type Cap = Ref;
@ -51,7 +51,7 @@ let nextActorId = 0;
export const __setNextActorId = (v: number) => nextActorId = v;
export type DataflowGraph = Graph<DataflowBlock, Cell>;
export type DataflowBlock = (t: Turn) => void;
export type DataflowBlock = () => void;
export class Actor {
readonly id = nextActorId++;
@ -91,19 +91,19 @@ export class Actor {
this.exitHooks.push(a);
}
terminateWith(t: Turn, reason: Exclude<ExitReason, null>) {
terminateWith(reason: Exclude<ExitReason, null>) {
if (this.exitReason !== null) return;
this.exitReason = reason;
if (!reason.ok) {
console.error(`${this} crashed:`, reason.err);
}
this.exitHooks.forEach(hook => hook(t));
queueTask(() => Turn.for(this.root, t => this.root._terminate(t, reason.ok), true));
this.exitHooks.forEach(hook => hook());
queueTask(() => Turn.for(this.root, () => this.root._terminate(reason.ok), true));
}
repairDataflowGraph(t: Turn) {
repairDataflowGraph() {
if (this._dataflowGraph === null) return;
this._dataflowGraph.repairDamage(block => block(t));
this._dataflowGraph.repairDamage(block => block());
}
toString(): string {
@ -157,26 +157,26 @@ export class Facet {
this.outbound.set(h, e);
}
_terminate(t: Turn, orderly: boolean): void {
_terminate(orderly: boolean): void {
if (!this.isLive) return;
this.isLive = false;
const parent = this.parent;
if (parent) parent.children.delete(this);
t._inFacet(this, t => {
this.children.forEach(child => child._terminate(t, orderly));
if (orderly) this.shutdownActions.forEach(a => a(t));
this.outbound.forEach(e => t._retract(e));
Turn.active._inFacet(this, () => {
this.children.forEach(child => child._terminate(orderly));
if (orderly) this.shutdownActions.forEach(a => a());
this.outbound.forEach(e => Turn.active._retract(e));
if (orderly) {
queueTask(() => {
if (parent) {
if (parent.isInert()) {
Turn.for(parent, t => parent._terminate(t, true));
Turn.for(parent, () => parent._terminate(true));
}
} else {
Turn.for(this.actor.root, t => this.actor.terminateWith(t, { ok: true }), true);
Turn.for(this.actor.root, () => this.actor.terminateWith({ ok: true }), true);
}
});
}
@ -197,19 +197,29 @@ export class Facet {
}
export class StopOnRetract implements Partial<Entity> {
retract(turn: Turn, _handle: Handle): void {
turn.stop();
retract(_handle: Handle): void {
Turn.active.stop();
}
}
export function _sync_impl(turn: Turn, e: Partial<Entity>, peer: Ref): void {
e.sync ? e.sync!(turn, peer) : turn.message(peer, true);
export function _sync_impl(e: Partial<Entity>, peer: Ref): void {
e.sync ? e.sync!(peer) : Turn.active.message(peer, true);
}
let nextHandle = 0;
let nextTurnId = 0;
export class Turn {
static active: Turn = void 0 as unknown as Turn;
static get activeFacet(): Facet {
return Turn.active.activeFacet;
}
static ref<T extends Partial<Entity>>(e: T): Ref {
return Turn.active.ref(e);
}
readonly id = nextTurnId++;
_activeFacet: Facet;
queues: Map<Actor, LocalAction[]> | null;
@ -221,11 +231,17 @@ export class Turn {
}
const t = new Turn(facet);
try {
f(t);
facet.actor.repairDataflowGraph(t);
const saved = Turn.active;
Turn.active = t;
try {
f();
facet.actor.repairDataflowGraph();
} finally {
Turn.active = saved;
}
t.deliver();
} catch (err) {
Turn.for(facet.actor.root, t => facet.actor.terminateWith(t, { ok: false, err }));
Turn.for(facet.actor.root, () => facet.actor.terminateWith({ ok: false, err }));
}
}
@ -241,7 +257,7 @@ export class Turn {
_inFacet(facet: Facet, f: LocalAction): void {
const saved = this._activeFacet;
this._activeFacet = facet;
f(this);
f();
this._activeFacet = saved;
}
@ -261,9 +277,9 @@ export class Turn {
}
stop(facet: Facet = this.activeFacet, continuation?: LocalAction) {
this.enqueue(facet.parent!, t => {
facet._terminate(t, true);
if (continuation) continuation(t);
this.enqueue(facet.parent!, () => {
facet._terminate(true);
if (continuation) continuation();
});
}
@ -302,11 +318,11 @@ export class Turn {
}
stopActor(): void {
this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: true }));
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor.terminateWith({ ok: true }));
}
crash(err: Error): void {
this.enqueue(this.activeFacet.actor.root, t => this.activeFacet.actor.terminateWith(t, { ok: false, err }));
this.enqueue(this.activeFacet.actor.root, () => this.activeFacet.actor.terminateWith({ ok: false, err }));
}
field<V extends Value<T>, T = Ref>(initial: V, name?: string): Field<V, T> {
@ -320,21 +336,21 @@ export class Turn {
dataflow(a: LocalAction) {
const f = this.activeFacet;
const b = (t: Turn) => f.isLive && t._inFacet(f, a);
f.onStop(_t => f.actor.dataflowGraph.forgetSubject(b));
f.actor.dataflowGraph.withSubject(b, () => b(this));
const b = () => f.isLive && Turn.active._inFacet(f, a);
f.onStop(() => f.actor.dataflowGraph.forgetSubject(b));
f.actor.dataflowGraph.withSubject(b, b);
}
assertDataflow(assertionFunction: (t: Turn) => {target: Ref, assertion: Assertion}) {
assertDataflow(assertionFunction: () => {target: Ref, assertion: Assertion}) {
let handle: Handle | undefined = void 0;
let target: Ref | undefined = void 0;
let assertion: Assertion | undefined = void 0;
this.dataflow(t => {
let {target: nextTarget, assertion: nextAssertion} = assertionFunction(t);
this.dataflow(() => {
let {target: nextTarget, assertion: nextAssertion} = assertionFunction();
if (target !== nextTarget || !is(assertion, nextAssertion)) {
target = nextTarget;
assertion = nextAssertion;
handle = t.replace(target, handle, assertion);
handle = Turn.active.replace(target, handle, assertion);
}
});
}
@ -350,9 +366,9 @@ export class Turn {
if (a !== null) {
const e = { handle: h, peer: ref, established: false };
this.activeFacet.outbound.set(h, e);
this.enqueue(ref.relay, t => {
this.enqueue(ref.relay, () => {
e.established = true;
ref.target.assert?.(t, a, h);
ref.target.assert?.(a, h);
});
}
}
@ -375,52 +391,49 @@ export class Turn {
_retract(e: OutboundAssertion): void {
this.activeFacet.outbound.delete(e.handle);
this.enqueue(e.peer.relay, t => {
this.enqueue(e.peer.relay, () => {
if (e.established) {
e.established = false;
e.peer.target.retract?.(t, e.handle);
e.peer.target.retract?.(e.handle);
}
});
}
sync(ref: Ref): Promise<Turn> {
return new Promise(resolve => this._sync(ref, this.ref({ message: resolve })));
sync(ref: Ref): Promise<void> {
return new Promise(resolve => this._sync(ref, this.ref({ message() { resolve() } })));
}
_sync(ref: Ref, peer: Ref): void {
this.enqueue(ref.relay, t => _sync_impl(t, ref.target, peer));
this.enqueue(ref.relay, () => _sync_impl(ref.target, peer));
}
message(ref: Ref, assertion: Assertion): void {
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) this.enqueue(ref.relay, t => ref.target.message?.(t, assertion));
if (a !== null) this.enqueue(ref.relay, () => ref.target.message?.(assertion));
}
enqueue(relay: Facet, a0: LocalAction): void {
if (this.queues === null) {
throw new Error("Attempt to reuse a committed Turn");
}
const a: LocalAction = t => t._inFacet(relay, a0);
const a: LocalAction = () => Turn.active._inFacet(relay, a0);
this.queues.get(relay.actor)?.push(a) ?? this.queues.set(relay.actor, [a]);
}
deliver() {
this.queues!.forEach((q, actor) =>
queueTask(() => Turn.for(actor.root, t => q.forEach(f => f(t)))));
queueTask(() => Turn.for(actor.root, () => q.forEach(f => f()))));
this.queues = null;
}
withSelfDo(a: LocalAction) {
a(this);
}
}
function stopIfInertAfter(a: LocalAction): LocalAction {
return t => {
a(t);
t.enqueue(t.activeFacet, t => {
if ((t.activeFacet.parent && !t.activeFacet.parent.isLive) || t.activeFacet.isInert()) {
t.stop();
return () => {
const facet = Turn.activeFacet;
a();
Turn.active.enqueue(facet, () => {
if ((facet.parent && !facet.parent.isLive) || facet.isInert()) {
Turn.active.stop(facet);
}
});
};

View File

@ -11,63 +11,63 @@ export class Dataspace implements Partial<Entity> {
readonly index = new Index();
readonly handleMap = new IdentityMap<Handle, [Assertion, Observe | undefined]>();
assert(turn: Turn, v: Assertion, handle: Handle): void {
this.index.addAssertion(turn, v);
assert(v: Assertion, handle: Handle): void {
this.index.addAssertion(v);
const o = toObserve(v);
if (o !== void 0) {
this.index.addObserver(turn, o.pattern, o.observer);
this.index.addObserver(o.pattern, o.observer);
}
this.handleMap.set(handle, [v, o]);
}
retract(turn: Turn, handle: Handle): void {
retract(handle: Handle): void {
const entry = this.handleMap.get(handle);
if (entry === void 0) return;
const [v, o] = entry;
if (o !== void 0) {
this.index.removeObserver(turn, o.pattern, o.observer);
this.index.removeObserver(o.pattern, o.observer);
}
this.index.removeAssertion(turn, v);
this.index.removeAssertion(v);
}
message(turn: Turn, v: Assertion): void {
this.index.deliverMessage(turn, v);
message(v: Assertion): void {
this.index.deliverMessage(v);
}
}
export function assertionObserver(f: (t: Turn, a: Assertion) => LocalAction | undefined): Partial<Entity> {
export function assertionObserver(f: (a: Assertion) => LocalAction | undefined): Partial<Entity> {
const assertionMap = new IdentityMap<Handle, LocalAction>();
return {
assert(t: Turn, a: Assertion, h: Handle): void {
const g = f(t, a) ?? null;
assert(a: Assertion, h: Handle): void {
const g = f(a) ?? null;
if (g !== null) {
assertionMap.set(h, g);
}
},
retract(t: Turn, h: Handle): void {
assertionMap.get(h)?.(t);
retract(h: Handle): void {
assertionMap.get(h)?.();
assertionMap.delete(h);
},
};
}
export function assertionFacetObserver(f: (t: Turn, a: Assertion) => void, inertOk: boolean = true): Partial<Entity> {
export function assertionFacetObserver(f: (a: Assertion) => void, inertOk: boolean = true): Partial<Entity> {
const facetMap = new IdentityMap<Handle, Facet>();
return {
assert(t: Turn, a: Assertion, h: Handle): void {
facetMap.set(h, t.facet(t => {
if (inertOk) t.activeFacet.preventInertCheck();
f(t, a);
assert(a: Assertion, h: Handle): void {
facetMap.set(h, Turn.active.facet(() => {
if (inertOk) Turn.activeFacet.preventInertCheck();
f(a);
}));
},
retract(t: Turn, h: Handle): void {
retract(h: Handle): void {
const facet = facetMap.get(h);
if (facet) t.stop(facet);
if (facet) Turn.active.stop(facet);
facetMap.delete(h);
},
};
}
export function assertObserve(t: Turn, ds: Ref, pattern: P.Pattern, e: Partial<Entity>): Handle {
return t.assert(ds, fromObserve(Observe({ pattern, observer: t.ref(e) })));
export function assertObserve(ds: Ref, pattern: P.Pattern, e: Partial<Entity>): Handle {
return Turn.active.assert(ds, fromObserve(Observe({ pattern, observer: Turn.ref(e) })));
}

View File

@ -1,7 +1,8 @@
/// SPDX-License-Identifier: GPL-3.0-or-later
/// SPDX-FileCopyrightText: Copyright © 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
import type { Assertion, Handle, Ref, Turn } from "./actor.js";
import type { Assertion, Handle, Ref } from "./actor.js";
import { Turn } from "./actor.js";
import { Bytes, Dictionary, DoubleFloat, embed, IdentityMap, is, isEmbedded, Record, SingleFloat, Tuple } from "@preserves/core";
import { SturdyValue } from "../transport/sturdy.js";
@ -209,32 +210,32 @@ export function attenuate(ref: Ref, ... a: Attenuation): Ref {
return { ... ref, attenuation: [... a, ... (ref.attenuation ?? [])] };
}
export function forwarder(t: Turn, ref: Ref): { proxy: Ref, revoker: Ref } {
export function forwarder(ref: Ref): { proxy: Ref, revoker: Ref } {
let underlying: Ref | null = ref;
let handleMap = new IdentityMap<Handle, Handle>();
let proxy = t.ref({
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
let proxy = Turn.ref({
assert(assertion: Assertion, handle: Handle): void {
if (underlying === null) return;
handleMap.set(handle, turn.assert(underlying, assertion));
handleMap.set(handle, Turn.active.assert(underlying, assertion));
},
retract(turn: Turn, handle: Handle): void {
retract(handle: Handle): void {
if (underlying === null) return;
turn.retract(handleMap.get(handle));
Turn.active.retract(handleMap.get(handle));
handleMap.delete(handle);
},
message(turn: Turn, body: Assertion): void {
message(body: Assertion): void {
if (underlying === null) return;
turn.message(underlying, body);
Turn.active.message(underlying, body);
},
sync(turn: Turn, peer: Ref): void {
sync(peer: Ref): void {
if (underlying === null) return;
turn._sync(underlying, peer);
Turn.active._sync(underlying, peer);
},
});
let revoker = t.ref({
message(turn: Turn, _body: Assertion): void {
let revoker = Turn.ref({
message(_body: Assertion): void {
underlying = null;
handleMap.forEach(h => turn.retract(h));
handleMap.forEach(h => Turn.active.retract(h));
},
});
return { proxy, revoker };

View File

@ -20,7 +20,7 @@ export class Index {
readonly allAssertions: Bag<Ref> = new Bag();
readonly root: Node = new Node(new Continuation(new Set()));
addObserver(t: Turn, pattern: P.Pattern, observer: Ref) {
addObserver(pattern: P.Pattern, observer: Ref) {
let {constPaths, constValues, capturePaths} = analysePattern(pattern);
const continuation = this.root.extend(pattern);
let constValMap = continuation.leafMap.get(constPaths);
@ -53,10 +53,10 @@ export class Index {
const captureMap: KeyedDictionary<Array<AnyValue>, Handle, Ref> = new KeyedDictionary();
observerGroup.observers.set(observer, captureMap);
observerGroup.cachedCaptures.forEach((_count, captures) =>
captureMap.set(captures, t.assert(observer, captures)));
captureMap.set(captures, Turn.active.assert(observer, captures)));
}
removeObserver(t: Turn, pattern: P.Pattern, observer: Ref) {
removeObserver(pattern: P.Pattern, observer: Ref) {
let {constPaths, constValues, capturePaths} = analysePattern(pattern);
const continuation = this.root.extend(pattern);
let constValMap = continuation.leafMap.get(constPaths);
@ -67,7 +67,7 @@ export class Index {
if (!observerGroup) return;
const captureMap = observerGroup.observers.get(observer);
if (captureMap) {
captureMap.forEach((handle, _captures) => t.retract(handle));
captureMap.forEach((handle, _captures) => Turn.active.retract(handle));
observerGroup.observers.delete(observer);
}
if (observerGroup.observers.size === 0) {
@ -81,7 +81,7 @@ export class Index {
}
}
adjustAssertion(t: Turn, outerValue: Assertion, delta: number): ChangeDescription {
adjustAssertion(outerValue: Assertion, delta: number): ChangeDescription {
let net = this.allAssertions.change(outerValue, delta);
switch (net) {
case ChangeDescription.ABSENT_TO_PRESENT:
@ -93,7 +93,7 @@ export class Index {
(h, vs) => {
if (h.cachedCaptures.change(vs, +1) === ChangeDescription.ABSENT_TO_PRESENT)
h.observers.forEach((captureMap, observer) =>
captureMap.set(vs, t.assert(observer, vs)));
captureMap.set(vs, Turn.active.assert(observer, vs)));
});
break;
@ -106,7 +106,7 @@ export class Index {
(h, vs) => {
if (h.cachedCaptures.change(vs, -1) === ChangeDescription.PRESENT_TO_ABSENT)
h.observers.forEach((captureMap, _observer) => {
t.retract(captureMap.get(vs));
Turn.active.retract(captureMap.get(vs));
captureMap.delete(vs);
});
});
@ -115,17 +115,17 @@ export class Index {
return net;
}
addAssertion(t: Turn, v: Assertion) {
this.adjustAssertion(t, v, +1);
addAssertion(v: Assertion) {
this.adjustAssertion(v, +1);
}
removeAssertion(t: Turn, v: Assertion) {
this.adjustAssertion(t, v, -1);
removeAssertion(v: Assertion) {
this.adjustAssertion(v, -1);
}
deliverMessage(t: Turn, v: Assertion, leafCallback: (l: Leaf, v: Assertion) => void = _nop) {
deliverMessage(v: Assertion, leafCallback: (l: Leaf, v: Assertion) => void = _nop) {
this.root.modify(EventType.MESSAGE, v, _nop, leafCallback, (h, vs) =>
h.observers.forEach((_captureMap, observer) => t.message(observer, vs)));
h.observers.forEach((_captureMap, observer) => Turn.active.message(observer, vs)));
}
}

View File

@ -20,23 +20,23 @@ export class SyncPeerEntity implements Entity {
this.peer = peer;
}
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
this.handleMap.set(handle, turn.assert(this.peer, assertion));
assert(assertion: Assertion, handle: Handle): void {
this.handleMap.set(handle, Turn.active.assert(this.peer, assertion));
}
retract(turn: Turn, handle: Handle): void {
turn.retract(this.handleMap.get(handle)!);
retract(handle: Handle): void {
Turn.active.retract(this.handleMap.get(handle)!);
this.handleMap.delete(handle);
}
message(turn: Turn, body: Assertion): void {
message(body: Assertion): void {
// We get to vanish from the indexes now
this.relay.releaseRefOut(this.e!);
turn.message(this.peer, body);
Turn.active.message(this.peer, body);
}
sync(turn: Turn, peer: Ref): void {
turn._sync(this.peer, peer);
sync(peer: Ref): void {
Turn.active._sync(this.peer, peer);
}
}
@ -53,26 +53,26 @@ export class RelayEntity implements Entity {
this.relay.send(this.oid, m);
}
assert(_turn: Turn, assertion: Assertion, handle: Handle): void {
assert(assertion: Assertion, handle: Handle): void {
this.send(IO.Event.Assert(IO.Assert({
assertion: this.relay.register(assertion, handle),
handle
})))
}
retract(_turn: Turn, handle: Handle): void {
retract(handle: Handle): void {
this.relay.deregister(handle);
this.send(IO.Event.Retract(IO.Retract(handle)));
}
message(_turn: Turn, body: Assertion): void {
message(body: Assertion): void {
this.send(IO.Event.Message(IO.Message(this.relay.register(body, null))));
}
sync(turn: Turn, peer: Ref): void {
sync(peer: Ref): void {
const peerEntity = new SyncPeerEntity(this.relay, peer);
const exported: Array<WireSymbol> = [];
const ior = this.relay.rewriteRefOut(turn.ref(peerEntity), false, exported);
const ior = this.relay.rewriteRefOut(Turn.ref(peerEntity), false, exported);
peerEntity.e = exported[0];
this.send(IO.Event.Sync(IO.Sync(ior)));
}
@ -107,7 +107,7 @@ export class Membrane {
}
export const INERT_REF: Ref = {
relay: Actor.boot(t => t.stop()).root,
relay: Actor.boot(() => Turn.active.stop()).root,
target: {},
};
@ -115,7 +115,7 @@ export type PacketWriter = (bs: Uint8Array) => void;
export interface RelayOptions {
packetWriter: PacketWriter;
setup(t: Turn, r: Relay): void;
setup(r: Relay): void;
debug?: boolean;
trustPeer?: boolean;
}
@ -140,14 +140,14 @@ export class Relay {
embeddedDecode: wireRefEmbeddedType,
});
constructor(t: Turn, options: RelayOptions) {
this.facet = t.activeFacet;
constructor(options: RelayOptions) {
this.facet = Turn.activeFacet;
this.w = options.packetWriter;
this.debug = options.debug ?? false;
this.trustPeer = options.trustPeer ?? true;
this.facet.preventInertCheck();
options.setup(t, this);
options.setup(this);
}
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
@ -157,10 +157,10 @@ export class Relay {
return [rewritten, exported];
}
rewriteIn(t: Turn, a: Value<WireRef>): [Assertion, Array<WireSymbol>]
rewriteIn(a: Value<WireRef>): [Assertion, Array<WireSymbol>]
{
const imported: Array<WireSymbol> = [];
const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(t, r, imported)));
const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(r, imported)));
return [rewritten, imported];
}
@ -204,7 +204,7 @@ export class Relay {
this.exported.drop(e);
}
rewriteRefIn(t: Turn, n: WireRef, imported: Array<WireSymbol>): Ref {
rewriteRefIn(n: WireRef, imported: Array<WireSymbol>): Ref {
switch (n._variant) {
case 'yours': {
const r = this.lookupLocal(n.oid);
@ -222,7 +222,7 @@ export class Relay {
}
case 'mine': {
const e = this.imported.grab("byOid", n.oid, false, () =>
({ oid: n.oid, ref: t.ref(new RelayEntity(this, n.oid)), count: 0 }));
({ oid: n.oid, ref: Turn.ref(new RelayEntity(this, n.oid)), count: 0 }));
imported.push(e);
return e.ref;
}
@ -248,7 +248,7 @@ export class Relay {
}
accept(bs: BytesLike): void {
Turn.for(this.facet, t => {
Turn.for(this.facet, () => {
this.decoder.write(bs);
while (true) {
const rawTurn = this.decoder.try_next();
@ -258,18 +258,18 @@ export class Relay {
if (this.debug) console.log('IN', rawTurn.asPreservesText());
wireTurn.forEach(v => {
const { oid: localOid, event: m } = v;
this.handle(t, this.lookupLocal(localOid), m);
this.handle(this.lookupLocal(localOid), m);
});
}
});
}
handle(t: Turn, r: Ref, m: IO.Event<WireRef>) {
handle(r: Ref, m: IO.Event<WireRef>) {
switch (m._variant) {
case 'Assert': {
const [a, imported] = this.rewriteIn(t, m.value.assertion);
const [a, imported] = this.rewriteIn(m.value.assertion);
this.inboundAssertions.set(m.value.handle, {
localHandle: t.assert(r, a),
localHandle: Turn.active.assert(r, a),
imported,
});
break;
@ -280,20 +280,20 @@ export class Relay {
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
this.inboundAssertions.delete(remoteHandle);
h.imported.forEach(e => this.imported.drop(e));
t.retract(h.localHandle);
Turn.active.retract(h.localHandle);
break;
}
case 'Message': {
const [a, imported] = this.rewriteIn(t, m.value.body);
const [a, imported] = this.rewriteIn(m.value.body);
if (imported.length > 0) throw new Error("Cannot receive transient reference");
t.message(r, a);
Turn.active.message(r, a);
break;
}
case 'Sync': {
const imported: Array<WireSymbol> = [];
const k = this.rewriteRefIn(t, m.value.peer, imported);
t.sync(r).then(t => {
t.message(k, true);
const k = this.rewriteRefIn(m.value.peer, imported);
Turn.active.sync(r).then(() => {
Turn.active.message(k, true);
imported.forEach(e => this.imported.drop(e));
});
break;
@ -308,18 +308,18 @@ export interface RelayActorOptions extends RelayOptions {
nextLocalOid?: IO.Oid;
}
export function spawnRelay(t: Turn, options: RelayActorOptions & {initialOid: IO.Oid}): Promise<Ref>;
export function spawnRelay(t: Turn, options: Omit<RelayActorOptions, 'initialOid'>): Promise<null>;
export function spawnRelay(t: Turn, options: RelayActorOptions): Promise<Ref | null>
export function spawnRelay(options: RelayActorOptions & {initialOid: IO.Oid}): Promise<Ref>;
export function spawnRelay(options: Omit<RelayActorOptions, 'initialOid'>): Promise<null>;
export function spawnRelay(options: RelayActorOptions): Promise<Ref | null>
{
return new Promise(resolve => {
t.spawn(t => {
const relay = new Relay(t, options);
Turn.active.spawn(() => {
const relay = new Relay(options);
if (options.initialRef !== void 0) {
relay.rewriteRefOut(options.initialRef, false, []);
}
if (options.initialOid !== void 0) {
resolve(relay.rewriteRefIn(t, WireRef.mine(options.initialOid), []));
resolve(relay.rewriteRefIn(WireRef.mine(options.initialOid), []));
} else {
resolve(null);
}

View File

@ -3,7 +3,7 @@
import { BoxState, SetBox, N } from './protocol.js';
boot(ds) {
export function boot(ds) {
spawn named 'box' {
field boxValue = 0;
at ds {

View File

@ -3,7 +3,7 @@
import { BoxState, SetBox } from './protocol.js';
boot(ds, doneCallback) {
export function boot(ds, doneCallback) {
spawn named 'client' {
at ds {
on asserted BoxState($v) => send message SetBox(v + 1);

View File

@ -4,12 +4,12 @@
import { N } from './protocol.js';
import * as Box from './box.js';
import * as Client from './client.js';
import { Dataspace } from '@syndicate-lang/core';
import { Actor, Dataspace, Turn } from '@syndicate-lang/core';
console.time('box-and-client-' + N.toString());
boot() {
thisTurn.activeFacet.preventInertCheck();
Actor.boot(() => {
Turn.activeFacet.preventInertCheck();
const ds = create new Dataspace();
Box.boot(thisTurn, ds);
Client.boot(thisTurn, ds, () => console.timeEnd('box-and-client-' + N.toString()));
}
Box.boot(ds);
Client.boot(ds, () => console.timeEnd('box-and-client-' + N.toString()));
});

View File

@ -7,7 +7,4 @@
<h1>Look in the JavaScript console for output.</h1>
<main id="main">
</main>
<script>
Syndicate.Actor.boot(Main.boot);
</script>
</html>

View File

@ -4,7 +4,7 @@
import { BoxState, SetBox, N } from './protocol.js';
import { Ref } from '@syndicate-lang/core';
boot(ds: Ref) {
export function boot(ds: Ref) {
spawn named 'box' {
field boxValue: number = 0;
at ds {

View File

@ -4,7 +4,7 @@
import { BoxState, SetBox } from './protocol.js';
import { Ref } from '@syndicate-lang/core';
boot(ds: Ref, doneCallback: () => void) {
export function boot(ds: Ref, doneCallback: () => void) {
spawn named 'client' {
at ds {
on asserted BoxState($v: number) => send message SetBox(v + 1);

View File

@ -4,12 +4,12 @@
import { N } from './protocol.js';
import * as Box from './box.js';
import * as Client from './client.js';
import { Dataspace } from '@syndicate-lang/core';
import { Actor, Dataspace, Turn } from '@syndicate-lang/core';
console.time('box-and-client-' + N.toString());
boot() {
thisTurn.activeFacet.preventInertCheck();
Actor.boot(() => {
Turn.activeFacet.preventInertCheck();
const ds = create new Dataspace();
Box.boot(thisTurn, ds);
Client.boot(thisTurn, ds, () => console.timeEnd('box-and-client-' + N.toString()));
}
Box.boot(ds);
Client.boot(ds, () => console.timeEnd('box-and-client-' + N.toString()));
});