Dataspace implementation; bug fixes & test cases for skeleton impl

This commit is contained in:
Tony Garnock-Jones 2018-10-27 20:32:12 +01:00
parent 825fdce8db
commit 51b8f425ab
8 changed files with 604 additions and 22 deletions

View File

@ -0,0 +1,67 @@
"use strict";
const Immutable = require('immutable');
const Syndicate = require('../src/main.js');
const Skeleton = Syndicate.Skeleton;
const Dataspace = Syndicate.Dataspace;
const Struct = Syndicate.Struct;
const __ = Syndicate.__;
const _$ = Syndicate._$;
const BoxState = Struct.makeConstructor('BoxState', ['value']);
const SetBox = Struct.makeConstructor('SetBox', ['newValue']);
const N = 100000;
console.time('box-and-client-' + N.toString());
let ds = new Dataspace(() => {
Dataspace.spawn('box', function () {
Dataspace.declareField(this, 'value', 0);
Dataspace.currentFacet().addEndpoint(() => {
return [BoxState(this.value), null];
});
Dataspace.currentFacet().addDataflow(() => {
if (this.value === N) {
Dataspace.currentFacet().stop(() => {
console.log('terminated box root facet');
});
}
});
Dataspace.currentFacet().addEndpoint(() => {
let handler = Skeleton.analyzeAssertion(SetBox(_$));
handler.callback = Dataspace.wrap((evt, vs) => {
if (evt === Skeleton.EVENT_MESSAGE) {
Dataspace.currentFacet().actor.scheduleScript(() => {
this.value = vs.get(0);
// console.log('box updated value', vs.get(0));
});
}
});
return [Syndicate.Observe(SetBox(_$)), handler];
});
});
Dataspace.spawn('client', () => {
Dataspace.currentFacet().addEndpoint(() => {
let handler = Skeleton.analyzeAssertion(BoxState(_$));
handler.callback = Dataspace.wrap((evt, vs) => {
if (evt === Skeleton.EVENT_ADDED) {
Dataspace.currentFacet().actor.scheduleScript(() => {
// console.log('client sending SetBox', vs.get(0) + 1);
Dataspace.send(SetBox(vs.get(0) + 1));
});
}
});
return [Syndicate.Observe(BoxState(_$)), handler];
});
});
});
// console.log('--- starting ---');
while (ds.runScripts()) {
// console.log('--- runScripts boundary ---');
}
// console.log('--- done ---');
console.timeEnd('box-and-client-' + N.toString());

View File

@ -48,6 +48,10 @@ MutableBag.prototype.entries = function () {
return this._items.entries();
};
MutableBag.prototype.snapshot = function () {
return this._items;
};
///////////////////////////////////////////////////////////////////////////
const Bag = Immutable.Map;

View File

