js-marketplace-2014/src/minimart.js

558 lines
16 KiB
JavaScript

var Route = require("./route.js");
///////////////////////////////////////////////////////////////////////////
// TODO: trigger-guards as per minimart
/*---------------------------------------------------------------------------*/
/* Events and Actions */
var __ = Route.__;
var _$ = Route._$;
function sub(pattern, metaLevel, level) {
return Route.simpleGestalt(false, pattern, metaLevel, level);
}
function pub(pattern, metaLevel, level) {
return Route.simpleGestalt(true, pattern, metaLevel, level);
}
function spawn(behavior, initialGestalts) {
return { type: "spawn",
behavior: behavior,
initialGestalt: Route.gestaltUnion(initialGestalts || []) };
}
function updateRoutes(gestalts) {
return { type: "routes", gestalt: Route.gestaltUnion(gestalts) };
}
function pendingRoutingUpdate(aggregate, affectedSubgestalt, knownTarget) {
return { type: "pendingRoutingUpdate",
aggregate: aggregate,
affectedSubgestalt: affectedSubgestalt,
knownTarget: knownTarget };
}
function sendMessage(m, metaLevel, isFeedback) {
return { type: "message",
metaLevel: (metaLevel === undefined) ? 0 : metaLevel,
message: m,
isFeedback: (isFeedback === undefined) ? false : isFeedback };
}
function shutdownWorld() {
return { type: "shutdownWorld" };
}
/*---------------------------------------------------------------------------*/
/* Configurations */
function World(bootFn) {
this.alive = true;
this.eventQueue = [];
this.runnablePids = {};
this.partialGestalt = Route.emptyGestalt; // Only gestalt from local processes
this.fullGestalt = Route.emptyGestalt ;; // partialGestalt unioned with downwardGestalt
this.processTable = {};
this.tombstones = {};
this.downwardGestalt = Route.emptyGestalt;
this.processActions = [];
this.asChild(-1, bootFn, true);
}
/* Class state / methods */
World.nextPid = 0;
World.stack = [];
World.current = function () {
return World.stack[World.stack.length - 1][0];
};
World.activePid = function () {
return World.stack[World.stack.length - 1][1];
};
World.send = function (m, metaLevel, isFeedback) {
World.current().enqueueAction(World.activePid(), sendMessage(m, metaLevel, isFeedback));
};
World.updateRoutes = function (gestalts) {
World.current().enqueueAction(World.activePid(), updateRoutes(gestalts));
};
World.spawn = function (behavior, initialGestalts) {
World.current().enqueueAction(World.activePid(), spawn(behavior, initialGestalts));
};
World.exit = function (exn) {
World.current().kill(World.activePid(), exn);
};
World.shutdownWorld = function () {
World.current().enqueueAction(World.activePid(), shutdownWorld());
};
World.withWorldStack = function (stack, f) {
var oldStack = World.stack;
World.stack = stack;
var result = null;
try {
result = f();
} catch (e) {
World.stack = oldStack;
throw e;
}
World.stack = oldStack;
return result;
};
World.wrap = function (f) {
var savedStack = World.stack.slice();
return function () {
var actuals = arguments;
return World.withWorldStack(savedStack, function () {
var result = World.current().asChild(World.activePid(), function () {
return f.apply(null, actuals);
});
for (var i = World.stack.length - 1; i >= 0; i--) {
World.stack[i][0].markPidRunnable(World.stack[i][1]);
}
return result;
});
};
};
/* Instance methods */
World.prototype.enqueueAction = function (pid, action) {
this.processActions.push([pid, action]);
};
// The code is written to maintain the runnablePids set carefully, to
// ensure we can locally decide whether we're inert or not without
// having to search the whole deep process tree.
World.prototype.isInert = function () {
return this.eventQueue.length === 0
&& this.processActions.length === 0
&& Route.is_emptySet(this.runnablePids);
};
World.prototype.markPidRunnable = function (pid) {
this.runnablePids[pid] = [pid];
};
World.prototype.step = function () {
this.dispatchEvents();
this.performActions();
this.stepChildren();
return this.alive && !this.isInert();
};
World.prototype.asChild = function (pid, f, omitLivenessCheck) {
if (!(pid in this.processTable) && !omitLivenessCheck) {
console.warn("World.asChild eliding invocation of dead process", pid);
return;
}
World.stack.push([this, pid]);
var result = null;
try {
result = f();
} catch (e) {
this.kill(pid, e);
}
if (World.stack.pop()[0] !== this) {
throw new Error("Internal error: World stack imbalance");
}
return result;
};
World.prototype.kill = function (pid, exn) {
if (exn && exn.stack) {
console.log("Process exited", pid, exn, exn.stack);
} else {
console.log("Process exited", pid, exn);
}
var p = this.processTable[pid];
if (p && p.behavior.trapexit) {
this.asChild(pid, function () { return p.behavior.trapexit(exn); });
}
delete this.processTable[pid];
if (p) {
if (exn) {
p.exitReason = exn;
this.tombstones[pid] = p;
}
this.applyAndIssueRoutingUpdate(p.gestalt, Route.emptyGestalt);
}
};
World.prototype.stepChildren = function () {
var pids = this.runnablePids;
this.runnablePids = {};
for (var pid in pids) {
var p = this.processTable[pid];
if (p && p.behavior.step /* exists, haven't called it yet */) {
var childBusy = this.asChild(pid | 0, function () { return p.behavior.step() });
if (childBusy) this.markPidRunnable(pid);
}
}
};
World.prototype.performActions = function () {
var queue = this.processActions;
this.processActions = [];
var item;
while ((item = queue.shift()) && this.alive) {
this.performAction(item[0], item[1]);
}
};
World.prototype.dispatchEvents = function () {
var queue = this.eventQueue;
this.eventQueue = [];
var item;
while ((item = queue.shift())) {
this.dispatchEvent(item);
}
};
World.prototype.performAction = function (pid, action) {
switch (action.type) {
case "spawn":
var pid = World.nextPid++;
var newGestalt = action.initialGestalt.label(pid);
this.processTable[pid] = { gestalt: newGestalt, behavior: action.behavior };
if (action.behavior.boot) {
this.asChild(pid, function () { action.behavior.boot() });
this.markPidRunnable(pid);
}
this.applyAndIssueRoutingUpdate(Route.emptyGestalt, newGestalt, pid);
break;
case "routes":
if (pid in this.processTable) {
// it may not be: this might be the routing update from a
// kill of the process
var oldGestalt = this.processTable[pid].gestalt;
var newGestalt = action.gestalt.label(pid|0);
// ^ pid|0: convert pid from string (table key!) to integer
this.processTable[pid].gestalt = newGestalt;
this.applyAndIssueRoutingUpdate(oldGestalt, newGestalt, pid);
}
break;
case "message":
if (action.metaLevel === 0) {
this.eventQueue.push(action);
} else {
World.send(action.message, action.metaLevel - 1, action.isFeedback);
}
break;
case "shutdownWorld":
this.alive = false; // force us to stop doing things immediately
World.exit();
break;
default:
var exn = new Error("Action type " + action.type + " not understood");
exn.action = action;
throw exn;
}
};
World.prototype.updateFullGestalt = function () {
this.fullGestalt = this.partialGestalt.union(this.downwardGestalt);
};
World.prototype.issueLocalRoutingUpdate = function (affectedSubgestalt, knownTarget) {
this.eventQueue.push(pendingRoutingUpdate(this.fullGestalt,
affectedSubgestalt,
knownTarget));
};
World.prototype.applyAndIssueRoutingUpdate = function (oldg, newg, knownTarget) {
knownTarget = typeof knownTarget === 'undefined' ? null : knownTarget;
this.partialGestalt = this.partialGestalt.erasePath(oldg).union(newg);
this.updateFullGestalt();
this.issueLocalRoutingUpdate(oldg.union(newg), knownTarget);
World.updateRoutes([this.partialGestalt.drop()]);
};
World.prototype.dispatchEvent = function (e) {
switch (e.type) {
case "pendingRoutingUpdate":
var pids = e.affectedSubgestalt.match(e.aggregate);
if (e.knownTarget !== null) pids.unshift(e.knownTarget);
for (var i = 0; i < pids.length; i++) {
var pid = pids[i];
if (pid === "out") console.warn("Would have delivered a routing update to environment");
var p = this.processTable[pid];
if (p) {
var g = e.aggregate.filter(p.gestalt);
this.asChild(pid, function () { p.behavior.handleEvent(updateRoutes([g])) });
this.markPidRunnable(pid);
}
}
break;
case "message":
var pids = this.partialGestalt.matchValue(e.message, e.metaLevel, e.isFeedback);
for (var i = 0; i < pids.length; i++) {
var pid = pids[i];
var p = this.processTable[pid];
this.asChild(pid, function () { p.behavior.handleEvent(e) });
this.markPidRunnable(pid);
}
break;
default:
var exn = new Error("Event type " + e.type + " not dispatchable");
exn.event = e;
throw exn;
}
};
World.prototype.handleEvent = function (e) {
switch (e.type) {
case "routes":
var oldDownward = this.downwardGestalt;
this.downwardGestalt = e.gestalt.label("out").lift();
this.updateFullGestalt();
this.issueLocalRoutingUpdate(oldDownward.union(this.downwardGestalt), null);
break;
case "message":
this.eventQueue.push(sendMessage(e.message, e.metaLevel + 1, e.isFeedback));
break;
default:
var exn = new Error("Event type " + e.type + " not understood");
exn.event = e;
throw exn;
}
};
/* Debugging, management, and monitoring */
World.prototype.processTree = function () {
var kids = [];
for (var pid in this.processTable) {
var p = this.processTable[pid];
if (p.behavior instanceof World) {
kids.push([pid, p.behavior.processTree()]);
} else {
kids.push([pid, p]);
}
}
for (var pid in this.tombstones) {
kids.push([pid, this.tombstones[pid]]);
}
kids.sort();
return kids;
};
World.prototype.textProcessTree = function (ownPid) {
var lines = [];
function dumpProcess(prefix, pid, p) {
if (p instanceof Array) {
lines.push(prefix + '--+ ' + pid);
for (var i = 0; i < p.length; i++) {
dumpProcess(prefix + ' |', p[i][0], p[i][1]);
}
lines.push(prefix);
} else {
var label = p.behavior.name || p.behavior.constructor.name || '';
var tombstoneString = p.exitReason ? ' (EXITED: ' + p.exitReason + ') ' : '';
lines.push(prefix + '-- ' + pid + ': ' + label +
tombstoneString +
JSON.stringify(p.behavior, function (k, v) {
return k === 'name' ? undefined : v;
}));
}
}
dumpProcess('', ownPid || '', this.processTree());
return lines.join('\n');
};
World.prototype.clearTombstones = function () {
this.tombstones = {};
for (var pid in this.processTable) {
var p = this.processTable[pid];
if (p.behavior instanceof World) {
p.behavior.clearTombstones();
}
}
};
/*---------------------------------------------------------------------------*/
/* Utilities: matching demand for some service */
function DemandMatcher(projection, metaLevel, options) {
options = $.extend({
demandLevel: 0,
supplyLevel: 0,
demandSideIsSubscription: false
}, options);
this.pattern = Route.projectionToPattern(projection);
this.projectionSpec = Route.compileProjection(projection);
this.metaLevel = metaLevel | 0;
this.demandLevel = options.demandLevel;
this.supplyLevel = options.supplyLevel;
this.demandSideIsSubscription = options.demandSideIsSubscription;
this.onDemandIncrease = function (captures) {
console.error("Unhandled increase in demand for route", captures);
};
this.onSupplyDecrease = function (captures) {
console.error("Unhandled decrease in supply for route", captures);
};
this.currentDemand = {};
this.currentSupply = {};
}
DemandMatcher.prototype.boot = function () {
var observerLevel = 1 + Math.max(this.demandLevel, this.supplyLevel);
World.updateRoutes([sub(this.pattern, this.metaLevel, observerLevel),
pub(this.pattern, this.metaLevel, observerLevel)]);
};
DemandMatcher.prototype.handleEvent = function (e) {
if (e.type === "routes") {
this.handleGestalt(e.gestalt);
}
};
DemandMatcher.prototype.handleGestalt = function (gestalt) {
var newDemandMatcher = gestalt.project(this.projectionSpec,
!this.demandSideIsSubscription,
this.metaLevel,
this.demandLevel);
var newSupplyMatcher = gestalt.project(this.projectionSpec,
this.demandSideIsSubscription,
this.metaLevel,
this.supplyLevel);
var newDemand = Route.arrayToSet(Route.matcherKeys(newDemandMatcher));
var newSupply = Route.arrayToSet(Route.matcherKeys(newSupplyMatcher));
var demandDelta = Route.setSubtract(newDemand, this.currentDemand);
var supplyDelta = Route.setSubtract(this.currentSupply, newSupply);
var demandIncr = Route.setSubtract(demandDelta, newSupply);
var supplyDecr = Route.setIntersect(supplyDelta, newDemand);
this.currentDemand = newDemand;
this.currentSupply = newSupply;
for (var k in demandIncr) this.onDemandIncrease(demandIncr[k]);
for (var k in supplyDecr) this.onSupplyDecrease(supplyDecr[k]);
};
/*---------------------------------------------------------------------------*/
/* Utilities: deduplicator */
function Deduplicator(ttl_ms) {
this.ttl_ms = ttl_ms || 10000;
this.queue = [];
this.map = {};
this.timerId = null;
}
Deduplicator.prototype.accept = function (m) {
var s = JSON.stringify(m);
if (s in this.map) return false;
var entry = [(+new Date()) + this.ttl_ms, s, m];
this.map[s] = entry;
this.queue.push(entry);
if (this.timerId === null) {
var self = this;
this.timerId = setInterval(function () { self.expireMessages(); },
this.ttl_ms > 1000 ? 1000 : this.ttl_ms);
}
return true;
};
Deduplicator.prototype.expireMessages = function () {
var now = +new Date();
while (this.queue.length > 0 && this.queue[0][0] <= now) {
var entry = this.queue.shift();
delete this.map[entry[1]];
}
if (this.queue.length === 0) {
clearInterval(this.timerId);
this.timerId = null;
}
};
/*---------------------------------------------------------------------------*/
/* Ground interface */
function Ground(bootFn) {
var self = this;
this.stepperId = null;
World.withWorldStack([[this, -1]], function () {
self.world = new World(bootFn);
});
}
Ground.prototype.step = function () {
var self = this;
return World.withWorldStack([[this, -1]], function () {
return self.world.step();
});
};
Ground.prototype.checkPid = function (pid) {
if (pid !== -1) console.error("Weird pid in Ground markPidRunnable", pid);
};
Ground.prototype.markPidRunnable = function (pid) {
this.checkPid(pid);
this.startStepping();
};
Ground.prototype.startStepping = function () {
var self = this;
if (this.stepperId) return;
if (this.step()) {
this.stepperId = setTimeout(function () {
self.stepperId = null;
self.startStepping();
}, 0);
}
};
Ground.prototype.stopStepping = function () {
if (this.stepperId) {
clearTimeout(this.stepperId);
this.stepperId = null;
}
};
Ground.prototype.enqueueAction = function (pid, action) {
this.checkPid(pid);
if (action.type === 'routes') {
if (!action.gestalt.isEmpty()) {
console.error("You have subscribed to a nonexistent event source.",
action.gestalt.pretty());
}
} else {
console.error("You have sent a message into the outer void.", action);
}
};
///////////////////////////////////////////////////////////////////////////
module.exports.__ = __;
module.exports._$ = _$;
module.exports.sub = sub;
module.exports.pub = pub;
module.exports.spawn = spawn;
module.exports.updateRoutes = updateRoutes;
module.exports.sendMessage = sendMessage;
module.exports.shutdownWorld = shutdownWorld;
module.exports.World = World;
module.exports.DemandMatcher = DemandMatcher;
module.exports.Deduplicator = Deduplicator;
module.exports.Ground = Ground;
module.exports.Route = Route;