syndicate-2017/js/src/demand-matcher.js

215 lines
7.5 KiB
JavaScript

var Immutable = require('immutable');
var Trie = require('./trie.js');
var Patch = require('./patch.js');
var Util = require('./util.js');
///////////////////////////////////////////////////////////////////////////
// Protocol between DemandMatcher and taskSupervisor functions
// Bits:
var IS_CHANGING = 1;
var IS_PRESENT = 2;
// Bit combinations:
var LOW = 0 ;
var RISING = IS_CHANGING ;
var HIGH = IS_PRESENT ;
var FALLING = IS_PRESENT | IS_CHANGING ;
///////////////////////////////////////////////////////////////////////////
// Default task supervision strategy. See syndicate/doc/demand-matcher.md.
function defaultTaskSupervisor(demandState, supplyState, supervisionState, taskFn, onTaskExit) {
var oldESI = supervisionState ? supervisionState.expectSupplyIncrease : false;
var oldESD = supervisionState ? supervisionState.expectSupplyDecrease : false;
var newESI = oldESI;
var newESD = oldESD;
if ((demandState === FALLING) && ((supplyState === RISING) ||
(supplyState === HIGH) ||
oldESI)) {
newESD = true;
}
if (!oldESI && ((demandState === RISING) ||
(demandState === HIGH)) && ((supplyState === LOW) ||
(supplyState === FALLING))) {
if ((demandState === HIGH) && !oldESD) {
onTaskExit();
} else {
taskFn();
newESI = true;
}
}
if (supplyState === FALLING) newESD = false;
if (supplyState === RISING) newESI = false;
if (newESI || newESD) {
return { expectSupplyIncrease: newESI, expectSupplyDecrease: newESD };
} else {
return null;
}
}
function defaultOnTaskExit(captures) {
console.error("Syndicate: DemandMatcher detected unexpected drop in supply", captures);
}
///////////////////////////////////////////////////////////////////////////
// DemandMatcher itself
function DemandMatcher(demandSpecs, supplySpecs, startTask, options) {
options = Util.extend({
metaLevel: 0,
demandMetaLevel: null,
supplyMetaLevel: null,
taskSupervisor: defaultTaskSupervisor,
onTaskExit: defaultOnTaskExit
}, options);
if (typeof startTask !== 'function') {
throw new Error("Syndicate: DemandMatcher expects 'startTask' function as third argument");
}
this.demandProjectionNames = ensureMatchingProjectionNames(demandSpecs);
this.supplyProjectionNames = ensureMatchingProjectionNames(supplySpecs);
ensureMatchingProjectionNames([demandSpecs[0], supplySpecs[0]]);
this.demandSpecs = demandSpecs;
this.supplySpecs = supplySpecs;
this.demandPatterns = demandSpecs.map(function (s) { return Trie.projectionToPattern(s); });
this.supplyPatterns = supplySpecs.map(function (s) { return Trie.projectionToPattern(s); });
this.demandMetaLevel =
(options.demandMetaLevel === null) ? options.metaLevel : options.demandMetaLevel;
this.supplyMetaLevel =
(options.supplyMetaLevel === null) ? options.metaLevel : options.supplyMetaLevel;
function metaWrap(n) {
return function (s) { return Patch.prependAtMeta(s, n); };
}
this.demandProjections = demandSpecs.map(metaWrap(this.demandMetaLevel));
this.supplyProjections = supplySpecs.map(metaWrap(this.supplyMetaLevel));
this.taskSupervisor = options.taskSupervisor;
this.startTask = startTask;
this.onTaskExit = options.onTaskExit;
this.currentDemand = Immutable.Set();
this.currentSupply = Immutable.Set();
this.supervisionStates = Immutable.Map();
if ('name' in options) {
this.name = options.name;
}
}
function ensureMatchingProjectionNames(specs) {
if (!(specs.length > 0)) {
throw new Error("Syndicate: DemandMatcher needs at least one spec");
}
var names = null;
specs.forEach(function (spec) {
if (names === null) {
names = Trie.projectionNames(spec);
} else {
if (JSON.stringify(names) !== JSON.stringify(Trie.projectionNames(spec))) {
throw new Error("Syndicate: DemandMatcher needs identical capture names");
}
}
});
return names;
}
DemandMatcher.prototype.boot = function () {
var p = Patch.emptyPatch;
function extend(ml) {
return function (pat) { p = p.andThen(Patch.sub(pat, ml)); };
}
this.demandPatterns.forEach(extend(this.demandMetaLevel));
this.supplyPatterns.forEach(extend(this.supplyMetaLevel));
return p;
};
DemandMatcher.prototype.handleEvent = function (e) {
if (e.type === "stateChange") {
this.handlePatch(e.patch);
}
};
DemandMatcher.prototype.handlePatch = function (p) {
var self = this;
var dN = self.demandProjectionNames.length;
var sN = self.supplyProjectionNames.length;
var addedDemand = self.extractKeys(p.added, self.demandProjections, dN, 'demand');
var removedDemand = self.extractKeys(p.removed, self.demandProjections, dN, 'demand');
var addedSupply = self.extractKeys(p.added, self.supplyProjections, sN, 'supply');
var removedSupply = self.extractKeys(p.removed, self.supplyProjections, sN, 'supply');
// Though the added and removed sets of patches are always disjoint,
// *after projection* this may not hold. Cancel out any overlaps.
var demandOverlap = addedDemand.intersect(removedDemand);
var supplyOverlap = addedSupply.intersect(removedSupply);
addedDemand = addedDemand.subtract(demandOverlap);
removedDemand = removedDemand.subtract(demandOverlap);
addedSupply = addedSupply.subtract(supplyOverlap);
removedSupply = removedSupply.subtract(supplyOverlap);
var allTasks = addedDemand.union(addedSupply).union(removedDemand).union(removedSupply);
allTasks.forEach(function (captures) {
function taskFn() {
self.startTask(Trie.captureToObject(captures, self.demandProjectionNames));
}
function onTaskExit() {
self.onTaskExit(Trie.captureToObject(captures, self.demandProjectionNames));
}
var demandState = computeState(self.currentDemand, addedDemand, removedDemand, captures);
var supplyState = computeState(self.currentSupply, addedSupply, removedSupply, captures);
var oldSupervisionState = self.supervisionStates.get(captures, null);
var newSupervisionState = self.taskSupervisor(demandState,
supplyState,
oldSupervisionState,
taskFn,
onTaskExit);
if (newSupervisionState === null) {
self.supervisionStates = self.supervisionStates.remove(captures);
} else {
self.supervisionStates = self.supervisionStates.set(captures, newSupervisionState);
}
});
self.currentSupply = self.currentSupply.union(addedSupply).subtract(removedSupply);
self.currentDemand = self.currentDemand.union(addedDemand).subtract(removedDemand);
};
DemandMatcher.prototype.extractKeys = function (trie, projections, keyCount, whichSide) {
var ks = Immutable.Set();
projections.forEach(function (proj) {
var moreKs = Trie.trieKeys(Trie.project(trie, proj), keyCount);
if (!moreKs) {
throw new Error("Syndicate: wildcard "+whichSide+" detected:\n" +
JSON.stringify(proj) + "\n" +
Trie.prettyTrie(trie));
}
ks = ks.union(moreKs);
});
return ks;
};
function computeState(current, added, removed, captures) {
var isPresent = current.has(captures);
var isChanging = added.has(captures) || removed.has(captures);
return (isPresent ? IS_PRESENT : 0) | (isChanging ? IS_CHANGING : 0);
}
///////////////////////////////////////////////////////////////////////////
module.exports.DemandMatcher = DemandMatcher;