@ -76,7 +76,7 @@ Graph.prototype.repairDamage = function (repairNode) {
Graph.prototype.defineObservableProperty = function (obj, prop, value, maybeOptions) {
var graph = this;
var options = typeof maybeOptions === 'undefined' ? {} : maybeOptions;
var options = maybeOptions === void 0 ? {} : maybeOptions;
var objectId = options.objectId || '__' + prop;
Object.defineProperty(obj, prop, {
configurable: true,

View File

@ -31,10 +31,144 @@ function Dataspace(bootProc) {
}
// Parameters
Dataspace.currentFacet = null;
Dataspace.inScript = true;
Dataspace._currentFacet = null;
Dataspace._inScript = true;
function Actor(dataspace, name) {
Dataspace.currentFacet = function () { return Dataspace._currentFacet; };
Dataspace.withNonScriptContext = function (thunk) {
let savedInScript = Dataspace._inScript;
Dataspace._inScript = false;
try {
return thunk();
} finally {
Dataspace._inScript = savedInScript;
}
};
Dataspace.withCurrentFacet = function (facet, thunk) {
let savedFacet = Dataspace._currentFacet;
Dataspace._currentFacet = facet;
try {
let result = thunk();
Dataspace._currentFacet = savedFacet;
return result;
} catch (e) {
let a = facet.actor;
a.abandonQueuedWork();
a._terminate();
Dataspace._currentFacet = savedFacet;
throw e;
}
};
Dataspace.wrap = function (f) {
let savedFacet = Dataspace._currentFacet;
return function () {
let actuals = arguments;
return Dataspace.withCurrentFacet(savedFacet, function () {
return f.apply(f.fields, actuals);
});
};
};
Dataspace.referenceField = function (obj, prop) {
if (!(prop in obj)) {
Dataspace._currentFacet.actor.dataspace.dataflow.recordObservation(
Immutable.List.of(obj, prop));
}
return obj[prop];
};
Dataspace.declareField = function (obj, prop, init) {
if (prop in obj) {
obj[prop] = init;
} else {
Dataspace._currentFacet.actor.dataspace.dataflow.defineObservableProperty(
obj,
prop,
init,
{ objectId: Immutable.List.of(obj, prop) });
}
};
Dataspace.deleteField = function (obj, prop) {
Dataspace._currentFacet.actor.dataspace.dataflow.recordDamage(Immutable.List.of(obj, prop));
return delete obj[prop];
};
Dataspace.prototype.runScripts = function () { // TODO: rename?
this.runPendingScripts();
this.performPendingActions();
return !this.runnable.isEmpty() || !this.pendingActions.isEmpty();
};
Dataspace.prototype.runPendingScripts = function () {
let runnable = this.runnable;
this.runnable = Immutable.List();
runnable.forEach((ac) => { ac.runPendingScripts(); /* TODO: rename? */ });
};
Dataspace.prototype.performPendingActions = function () {
let groups = this.pendingActions;
this.pendingActions = Immutable.List();
groups.forEach((group) => {
group.actions.forEach((action) => {
action.perform(this, group.actor);
this.runPendingScripts();
});
});
};
Dataspace.prototype.commitActions = function (ac, pending) {
this.pendingActions = this.pendingActions.push(new ActionGroup(ac, pending));
};
Dataspace.prototype.refreshAssertions = function () {
Dataspace.withNonScriptContext(() => {
this.dataflow.repairDamage((subjectId) => {
let [facet, eid] = subjectId;
if (facet.isLive) { // TODO: necessary test, or tautological?
let ac = facet.actor;
Dataspace.withCurrentFacet(facet, () => {
facet.endpoints.get(eid).refresh(this, ac, facet);
});
}
});
});
};
Dataspace.prototype.addActor = function (name, bootProc, initialAssertions) {
let ac = new Actor(this, name, initialAssertions);
this.applyPatch(ac, ac.adhocAssertions.snapshot());
ac.addFacet(null, () => {
// Root facet is a dummy "system" facet that exists to hold
// one-or-more "user" "root" facets.
ac.addFacet(Dataspace.currentFacet(), bootProc);
// ^ The "true root", user-visible facet.
initialAssertions.forEach((a) => { ac.adhocRetract(a); });
});
};
Dataspace.prototype.applyPatch = function (ac, delta) {
delta.forEach((count, a) => {
if (a !== void 0) {
this.index.adjustAssertion(a, count);
ac.cleanupChanges.change(a, -count);
}
});
};
Dataspace.prototype.subscribe = function (handler) {
this.index.addHandler(handler, handler.callback);
};
Dataspace.prototype.unsubscribe = function (handler) {
this.index.removeHandler(handler, handler.callback);
};
function Actor(dataspace, name, initialAssertions) {
this.id = dataspace.nextId++;
this.dataspace = dataspace;
this.name = name;
@ -43,13 +177,111 @@ function Actor(dataspace, name) {
this.pendingScripts = [];
for (let i = 0; i < PRIORITY._count; i++) { this.pendingScripts.push(Immutable.List()); }
this.pendingActions = Immutable.List();
this.adhocAssertions = new Bag.MutableBag(); // no negative counts allowed
this.adhocAssertions = new Bag.MutableBag(initialAssertions); // no negative counts allowed
this.cleanupChanges = new Bag.MutableBag(); // negative counts allowed!
}
Actor.prototype.runPendingScripts = function () {
while (true) {
let script = this.popNextScript();
if (!script) break;
script();
this.dataspace.refreshAssertions();
}
this.isRunnable = false;
let pending = this.pendingActions;
if (!pending.isEmpty()) {
this.pendingActions = Immutable.List();
this.dataspace.commitActions(this, pending);
}
};
Actor.prototype.popNextScript = function () {
let scripts = this.pendingScripts;
for (let i = 0; i < PRIORITY._count; i++) {
let q = scripts[i];
if (!q.isEmpty()) {
scripts[i] = q.shift();
return q.first();
}
}
return null;
};
Actor.prototype.abandonQueuedWork = function () {
this.pendingActions = Immutable.List();
for (let i = 0; i < PRIORITY._count; i++) { this.pendingScripts[i] = Immutable.List(); }
};
Actor.prototype.scheduleScript = function (unwrappedThunk, priority) {
this.pushScript(Dataspace.wrap(unwrappedThunk), priority);
};
Actor.prototype.pushScript = function (wrappedThunk, priority) {
// The wrappedThunk must already have code for ensuring
// _currentFacet is correct inside it. Compare with scheduleScript.
if (priority === void 0) {
priority = PRIORITY.NORMAL;
}
if (!this.isRunnable) {
this.isRunnable = true;
this.dataspace.runnable = this.dataspace.runnable.push(this);
}
this.pendingScripts[priority] = this.pendingScripts[priority].push(wrappedThunk);
};
Actor.prototype.addFacet = function (parentFacet, bootProc, checkInScript) {
if (checkInScript === true && !Dataspace._inScript) {
throw new Error("Cannot add facet outside script; are you missing a `react { ... }`?");
}
let f = new Facet(this, parentFacet);
Dataspace.withCurrentFacet(f, () => {
Dataspace.withNonScriptContext(() => {
bootProc.call(f.fields);
});
});
this.pushScript(() => {
if ((parentFacet && !parentFacet.isLive) || f.isInert()) {
f._terminate();
}
});
};
Actor.prototype._terminate = function () {
// Abruptly terminates an entire actor, without running stop-scripts etc.
this.pushScript(() => {
this.adhocAssertions.snapshot().forEach((_count, a) => { this.retract(a); });
});
if (this.rootFacet) {
this.rootFacet._abort();
}
this.pushScript(() => { this.enqueueScriptAction(new Quit()); });
};
Actor.prototype.enqueueScriptAction = function (action) {
this.pendingActions = this.pendingActions.push(action);
};
Actor.prototype.pendingPatch = function () {
if (!this.pendingActions.isEmpty()) {
let p = this.pendingActions.last();
if (p instanceof Patch) {
return p;
}
}
let p = new Patch(Bag.Bag());
this.enqueueScriptAction(p);
return p;
};
Actor.prototype.assert = function (a) { this.pendingPatch().adjust(a, +1); };
Actor.prototype.retract = function (a) { this.pendingPatch().adjust(a, -1); };
Actor.prototype.toString = function () {
let s = 'Actor(' + this.id;
if (typeof this.name !== 'undefined') s = s + ',' + JSON.stringify(this.name);
if (this.name !== void 0) s = s + ',' + JSON.stringify(this.name);
return s + ')';
};
@ -57,23 +289,64 @@ function Patch(changes) {
this.changes = changes;
}
Patch.prototype.perform = function (ds, ac) {
ds.applyPatch(ac, this.changes);
};
Patch.prototype.adjust = function (a, count) {
var _net;
({bag: this.changes, net: _net} = Bag.change(this.changes, a, count));
};
function Message(body) {
this.body = body;
}
Message.prototype.perform = function (ds, ac) {
if (this.body !== void 0) {
ds.index.sendMessage(this.body);
}
};
Dataspace.send = function (body) {
Dataspace._currentFacet.actor.enqueueScriptAction(new Message(body));
};
function Spawn(name, bootProc, initialAssertions) {
this.name = name;
this.bootProc = bootProc;
this.initialAssertions = initialAssertions;
this.initialAssertions = initialAssertions || Immutable.Set();
}
function Quit() {
Spawn.prototype.perform = function (ds, ac) {
ds.addActor(this.name, this.bootProc, this.initialAssertions);
};
Dataspace.spawn = function (name, bootProc, initialAssertions) {
let a = new Spawn(name, bootProc, initialAssertions);
Dataspace._currentFacet.actor.enqueueScriptAction(a);
};
function Quit() { // TODO: rename? Perhaps to Cleanup?
// Pseudo-action - not for userland use.
}
Quit.prototype.perform = function (ds, ac) {
ds.applyPatch(ac, ac.cleanupChanges.snapshot());
};
function DeferredTurn(continuation) {
this.continuation = continuation;
}
DeferredTurn.prototype.perform = function (ds, ac) {
ac.pushScript(this.continuation);
};
Dataspace.deferTurn = function (continuation) {
Dataspace._currentFacet.actor.enqueueScriptAction(new DeferredTurn(Dataspace.wrap(continuation)));
};
function ActionGroup(actor, actions) {
this.actor = actor;
this.actions = actions;
@ -84,14 +357,108 @@ function Facet(actor, parent) {
this.isLive = true;
this.actor = actor;
this.parent = parent;
this.endpoints = {};
this.endpoints = Immutable.Map();
this.stopScripts = Immutable.List();
this.children = Immutable.Set();
if (parent) {
parent.children = parent.children.add(this);
this.fields = Dataflow.Graph.newScope(parent.fields);
} else {
if (actor.rootFacet) {
throw new Error("INVARIANT VIOLATED: Attempt to add second root facet");
}
actor.rootFacet = this;
this.fields = Dataflow.Graph.newScope({});
}
}
Facet.prototype._abort = function () {
this.isLive = false;
this.children.forEach((child) => { child._abort(); });
this.retractAssertionsAndSubscriptions();
};
Facet.prototype.retractAssertionsAndSubscriptions = function () {
let ac = this.actor;
let ds = ac.dataspace;
ac.pushScript(() => {
this.endpoints.forEach((ep) => {
ep.destroy(ds, ac, this);
});
this.endpoints = Immutable.Map();
});
};
Facet.prototype.isInert = function () {
return this.endpoints.isEmpty() && this.children.isEmpty();
};
Facet.prototype._terminate = function () {
if (this.isLive) {
let ac = this.actor;
let parent = this.parent;
if (parent) {
parent.children = parent.children.remove(this);
} else {
ac.rootFacet = null;
}
this.isLive = false;
this.children.forEach((child) => { child._terminate(); });
// Run stop-scripts after terminating children. This means that
// children's stop-scripts run before ours.
ac.pushScript(() => {
Dataspace.withCurrentFacet(this, () => {
this.stopScripts.forEach((s) => { s.call(this.fields); });
});
});
this.retractAssertionsAndSubscriptions();
ac.pushScript(() => {
if (parent) {
if (parent.isInert()) {
parent._terminate();
}
} else {
ac._terminate();
}
}, PRIORITY.GC);
}
};
Facet.prototype.stop = function (continuation) {
Dataspace.withCurrentFacet(this.parent, () => {
this.actor.scheduleScript(() => {
this._terminate();
this.actor.scheduleScript(() => {
continuation.call(this.fields); // TODO: is this the correct scope to use??
});
});
});
};
Facet.prototype.addStopScript = function (s) {
this.stopScripts = this.stopScripts.push(s);
};
Facet.prototype.addEndpoint = function (updateFun, isDynamic) {
return new Endpoint(this, isDynamic === void 0 ? true : isDynamic, updateFun);
};
Facet.prototype.addDataflow = function (subjectFun, priority) {
return this.addEndpoint(() => {
let subjectId = this.actor.dataspace.dataflow.currentSubjectId;
this.actor.scheduleScript(() => {
this.actor.dataspace.dataflow.withSubject(subjectId, () => subjectFun.call(this.fields));
}, priority);
return [void 0, null];
});
};
Facet.prototype.toString = function () {
let s = 'Facet(' + this.actor.id;
if (typeof this.actor.name !== 'undefined') s = s + ',' + JSON.stringify(this.actor.name);
if (this.actor.name !== void 0) s = s + ',' + JSON.stringify(this.actor.name);
s = s + ',' + this.id;
let f = this.parent;
while (f != null) {
@ -101,21 +468,54 @@ Facet.prototype.toString = function () {
return s + ')';
};
function Endpoint(id, assertion, handler, updateFun) {
this.id = id;
function Endpoint(facet, isDynamic, updateFun) {
if (Dataspace._inScript) {
throw new Error("Cannot add endpoint in script; are you missing a `react { ... }`?");
}
let ac = facet.actor;
let ds = ac.dataspace;
this.id = ds.nextId++;
this.updateFun = updateFun;
let [initialAssertion, initialHandler] = ds.dataflow.withSubject(
isDynamic ? [facet, this.id] : false,
() => updateFun.call(facet.fields));
this._install(ds, ac, initialAssertion, initialHandler);
facet.endpoints = facet.endpoints.set(this.id, this);
}
Endpoint.prototype._install = function (ds, ac, assertion, handler) {
this.assertion = assertion;
this.handler = handler;
this.updateFun = updateFun;
}
ac.assert(this.assertion);
if (this.handler) { ds.subscribe(this.handler); }
};
Endpoint.prototype._uninstall = function (ds, ac) {
ac.retract(this.assertion);
if (this.handler) { ds.unsubscribe(this.handler); }
};
Endpoint.prototype.refresh = function (ds, ac, facet) {
let [newAssertion, newHandler] = this.updateFun.call(facet.fields);
newAssertion = Immutable.fromJS(newAssertion);
if (!Immutable.is(newAssertion, this.assertion)) {
this._uninstall(ds, ac);
this._install(ds, ac, newAssertion, newHandler);
}
};
Endpoint.prototype.destroy = function (ds, ac, facet) {
ds.dataflow.forgetSubject([facet, this.id]);
// ^ TODO: this won't work because of object identity problems! Why
// does the Racket implementation do this, when the old JS
// implementation doesn't?
this._uninstall(ds, ac);
};
Endpoint.prototype.toString = function () {
return 'Endpoint(' + this.id + ')';
};
function Handler(staticInfo, callback) {
this.staticInfo = staticInfo;
this.callback = callback;
}
///////////////////////////////////////////////////////////////////////////
module.exports.Dataspace = Dataspace;

View File

@ -2,9 +2,20 @@
const Struct = require('./struct.js');
const Skeleton = require('./skeleton.js');
const Dataspace = require('./dataspace.js');
const Assertions = require('./assertions.js');
module.exports.Bag = require("./bag.js");
module.exports.Struct = Struct;
module.exports.Skeleton = Skeleton;
module.exports.__ = Struct.__;
module.exports._$ = Skeleton._$;
module.exports._Dataspace = Dataspace;
module.exports.Dataspace = Dataspace.Dataspace;
module.exports._Assertions = Assertions;
module.exports.Observe = Assertions.Observe;
module.exports.Seal = Assertions.Seal;
module.exports.Inbound = Assertions.Inbound;
module.exports.Outbound = Assertions.Outbound;

View File

@ -27,6 +27,14 @@ function Selector(popCount, index) {
this.index = index;
}
Selector.prototype.equals = function (other) {
return (this.popCount === other.popCount) && (this.index === other.index);
};
Selector.prototype.hashCode = function () {
return (this.popCount * 5) + this.index;
};
function Continuation(cachedAssertions) {
this.cachedAssertions = cachedAssertions;
this.leafMap = Immutable.Map();
@ -81,7 +89,7 @@ Node.prototype.extend = function(skeleton) {
if (!nextNode) {
nextNode = new Node(new Continuation(
node.continuation.cachedAssertions.filter(
(a) => classOf(project(a, path)) === cls)));
(a) => classOf(projectPath(a, path)) === cls)));
table = table.set(cls, nextNode);
node.edges = node.edges.set(selector, table);
}
@ -130,7 +138,7 @@ Index.prototype.addHandler = function(analysisResults, callback) {
leaf.handlerMap = leaf.handlerMap.set(capturePaths, handler);
}
handler.callbacks = handler.callbacks.add(callback);
handler.cachedCaptures.forEach((captures) => {
handler.cachedCaptures.forEach((_count, captures) => {
callback(EVENT_ADDED, captures);
return true;
});
@ -254,7 +262,7 @@ Index.prototype.sendMessage = function(v) {
// The pattern argument defaults to wildcard, __.
function Capture(name, pattern) {
this.name = name;
this.pattern = (typeof pattern === 'undefined' ? __ : pattern);
this.pattern = (pattern === void 0 ? __ : pattern);
}
// Abbreviation: _$(...) <==> new Capture(...)

48
test/test-dataspace.js Normal file
View File

@ -0,0 +1,48 @@
"use strict";
const chai = require('chai');
const expect = chai.expect;
chai.use(require('chai-immutable'));
const Immutable = require('immutable');
const Syndicate = require('../src/main.js');
const Skeleton = Syndicate.Skeleton;
const Dataspace = Syndicate.Dataspace;
const Struct = Syndicate.Struct;
const __ = Syndicate.__;
const _$ = Syndicate._$;
describe('dataspace', () => {
it('should boot and run', () => {
// TODO: convert this into even a rudimentary somewhat-real test case
// (change console.log into gathering a trace)
let ds = new Dataspace(() => {
// console.log('boot');
Dataspace.currentFacet().addEndpoint(() => {
let handler = Skeleton.analyzeAssertion(_$);
handler.callback = (evt, vs) => {
if (Syndicate.Observe.isClassOf(vs.get(0))) {
// console.log('OBSERVATION EVENT', evt, vs, vs.get(0).get(0) === _$);
} else {
// console.log('EVENT', evt, vs);
}
};
return [Syndicate.Observe(_$), handler];
});
Dataspace.deferTurn(() => {
// console.log('after defer');
Dataspace.send(1234);
// console.log('after send');
Dataspace.spawn('secondproc', () => {
// console.log('secondproc boot');
});
});
});
// console.log('--- starting ---');
while (ds.runScripts()) {
// console.log('--- runScripts boundary ---');
}
// console.log('--- done ---');
});
});

View File

@ -145,6 +145,50 @@ describe('skeleton', () => {
});
});
describe('handler added after assertion (1)', () => {
let trace = skeletonTrace((i, traceHolder) => {
i.addAssertion(["hi", 123, 234]);
i.addHandler(Skeleton.analyzeAssertion(["hi", _$, _$]), eventCallback(traceHolder, "X"));
i.removeAssertion(["hi", 123, 234]);
});
it('should get two events', () => {
expect(trace).to.equal(Immutable.List([
Event("X", Skeleton.EVENT_ADDED, [123, 234]),
Event("X", Skeleton.EVENT_REMOVED, [123, 234])]));
});
});
describe('handler added after assertion (2)', () => {
let trace = skeletonTrace((i, traceHolder) => {
i.addAssertion(["hi", 123, 234]);
i.addHandler(Skeleton.analyzeAssertion(_$), eventCallback(traceHolder, "X"));
i.removeAssertion(["hi", 123, 234]);
});
it('should get two events', () => {
expect(trace).to.equal(Immutable.List([
Event("X", Skeleton.EVENT_ADDED, [["hi", 123, 234]]),
Event("X", Skeleton.EVENT_REMOVED, [["hi", 123, 234]])]));
});
});
describe('handler removed before assertion removed', () => {
let trace = skeletonTrace((i, traceHolder) => {
i.addAssertion(["hi", 123, 234]);
let h = Skeleton.analyzeAssertion(["hi", _$, _$]);
h.callback = eventCallback(traceHolder, "X")
i.addHandler(h, h.callback);
i.removeHandler(h, h.callback);
i.removeAssertion(["hi", 123, 234]);
});
it('should get one event', () => {
expect(trace).to.equal(Immutable.List([
Event("X", Skeleton.EVENT_ADDED, [123, 234])]));
});
});
describe('simple list assertions trace', () => {
let trace = skeletonTrace((i, traceHolder) => {
i.addHandler(Skeleton.analyzeAssertion(["hi", _$, _$]), eventCallback(traceHolder, "3-EVENT"));