diff --git a/index.html b/index.html
index 48018e0..d3388d6 100644
--- a/index.html
+++ b/index.html
@@ -20,10 +20,30 @@
Hello, world.
-
-
-
-
+
+
+
+
diff --git a/index.js b/index.js
index a55c5c6..df07551 100644
--- a/index.js
+++ b/index.js
@@ -1,30 +1,55 @@
+Spy///////////////////////////////////////////////////////////////////////////
+// Wire protocol representation of events and actions
+
+function encodeEvent(e) {
+ switch (e.type) {
+ case "routes":
+ var rs = [];
+ for (var i = 0; i < e.routes.length; i++) {
+ rs.push(e.routes[i].toJSON());
+ }
+ return ["routes", rs];
+ case "message":
+ return ["message", e.message, e.metaLevel, e.isFeedback];
+ }
+}
+
+function decodeAction(j) {
+ switch (j[0]) {
+ case "routes":
+ var rs = [];
+ for (var i = 0; i < j[1].length; i++) {
+ rs.push(Route.fromJSON(j[1][i]));
+ }
+ return updateRoutes(rs);
+ case "message":
+ return sendMessage(j[1], j[2], j[3]);
+ default:
+ throw { message: "Invalid JSON-encoded action: " + JSON.stringify(j) };
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////
+// Generic Spy
+
function Spy() {
}
Spy.prototype.boot = function () {
- World.updateRoutes([sub(__, 0, 1000), pub(__, 0, 1000)]);
+ World.updateRoutes([sub(__, 0, Infinity), pub(__, 0, Infinity)]);
};
Spy.prototype.handleEvent = function (e) {
- console.log("SPY", e);
-};
-
-function JQueryEventRouter(selector, eventName) {
- var self = this;
- this.selector = selector;
- this.eventName = eventName;
- this.handler =
- World.wrap(function (e) { World.send(["jQuery", self.selector, self.eventName, e]); });
- $(this.selector).on(this.eventName, this.handler);
-}
-
-JQueryEventRouter.prototype.handleEvent = function (e) {
- if (e.type === "routes" && e.routes.length === 0) {
- $(this.selector).off(this.eventName, this.handler);
- World.exit();
+ switch (e.type) {
+ case "routes": console.log("SPY", "routes", e.routes); break;
+ case "message": console.log("SPY", "message", e.message, e.metaLevel, e.isFeedback); break;
+ default: console.log("SPY", "unknown", e); break;
}
};
+///////////////////////////////////////////////////////////////////////////
+// JQuery event driver
+
function spawnJQueryDriver() {
var d = new DemandMatcher(["jQuery", __, __, __]);
d.onDemandIncrease = function (r) {
@@ -37,50 +62,218 @@ function spawnJQueryDriver() {
World.spawn(d);
}
-var g = new Ground(function () {
- console.log('starting ground boot');
- World.spawn(new Spy());
- spawnJQueryDriver();
- World.spawn({
- // step: function () { console.log('dummy step'); },
- boot: function () {
- console.log('dummy boot');
- World.updateRoutes([sub(["jQuery", "#testButton", "click", __]),
- sub(["jQuery", "#testButton2", "click", __]),
- sub(__, 1)]);
- World.send({msg: 'hello outer world'}, 1);
- World.send({msg: 'hello inner world'}, 0);
- // World.spawn(new World(function () {
- // World.spawn({
- // boot: function () {
- // console.log('w1p1');
- // World.updateRoutes([sub('w1p1')]);
- // },
- // handleEvent: function (e) { console.log('w1p1', e); }
- // })
- // }))
- // World.spawn(new World(function () {
- // World.spawn({
- // boot: function () {
- // console.log('w2p2');
- // World.updateRoutes([sub('w2p2')]);
- // },
- // handleEvent: function (e) { console.log('w2p2', e); }
- // })
- // }));
- },
- handleEvent: function (e) {
- if (e.type === "message" && e.message[0] === "jQuery") {
- if (e.message[1] === "#testButton") {
- console.log("got a click");
- World.updateRoutes([sub(["jQuery", "#testButton2", "click", __])]);
- } else {
- console.log("got a click 2");
- // World.exit();
+function JQueryEventRouter(selector, eventName) {
+ var self = this;
+ this.selector = selector;
+ this.eventName = eventName;
+ this.handler =
+ World.wrap(function (e) {
+ World.send(["jQuery", self.selector, self.eventName, e]);
+ e.preventDefault();
+ return false;
+ });
+ $(this.selector).on(this.eventName, this.handler);
+}
+
+JQueryEventRouter.prototype.handleEvent = function (e) {
+ if (e.type === "routes" && e.routes.length === 0) {
+ $(this.selector).off(this.eventName, this.handler);
+ World.exit();
+ }
+};
+
+///////////////////////////////////////////////////////////////////////////
+// WebSocket client driver
+
+var DEFAULT_RECONNECT_DELAY = 100;
+var MAX_RECONNECT_DELAY = 30000;
+
+function WebSocketConnection(label, wsurl, shouldReconnect) {
+ this.label = label;
+ this.wsurl = wsurl;
+ this.shouldReconnect = shouldReconnect ? true : false;
+ this.reconnectDelay = DEFAULT_RECONNECT_DELAY;
+ this.localRoutes = [];
+ this.peerRoutes = [];
+ this.prevPeerRoutesMessage = null;
+ this.sock = null;
+ this.deduplicator = new Deduplicator();
+}
+
+WebSocketConnection.prototype.relayRoutes = function () {
+ // fresh copy each time, suitable for in-place extension/mutation
+ return [pub([this.label, __, __], 0, 1000), sub([this.label, __, __], 0, 1000)];
+};
+
+WebSocketConnection.prototype.aggregateRoutes = function () {
+ var rs = this.relayRoutes();
+ for (var i = 0; i < this.peerRoutes.length; i++) {
+ var r = this.peerRoutes[i];
+ rs.push(new Route(r.isSubscription,
+ [this.label, __, r.pattern],
+ r.metaLevel,
+ r.level));
+ }
+ console.log("WebSocketConnection", this.label, rs);
+ return rs;
+};
+
+WebSocketConnection.prototype.boot = function () {
+ World.updateRoutes(this.aggregateRoutes());
+ this.reconnect();
+};
+
+WebSocketConnection.prototype.isConnected = function () {
+ return this.sock && this.sock.readyState === this.sock.OPEN;
+}
+
+WebSocketConnection.prototype.sendLocalRoutes = function () {
+ if (this.isConnected()) {
+ this.sock.send(JSON.stringify(encodeEvent(updateRoutes(this.localRoutes))));
+ }
+};
+
+WebSocketConnection.prototype.handleEvent = function (e) {
+ console.log("WebSocketConnection.handleEvent", e);
+ switch (e.type) {
+ case "routes":
+ this.localRoutes = [];
+ for (var i = 0; i < e.routes.length; i++) {
+ var r = e.routes[i];
+ if (r.pattern.length && r.pattern.length === 3
+ && r.pattern[0] === this.label
+ && typeof(r.pattern[1]) === "number")
+ {
+ this.localRoutes.push(new Route(r.isSubscription,
+ r.pattern[2],
+ r.pattern[1],
+ r.level));
+ }
+ }
+ this.sendLocalRoutes();
+ break;
+ case "message":
+ var m = e.message;
+ if (m.length && m.length === 3
+ && m[0] === this.label
+ && typeof(m[1]) === "number")
+ {
+ if (this.isConnected()) {
+ var encoded = JSON.stringify(encodeEvent(sendMessage(m[2], m[1], e.isFeedback)));
+ if (this.deduplicator.accept(encoded)) {
+ this.sock.send(encoded);
}
}
}
- });
-});
+ break;
+ }
+};
-g.startStepping();
+WebSocketConnection.prototype.reconnect = function () {
+ var self = this;
+ if (this.sock) {
+ this.sock.close();
+ this.sock = null;
+ }
+ this.sock = new WebSocket(this.wsurl);
+ this.sock.onopen = World.wrap(function (e) { return self.onopen(e); });
+ this.sock.onmessage = World.wrap(function (e) { return self.onmessage(e); });
+ this.sock.onclose = World.wrap(function (e) { return self.onclose(e); });
+};
+
+WebSocketConnection.prototype.onopen = function (e) {
+ console.log("onopen", e);
+ this.reconnectDelay = DEFAULT_RECONNECT_DELAY;
+ this.sendLocalRoutes();
+};
+
+WebSocketConnection.prototype.onmessage = function (wse) {
+ console.log("onmessage", wse);
+ var j = JSON.parse(wse.data);
+ var e = decodeAction(j);
+ switch (e.type) {
+ case "routes":
+ if (this.prevPeerRoutesMessage !== wse.data) {
+ this.prevPeerRoutesMessage = wse.data;
+ this.peerRoutes = e.routes;
+ World.updateRoutes(this.aggregateRoutes());
+ }
+ break;
+ case "message":
+ if (this.deduplicator.accept(wse.data)) {
+ World.send([this.label, e.metaLevel, e.message], 0, e.isFeedback);
+ }
+ break;
+ }
+};
+
+WebSocketConnection.prototype.onclose = function (e) {
+ var self = this;
+ console.log("onclose", e);
+ if (this.shouldReconnect) {
+ console.log("reconnecting to " + this.wsurl + " in " + this.reconnectDelay + "ms");
+ setTimeout(World.wrap(function () { self.reconnect(); }), this.reconnectDelay);
+ this.reconnectDelay = this.reconnectDelay * 1.618 + (Math.random() * 1000);
+ this.reconnectDelay =
+ this.reconnectDelay > MAX_RECONNECT_DELAY
+ ? MAX_RECONNECT_DELAY + (Math.random() * 1000)
+ : this.reconnectDelay;
+ }
+};
+
+///////////////////////////////////////////////////////////////////////////
+// Main
+
+$(document).ready(function () {
+ $("#chat_form").submit(function (e) { e.preventDefault(); return false; });
+
+ var g = new Ground(function () {
+ console.log('starting ground boot');
+ World.spawn(new Spy());
+ spawnJQueryDriver();
+ World.spawn(new WebSocketConnection("broker", "ws://localhost:8000/", true));
+ World.spawn({
+ // step: function () { console.log('dummy step'); },
+ boot: function () {
+ World.updateRoutes(this.subscriptions());
+ },
+ nym: function () {
+ return $("#nym").val();
+ },
+ subscriptions: function () {
+ return [sub(["jQuery", "#send_chat", "click", __]),
+ sub(["jQuery", "#nym", "change", __]),
+ pub(["broker", 0, [this.nym(), "says", __]]),
+ sub(["broker", 0, [__, "says", __]], 0, 1)];
+ },
+ handleEvent: function (e) {
+ if (e.type === "message") {
+ switch (e.message[0]) {
+ case "jQuery":
+ switch (e.message[1]) {
+ case "#send_chat":
+ var inp = $("#chat_input");
+ var utterance = inp.val();
+ inp.val("");
+ World.send(["broker", 0, [this.nym(), "says", utterance]]);
+ break;
+ case "#nym":
+ World.updateRoutes(this.subscriptions());
+ break;
+ default:
+ console.log("Got jquery event from as-yet-unhandled subscription",
+ e.message[2], e.message[3]);
+ }
+ break;
+ case "broker":
+ console.log("Broker message:", e.message[2]);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ });
+ });
+ g.startStepping();
+});
diff --git a/marketplace.js b/marketplace.js
index 08b7965..7f1832d 100644
--- a/marketplace.js
+++ b/marketplace.js
@@ -85,6 +85,14 @@ Route.prototype.toJSON = function () {
return [this.isSubscription ? "sub" : "pub", this.pattern, this.metaLevel, this.level];
};
+Route.fromJSON = function (j) {
+ switch (j[0]) {
+ case "sub": return new Route(true, j[1], j[2], j[3]);
+ case "pub": return new Route(false, j[1], j[2], j[3]);
+ default: throw { message: "Invalid JSON-encoded route: " + JSON.stringify(j) };
+ }
+};
+
function sub(pattern, metaLevel, level) {
return new Route(true, pattern, metaLevel, level);
}
@@ -534,6 +542,43 @@ DemandMatcher.prototype.incorporateChanges = function (isArrivals, routeList) {
}
};
+/*---------------------------------------------------------------------------*/
+/* Utilities: deduplicator */
+
+function Deduplicator(ttl_ms) {
+ this.ttl_ms = ttl_ms || 10000;
+ this.queue = [];
+ this.map = {};
+ this.timerId = null;
+}
+
+Deduplicator.prototype.accept = function (m) {
+ var s = JSON.stringify(m);
+ if (s in this.map) return false;
+ var entry = [(+new Date()) + this.ttl_ms, s, m];
+ this.map[s] = entry;
+ this.queue.push(entry);
+
+ if (this.timerId === null) {
+ var self = this;
+ this.timerId = setInterval(function () { self.expireMessages(); },
+ this.ttl_ms > 1000 ? 1000 : this.ttl_ms);
+ }
+ return true;
+};
+
+Deduplicator.prototype.expireMessages = function () {
+ var now = +new Date();
+ while (this.queue.length > 0 && this.queue[0][0] <= now) {
+ var entry = this.queue.shift();
+ delete this.map[entry[1]];
+ }
+ if (this.queue.length === 0) {
+ clearInterval(this.timerId);
+ this.timerId = null;
+ }
+};
+
/*---------------------------------------------------------------------------*/
/* Ground interface */