diff --git a/marketplace.js b/marketplace.js index f6c8d91..7e3f634 100644 --- a/marketplace.js +++ b/marketplace.js @@ -62,77 +62,227 @@ function anyUnify(aa, bb) { } /*---------------------------------------------------------------------------*/ -/* Relaying */ +/* Events and Actions */ -function Route(isSubscription, pattern, isMeta, level) { +function Route(isSubscription, pattern, metaLevel, level) { this.isSubscription = isSubscription; this.pattern = pattern; - this.isMeta = isMeta ? true : false; + this.metaLevel = (metaLevel === undefined) ? 0 : metaLevel; this.level = (level === undefined) ? 0 : level; } -function sub(pattern, isMeta, level) { - return new Route(true, pattern, isMeta, level); +Route.prototype.drop = function () { + if (this.metaLevel === 0) { return null; } + return new Route(this.isSubscription, this.pattern, this.metaLevel - 1, this.level); +}; + +Route.prototype.lift = function () { + return new Route(this.isSubscription, this.pattern, this.metaLevel + 1, this.level); +}; + +function sub(pattern, metaLevel, level) { + return new Route(true, pattern, metaLevel, level); } -function pub(pattern, isMeta, level) { - return new Route(false, pattern, isMeta, level); +function pub(pattern, metaLevel, level) { + return new Route(false, pattern, metaLevel, level); } -function spawn(actor) { /* TODO: initialRoutes */ - return { type: "spawn", actor: actor }; +function spawn(behavior, initialRoutes) { + return { type: "spawn", + behavior: behavior, + initialRoutes: (initialRoutes === undefined) ? [] : initialRoutes }; } function updateRoutes(routes) { return { type: "routes", routes: routes }; } -function sendMessage(m) { - return { type: "send", isMeta: false, message: m }; +function sendMessage(m, metaLevel, isFeedback) { + return { type: "send", + metaLevel: (metaLevel === undefined) ? 0 : metaLevel, + message: m, + isFeedback: (isFeedback === undefined) ? false : isFeedback }; } -function metaMessage(m) { - return { type: "send", isMeta: true, message: m }; +/*---------------------------------------------------------------------------*/ +/* Metafunctions */ + +function dropRoutes(routes) { + var result = []; + for (var i = 0; i < routes.length; i++) { + var r = routes[i].drop(); + if (r) { result.push(r); } + } + return result; } +function liftRoutes(routes) { + var result = []; + for (var i = 0; i < routes.length; i++) { + result.push(routes[i].lift()); + } + return result; +} + +function filterEvent(e, routes) { + switch (e.type) { + case "routes": + var result = []; + for (var i = 0; i < e.routes.length; i++) { + for (var j = 0; j < routes.length; j++) { + var ri = e.routes[i]; + var rj = routes[j]; + if (ri.isSubscription === !rj.isSubscription + && ri.metaLevel === rj.metaLevel + && ri.level < rj.level) + { + var u = unify(ri.pattern, rj.pattern); + if (u) { + var rk = new Route(ri.isSubscription, u.result, ri.metaLevel, ri.level); + result.push(rk); + } + } + } + } + return result.length ? result : null; + case "send": + for (var i = 0; i < routes.length; i++) { + var r = routes[i]; + if (e.metaLevel === r.metaLevel + && e.isFeedback === !r.isSubscription + && unify(e.message, r.pattern)) + { + return e; + } + } + return null; + default: + throw { message: "Event type " + e.type + " not filterable", + event: e }; + } +} + +/*---------------------------------------------------------------------------*/ +/* Configurations */ + function World(bootActions) { this.nextPid = 0; this.eventQueue = []; this.processTable = {}; this.downwardRoutes = []; - this.pendingActions = []; + this.processActions = []; + this.activePid = null; this.enqueueActions(-1, bootActions); } +/* Class state / methods */ + +World.current = null; // parameter + +World.send = function (m, metaLevel, isFeedback) { + World.current.enqueueAction(sendMessage(m, metaLevel, isFeedback)); +}; + +World.updateRoutes = function (routes) { + World.current.enqueueAction(updateRoutes(routes)); +}; + +World.spawn = function (behavior, initialRoutes) { + World.current.enqueueAction(spawn(behavior, initialRoutes)); +}; + +/* Instance methods */ + +World.prototype.enqueueAction = function (action) { + this.processActions.push([this.activePid, action]); +}; + World.prototype.enqueueActions = function (pid, actions) { for (var i = 0; i < actions.length; i++) { - this.pendingActions.push([pid, actions[i]]); + this.processActions.push([pid, actions[i]]); } }; -World.prototype.boot = function () { - this.performPendingActions(); +World.prototype.isQuiescent = function () { + return this.eventQueue.length === 0 && this.processActions.length === 0; }; -World.prototype.performPendingActions = function () { - var queue = this.pendingActions; - this.pendingActions = []; +World.prototype.step = function () { + this.dispatchEvents(); + this.performActions(); + return this.stepChildren() || !this.isQuiescent(); +}; + +World.prototype.asChild = function (pid, f) { + var oldWorld = World.current; + var result = null; + World.current = this; + this.activePid = pid; + try { + result = f(); + } catch (e) { + this.kill(pid, e); + } + this.activePid = null; + World.current = oldWorld; + return result; +}; + +World.prototype.kill = function (pid, exn) { + console.log("Killed process " + pid + (exn ? " with reason " + exn.message : "")); + delete this.processTable[pid]; + this.issueRoutingUpdate(); +}; + +World.prototype.stepChildren = function () { + var someChildBusy = false; + for (var pid in this.processTable) { + var p = this.processTable[pid]; + if (p.behavior.step /* exists, haven't called it yet */) { + var childBusy = this.asChild(pid, function () { p.behavior.step() }); + someChildBusy = someChildBusy || childBusy; + } + } + return someChildBusy; +}; + +World.prototype.performActions = function () { + var queue = this.processActions; + this.processActions = []; var item; while ((item = queue.shift())) { this.performAction(item[0], item[1]); } }; +World.prototype.dispatchEvents = function () { + var queue = this.eventQueue; + this.eventQueue = []; + var item; + while ((item = queue.shift())) { + this.dispatchEvent(item); + } +}; + World.prototype.performAction = function (pid, action) { switch (action.type) { case "spawn": - this.doSpawn(action.actor); + var pid = this.nextPid++; + this.processTable[pid] = { routes: action.initialRoutes, behavior: action.behavior }; + if (behavior.boot) { this.asChild(pid, function () { behavior.boot() }); } + this.issueRoutingUpdate(); break; case "routes": - this.doRoutes(pid, action.routes); + this.processTable[pid].routes = action.routes; + this.issueRoutingUpdate(); break; case "send": - this.doSend(pid, action.isMeta, action.message); + if (action.metaLevel === 0) { + this.eventQueue.push(action); + } else { + World.send(action.message, action.metaLevel - 1, action.isFeedback); + } break; default: throw { message: "Action type " + action.type + " not understood", @@ -140,22 +290,40 @@ World.prototype.performAction = function (pid, action) { } }; -World.prototype.doSpawn = function (actor) { - this.processTable[this.nextPid++] = actor; +World.prototype.aggregateRoutes = function (base) { + var acc = base.slice(); + for (var pid in this.processTable) { + var p = this.processTable[pid]; + for (var i = 0; i < p.routes.length; i++) { + acc.push(p.routes[i]); + } + } }; -World.prototype.handleEvent = function (e) { - this.dispatchEvent(e); - this.performPendingActions(); +World.prototype.issueRoutingUpdate = function () { + this.eventQueue.push(updateRoutes(this.aggregateRoutes(this.downwardRoutes))); + World.updateRoutes(dropRoutes(this.aggregateRoutes([]))); }; World.prototype.dispatchEvent = function (e) { - switch (e.type) { - case "routes": - break; - case "send": - break; - default: - break; + for (var pid in this.processTable) { + var p = this.processTable[pid]; + var e1 = filterEvent(e, p.routes); + if (e1) { this.asChild(pid, function () { p.behavior.handleEvent(e1) }); } + } +}; + +World.prototype.handleEvent = function (e) { + switch (e.type) { + case "routes": + this.downwardRoutes = liftRoutes(e.routes); + this.issueRoutingUpdate(); + break; + case "send": + this.eventQueue.push(sendMessage(e.message, e.metaLevel + 1, e.isFeedback)); + break; + default: + throw { message: "Event type " + e.type + " not understood", + event: e }; } };