Major step toward eliminating ambient authority

This commit is contained in:
Tony Garnock-Jones 2021-01-14 14:42:30 +01:00
parent c5efff9a0f
commit b60551a52b
5 changed files with 221 additions and 245 deletions

View File

@ -28,58 +28,58 @@ const N = 100000;
console.time('box-and-client-' + N.toString()); console.time('box-and-client-' + N.toString());
new Ground(() => { new Ground(groundRoot => {
Dataspace.spawn('box', function () { groundRoot.spawn('box', function (boxRoot) {
Dataspace.declareField(this, 'value', 0); boxRoot.actor.dataspace.declareField(this, 'value', 0);
Dataspace.currentFacet.addEndpoint(() => { boxRoot.addEndpoint(() => {
// console.log('recomputing published BoxState', this.value); // console.log('recomputing published BoxState', this.value);
return { assertion: BoxState(this.value), analysis: null }; return { assertion: BoxState(this.value), analysis: null };
});
Dataspace.currentFacet.addDataflow(() => {
// console.log('dataflow saw new value', this.value);
if (this.value === N) {
Dataspace.currentFacet.stop(() => {
console.log('terminated box root facet');
});
}
});
Dataspace.currentFacet.addEndpoint(() => {
let analysis = Skeleton.analyzeAssertion(SetBox(_$));
analysis.callback = Dataspace.wrap((evt, vs) => {
if (evt === Skeleton.EventType.MESSAGE) {
Dataspace.currentFacet.actor.scheduleScript(() => {
this.value = vs[0];
// console.log('box updated value', vs[0]);
});
}
});
return { assertion: Observe(SetBox(_$)), analysis };
});
}); });
boxRoot.addDataflow(() => {
// console.log('dataflow saw new value', this.value);
if (this.value === N) {
boxRoot.stop(() => {
console.log('terminated box root facet');
});
}
});
boxRoot.addEndpoint(() => {
let analysis = Skeleton.analyzeAssertion(SetBox(_$));
analysis.callback = boxRoot.wrap((facet, evt, vs) => {
if (evt === Skeleton.EventType.MESSAGE) {
boxRoot.scheduleScript(() => {
this.value = vs[0];
// console.log('box updated value', vs[0]);
});
}
});
return { assertion: Observe(SetBox(_$)), analysis };
});
});
Dataspace.spawn('client', function () { groundRoot.spawn('client', function (clientRoot) {
Dataspace.currentFacet.addEndpoint(() => { clientRoot.addEndpoint(() => {
let analysis = Skeleton.analyzeAssertion(BoxState(_$)); let analysis = Skeleton.analyzeAssertion(BoxState(_$));
analysis.callback = Dataspace.wrap((evt, vs) => { analysis.callback = clientRoot.wrap((facet, evt, vs) => {
if (evt === Skeleton.EventType.ADDED) { if (evt === Skeleton.EventType.ADDED) {
Dataspace.currentFacet.actor.scheduleScript(() => { clientRoot.scheduleScript(() => {
// console.log('client sending SetBox', vs[0] + 1); // console.log('client sending SetBox', vs[0] + 1);
Dataspace.send(SetBox(vs[0] + 1)); clientRoot.send(SetBox(vs[0] + 1));
}); });
} }
}); });
return { assertion: Observe(BoxState(_$)), analysis }; return { assertion: Observe(BoxState(_$)), analysis };
});
Dataspace.currentFacet.addEndpoint(() => {
let analysis = Skeleton.analyzeAssertion(BoxState(__));
analysis.callback = Dataspace.wrap((evt, _vs) => {
if (evt === Skeleton.EventType.REMOVED) {
Dataspace.currentFacet.actor.scheduleScript(() => {
console.log('box gone');
});
}
});
return { assertion: Observe(BoxState(__)), analysis };
});
}); });
clientRoot.addEndpoint(() => {
let analysis = Skeleton.analyzeAssertion(BoxState(__));
analysis.callback = clientRoot.wrap((facet, evt, _vs) => {
if (evt === Skeleton.EventType.REMOVED) {
clientRoot.scheduleScript(() => {
console.log('box gone');
});
}
});
return { assertion: Observe(BoxState(__)), analysis };
});
});
}).addStopHandler(() => console.timeEnd('box-and-client-' + N.toString())).start(); }).addStopHandler(() => console.timeEnd('box-and-client-' + N.toString())).start();

View File

