var Route = require("./route.js"); var Util = require("./util.js"); /////////////////////////////////////////////////////////////////////////// // TODO: trigger-guards as per minimart /*---------------------------------------------------------------------------*/ /* Events and Actions */ var __ = Route.__; var _$ = Route._$; function sub(pattern, metaLevel, level) { return Route.simpleGestalt(false, pattern, metaLevel, level); } function pub(pattern, metaLevel, level) { return Route.simpleGestalt(true, pattern, metaLevel, level); } function spawn(behavior, initialGestalts) { return { type: "spawn", behavior: behavior, initialGestalt: Route.gestaltUnion(initialGestalts || []) }; } function updateRoutes(gestalts) { return { type: "routes", gestalt: Route.gestaltUnion(gestalts) }; } function pendingRoutingUpdate(aggregate, affectedSubgestalt, knownTarget) { return { type: "pendingRoutingUpdate", aggregate: aggregate, affectedSubgestalt: affectedSubgestalt, knownTarget: knownTarget }; } function sendMessage(m, metaLevel, isFeedback) { return { type: "message", metaLevel: (metaLevel === undefined) ? 0 : metaLevel, message: m, isFeedback: (isFeedback === undefined) ? false : isFeedback }; } function shutdownWorld() { return { type: "shutdownWorld" }; } /*---------------------------------------------------------------------------*/ /* Configurations */ function World(bootFn) { this.alive = true; this.eventQueue = []; this.runnablePids = {}; this.partialGestalt = Route.emptyGestalt; // Only gestalt from local processes this.fullGestalt = Route.emptyGestalt ;; // partialGestalt unioned with downwardGestalt this.processTable = {}; this.tombstones = {}; this.downwardGestalt = Route.emptyGestalt; this.processActions = []; this.asChild(-1, bootFn, true); } /* Class state / methods */ World.nextPid = 0; World.stack = []; World.current = function () { return World.stack[World.stack.length - 1][0]; }; World.activePid = function () { return World.stack[World.stack.length - 1][1]; }; World.send = function (m, metaLevel, isFeedback) { World.current().enqueueAction(World.activePid(), sendMessage(m, metaLevel, isFeedback)); }; World.updateRoutes = function (gestalts) { World.current().enqueueAction(World.activePid(), updateRoutes(gestalts)); }; World.spawn = function (behavior, initialGestalts) { World.current().enqueueAction(World.activePid(), spawn(behavior, initialGestalts)); }; World.exit = function (exn) { World.current().kill(World.activePid(), exn); }; World.shutdownWorld = function () { World.current().enqueueAction(World.activePid(), shutdownWorld()); }; World.withWorldStack = function (stack, f) { var oldStack = World.stack; World.stack = stack; var result = null; try { result = f(); } catch (e) { World.stack = oldStack; throw e; } World.stack = oldStack; return result; }; World.wrap = function (f) { var savedStack = World.stack.slice(); return function () { var actuals = arguments; return World.withWorldStack(savedStack, function () { var result = World.current().asChild(World.activePid(), function () { return f.apply(null, actuals); }); for (var i = World.stack.length - 1; i >= 0; i--) { World.stack[i][0].markPidRunnable(World.stack[i][1]); } return result; }); }; }; /* Instance methods */ World.prototype.enqueueAction = function (pid, action) { this.processActions.push([pid, action]); }; // The code is written to maintain the runnablePids set carefully, to // ensure we can locally decide whether we're inert or not without // having to search the whole deep process tree. World.prototype.isInert = function () { return this.eventQueue.length === 0 && this.processActions.length === 0 && Route.is_emptySet(this.runnablePids); }; World.prototype.markPidRunnable = function (pid) { this.runnablePids[pid] = [pid]; }; World.prototype.step = function () { this.dispatchEvents(); this.performActions(); this.stepChildren(); return this.alive && !this.isInert(); }; World.prototype.asChild = function (pid, f, omitLivenessCheck) { if (!(pid in this.processTable) && !omitLivenessCheck) { console.warn("World.asChild eliding invocation of dead process", pid); return; } World.stack.push([this, pid]); var result = null; try { result = f(); } catch (e) { this.kill(pid, e); } if (World.stack.pop()[0] !== this) { throw new Error("Internal error: World stack imbalance"); } return result; }; World.prototype.kill = function (pid, exn) { if (exn && exn.stack) { console.log("Process exited", pid, exn, exn.stack); } else { console.log("Process exited", pid, exn); } var p = this.processTable[pid]; if (p && p.behavior.trapexit) { this.asChild(pid, function () { return p.behavior.trapexit(exn); }); } delete this.processTable[pid]; if (p) { if (exn) { p.exitReason = exn; this.tombstones[pid] = p; } this.applyAndIssueRoutingUpdate(p.gestalt, Route.emptyGestalt); } }; World.prototype.stepChildren = function () { var pids = this.runnablePids; this.runnablePids = {}; for (var pid in pids) { var p = this.processTable[pid]; if (p && p.behavior.step /* exists, haven't called it yet */) { var childBusy = this.asChild(pid | 0, function () { return p.behavior.step() }); if (childBusy) this.markPidRunnable(pid); } } }; World.prototype.performActions = function () { var queue = this.processActions; this.processActions = []; var item; while ((item = queue.shift()) && this.alive) { this.performAction(item[0], item[1]); } }; World.prototype.dispatchEvents = function () { var queue = this.eventQueue; this.eventQueue = []; var item; while ((item = queue.shift())) { this.dispatchEvent(item); } }; World.prototype.performAction = function (pid, action) { switch (action.type) { case "spawn": var pid = World.nextPid++; var newGestalt = action.initialGestalt.label(pid); this.processTable[pid] = { gestalt: newGestalt, behavior: action.behavior }; if (action.behavior.boot) { this.asChild(pid, function () { action.behavior.boot() }); this.markPidRunnable(pid); } this.applyAndIssueRoutingUpdate(Route.emptyGestalt, newGestalt, pid); break; case "routes": if (pid in this.processTable) { // it may not be: this might be the routing update from a // kill of the process var oldGestalt = this.processTable[pid].gestalt; var newGestalt = action.gestalt.label(pid|0); // ^ pid|0: convert pid from string (table key!) to integer this.processTable[pid].gestalt = newGestalt; this.applyAndIssueRoutingUpdate(oldGestalt, newGestalt, pid); } break; case "message": if (action.metaLevel === 0) { this.eventQueue.push(action); } else { World.send(action.message, action.metaLevel - 1, action.isFeedback); } break; case "shutdownWorld": this.alive = false; // force us to stop doing things immediately World.exit(); break; default: var exn = new Error("Action type " + action.type + " not understood"); exn.action = action; throw exn; } }; World.prototype.updateFullGestalt = function () { this.fullGestalt = this.partialGestalt.union(this.downwardGestalt); }; World.prototype.issueLocalRoutingUpdate = function (affectedSubgestalt, knownTarget) { this.eventQueue.push(pendingRoutingUpdate(this.fullGestalt, affectedSubgestalt, knownTarget)); }; World.prototype.applyAndIssueRoutingUpdate = function (oldg, newg, knownTarget) { knownTarget = typeof knownTarget === 'undefined' ? null : knownTarget; this.partialGestalt = this.partialGestalt.erasePath(oldg).union(newg); this.updateFullGestalt(); this.issueLocalRoutingUpdate(oldg.union(newg), knownTarget); World.updateRoutes([this.partialGestalt.drop()]); }; World.prototype.dispatchEvent = function (e) { switch (e.type) { case "pendingRoutingUpdate": var pids = e.affectedSubgestalt.match(e.aggregate); if (e.knownTarget !== null) pids.unshift(e.knownTarget); for (var i = 0; i < pids.length; i++) { var pid = pids[i]; if (pid === "out") console.warn("Would have delivered a routing update to environment"); var p = this.processTable[pid]; if (p) { var g = e.aggregate.filter(p.gestalt); this.asChild(pid, function () { p.behavior.handleEvent(updateRoutes([g])) }); this.markPidRunnable(pid); } } break; case "message": var pids = this.partialGestalt.matchValue(e.message, e.metaLevel, e.isFeedback); for (var i = 0; i < pids.length; i++) { var pid = pids[i]; var p = this.processTable[pid]; this.asChild(pid, function () { p.behavior.handleEvent(e) }); this.markPidRunnable(pid); } break; default: var exn = new Error("Event type " + e.type + " not dispatchable"); exn.event = e; throw exn; } }; World.prototype.handleEvent = function (e) { switch (e.type) { case "routes": var oldDownward = this.downwardGestalt; this.downwardGestalt = e.gestalt.label("out").lift(); this.updateFullGestalt(); this.issueLocalRoutingUpdate(oldDownward.union(this.downwardGestalt), null); break; case "message": this.eventQueue.push(sendMessage(e.message, e.metaLevel + 1, e.isFeedback)); break; default: var exn = new Error("Event type " + e.type + " not understood"); exn.event = e; throw exn; } }; /* Debugging, management, and monitoring */ World.prototype.processTree = function () { var kids = []; for (var pid in this.processTable) { var p = this.processTable[pid]; if (p.behavior instanceof World) { kids.push([pid, p.behavior.processTree()]); } else { kids.push([pid, p]); } } for (var pid in this.tombstones) { kids.push([pid, this.tombstones[pid]]); } kids.sort(function (a, b) { return a[0] - b[0] }); return kids; }; World.prototype.textProcessTree = function (ownPid) { var lines = []; function dumpProcess(prefix, pid, p) { if (Array.isArray(p)) { lines.push(prefix + '--+ ' + pid); for (var i = 0; i < p.length; i++) { dumpProcess(prefix + ' |', p[i][0], p[i][1]); } lines.push(prefix); } else { var label = || || ''; var tombstoneString = p.exitReason ? ' (EXITED: ' + p.exitReason + ') ' : ''; var stringifiedState; try { var rawState = p.behavior.debugState ? p.behavior.debugState() : p.behavior; stringifiedState = JSON.stringify(rawState, function (k, v) { return (k === 'name') ? undefined : v; }); } catch (e) { stringifiedState = "(cannot convert process state to JSON)"; } lines.push(prefix + '-- ' + pid + ': ' + label + tombstoneString + stringifiedState); } } dumpProcess('', ownPid || '', this.processTree()); return lines.join('\n'); }; World.prototype.clearTombstones = function () { this.tombstones = {}; for (var pid in this.processTable) { var p = this.processTable[pid]; if (p.behavior instanceof World) { p.behavior.clearTombstones(); } } }; /*---------------------------------------------------------------------------*/ /* Utilities: matching demand for some service */ function DemandMatcher(projection, metaLevel, options) { options = Util.extend({ demandLevel: 0, supplyLevel: 0, demandSideIsSubscription: false }, options); this.pattern = Route.projectionToPattern(projection); this.projectionSpec = Route.compileProjection(projection); this.metaLevel = metaLevel | 0; this.demandLevel = options.demandLevel; this.supplyLevel = options.supplyLevel; this.demandSideIsSubscription = options.demandSideIsSubscription; this.onDemandIncrease = function (captures) { console.error("Unhandled increase in demand for route", captures); }; this.onSupplyDecrease = function (captures) { console.error("Unhandled decrease in supply for route", captures); }; this.currentDemand = {}; this.currentSupply = {}; } DemandMatcher.prototype.boot = function () { var observerLevel = 1 + Math.max(this.demandLevel, this.supplyLevel); World.updateRoutes([sub(this.pattern, this.metaLevel, observerLevel), pub(this.pattern, this.metaLevel, observerLevel)]); }; DemandMatcher.prototype.handleEvent = function (e) { if (e.type === "routes") { this.handleGestalt(e.gestalt); } }; DemandMatcher.prototype.handleGestalt = function (gestalt) { var newDemandMatcher = gestalt.project(this.projectionSpec, !this.demandSideIsSubscription, this.metaLevel, this.demandLevel); var newSupplyMatcher = gestalt.project(this.projectionSpec, this.demandSideIsSubscription, this.metaLevel, this.supplyLevel); var newDemand = Route.arrayToSet(Route.matcherKeys(newDemandMatcher)); var newSupply = Route.arrayToSet(Route.matcherKeys(newSupplyMatcher)); var demandDelta = Route.setSubtract(newDemand, this.currentDemand); var supplyDelta = Route.setSubtract(this.currentSupply, newSupply); var demandIncr = Route.setSubtract(demandDelta, newSupply); var supplyDecr = Route.setIntersect(supplyDelta, newDemand); this.currentDemand = newDemand; this.currentSupply = newSupply; for (var k in demandIncr) this.onDemandIncrease(demandIncr[k]); for (var k in supplyDecr) this.onSupplyDecrease(supplyDecr[k]); }; /*---------------------------------------------------------------------------*/ /* Utilities: deduplicator */ function Deduplicator(ttl_ms) { this.ttl_ms = ttl_ms || 10000; this.queue = []; = {}; this.timerId = null; } Deduplicator.prototype.accept = function (m) { var s = JSON.stringify(m); if (s in return false; var entry = [(+new Date()) + this.ttl_ms, s, m];[s] = entry; this.queue.push(entry); if (this.timerId === null) { var self = this; this.timerId = setInterval(function () { self.expireMessages(); }, this.ttl_ms > 1000 ? 1000 : this.ttl_ms); } return true; }; Deduplicator.prototype.expireMessages = function () { var now = +new Date(); while (this.queue.length > 0 && this.queue[0][0] <= now) { var entry = this.queue.shift(); delete[entry[1]]; } if (this.queue.length === 0) { clearInterval(this.timerId); this.timerId = null; } }; /////////////////////////////////////////////////////////////////////////// module.exports.__ = __; module.exports._$ = _$; module.exports.sub = sub; = pub; module.exports.spawn = spawn; module.exports.updateRoutes = updateRoutes; module.exports.sendMessage = sendMessage; module.exports.shutdownWorld = shutdownWorld; module.exports.World = World; module.exports.DemandMatcher = DemandMatcher; module.exports.Deduplicator = Deduplicator; module.exports.Route = Route;