
634 lines
17 KiB

/* Unification */
var __ = new Object(); /* wildcard marker */
__.__ = "__";
function unificationFailed() {
throw {unificationFailed: true};
function unify1(a, b) {
var i;
if (a === __) return b;
if (b === __) return a;
if (a === b) return a;
if (Array.isArray(a) && Array.isArray(b)) {
if (a.length !== b.length) unificationFailed();
var result = new Array(a.length);
for (i = 0; i < a.length; i++) {
result[i] = unify1(a[i], b[i]);
return result;
if (typeof a === "object" && typeof b === "object") {
/* TODO: consider other kinds of matching. I've chosen to
require any field mentioned by either side to be present in
both. Does that make sense? */
var result = ({});
for (i in a) { if (a.hasOwnProperty(i)) result[i] = true; }
for (i in b) { if (b.hasOwnProperty(i)) result[i] = true; }
for (i in result) {
if (result.hasOwnProperty(i)) {
result[i] = unify1(a[i], b[i]);
return result;
function unify(a, b) {
try {
// console.log("unify", JSON.stringify(a), JSON.stringify(b));
return {result: unify1(a, b)};
} catch (e) {
if (e.unificationFailed) return undefined;
throw e;
function anyUnify(aa, bb) {
for (var i = 0; i < aa.length; i++) {
for (var j = 0; j < bb.length; j++) {
if (unify(aa[i], bb[j])) return true;
return false;
/* Events and Actions */
function Route(isSubscription, pattern, metaLevel, level) {
this.isSubscription = isSubscription;
this.pattern = pattern;
this.metaLevel = (metaLevel === undefined) ? 0 : metaLevel;
this.level = (level === undefined) ? 0 : level;
Route.prototype.drop = function () {
if (this.metaLevel === 0) { return null; }
return new Route(this.isSubscription, this.pattern, this.metaLevel - 1, this.level);
Route.prototype.lift = function () {
return new Route(this.isSubscription, this.pattern, this.metaLevel + 1, this.level);
Route.prototype.toJSON = function () {
return [this.isSubscription ? "sub" : "pub", this.pattern, this.metaLevel, this.level];
Route.prototype.visibilityToRoute = function (other, overrideOtherLevel) {
if (!this.isSubscription !== other.isSubscription) return undefined;
if (this.metaLevel !== other.metaLevel) return undefined;
if (this.level >= (overrideOtherLevel || other.level)) return undefined;
return unify(this.pattern, other.pattern); // returns undefined if unification fails
Route.fromJSON = function (j) {
switch (j[0]) {
case "sub": return new Route(true, j[1], j[2], j[3]);
case "pub": return new Route(false, j[1], j[2], j[3]);
default: throw { message: "Invalid JSON-encoded route: " + JSON.stringify(j) };
function sub(pattern, metaLevel, level) {
return new Route(true, pattern, metaLevel, level);
function pub(pattern, metaLevel, level) {
return new Route(false, pattern, metaLevel, level);
function spawn(behavior, initialRoutes) {
return { type: "spawn",
behavior: behavior,
initialRoutes: (initialRoutes === undefined) ? [] : initialRoutes };
function updateRoutes(routes) {
return { type: "routes", routes: routes };
function sendMessage(m, metaLevel, isFeedback) {
return { type: "message",
metaLevel: (metaLevel === undefined) ? 0 : metaLevel,
message: m,
isFeedback: (isFeedback === undefined) ? false : isFeedback };
function shutdownWorld() {
return { type: "shutdownWorld" };
/* Metafunctions */
function dropRoutes(routes) {
var result = [];
for (var i = 0; i < routes.length; i++) {
var r = routes[i].drop();
if (r) { result.push(r); }
return result;
function liftRoutes(routes) {
var result = [];
for (var i = 0; i < routes.length; i++) {
return result;
function intersectRoutes(rs1, rs2) {
var result = [];
for (var i = 0; i < rs1.length; i++) {
for (var j = 0; j < rs2.length; j++) {
var ri = rs1[i];
var rj = rs2[j];
var u = ri.visibilityToRoute(rj);
if (u) {
var rk = new Route(ri.isSubscription, u.result, ri.metaLevel, ri.level);
return result;
function filterEvent(e, routes) {
switch (e.type) {
case "routes":
return updateRoutes(intersectRoutes(e.routes, routes));
case "message":
for (var i = 0; i < routes.length; i++) {
var r = routes[i];
if (e.metaLevel === r.metaLevel
&& e.isFeedback === !r.isSubscription
&& unify(e.message, r.pattern))
return e;
return null;
throw { message: "Event type " + e.type + " not filterable",
event: e };
/* Configurations */
function World(bootFn) {
this.nextPid = 0;
this.eventQueue = [];
this.processTable = {};
this.downwardRoutes = [];
this.processActions = [];
this.activePid = null;
this.stepperId = null;
this.asChild(-1, bootFn, true);
/* Class state / methods */
World.stack = [];
World.current = function () {
return World.stack[World.stack.length - 1];
World.send = function (m, metaLevel, isFeedback) {
World.current().enqueueAction(sendMessage(m, metaLevel, isFeedback));
World.updateRoutes = function (routes) {
World.spawn = function (behavior, initialRoutes) {
World.current().enqueueAction(spawn(behavior, initialRoutes));
World.exit = function (exn) {
World.shutdownWorld = function () {
World.withWorldStack = function (stack, f) {
var oldStack = World.stack;
World.stack = stack;
var result = null;
try {
result = f();
} catch (e) {
World.stack = oldStack;
throw e;
World.stack = oldStack;
return result;
World.wrap = function (f) {
var savedStack = World.stack.slice();
var savedPid = World.current().activePid;
return function () {
var actuals = arguments;
return World.withWorldStack(savedStack, function () {
var result = World.current().asChild(savedPid, function () {
return f.apply(null, actuals);
return result;
/* Instance methods */
World.prototype.killActive = function (exn) {
this.kill(this.activePid, exn);
World.prototype.enqueueAction = function (action) {
this.processActions.push([this.activePid, action]);
World.prototype.isQuiescent = function () {
return this.eventQueue.length === 0 && this.processActions.length === 0;
World.prototype.step = function () {
return this.stepChildren() || !this.isQuiescent();
World.prototype.startStepping = function () {
var self = this;
if (this.stepperId) return;
if (this.step()) {
this.stepperId = setTimeout(function () {
self.stepperId = null;
}, 0);
World.prototype.stopStepping = function () {
if (this.stepperId) {
this.stepperId = null;
World.prototype.asChild = function (pid, f, omitLivenessCheck) {
if (!(pid in this.processTable) && !omitLivenessCheck) {
console.warn("World.asChild eliding invocation of dead process", pid);
var result = null;
this.activePid = pid;
try {
result = f();
} catch (e) {
this.kill(pid, e);
this.activePid = null;
if (World.stack.pop() !== this) {
throw { message: "Internal error: World stack imbalance" };
return result;
World.prototype.kill = function (pid, exn) {
if (exn && exn.stack) {
console.log("Process exited", pid, exn, exn.stack);
} else {
console.log("Process exited", pid, exn);
var p = this.processTable[pid];
if (p && p.behavior.trapexit) {
this.asChild(pid, function () { return p.behavior.trapexit(exn); });
delete this.processTable[pid];
World.prototype.stepChildren = function () {
var someChildBusy = false;
for (var pid in this.processTable) {
var p = this.processTable[pid];
if (p.behavior.step /* exists, haven't called it yet */) {
var childBusy = this.asChild(pid, function () { return p.behavior.step() });
someChildBusy = someChildBusy || childBusy;
return someChildBusy;
World.prototype.performActions = function () {
var queue = this.processActions;
this.processActions = [];
var item;
while ((item = queue.shift())) {
this.performAction(item[0], item[1]);
World.prototype.dispatchEvents = function () {
var queue = this.eventQueue;
this.eventQueue = [];
var item;
while ((item = queue.shift())) {
World.prototype.performAction = function (pid, action) {
switch (action.type) {
case "spawn":
var pid = this.nextPid++;
this.processTable[pid] = { routes: action.initialRoutes, behavior: action.behavior };
if (action.behavior.boot) { this.asChild(pid, function () { action.behavior.boot() }); }
case "routes":
if (pid in this.processTable) {
// it may not be: this might be the routing update from a
// kill of the process
this.processTable[pid].routes = action.routes;
case "message":
if (action.metaLevel === 0) {
} else {
World.send(action.message, action.metaLevel - 1, action.isFeedback);
case "shutdownWorld":
throw { message: "Action type " + action.type + " not understood",
action: action };
World.prototype.aggregateRoutes = function (base) {
var acc = base.slice();
for (var pid in this.processTable) {
var p = this.processTable[pid];
for (var i = 0; i < p.routes.length; i++) {
return acc;
World.prototype.issueLocalRoutingUpdate = function () {
World.prototype.issueRoutingUpdate = function () {
World.prototype.dispatchEvent = function (e) {
for (var pid in this.processTable) {
var p = this.processTable[pid];
var e1 = filterEvent(e, p.routes);
// console.log("filtering", e, p.routes, e1);
if (e1) { this.asChild(pid, function () { p.behavior.handleEvent(e1) }); }
World.prototype.handleEvent = function (e) {
switch (e.type) {
case "routes":
this.downwardRoutes = liftRoutes(e.routes);
case "message":
this.eventQueue.push(sendMessage(e.message, e.metaLevel + 1, e.isFeedback));
throw { message: "Event type " + e.type + " not understood",
event: e };
/* Utilities: detecting presence/absence events via routing events */
function PresenceDetector(initialRoutes) {
this.state = this._digestRoutes(initialRoutes === undefined ? [] : initialRoutes);
PresenceDetector.prototype._digestRoutes = function (routes) {
var newState = {};
for (var i = 0; i < routes.length; i++) {
newState[JSON.stringify(routes[i].toJSON())] = routes[i];
return newState;
PresenceDetector.prototype.getRouteList = function () {
var rs = [];
for (var k in this.state) { rs.push(this.state[k]); }
return rs;
PresenceDetector.prototype.handleRoutes = function (routes) {
var added = [];
var removed = [];
var newState = this._digestRoutes(routes);
for (var k in newState) {
if (!(k in this.state)) {
} else {
delete this.state[k];
for (var k in this.state) {
this.state = newState;
return { added: added, removed: removed };
PresenceDetector.prototype.presenceExistsFor = function (probeRoute) {
for (var k in this.state) {
var existingRoute = this.state[k];
if (existingRoute.visibleToRoute(probeRoute, Infinity) &&
(existingRoute.level === probeRoute.level))
return true;
return false;
/* Utilities: matching demand for some service */
function DemandMatcher(pattern, metaLevel, options) {
options = $.extend({
demandLevel: 0,
supplyLevel: 0,
demandSideIsSubscription: true
}, options);
this.pattern = pattern;
this.metaLevel = metaLevel;
this.demandLevel = options.demandLevel;
this.supplyLevel = options.supplyLevel;
this.demandSideIsSubscription = options.demandSideIsSubscription;
this.onDemandIncrease = function (r) {
console.error("Unhandled increase in demand for route", r);
this.onSupplyDecrease = function (r) {
console.error("Unhandled decrease in supply for route", r);
this.state = new PresenceDetector();
DemandMatcher.prototype.boot = function () {
DemandMatcher.prototype.handleEvent = function (e) {
if (e.type === "routes") {
DemandMatcher.prototype.computeDetector = function (demandSide) {
var maxLevel = (this.demandLevel > this.supplyLevel ? this.demandLevel : this.supplyLevel);
return new Route(this.demandSideIsSubscription ? !demandSide : demandSide,
maxLevel + 1);
DemandMatcher.prototype.handleRoutes = function (routes) {
var changes = this.state.handleRoutes(routes);
this.incorporateChanges(true, changes.added);
this.incorporateChanges(false, changes.removed);
DemandMatcher.prototype.incorporateChanges = function (isArrivals, routeList) {
var relevantChangeDetector = this.computeDetector(isArrivals);
var expectedChangeLevel = isArrivals ? this.demandLevel : this.supplyLevel;
var expectedPeerLevel = isArrivals ? this.supplyLevel : this.demandLevel;
for (var i = 0; i < routeList.length; i++) {
var changed = routeList[i];
if (changed.level != expectedChangeLevel) continue;
var relevantChangedN = intersectRoutes([changed], [relevantChangeDetector]);
if (relevantChangedN.length === 0) continue;
var relevantChanged = relevantChangedN[0]; /* there can be only one */
var peerDetector = new Route(relevantChanged.isSubscription,
expectedPeerLevel + 1);
var peerRoutes = intersectRoutes(this.state.getRouteList(), [peerDetector]);
var peerExists = false;
for (var j = 0; j < peerRoutes.length; j++) {
if (peerRoutes[j].level == expectedPeerLevel) {
peerExists = true;
if (isArrivals && !peerExists) { this.onDemandIncrease(relevantChanged); }
if (!isArrivals && peerExists) { this.onSupplyDecrease(relevantChanged); }
/* Utilities: deduplicator */
function Deduplicator(ttl_ms) {
this.ttl_ms = ttl_ms || 10000;
this.queue = [];
this.map = {};
this.timerId = null;
Deduplicator.prototype.accept = function (m) {
var s = JSON.stringify(m);
if (s in this.map) return false;
var entry = [(+new Date()) + this.ttl_ms, s, m];
this.map[s] = entry;
if (this.timerId === null) {
var self = this;
this.timerId = setInterval(function () { self.expireMessages(); },
this.ttl_ms > 1000 ? 1000 : this.ttl_ms);
return true;
Deduplicator.prototype.expireMessages = function () {
var now = +new Date();
while (this.queue.length > 0 && this.queue[0][0] <= now) {
var entry = this.queue.shift();
delete this.map[entry[1]];
if (this.queue.length === 0) {
this.timerId = null;
/* Ground interface */
function Ground(bootFn) {
var self = this;
this.stepperId = null;
this.state = new PresenceDetector();
World.withWorldStack([this], function () {
self.world = new World(bootFn);
Ground.prototype.step = function () {
var self = this;
return World.withWorldStack([this], function () {
return self.world.step();
Ground.prototype.startStepping = World.prototype.startStepping;
Ground.prototype.stopStepping = World.prototype.stopStepping;
Ground.prototype.enqueueAction = function (action) {
if (action.type === 'routes') {
var added = this.state.handleRoutes(action.routes).added;
if (added.length > 0) {
console.error("You have subscribed to a nonexistent event source.", added);
} else {
console.error("You have sent a message into the outer void.", action);