@ -39,12 +39,13 @@ export type ActorId = number;
export type FacetId = ActorId; export type FacetId = ActorId;
export type EndpointId = ActorId; export type EndpointId = ActorId;
export type Script = () => void; export type Task<T> = () => T;
export type Script<T> = (f: Facet) => T;
export type MaybeValue = Value | undefined; export type MaybeValue = Value | undefined;
export type EndpointSpec = { assertion: MaybeValue, analysis: Skeleton.Analysis | null }; export type EndpointSpec = { assertion: MaybeValue, analysis: Skeleton.Analysis | null };
export type ObserverCallback = (bindings: Array<Value>) => void; export type ObserverCallback = (facet: Facet, bindings: Array<Value>) => void;
export type ObserverCallbacks = { export type ObserverCallbacks = {
add?: ObserverCallback; add?: ObserverCallback;
@ -77,109 +78,62 @@ export abstract class Dataspace {
pendingTurns: Array<Turn>; pendingTurns: Array<Turn>;
actors: IdentityMap<number, Actor> = new IdentityMap(); actors: IdentityMap<number, Actor> = new IdentityMap();
constructor(bootProc: Script) { constructor(bootProc: Script<void>) {
this.pendingTurns = [new Turn(null, [new Spawn(null, bootProc, new Set())])]; this.pendingTurns = [new Turn(null, [new Spawn(null, bootProc, new Set())])];
} }
static _currentFacet: Facet | null = null; _inScript = true;
static _inScript = true;
static get currentFacet(): Facet | null { withNonScriptContext<T>(task: Task<T>): T {
return Dataspace._currentFacet; let savedInScript = this._inScript;
} this._inScript = false;
static withNonScriptContext<T>(thunk: () => T) {
let savedInScript = Dataspace._inScript;
Dataspace._inScript = false;
try { try {
return thunk(); return task();
} finally { } finally {
Dataspace._inScript = savedInScript; this._inScript = savedInScript;
} }
} }
static withCurrentFacet<T>(facet: Facet, thunk: () => T) {
let savedFacet = Dataspace._currentFacet;
Dataspace._currentFacet = facet;
try {
// console.group('Facet', facet && facet.toString());
let result = thunk();
Dataspace._currentFacet = savedFacet;
return result;
} catch (e) {
let a = facet.actor;
a.abandonQueuedWork();
a._terminate(false);
Dataspace._currentFacet = savedFacet;
console.error('Actor ' + a.toString() + ' exited with exception:', e);
} finally {
// console.groupEnd();
}
}
static wrap<T extends Array<any>, R>(f: (... args: T) => R): (... args: T) => R {
const savedFacet = Dataspace._currentFacet;
return (... actuals) =>
Dataspace.withCurrentFacet(savedFacet, () => f.apply(savedFacet.fields, actuals));
}
abstract start(): this; abstract start(): this;
abstract ground(): Ground; abstract ground(): Ground;
static wrapExternal<T extends Array<any>>(f: (... args: T) => void): (... args: T) => void { backgroundTask(): () => void {
const savedFacet = Dataspace._currentFacet; return this.ground().backgroundTask();
const ac = savedFacet.actor;
return (... actuals) => {
if (savedFacet.isLive) {
ac.dataspace.start();
ac.pushScript(() =>
Dataspace.withCurrentFacet(savedFacet, () =>
f.apply(savedFacet.fields, actuals)));
}
};
} }
static backgroundTask(): () => void { referenceField(obj: DataflowObservableObject, prop: string) {
return Dataspace._currentFacet.actor.dataspace.ground().backgroundTask();
}
static referenceField(obj: DataflowObservableObject, prop: string) {
if (!(prop in obj)) { if (!(prop in obj)) {
Dataspace._currentFacet.actor.dataspace.dataflow.recordObservation([obj, prop]); this.dataflow.recordObservation([obj, prop]);
} }
return obj[prop]; return obj[prop];
} }
static declareField(obj: DataflowObservableObject, prop: string, init: any) { declareField(obj: DataflowObservableObject, prop: string, init: any) {
if (prop in obj) { if (prop in obj) {
obj[prop] = init; obj[prop] = init;
} else { } else {
Dataspace._currentFacet.actor.dataspace.dataflow.defineObservableProperty( this.dataflow.defineObservableProperty(obj, prop, init, {
obj, objectId: [obj, prop],
prop, noopGuard: is
init, });
{
objectId: [obj, prop],
noopGuard: is
});
} }
} }
static deleteField(obj: DataflowObservableObject, prop: string) { deleteField(obj: DataflowObservableObject, prop: string) {
Dataspace._currentFacet.actor.dataspace.dataflow.recordDamage([obj, prop]); this.dataflow.recordDamage([obj, prop]);
delete obj[prop]; delete obj[prop];
} }
runScripts() { // TODO: rename? runTasks() { // TODO: rename?
this.runPendingScripts(); this.runPendingTasks();
this.performPendingActions(); this.performPendingActions();
return this.runnable.length > 0 || this.pendingTurns.length > 0; return this.runnable.length > 0 || this.pendingTurns.length > 0;
} }
runPendingScripts() { runPendingTasks() {
let runnable = this.runnable; let runnable = this.runnable;
this.runnable = []; this.runnable = [];
runnable.forEach((ac) => { ac.runPendingScripts(); /* TODO: rename? */ }); runnable.forEach((ac) => { ac.runPendingTasks(); /* TODO: rename? */ });
} }
performPendingActions() { performPendingActions() {
@ -189,7 +143,7 @@ export abstract class Dataspace {
turn.actions.forEach((action) => { turn.actions.forEach((action) => {
// console.log('[DATASPACE]', group.actor && group.actor.toString(), action); // console.log('[DATASPACE]', group.actor && group.actor.toString(), action);
action.perform(this, turn.actor); action.perform(this, turn.actor);
this.runPendingScripts(); this.runPendingTasks();
}); });
}); });
} }
@ -199,24 +153,24 @@ export abstract class Dataspace {
} }
refreshAssertions() { refreshAssertions() {
Dataspace.withNonScriptContext(() => { this.withNonScriptContext(() => {
this.dataflow.repairDamage((ep) => { this.dataflow.repairDamage((ep) => {
let facet = ep.facet; let facet = ep.facet;
if (facet.isLive) { // TODO: necessary test, or tautological? if (facet.isLive) { // TODO: necessary test, or tautological?
Dataspace.withCurrentFacet(facet, () => ep.refresh()); facet.invokeScript(() => ep.refresh());
} }
}); });
}); });
} }
addActor(name: any, bootProc: Script, initialAssertions: Set, parentActor: Actor | undefined) { addActor(name: any, bootProc: Script<void>, initialAssertions: Set, parentActor: Actor | undefined) {
let ac = new Actor(this, name, initialAssertions, parentActor?.id); let ac = new Actor(this, name, initialAssertions, parentActor?.id);
// debug('Spawn', ac && ac.toString()); // debug('Spawn', ac && ac.toString());
this.applyPatch(ac, ac.adhocAssertions); this.applyPatch(ac, ac.adhocAssertions);
ac.addFacet(null, () => { ac.addFacet(null, systemFacet => {
// Root facet is a dummy "system" facet that exists to hold // Root facet is a dummy "system" facet that exists to hold
// one-or-more "user" "root" facets. // one-or-more "user" "root" facets.
ac.addFacet(Dataspace._currentFacet, bootProc); ac.addFacet(systemFacet, bootProc);
// ^ The "true root", user-visible facet. // ^ The "true root", user-visible facet.
initialAssertions.forEach((a) => { ac.adhocRetract(a); }); initialAssertions.forEach((a) => { ac.adhocRetract(a); });
}); });
@ -241,10 +195,10 @@ export abstract class Dataspace {
// if (!delta.isEmpty()) debug('applyPatch END'); // if (!delta.isEmpty()) debug('applyPatch END');
} }
sendMessage(m: Value, _sendingActor: Actor) { deliverMessage(m: Value, _sendingActor: Actor) {
// debug('sendMessage', sendingActor && sendingActor.toString(), m.toString()); // debug('deliverMessage', sendingActor && sendingActor.toString(), m.toString());
this.index.sendMessage(m); this.index.deliverMessage(m);
// this.index.sendMessage(m, (leaf, _m) => { // this.index.deliverMessage(m, (leaf, _m) => {
// sendingActor.touchedTopics = sendingActor.touchedTopics.add(leaf); // sendingActor.touchedTopics = sendingActor.touchedTopics.add(leaf);
// }); // });
} }
@ -264,27 +218,6 @@ export abstract class Dataspace {
endpointHook(_facet: Facet, _endpoint: Endpoint) { endpointHook(_facet: Facet, _endpoint: Endpoint) {
// Subclasses may override // Subclasses may override
} }
static send(body: any) {
if (!Dataspace._inScript) {
throw new Error("Cannot `send` during facet setup; are you missing an `on start { ... }`?");
}
Dataspace._currentFacet.enqueueScriptAction(new Message(body));
}
static spawn(name: any, bootProc: Script, initialAssertions?: Set) {
if (!Dataspace._inScript) {
throw new Error("Cannot `spawn` during facet setup; are you missing an `on start { ... }`?");
}
Dataspace._currentFacet.enqueueScriptAction(new Spawn(name, bootProc, initialAssertions));
}
static deferTurn(continuation: Script) {
if (!Dataspace._inScript) {
throw new Error("Cannot defer turn during facet setup; are you missing an `on start { ... }`?");
}
Dataspace._currentFacet.enqueueScriptAction(new DeferredTurn(Dataspace.wrap(continuation)));
}
} }
export class Actor { export class Actor {
@ -293,7 +226,7 @@ export class Actor {
readonly name: any; readonly name: any;
rootFacet: Facet | null = null; rootFacet: Facet | null = null;
isRunnable: boolean = false; isRunnable: boolean = false;
readonly pendingScripts: Array<Array<Script>>; readonly pendingTasks: Array<Array<Task<void>>>;
pendingActions: Array<Action>; pendingActions: Array<Action>;
adhocAssertions: Bag; adhocAssertions: Bag;
cleanupChanges = new Bag(); // negative counts allowed! cleanupChanges = new Bag(); // negative counts allowed!
@ -308,19 +241,19 @@ export class Actor {
this.dataspace = dataspace; this.dataspace = dataspace;
this.name = name; this.name = name;
this.isRunnable = false; this.isRunnable = false;
this.pendingScripts = []; this.pendingTasks = [];
for (let i = 0; i < Priority._count; i++) { this.pendingScripts.push([]); } for (let i = 0; i < Priority._count; i++) { this.pendingTasks.push([]); }
this.pendingActions = []; this.pendingActions = [];
this.adhocAssertions = new Bag(initialAssertions); // no negative counts allowed this.adhocAssertions = new Bag(initialAssertions); // no negative counts allowed
this.parentId = parentActorId; this.parentId = parentActorId;
dataspace.actors.set(this.id, this); dataspace.actors.set(this.id, this);
} }
runPendingScripts() { runPendingTasks() {
while (true) { while (true) {
let script = this.popNextScript(); let task = this.popNextTask();
if (!script) break; if (!task) break;
script(); task();
this.dataspace.refreshAssertions(); this.dataspace.refreshAssertions();
} }
@ -332,10 +265,10 @@ export class Actor {
} }
} }
popNextScript(): Script | null { popNextTask(): Task<void> | null {
let scripts = this.pendingScripts; let tasks = this.pendingTasks;
for (let i = 0; i < Priority._count; i++) { for (let i = 0; i < Priority._count; i++) {
let q = scripts[i]; let q = tasks[i];
if (q.length > 0) return q.shift(); if (q.length > 0) return q.shift();
} }
return null; return null;
@ -343,34 +276,24 @@ export class Actor {
abandonQueuedWork() { abandonQueuedWork() {
this.pendingActions = []; this.pendingActions = [];
for (let i = 0; i < Priority._count; i++) { this.pendingScripts[i] = []; } for (let i = 0; i < Priority._count; i++) { this.pendingTasks[i] = []; }
} }
scheduleScript(unwrappedThunk: Script, priority?: Priority) { scheduleTask(task: Task<void>, priority: Priority = Priority.NORMAL) {
this.pushScript(Dataspace.wrap(unwrappedThunk), priority);
}
pushScript(wrappedThunk: Script, priority: Priority = Priority.NORMAL) {
// The wrappedThunk must already have code for ensuring
// _currentFacet is correct inside it. Compare with scheduleScript.
if (!this.isRunnable) { if (!this.isRunnable) {
this.isRunnable = true; this.isRunnable = true;
this.dataspace.runnable.push(this); this.dataspace.runnable.push(this);
} }
this.pendingScripts[priority].push(wrappedThunk); this.pendingTasks[priority].push(task);
} }
addFacet(parentFacet: Facet, bootProc: Script, checkInScript: boolean = false) { addFacet(parentFacet: Facet, bootProc: Script<void>, checkInScript: boolean = false) {
if (checkInScript && !Dataspace._inScript) { if (checkInScript && !this.dataspace._inScript) {
throw new Error("Cannot add facet outside script; are you missing a `react { ... }`?"); throw new Error("Cannot add facet outside script; are you missing a `react { ... }`?");
} }
let f = new Facet(this, parentFacet); let f = new Facet(this, parentFacet);
Dataspace.withCurrentFacet(f, () => { f.invokeScript(f => this.dataspace.withNonScriptContext(() => bootProc.call(f.fields, f)));
Dataspace.withNonScriptContext(() => { this.scheduleTask(() => {
bootProc.call(f.fields);
});
});
this.pushScript(() => {
if ((parentFacet && !parentFacet.isLive) || f.isInert()) { if ((parentFacet && !parentFacet.isLive) || f.isInert()) {
f._terminate(); f._terminate();
} }
@ -380,14 +303,14 @@ export class Actor {
_terminate(emitPatches: boolean) { _terminate(emitPatches: boolean) {
// Abruptly terminates an entire actor, without running stop-scripts etc. // Abruptly terminates an entire actor, without running stop-scripts etc.
if (emitPatches) { if (emitPatches) {
this.pushScript(() => { this.scheduleTask(() => {
this.adhocAssertions.snapshot().forEach((_count, a) => { this.retract(a); }); this.adhocAssertions.snapshot().forEach((_count, a) => { this.retract(a); });
}); });
} }
if (this.rootFacet) { if (this.rootFacet) {
this.rootFacet._abort(emitPatches); this.rootFacet._abort(emitPatches);
} }
this.pushScript(() => { this.enqueueScriptAction(new Quit()); }); this.scheduleTask(() => { this.enqueueScriptAction(new Quit()); });
} }
enqueueScriptAction(action: Action) { enqueueScriptAction(action: Action) {
@ -458,16 +381,16 @@ class Message extends Action {
} }
perform(ds: Dataspace, ac: Actor): void { perform(ds: Dataspace, ac: Actor): void {
ds.sendMessage(this.body, ac); ds.deliverMessage(this.body, ac);
} }
} }
class Spawn extends Action { class Spawn extends Action {
readonly name: any; readonly name: any;
readonly bootProc: Script; readonly bootProc: Script<void>;
readonly initialAssertions: Set; readonly initialAssertions: Set;
constructor(name: any, bootProc: Script, initialAssertions: Set = new Set()) { constructor(name: any, bootProc: Script<void>, initialAssertions: Set = new Set()) {
super(); super();
this.name = name; this.name = name;
this.bootProc = bootProc; this.bootProc = bootProc;
@ -490,16 +413,16 @@ class Quit extends Action { // TODO: rename? Perhaps to Cleanup?
} }
class DeferredTurn extends Action { class DeferredTurn extends Action {
readonly continuation: Script; readonly continuation: Task<void>;
constructor(continuation: Script) { constructor(continuation: Task<void>) {
super(); super();
this.continuation = continuation; this.continuation = continuation;
} }
perform(_ds: Dataspace, ac: Actor): void { perform(_ds: Dataspace, ac: Actor): void {
// debug('DeferredTurn', ac && ac.toString()); // debug('DeferredTurn', ac && ac.toString());
ac.pushScript(this.continuation); ac.scheduleTask(this.continuation);
} }
} }
@ -523,7 +446,7 @@ export class Facet {
readonly actor: Actor; readonly actor: Actor;
readonly parent: Facet | null; readonly parent: Facet | null;
readonly endpoints = new IdentityMap<EndpointId, Endpoint>(); readonly endpoints = new IdentityMap<EndpointId, Endpoint>();
readonly stopScripts: Array<Script> = []; readonly stopScripts: Array<Script<void>> = [];
readonly children = new IdentitySet<Facet>(); readonly children = new IdentitySet<Facet>();
readonly fields: any; readonly fields: any;
@ -551,7 +474,7 @@ export class Facet {
} }
retractAssertionsAndSubscriptions(emitPatches: boolean) { retractAssertionsAndSubscriptions(emitPatches: boolean) {
this.actor.pushScript(() => { this.actor.scheduleTask(() => {
this.endpoints.forEach((ep) => ep.destroy(emitPatches)); this.endpoints.forEach((ep) => ep.destroy(emitPatches));
this.endpoints.clear(); this.endpoints.clear();
}); });
@ -577,14 +500,13 @@ export class Facet {
// Run stop-scripts after terminating children. This means // Run stop-scripts after terminating children. This means
// that children's stop-scripts run before ours. // that children's stop-scripts run before ours.
ac.pushScript(() => { ac.scheduleTask(() =>
Dataspace.withCurrentFacet(this, () => { this.invokeScript(() =>
this.stopScripts.forEach((s) => { s.call(this.fields); }); this.stopScripts.forEach(s =>
}); s.call(this.fields, this))));
});
this.retractAssertionsAndSubscriptions(true); this.retractAssertionsAndSubscriptions(true);
ac.pushScript(() => { ac.scheduleTask(() => {
if (parent) { if (parent) {
if (parent.isInert()) { if (parent.isInert()) {
parent._terminate(); parent._terminate();
@ -595,29 +517,25 @@ export class Facet {
}, Priority.GC); }, Priority.GC);
} }
stop(continuation?: Script) { stop(continuation?: Script<void>) {
Dataspace.withCurrentFacet(this.parent, () => { this.parent.invokeScript(() => {
this.actor.scheduleScript(() => { this.actor.scheduleTask(() => {
this._terminate(); this._terminate();
if (continuation) { if (continuation) {
this.actor.scheduleScript(() => continuation.call(this.fields)); this.parent.scheduleScript(parent => continuation.call(this.fields, parent));
// ^ TODO: is this the correct scope to use?? // ^ TODO: is this the correct scope to use??
} }
}); });
}); });
} }
addStartScript(s: Script) { addStartScript(s: Script<void>) {
if (Dataspace._inScript) { this.ensureFacetSetup('`on start`');
throw new Error("Cannot `on start` outside facet setup"); this.scheduleScript(s);
}
this.actor.scheduleScript(s);
} }
addStopScript(s: Script) { addStopScript(s: Script<void>) {
if (Dataspace._inScript) { this.ensureFacetSetup('`on stop`');
throw new Error("Cannot `on stop` outside facet setup");
}
this.stopScripts.push(s); this.stopScripts.push(s);
} }
@ -627,19 +545,19 @@ export class Facet {
return ep; return ep;
} }
_addRawObserverEndpoint(specThunk: () => MaybeValue, callbacks: ObserverCallbacks): Endpoint _addRawObserverEndpoint(specScript: Script<MaybeValue>, callbacks: ObserverCallbacks): Endpoint
{ {
return this.addEndpoint(() => { return this.addEndpoint(() => {
const spec = specThunk(); const spec = specScript(this);
if (spec === void 0) { if (spec === void 0) {
return { assertion: void 0, analysis: null }; return { assertion: void 0, analysis: null };
} else { } else {
const analysis = Skeleton.analyzeAssertion(spec); const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => { analysis.callback = this.wrap((facet, evt, vs) => {
switch (evt) { switch (evt) {
case Skeleton.EventType.ADDED: callbacks.add?.(vs); break; case Skeleton.EventType.ADDED: callbacks.add?.(facet, vs); break;
case Skeleton.EventType.REMOVED: callbacks.del?.(vs); break; case Skeleton.EventType.REMOVED: callbacks.del?.(facet, vs); break;
case Skeleton.EventType.MESSAGE: callbacks.msg?.(vs); break; case Skeleton.EventType.MESSAGE: callbacks.msg?.(facet, vs); break;
} }
}); });
return { assertion: Observe(spec), analysis }; return { assertion: Observe(spec), analysis };
@ -647,9 +565,10 @@ export class Facet {
}); });
} }
addObserverEndpoint(specThunk: () => MaybeValue, callbacks: ObserverCallbacks): Endpoint { addObserverEndpoint(specThunk: (facet: Facet) => MaybeValue, callbacks: ObserverCallbacks): Endpoint {
const scriptify = (f?: ObserverCallback) => const scriptify = (f?: ObserverCallback) =>
f && ((vs: Array<Value>) => this.actor.scheduleScript(() => f(vs))); f && ((facet: Facet, vs: Array<Value>) =>
facet.scheduleScript(() => f.call(facet.fields, facet, vs)));
return this._addRawObserverEndpoint(specThunk, { return this._addRawObserverEndpoint(specThunk, {
add: scriptify(callbacks.add), add: scriptify(callbacks.add),
del: scriptify(callbacks.del), del: scriptify(callbacks.del),
@ -657,10 +576,10 @@ export class Facet {
}); });
} }
addDataflow(subjectFun: Script, priority?: Priority): Endpoint { addDataflow(subjectFun: Script<void>, priority?: Priority): Endpoint {
return this.addEndpoint(() => { return this.addEndpoint(() => {
let subjectId = this.actor.dataspace.dataflow.currentSubjectId; let subjectId = this.actor.dataspace.dataflow.currentSubjectId;
this.actor.scheduleScript(() => { this.scheduleScript(() => {
if (this.isLive) { if (this.isLive) {
this.actor.dataspace.dataflow.withSubject(subjectId, () => this.actor.dataspace.dataflow.withSubject(subjectId, () =>
subjectFun.call(this.fields)); subjectFun.call(this.fields));
@ -687,25 +606,84 @@ export class Facet {
} }
return s + ')'; return s + ')';
} }
invokeScript<T>(script: Script<T>, propagateErrors = false): T | undefined {
try {
// console.group('Facet', facet && facet.toString());
return script.call(this.fields, this);
} catch (e) {
let a = this.actor;
a.abandonQueuedWork();
a._terminate(false);
console.error('Actor ' + a.toString() + ' exited with exception:', e);
if (propagateErrors) throw e;
return undefined;
} finally {
// console.groupEnd();
}
}
wrap<T extends Array<any>, R>(fn: (f: Facet, ... args: T) => R): (... args: T) => R {
return (... actuals) => this.invokeScript(f => fn.call(f.fields, f, ... actuals), true);
}
wrapExternal<T extends Array<any>>(fn: (f: Facet, ... args: T) => void): (... args: T) => void {
const ac = this.actor;
return (... actuals) => {
if (this.isLive) {
ac.dataspace.start();
ac.scheduleTask(() => this.invokeScript(f => fn.call(f.fields, f, ... actuals)));
}
};
}
ensureFacetSetup(what: string) {
if (this.actor.dataspace._inScript) {
throw new Error(`Cannot ${what} outside facet setup; are you missing \`react { ... }\`?`);
}
}
ensureNonFacetSetup(what: string, keyword: string) {
if (!this.actor.dataspace._inScript) {
throw new Error(`Cannot ${what} during facet setup; are you missing \`${keyword} { ... }\`?`);
}
}
send(body: any) {
this.ensureNonFacetSetup('`send`', 'on start');
this.enqueueScriptAction(new Message(body));
}
spawn(name: any, bootProc: Script<void>, initialAssertions?: Set) {
this.ensureNonFacetSetup('`spawn`', 'on start');
this.enqueueScriptAction(new Spawn(name, bootProc, initialAssertions));
}
deferTurn(continuation: Script<void>) {
this.ensureNonFacetSetup('`deferTurn`', 'on start');
this.enqueueScriptAction(new DeferredTurn(this.wrap(continuation)));
}
scheduleScript(script: Script<void>, priority?: Priority) {
this.actor.scheduleTask(this.wrap(script), priority);
}
} }
export class Endpoint { export class Endpoint {
readonly id: EndpointId; readonly id: EndpointId;
readonly facet: Facet; readonly facet: Facet;
readonly updateFun: () => EndpointSpec; readonly updateFun: Script<EndpointSpec>;
spec: EndpointSpec; spec: EndpointSpec;
constructor(facet: Facet, isDynamic: boolean, updateFun: () => EndpointSpec) { constructor(facet: Facet, isDynamic: boolean, updateFun: Script<EndpointSpec>) {
if (Dataspace._inScript) { facet.ensureFacetSetup('add endpoint');
throw new Error("Cannot add endpoint in script; are you missing a `react { ... }`?");
}
let ac = facet.actor; let ac = facet.actor;
let ds = ac.dataspace; let ds = ac.dataspace;
this.id = ds.nextId++; this.id = ds.nextId++;
this.facet = facet; this.facet = facet;
this.updateFun = updateFun; this.updateFun = updateFun;
let initialSpec = ds.dataflow.withSubject(isDynamic ? this : undefined, let initialSpec = ds.dataflow.withSubject(isDynamic ? this : undefined,
() => updateFun.call(facet.fields)); () => updateFun.call(facet.fields, facet));
this._install(initialSpec); this._install(initialSpec);
facet.endpoints.set(this.id, this); facet.endpoints.set(this.id, this);
} }
@ -729,7 +707,7 @@ export class Endpoint {
} }
refresh() { refresh() {
let newSpec = this.updateFun.call(this.facet.fields); let newSpec = this.updateFun.call(this.facet.fields, this.facet);
if (newSpec.assertion !== void 0) newSpec.assertion = fromJS(newSpec.assertion); if (newSpec.assertion !== void 0) newSpec.assertion = fromJS(newSpec.assertion);
if (!is(newSpec.assertion, this.spec.assertion)) { if (!is(newSpec.assertion, this.spec.assertion)) {
this._uninstall(true); this._uninstall(true);

View File

@ -35,8 +35,8 @@ export class Ground extends Dataspace {
stopHandlers: Array<StopHandler<this>> = []; stopHandlers: Array<StopHandler<this>> = [];
backgroundTaskCount = 0; backgroundTaskCount = 0;
constructor(bootProc: Script) { constructor(bootProc: Script<void>) {
super(function () { Dataspace.currentFacet.addStartScript(bootProc); }); super(function (rootFacet) { rootFacet.addStartScript(bootProc); });
if (typeof window !== 'undefined') { if (typeof window !== 'undefined') {
window._ground = this; window._ground = this;
} }
@ -84,7 +84,7 @@ export class Ground extends Dataspace {
try { try {
let stillBusy = false; let stillBusy = false;
for (var fuel = this.startingFuel; fuel > 0; fuel--) { for (var fuel = this.startingFuel; fuel > 0; fuel--) {
stillBusy = this.runScripts(); stillBusy = this.runTasks();
if (!stillBusy) break; if (!stillBusy) break;
} }
if (stillBusy) { if (stillBusy) {

View File

@ -30,13 +30,13 @@ export const $QuitDataspace = new $Special("quit-dataspace");
export class NestedDataspace extends Dataspace { export class NestedDataspace extends Dataspace {
readonly outerFacet: Facet; readonly outerFacet: Facet;
constructor(outerFacet: Facet, bootProc: Script) { constructor(outerFacet: Facet, bootProc: Script<void>) {
super(bootProc); super(bootProc);
this.outerFacet = outerFacet; this.outerFacet = outerFacet;
} }
sendMessage(m: any, _sendingActor: Actor) { deliverMessage(m: any, _sendingActor: Actor) {
super.sendMessage(m, _sendingActor); super.deliverMessage(m, _sendingActor);
if (m === $QuitDataspace) { if (m === $QuitDataspace) {
this.outerFacet.stop(); this.outerFacet.stop();
} }
@ -88,13 +88,13 @@ export class NestedDataspace extends Dataspace {
if (Outbound.isClassOf(a)) { if (Outbound.isClassOf(a)) {
switch (net) { switch (net) {
case ChangeDescription.ABSENT_TO_PRESENT: case ChangeDescription.ABSENT_TO_PRESENT:
this.outerFacet.actor.pushScript(() => { this.outerFacet.actor.scheduleTask(() => {
this.outerFacet.actor.adhocAssert(a.get(0)); this.outerFacet.actor.adhocAssert(a.get(0));
}); });
this.outerFacet.actor.dataspace.start(); this.outerFacet.actor.dataspace.start();
break; break;
case ChangeDescription.PRESENT_TO_ABSENT: case ChangeDescription.PRESENT_TO_ABSENT:
this.outerFacet.actor.pushScript(() => { this.outerFacet.actor.scheduleTask(() => {
this.outerFacet.actor.adhocRetract(a.get(0)); this.outerFacet.actor.adhocRetract(a.get(0));
}); });
this.outerFacet.actor.dataspace.start(); this.outerFacet.actor.dataspace.start();
@ -120,11 +120,11 @@ export class NestedDataspace extends Dataspace {
start(): this { start(): this {
this.outerFacet.actor.dataspace.start(); this.outerFacet.actor.dataspace.start();
this.outerFacet.actor.pushScript(() => { this.outerFacet.scheduleScript(outerFacet => {
Dataspace.withCurrentFacet(this.outerFacet, () => { outerFacet.invokeScript(() => {
if (this.outerFacet.isLive) { if (this.outerFacet.isLive) {
Dataspace.deferTurn(() => { this.outerFacet.deferTurn(() => {
const stillBusy = this.runScripts(); const stillBusy = this.runTasks();
if (stillBusy) this.start(); if (stillBusy) this.start();
}); });
} }
@ -138,14 +138,12 @@ export class NestedDataspace extends Dataspace {
} }
} }
export function inNestedDataspace(bootProc: Script): Script { export function inNestedDataspace(bootProc: Script<void>): Script<void> {
return () => { return outerFacet => {
const outerFacet = Dataspace.currentFacet;
outerFacet.addDataflow(function () {}); outerFacet.addDataflow(function () {});
// ^ eww! Dummy endpoint to keep the root facet of the relay alive. // ^ eww! Dummy endpoint to keep the root facet of the relay alive.
const innerDs = new NestedDataspace(outerFacet, function () { const innerDs = new NestedDataspace(outerFacet, innerFacet =>
Dataspace.currentFacet.addStartScript(() => bootProc.call(this)); innerFacet.addStartScript(f => bootProc.call(f.fields, f)));
});
innerDs.start(); innerDs.start();
}; };
} }

View File

@ -144,7 +144,7 @@ export class Index {
this.adjustAssertion(v, -1); this.adjustAssertion(v, -1);
} }
sendMessage(v: Value, leafCallback: (l: Leaf, v: Value) => void = _nop) { deliverMessage(v: Value, leafCallback: (l: Leaf, v: Value) => void = _nop) {
this.root.modify(EventType.MESSAGE, v, _nop, leafCallback, (h, vs) => this.root.modify(EventType.MESSAGE, v, _nop, leafCallback, (h, vs) =>
h.callbacks.forEach(cb => cb(EventType.MESSAGE, vs))); h.callbacks.forEach(cb => cb(EventType.MESSAGE, vs)));
} }