From b60551a52b1b852789eac5325d45f83fadde277f Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 14 Jan 2021 14:42:30 +0100 Subject: [PATCH] Major step toward eliminating ambient authority --- packages/core/examples/box-and-client.js | 102 +++---- packages/core/src/runtime/dataspace.ts | 328 +++++++++++------------ packages/core/src/runtime/ground.ts | 6 +- packages/core/src/runtime/relay.ts | 28 +- packages/core/src/runtime/skeleton.ts | 2 +- 5 files changed, 221 insertions(+), 245 deletions(-) diff --git a/packages/core/examples/box-and-client.js b/packages/core/examples/box-and-client.js index 2f54f44..b155ee5 100755 --- a/packages/core/examples/box-and-client.js +++ b/packages/core/examples/box-and-client.js @@ -28,58 +28,58 @@ const N = 100000; console.time('box-and-client-' + N.toString()); -new Ground(() => { - Dataspace.spawn('box', function () { - Dataspace.declareField(this, 'value', 0); - Dataspace.currentFacet.addEndpoint(() => { - // console.log('recomputing published BoxState', this.value); - return { assertion: BoxState(this.value), analysis: null }; - }); - Dataspace.currentFacet.addDataflow(() => { - // console.log('dataflow saw new value', this.value); - if (this.value === N) { - Dataspace.currentFacet.stop(() => { - console.log('terminated box root facet'); - }); - } - }); - Dataspace.currentFacet.addEndpoint(() => { - let analysis = Skeleton.analyzeAssertion(SetBox(_$)); - analysis.callback = Dataspace.wrap((evt, vs) => { - if (evt === Skeleton.EventType.MESSAGE) { - Dataspace.currentFacet.actor.scheduleScript(() => { - this.value = vs[0]; - // console.log('box updated value', vs[0]); - }); - } - }); - return { assertion: Observe(SetBox(_$)), analysis }; - }); +new Ground(groundRoot => { + groundRoot.spawn('box', function (boxRoot) { + boxRoot.actor.dataspace.declareField(this, 'value', 0); + boxRoot.addEndpoint(() => { + // console.log('recomputing published BoxState', this.value); + return { assertion: BoxState(this.value), analysis: null }; }); + boxRoot.addDataflow(() => { + // console.log('dataflow saw new value', this.value); + if (this.value === N) { + boxRoot.stop(() => { + console.log('terminated box root facet'); + }); + } + }); + boxRoot.addEndpoint(() => { + let analysis = Skeleton.analyzeAssertion(SetBox(_$)); + analysis.callback = boxRoot.wrap((facet, evt, vs) => { + if (evt === Skeleton.EventType.MESSAGE) { + boxRoot.scheduleScript(() => { + this.value = vs[0]; + // console.log('box updated value', vs[0]); + }); + } + }); + return { assertion: Observe(SetBox(_$)), analysis }; + }); + }); - Dataspace.spawn('client', function () { - Dataspace.currentFacet.addEndpoint(() => { - let analysis = Skeleton.analyzeAssertion(BoxState(_$)); - analysis.callback = Dataspace.wrap((evt, vs) => { - if (evt === Skeleton.EventType.ADDED) { - Dataspace.currentFacet.actor.scheduleScript(() => { - // console.log('client sending SetBox', vs[0] + 1); - Dataspace.send(SetBox(vs[0] + 1)); - }); - } - }); - return { assertion: Observe(BoxState(_$)), analysis }; - }); - Dataspace.currentFacet.addEndpoint(() => { - let analysis = Skeleton.analyzeAssertion(BoxState(__)); - analysis.callback = Dataspace.wrap((evt, _vs) => { - if (evt === Skeleton.EventType.REMOVED) { - Dataspace.currentFacet.actor.scheduleScript(() => { - console.log('box gone'); - }); - } - }); - return { assertion: Observe(BoxState(__)), analysis }; - }); + groundRoot.spawn('client', function (clientRoot) { + clientRoot.addEndpoint(() => { + let analysis = Skeleton.analyzeAssertion(BoxState(_$)); + analysis.callback = clientRoot.wrap((facet, evt, vs) => { + if (evt === Skeleton.EventType.ADDED) { + clientRoot.scheduleScript(() => { + // console.log('client sending SetBox', vs[0] + 1); + clientRoot.send(SetBox(vs[0] + 1)); + }); + } + }); + return { assertion: Observe(BoxState(_$)), analysis }; }); + clientRoot.addEndpoint(() => { + let analysis = Skeleton.analyzeAssertion(BoxState(__)); + analysis.callback = clientRoot.wrap((facet, evt, _vs) => { + if (evt === Skeleton.EventType.REMOVED) { + clientRoot.scheduleScript(() => { + console.log('box gone'); + }); + } + }); + return { assertion: Observe(BoxState(__)), analysis }; + }); + }); }).addStopHandler(() => console.timeEnd('box-and-client-' + N.toString())).start(); diff --git a/packages/core/src/runtime/dataspace.ts b/packages/core/src/runtime/dataspace.ts index 6d87cdb..ce8ea2d 100644 --- a/packages/core/src/runtime/dataspace.ts +++ b/packages/core/src/runtime/dataspace.ts @@ -39,12 +39,13 @@ export type ActorId = number; export type FacetId = ActorId; export type EndpointId = ActorId; -export type Script = () => void; +export type Task = () => T; +export type Script = (f: Facet) => T; export type MaybeValue = Value | undefined; export type EndpointSpec = { assertion: MaybeValue, analysis: Skeleton.Analysis | null }; -export type ObserverCallback = (bindings: Array) => void; +export type ObserverCallback = (facet: Facet, bindings: Array) => void; export type ObserverCallbacks = { add?: ObserverCallback; @@ -77,109 +78,62 @@ export abstract class Dataspace { pendingTurns: Array; actors: IdentityMap = new IdentityMap(); - constructor(bootProc: Script) { + constructor(bootProc: Script) { this.pendingTurns = [new Turn(null, [new Spawn(null, bootProc, new Set())])]; } - static _currentFacet: Facet | null = null; - static _inScript = true; + _inScript = true; - static get currentFacet(): Facet | null { - return Dataspace._currentFacet; - } - - static withNonScriptContext(thunk: () => T) { - let savedInScript = Dataspace._inScript; - Dataspace._inScript = false; + withNonScriptContext(task: Task): T { + let savedInScript = this._inScript; + this._inScript = false; try { - return thunk(); + return task(); } finally { - Dataspace._inScript = savedInScript; + this._inScript = savedInScript; } } - static withCurrentFacet(facet: Facet, thunk: () => T) { - let savedFacet = Dataspace._currentFacet; - Dataspace._currentFacet = facet; - try { - // console.group('Facet', facet && facet.toString()); - let result = thunk(); - Dataspace._currentFacet = savedFacet; - return result; - } catch (e) { - let a = facet.actor; - a.abandonQueuedWork(); - a._terminate(false); - Dataspace._currentFacet = savedFacet; - console.error('Actor ' + a.toString() + ' exited with exception:', e); - } finally { - // console.groupEnd(); - } - } - - static wrap, R>(f: (... args: T) => R): (... args: T) => R { - const savedFacet = Dataspace._currentFacet; - return (... actuals) => - Dataspace.withCurrentFacet(savedFacet, () => f.apply(savedFacet.fields, actuals)); - } - abstract start(): this; abstract ground(): Ground; - static wrapExternal>(f: (... args: T) => void): (... args: T) => void { - const savedFacet = Dataspace._currentFacet; - const ac = savedFacet.actor; - return (... actuals) => { - if (savedFacet.isLive) { - ac.dataspace.start(); - ac.pushScript(() => - Dataspace.withCurrentFacet(savedFacet, () => - f.apply(savedFacet.fields, actuals))); - } - }; + backgroundTask(): () => void { + return this.ground().backgroundTask(); } - static backgroundTask(): () => void { - return Dataspace._currentFacet.actor.dataspace.ground().backgroundTask(); - } - - static referenceField(obj: DataflowObservableObject, prop: string) { + referenceField(obj: DataflowObservableObject, prop: string) { if (!(prop in obj)) { - Dataspace._currentFacet.actor.dataspace.dataflow.recordObservation([obj, prop]); + this.dataflow.recordObservation([obj, prop]); } return obj[prop]; } - static declareField(obj: DataflowObservableObject, prop: string, init: any) { + declareField(obj: DataflowObservableObject, prop: string, init: any) { if (prop in obj) { obj[prop] = init; } else { - Dataspace._currentFacet.actor.dataspace.dataflow.defineObservableProperty( - obj, - prop, - init, - { - objectId: [obj, prop], - noopGuard: is - }); + this.dataflow.defineObservableProperty(obj, prop, init, { + objectId: [obj, prop], + noopGuard: is + }); } } - static deleteField(obj: DataflowObservableObject, prop: string) { - Dataspace._currentFacet.actor.dataspace.dataflow.recordDamage([obj, prop]); + deleteField(obj: DataflowObservableObject, prop: string) { + this.dataflow.recordDamage([obj, prop]); delete obj[prop]; } - runScripts() { // TODO: rename? - this.runPendingScripts(); + runTasks() { // TODO: rename? + this.runPendingTasks(); this.performPendingActions(); return this.runnable.length > 0 || this.pendingTurns.length > 0; } - runPendingScripts() { + runPendingTasks() { let runnable = this.runnable; this.runnable = []; - runnable.forEach((ac) => { ac.runPendingScripts(); /* TODO: rename? */ }); + runnable.forEach((ac) => { ac.runPendingTasks(); /* TODO: rename? */ }); } performPendingActions() { @@ -189,7 +143,7 @@ export abstract class Dataspace { turn.actions.forEach((action) => { // console.log('[DATASPACE]', group.actor && group.actor.toString(), action); action.perform(this, turn.actor); - this.runPendingScripts(); + this.runPendingTasks(); }); }); } @@ -199,24 +153,24 @@ export abstract class Dataspace { } refreshAssertions() { - Dataspace.withNonScriptContext(() => { + this.withNonScriptContext(() => { this.dataflow.repairDamage((ep) => { let facet = ep.facet; if (facet.isLive) { // TODO: necessary test, or tautological? - Dataspace.withCurrentFacet(facet, () => ep.refresh()); + facet.invokeScript(() => ep.refresh()); } }); }); } - addActor(name: any, bootProc: Script, initialAssertions: Set, parentActor: Actor | undefined) { + addActor(name: any, bootProc: Script, initialAssertions: Set, parentActor: Actor | undefined) { let ac = new Actor(this, name, initialAssertions, parentActor?.id); // debug('Spawn', ac && ac.toString()); this.applyPatch(ac, ac.adhocAssertions); - ac.addFacet(null, () => { + ac.addFacet(null, systemFacet => { // Root facet is a dummy "system" facet that exists to hold // one-or-more "user" "root" facets. - ac.addFacet(Dataspace._currentFacet, bootProc); + ac.addFacet(systemFacet, bootProc); // ^ The "true root", user-visible facet. initialAssertions.forEach((a) => { ac.adhocRetract(a); }); }); @@ -241,10 +195,10 @@ export abstract class Dataspace { // if (!delta.isEmpty()) debug('applyPatch END'); } - sendMessage(m: Value, _sendingActor: Actor) { - // debug('sendMessage', sendingActor && sendingActor.toString(), m.toString()); - this.index.sendMessage(m); - // this.index.sendMessage(m, (leaf, _m) => { + deliverMessage(m: Value, _sendingActor: Actor) { + // debug('deliverMessage', sendingActor && sendingActor.toString(), m.toString()); + this.index.deliverMessage(m); + // this.index.deliverMessage(m, (leaf, _m) => { // sendingActor.touchedTopics = sendingActor.touchedTopics.add(leaf); // }); } @@ -264,27 +218,6 @@ export abstract class Dataspace { endpointHook(_facet: Facet, _endpoint: Endpoint) { // Subclasses may override } - - static send(body: any) { - if (!Dataspace._inScript) { - throw new Error("Cannot `send` during facet setup; are you missing an `on start { ... }`?"); - } - Dataspace._currentFacet.enqueueScriptAction(new Message(body)); - } - - static spawn(name: any, bootProc: Script, initialAssertions?: Set) { - if (!Dataspace._inScript) { - throw new Error("Cannot `spawn` during facet setup; are you missing an `on start { ... }`?"); - } - Dataspace._currentFacet.enqueueScriptAction(new Spawn(name, bootProc, initialAssertions)); - } - - static deferTurn(continuation: Script) { - if (!Dataspace._inScript) { - throw new Error("Cannot defer turn during facet setup; are you missing an `on start { ... }`?"); - } - Dataspace._currentFacet.enqueueScriptAction(new DeferredTurn(Dataspace.wrap(continuation))); - } } export class Actor { @@ -293,7 +226,7 @@ export class Actor { readonly name: any; rootFacet: Facet | null = null; isRunnable: boolean = false; - readonly pendingScripts: Array>; + readonly pendingTasks: Array>>; pendingActions: Array; adhocAssertions: Bag; cleanupChanges = new Bag(); // negative counts allowed! @@ -308,19 +241,19 @@ export class Actor { this.dataspace = dataspace; this.name = name; this.isRunnable = false; - this.pendingScripts = []; - for (let i = 0; i < Priority._count; i++) { this.pendingScripts.push([]); } + this.pendingTasks = []; + for (let i = 0; i < Priority._count; i++) { this.pendingTasks.push([]); } this.pendingActions = []; this.adhocAssertions = new Bag(initialAssertions); // no negative counts allowed this.parentId = parentActorId; dataspace.actors.set(this.id, this); } - runPendingScripts() { + runPendingTasks() { while (true) { - let script = this.popNextScript(); - if (!script) break; - script(); + let task = this.popNextTask(); + if (!task) break; + task(); this.dataspace.refreshAssertions(); } @@ -332,10 +265,10 @@ export class Actor { } } - popNextScript(): Script | null { - let scripts = this.pendingScripts; + popNextTask(): Task | null { + let tasks = this.pendingTasks; for (let i = 0; i < Priority._count; i++) { - let q = scripts[i]; + let q = tasks[i]; if (q.length > 0) return q.shift(); } return null; @@ -343,34 +276,24 @@ export class Actor { abandonQueuedWork() { this.pendingActions = []; - for (let i = 0; i < Priority._count; i++) { this.pendingScripts[i] = []; } + for (let i = 0; i < Priority._count; i++) { this.pendingTasks[i] = []; } } - scheduleScript(unwrappedThunk: Script, priority?: Priority) { - this.pushScript(Dataspace.wrap(unwrappedThunk), priority); - } - - pushScript(wrappedThunk: Script, priority: Priority = Priority.NORMAL) { - // The wrappedThunk must already have code for ensuring - // _currentFacet is correct inside it. Compare with scheduleScript. + scheduleTask(task: Task, priority: Priority = Priority.NORMAL) { if (!this.isRunnable) { this.isRunnable = true; this.dataspace.runnable.push(this); } - this.pendingScripts[priority].push(wrappedThunk); + this.pendingTasks[priority].push(task); } - addFacet(parentFacet: Facet, bootProc: Script, checkInScript: boolean = false) { - if (checkInScript && !Dataspace._inScript) { + addFacet(parentFacet: Facet, bootProc: Script, checkInScript: boolean = false) { + if (checkInScript && !this.dataspace._inScript) { throw new Error("Cannot add facet outside script; are you missing a `react { ... }`?"); } let f = new Facet(this, parentFacet); - Dataspace.withCurrentFacet(f, () => { - Dataspace.withNonScriptContext(() => { - bootProc.call(f.fields); - }); - }); - this.pushScript(() => { + f.invokeScript(f => this.dataspace.withNonScriptContext(() => bootProc.call(f.fields, f))); + this.scheduleTask(() => { if ((parentFacet && !parentFacet.isLive) || f.isInert()) { f._terminate(); } @@ -380,14 +303,14 @@ export class Actor { _terminate(emitPatches: boolean) { // Abruptly terminates an entire actor, without running stop-scripts etc. if (emitPatches) { - this.pushScript(() => { + this.scheduleTask(() => { this.adhocAssertions.snapshot().forEach((_count, a) => { this.retract(a); }); }); } if (this.rootFacet) { this.rootFacet._abort(emitPatches); } - this.pushScript(() => { this.enqueueScriptAction(new Quit()); }); + this.scheduleTask(() => { this.enqueueScriptAction(new Quit()); }); } enqueueScriptAction(action: Action) { @@ -458,16 +381,16 @@ class Message extends Action { } perform(ds: Dataspace, ac: Actor): void { - ds.sendMessage(this.body, ac); + ds.deliverMessage(this.body, ac); } } class Spawn extends Action { readonly name: any; - readonly bootProc: Script; + readonly bootProc: Script; readonly initialAssertions: Set; - constructor(name: any, bootProc: Script, initialAssertions: Set = new Set()) { + constructor(name: any, bootProc: Script, initialAssertions: Set = new Set()) { super(); this.name = name; this.bootProc = bootProc; @@ -490,16 +413,16 @@ class Quit extends Action { // TODO: rename? Perhaps to Cleanup? } class DeferredTurn extends Action { - readonly continuation: Script; + readonly continuation: Task; - constructor(continuation: Script) { + constructor(continuation: Task) { super(); this.continuation = continuation; } perform(_ds: Dataspace, ac: Actor): void { // debug('DeferredTurn', ac && ac.toString()); - ac.pushScript(this.continuation); + ac.scheduleTask(this.continuation); } } @@ -523,7 +446,7 @@ export class Facet { readonly actor: Actor; readonly parent: Facet | null; readonly endpoints = new IdentityMap(); - readonly stopScripts: Array