From 77cd357aaa8b034ae0afa14208488cc0102a375d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 31 Oct 2013 12:10:53 +0000 Subject: [PATCH] Websocket client driver + rudimentary chat support --- index.html | 28 ++++- index.js | 313 +++++++++++++++++++++++++++++++++++++++---------- marketplace.js | 45 +++++++ 3 files changed, 322 insertions(+), 64 deletions(-) diff --git a/index.html b/index.html index 48018e0..d3388d6 100644 --- a/index.html +++ b/index.html @@ -20,10 +20,30 @@

Hello, world.

-

- - -

+ + +
+
+
+
+
+
+ + +
+
+
+ +
+ +
+
+
+
+
+
+
+
diff --git a/index.js b/index.js index a55c5c6..df07551 100644 --- a/index.js +++ b/index.js @@ -1,30 +1,55 @@ +Spy/////////////////////////////////////////////////////////////////////////// +// Wire protocol representation of events and actions + +function encodeEvent(e) { + switch (e.type) { + case "routes": + var rs = []; + for (var i = 0; i < e.routes.length; i++) { + rs.push(e.routes[i].toJSON()); + } + return ["routes", rs]; + case "message": + return ["message", e.message, e.metaLevel, e.isFeedback]; + } +} + +function decodeAction(j) { + switch (j[0]) { + case "routes": + var rs = []; + for (var i = 0; i < j[1].length; i++) { + rs.push(Route.fromJSON(j[1][i])); + } + return updateRoutes(rs); + case "message": + return sendMessage(j[1], j[2], j[3]); + default: + throw { message: "Invalid JSON-encoded action: " + JSON.stringify(j) }; + } +} + +/////////////////////////////////////////////////////////////////////////// +// Generic Spy + function Spy() { } Spy.prototype.boot = function () { - World.updateRoutes([sub(__, 0, 1000), pub(__, 0, 1000)]); + World.updateRoutes([sub(__, 0, Infinity), pub(__, 0, Infinity)]); }; Spy.prototype.handleEvent = function (e) { - console.log("SPY", e); -}; - -function JQueryEventRouter(selector, eventName) { - var self = this; - this.selector = selector; - this.eventName = eventName; - this.handler = - World.wrap(function (e) { World.send(["jQuery", self.selector, self.eventName, e]); }); - $(this.selector).on(this.eventName, this.handler); -} - -JQueryEventRouter.prototype.handleEvent = function (e) { - if (e.type === "routes" && e.routes.length === 0) { - $(this.selector).off(this.eventName, this.handler); - World.exit(); + switch (e.type) { + case "routes": console.log("SPY", "routes", e.routes); break; + case "message": console.log("SPY", "message", e.message, e.metaLevel, e.isFeedback); break; + default: console.log("SPY", "unknown", e); break; } }; +/////////////////////////////////////////////////////////////////////////// +// JQuery event driver + function spawnJQueryDriver() { var d = new DemandMatcher(["jQuery", __, __, __]); d.onDemandIncrease = function (r) { @@ -37,50 +62,218 @@ function spawnJQueryDriver() { World.spawn(d); } -var g = new Ground(function () { - console.log('starting ground boot'); - World.spawn(new Spy()); - spawnJQueryDriver(); - World.spawn({ - // step: function () { console.log('dummy step'); }, - boot: function () { - console.log('dummy boot'); - World.updateRoutes([sub(["jQuery", "#testButton", "click", __]), - sub(["jQuery", "#testButton2", "click", __]), - sub(__, 1)]); - World.send({msg: 'hello outer world'}, 1); - World.send({msg: 'hello inner world'}, 0); - // World.spawn(new World(function () { - // World.spawn({ - // boot: function () { - // console.log('w1p1'); - // World.updateRoutes([sub('w1p1')]); - // }, - // handleEvent: function (e) { console.log('w1p1', e); } - // }) - // })) - // World.spawn(new World(function () { - // World.spawn({ - // boot: function () { - // console.log('w2p2'); - // World.updateRoutes([sub('w2p2')]); - // }, - // handleEvent: function (e) { console.log('w2p2', e); } - // }) - // })); - }, - handleEvent: function (e) { - if (e.type === "message" && e.message[0] === "jQuery") { - if (e.message[1] === "#testButton") { - console.log("got a click"); - World.updateRoutes([sub(["jQuery", "#testButton2", "click", __])]); - } else { - console.log("got a click 2"); - // World.exit(); +function JQueryEventRouter(selector, eventName) { + var self = this; + this.selector = selector; + this.eventName = eventName; + this.handler = + World.wrap(function (e) { + World.send(["jQuery", self.selector, self.eventName, e]); + e.preventDefault(); + return false; + }); + $(this.selector).on(this.eventName, this.handler); +} + +JQueryEventRouter.prototype.handleEvent = function (e) { + if (e.type === "routes" && e.routes.length === 0) { + $(this.selector).off(this.eventName, this.handler); + World.exit(); + } +}; + +/////////////////////////////////////////////////////////////////////////// +// WebSocket client driver + +var DEFAULT_RECONNECT_DELAY = 100; +var MAX_RECONNECT_DELAY = 30000; + +function WebSocketConnection(label, wsurl, shouldReconnect) { + this.label = label; + this.wsurl = wsurl; + this.shouldReconnect = shouldReconnect ? true : false; + this.reconnectDelay = DEFAULT_RECONNECT_DELAY; + this.localRoutes = []; + this.peerRoutes = []; + this.prevPeerRoutesMessage = null; + this.sock = null; + this.deduplicator = new Deduplicator(); +} + +WebSocketConnection.prototype.relayRoutes = function () { + // fresh copy each time, suitable for in-place extension/mutation + return [pub([this.label, __, __], 0, 1000), sub([this.label, __, __], 0, 1000)]; +}; + +WebSocketConnection.prototype.aggregateRoutes = function () { + var rs = this.relayRoutes(); + for (var i = 0; i < this.peerRoutes.length; i++) { + var r = this.peerRoutes[i]; + rs.push(new Route(r.isSubscription, + [this.label, __, r.pattern], + r.metaLevel, + r.level)); + } + console.log("WebSocketConnection", this.label, rs); + return rs; +}; + +WebSocketConnection.prototype.boot = function () { + World.updateRoutes(this.aggregateRoutes()); + this.reconnect(); +}; + +WebSocketConnection.prototype.isConnected = function () { + return this.sock && this.sock.readyState === this.sock.OPEN; +} + +WebSocketConnection.prototype.sendLocalRoutes = function () { + if (this.isConnected()) { + this.sock.send(JSON.stringify(encodeEvent(updateRoutes(this.localRoutes)))); + } +}; + +WebSocketConnection.prototype.handleEvent = function (e) { + console.log("WebSocketConnection.handleEvent", e); + switch (e.type) { + case "routes": + this.localRoutes = []; + for (var i = 0; i < e.routes.length; i++) { + var r = e.routes[i]; + if (r.pattern.length && r.pattern.length === 3 + && r.pattern[0] === this.label + && typeof(r.pattern[1]) === "number") + { + this.localRoutes.push(new Route(r.isSubscription, + r.pattern[2], + r.pattern[1], + r.level)); + } + } + this.sendLocalRoutes(); + break; + case "message": + var m = e.message; + if (m.length && m.length === 3 + && m[0] === this.label + && typeof(m[1]) === "number") + { + if (this.isConnected()) { + var encoded = JSON.stringify(encodeEvent(sendMessage(m[2], m[1], e.isFeedback))); + if (this.deduplicator.accept(encoded)) { + this.sock.send(encoded); } } } - }); -}); + break; + } +}; -g.startStepping(); +WebSocketConnection.prototype.reconnect = function () { + var self = this; + if (this.sock) { + this.sock.close(); + this.sock = null; + } + this.sock = new WebSocket(this.wsurl); + this.sock.onopen = World.wrap(function (e) { return self.onopen(e); }); + this.sock.onmessage = World.wrap(function (e) { return self.onmessage(e); }); + this.sock.onclose = World.wrap(function (e) { return self.onclose(e); }); +}; + +WebSocketConnection.prototype.onopen = function (e) { + console.log("onopen", e); + this.reconnectDelay = DEFAULT_RECONNECT_DELAY; + this.sendLocalRoutes(); +}; + +WebSocketConnection.prototype.onmessage = function (wse) { + console.log("onmessage", wse); + var j = JSON.parse(wse.data); + var e = decodeAction(j); + switch (e.type) { + case "routes": + if (this.prevPeerRoutesMessage !== wse.data) { + this.prevPeerRoutesMessage = wse.data; + this.peerRoutes = e.routes; + World.updateRoutes(this.aggregateRoutes()); + } + break; + case "message": + if (this.deduplicator.accept(wse.data)) { + World.send([this.label, e.metaLevel, e.message], 0, e.isFeedback); + } + break; + } +}; + +WebSocketConnection.prototype.onclose = function (e) { + var self = this; + console.log("onclose", e); + if (this.shouldReconnect) { + console.log("reconnecting to " + this.wsurl + " in " + this.reconnectDelay + "ms"); + setTimeout(World.wrap(function () { self.reconnect(); }), this.reconnectDelay); + this.reconnectDelay = this.reconnectDelay * 1.618 + (Math.random() * 1000); + this.reconnectDelay = + this.reconnectDelay > MAX_RECONNECT_DELAY + ? MAX_RECONNECT_DELAY + (Math.random() * 1000) + : this.reconnectDelay; + } +}; + +/////////////////////////////////////////////////////////////////////////// +// Main + +$(document).ready(function () { + $("#chat_form").submit(function (e) { e.preventDefault(); return false; }); + + var g = new Ground(function () { + console.log('starting ground boot'); + World.spawn(new Spy()); + spawnJQueryDriver(); + World.spawn(new WebSocketConnection("broker", "ws://localhost:8000/", true)); + World.spawn({ + // step: function () { console.log('dummy step'); }, + boot: function () { + World.updateRoutes(this.subscriptions()); + }, + nym: function () { + return $("#nym").val(); + }, + subscriptions: function () { + return [sub(["jQuery", "#send_chat", "click", __]), + sub(["jQuery", "#nym", "change", __]), + pub(["broker", 0, [this.nym(), "says", __]]), + sub(["broker", 0, [__, "says", __]], 0, 1)]; + }, + handleEvent: function (e) { + if (e.type === "message") { + switch (e.message[0]) { + case "jQuery": + switch (e.message[1]) { + case "#send_chat": + var inp = $("#chat_input"); + var utterance = inp.val(); + inp.val(""); + World.send(["broker", 0, [this.nym(), "says", utterance]]); + break; + case "#nym": + World.updateRoutes(this.subscriptions()); + break; + default: + console.log("Got jquery event from as-yet-unhandled subscription", + e.message[2], e.message[3]); + } + break; + case "broker": + console.log("Broker message:", e.message[2]); + break; + default: + break; + } + } + } + }); + }); + g.startStepping(); +}); diff --git a/marketplace.js b/marketplace.js index 08b7965..7f1832d 100644 --- a/marketplace.js +++ b/marketplace.js @@ -85,6 +85,14 @@ Route.prototype.toJSON = function () { return [this.isSubscription ? "sub" : "pub", this.pattern, this.metaLevel, this.level]; }; +Route.fromJSON = function (j) { + switch (j[0]) { + case "sub": return new Route(true, j[1], j[2], j[3]); + case "pub": return new Route(false, j[1], j[2], j[3]); + default: throw { message: "Invalid JSON-encoded route: " + JSON.stringify(j) }; + } +}; + function sub(pattern, metaLevel, level) { return new Route(true, pattern, metaLevel, level); } @@ -534,6 +542,43 @@ DemandMatcher.prototype.incorporateChanges = function (isArrivals, routeList) { } }; +/*---------------------------------------------------------------------------*/ +/* Utilities: deduplicator */ + +function Deduplicator(ttl_ms) { + this.ttl_ms = ttl_ms || 10000; + this.queue = []; + this.map = {}; + this.timerId = null; +} + +Deduplicator.prototype.accept = function (m) { + var s = JSON.stringify(m); + if (s in this.map) return false; + var entry = [(+new Date()) + this.ttl_ms, s, m]; + this.map[s] = entry; + this.queue.push(entry); + + if (this.timerId === null) { + var self = this; + this.timerId = setInterval(function () { self.expireMessages(); }, + this.ttl_ms > 1000 ? 1000 : this.ttl_ms); + } + return true; +}; + +Deduplicator.prototype.expireMessages = function () { + var now = +new Date(); + while (this.queue.length > 0 && this.queue[0][0] <= now) { + var entry = this.queue.shift(); + delete this.map[entry[1]]; + } + if (this.queue.length === 0) { + clearInterval(this.timerId); + this.timerId = null; + } +}; + /*---------------------------------------------------------------------------*/ /* Ground interface */