Update
This commit is contained in:
parent
8abc6b6975
commit
98e447c7f7
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -1,237 +0,0 @@
|
|||
"use strict";
|
||||
// WebSocket-based Syndicate broker client
|
||||
|
||||
var Trie = Syndicate.Trie;
|
||||
var Patch = Syndicate.Patch;
|
||||
var Dataspace = Syndicate.Dataspace;
|
||||
var Struct = Syndicate.Struct;
|
||||
var DemandMatcher = Syndicate.DemandMatcher;
|
||||
var __ = Syndicate.__;
|
||||
var _$ = Syndicate._$;
|
||||
|
||||
var DEFAULT_RECONNECT_DELAY = 100; // ms
|
||||
var MAX_RECONNECT_DELAY = 30000; // ms
|
||||
var DEFAULT_IDLE_TIMEOUT = 300000; // ms; i.e., 5 minutes
|
||||
var DEFAULT_PING_INTERVAL = DEFAULT_IDLE_TIMEOUT - 10000; // ms
|
||||
|
||||
var toBroker = Struct.makeConstructor('toBroker', ['url', 'assertion']);
|
||||
var fromBroker = Struct.makeConstructor('fromBroker', ['url', 'assertion']);
|
||||
var brokerConnection = Struct.makeConstructor('brokerConnection', ['url']);
|
||||
var brokerConnected = Struct.makeConstructor('brokerConnected', ['url']);
|
||||
var forceBrokerDisconnect = Struct.makeConstructor('forceBrokerDisconnect', ['url']);
|
||||
|
||||
function spawnBrokerClientDriver() {
|
||||
var URL = _$('url'); // capture used to extract URL
|
||||
Dataspace.spawn(
|
||||
new Dataspace(function () {
|
||||
Dataspace.spawn(
|
||||
new DemandMatcher([brokerConnection(URL)],
|
||||
[brokerConnection(URL)],
|
||||
{
|
||||
demandMetaLevel: 1,
|
||||
supplyMetaLevel: 0,
|
||||
onDemandIncrease: function (c) {
|
||||
Dataspace.spawn(new BrokerClientConnection(c.url));
|
||||
}
|
||||
}));
|
||||
}));
|
||||
}
|
||||
|
||||
function BrokerClientConnection(wsurl) {
|
||||
this.wsurl = wsurl;
|
||||
this.sock = null;
|
||||
|
||||
this.sendsAttempted = 0;
|
||||
this.sendsTransmitted = 0;
|
||||
this.receiveCount = 0;
|
||||
this.connectionCount = 0;
|
||||
|
||||
this.reconnectDelay = DEFAULT_RECONNECT_DELAY;
|
||||
this.idleTimeout = DEFAULT_IDLE_TIMEOUT;
|
||||
this.pingInterval = DEFAULT_PING_INTERVAL;
|
||||
|
||||
this.localAssertions = Trie.emptyTrie;
|
||||
this.remoteAssertions = Trie.emptyTrie;
|
||||
|
||||
this.activityTimestamp = 0;
|
||||
this.idleTimer = null;
|
||||
this.pingTimer = null;
|
||||
}
|
||||
|
||||
BrokerClientConnection.prototype.clearHeartbeatTimers = function () {
|
||||
if (this.idleTimer) { clearTimeout(this.idleTimer); this.idleTimer = null; }
|
||||
if (this.pingTimer) { clearTimeout(this.pingTimer); this.pingTimer = null; }
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.recordActivity = function () {
|
||||
var self = this;
|
||||
this.activityTimestamp = +(new Date());
|
||||
this.clearHeartbeatTimers();
|
||||
this.idleTimer = setTimeout(function () { self.forceclose(); }, this.idleTimeout);
|
||||
this.pingTimer = setTimeout(function () { self.safeSend(JSON.stringify("ping")) },
|
||||
this.pingInterval);
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.boot = function () {
|
||||
this.reconnect();
|
||||
var initialAssertions =
|
||||
Patch.sub(toBroker(this.wsurl, __), 1) // read assertions to go out
|
||||
.andThen(Patch.sub(Patch.observe(fromBroker(this.wsurl, __)), 1)) // and monitor interests
|
||||
.andThen(Patch.assert(brokerConnection(this.wsurl))) // signal to DemandMatcher that we exist
|
||||
.andThen(Patch.sub(brokerConnection(this.wsurl), 1)) // track demand
|
||||
.andThen(Patch.sub(forceBrokerDisconnect(this.wsurl), 1))
|
||||
;
|
||||
return initialAssertions;
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.trapexit = function () {
|
||||
this.forceclose();
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.isConnected = function () {
|
||||
return this.sock && this.sock.readyState === this.sock.OPEN;
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.safeSend = function (m) {
|
||||
// console.log('safeSend', m);
|
||||
try {
|
||||
this.sendsAttempted++;
|
||||
if (this.isConnected()) {
|
||||
this.sock.send(m);
|
||||
this.sendsTransmitted++;
|
||||
}
|
||||
} catch (e) {
|
||||
console.warn("Trapped exn while sending", e);
|
||||
}
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.sendPatch = function (p) {
|
||||
var j = JSON.stringify(Codec.encodeEvent(Syndicate.stateChange(p)));
|
||||
this.safeSend(j);
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.handleEvent = function (e) {
|
||||
// console.log("BrokerClientConnection.handleEvent", e);
|
||||
switch (e.type) {
|
||||
case "stateChange":
|
||||
if (e.patch.project(Patch.atMeta(brokerConnection(_$))).hasRemoved()) {
|
||||
// console.log("Client is no longer interested in this connection", this.wsurl);
|
||||
Dataspace.exit();
|
||||
}
|
||||
|
||||
var pTo = e.patch.project(Patch.atMeta(toBroker(__, _$)));
|
||||
|
||||
var pObsFrom = e.patch.project(Patch.atMeta(Patch.observe(fromBroker(__, _$))));
|
||||
pObsFrom = new Patch.Patch(
|
||||
Trie.compilePattern(true, Patch.observe(Trie.embeddedTrie(pObsFrom.added))),
|
||||
Trie.compilePattern(true, Patch.observe(Trie.embeddedTrie(pObsFrom.removed))));
|
||||
|
||||
var newLocalAssertions = this.localAssertions;
|
||||
newLocalAssertions = pTo.label(Immutable.Set.of("to")).applyTo(newLocalAssertions);
|
||||
newLocalAssertions = pObsFrom.label(Immutable.Set.of("obsFrom")).applyTo(newLocalAssertions);
|
||||
|
||||
var trueSet = Immutable.Set.of(true);
|
||||
var alwaysTrueSet = function (v) { return trueSet; };
|
||||
var p = Patch.computePatch(Trie.relabel(this.localAssertions, alwaysTrueSet),
|
||||
Trie.relabel(newLocalAssertions, alwaysTrueSet));
|
||||
|
||||
this.localAssertions = newLocalAssertions;
|
||||
// console.log("localAssertions");
|
||||
// console.log(Trie.prettyTrie(this.localAssertions));
|
||||
// console.log(p.pretty());
|
||||
this.sendPatch(p);
|
||||
break;
|
||||
|
||||
case "message":
|
||||
var m = e.message;
|
||||
if (Patch.atMeta.isClassOf(m)) {
|
||||
m = m[0];
|
||||
if (toBroker.isClassOf(m)) {
|
||||
var j = JSON.stringify(Codec.encodeEvent(Syndicate.message(m[1])));
|
||||
this.safeSend(j);
|
||||
} else if (forceBrokerDisconnect.isClassOf(m)) {
|
||||
this.forceclose();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.forceclose = function (keepReconnectDelay) {
|
||||
if (!keepReconnectDelay) {
|
||||
this.reconnectDelay = DEFAULT_RECONNECT_DELAY;
|
||||
}
|
||||
this.clearHeartbeatTimers();
|
||||
if (this.sock) {
|
||||
console.log("BrokerClientConnection.forceclose called");
|
||||
this.sock.close();
|
||||
this.sock = null;
|
||||
}
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.reconnect = function () {
|
||||
var self = this;
|
||||
this.forceclose(true);
|
||||
this.connectionCount++;
|
||||
this.sock = new WebSocket(this.wsurl);
|
||||
this.sock.onopen = Dataspace.wrap(function (e) { return self.onopen(e); });
|
||||
this.sock.onmessage = Dataspace.wrap(function (e) {
|
||||
self.receiveCount++;
|
||||
return self.onmessage(e);
|
||||
});
|
||||
this.sock.onclose = Dataspace.wrap(function (e) { return self.onclose(e); });
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.onopen = function (e) {
|
||||
console.log("connected to " + this.sock.url);
|
||||
this.recordActivity();
|
||||
Dataspace.stateChange(Patch.assert(brokerConnected(this.wsurl), 1));
|
||||
this.reconnectDelay = DEFAULT_RECONNECT_DELAY;
|
||||
this.sendPatch((new Patch.Patch(this.localAssertions, Trie.emptyTrie)).strip());
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.onmessage = function (wse) {
|
||||
// console.log("onmessage", wse);
|
||||
this.recordActivity();
|
||||
|
||||
var j = JSON.parse(wse.data, Struct.reviver);
|
||||
if (j === "ping") {
|
||||
this.safeSend(JSON.stringify("pong"));
|
||||
return;
|
||||
} else if (j === "pong") {
|
||||
return; // recordActivity already took care of our timers
|
||||
}
|
||||
|
||||
var e = Codec.decodeAction(j);
|
||||
switch (e.type) {
|
||||
case "stateChange": {
|
||||
var added = fromBroker(this.wsurl, Trie.embeddedTrie(e.patch.added));
|
||||
var removed = fromBroker(this.wsurl, Trie.embeddedTrie(e.patch.removed));
|
||||
var p = Patch.assert(added, 1).andThen(Patch.retract(removed, 1));
|
||||
// console.log('incoming stateChange');
|
||||
// console.log(p.pretty());
|
||||
Dataspace.stateChange(p);
|
||||
break;
|
||||
}
|
||||
case "message": {
|
||||
Dataspace.send(fromBroker(this.wsurl, e.message), 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
BrokerClientConnection.prototype.onclose = function (e) {
|
||||
var self = this;
|
||||
|
||||
// console.log("onclose", e);
|
||||
Dataspace.stateChange(Patch.retract(brokerConnected(this.wsurl), 1));
|
||||
|
||||
console.log("reconnecting to " + this.wsurl + " in " + this.reconnectDelay + "ms");
|
||||
setTimeout(Dataspace.wrap(function () { self.reconnect(); }), this.reconnectDelay);
|
||||
|
||||
this.reconnectDelay = this.reconnectDelay * 1.618 + (Math.random() * 1000);
|
||||
this.reconnectDelay =
|
||||
this.reconnectDelay > MAX_RECONNECT_DELAY
|
||||
? MAX_RECONNECT_DELAY + (Math.random() * 1000)
|
||||
: this.reconnectDelay;
|
||||
};
|
|
@ -1,41 +0,0 @@
|
|||
"use strict";
|
||||
// Wire protocol representation of events and actions
|
||||
|
||||
var Trie = Syndicate.Trie;
|
||||
var Struct = Syndicate.Struct;
|
||||
|
||||
function _encode(e) {
|
||||
switch (e.type) {
|
||||
case "stateChange":
|
||||
return ["patch", e.patch.toJSON()];
|
||||
case "message":
|
||||
return ["message", e.message];
|
||||
}
|
||||
}
|
||||
|
||||
function _decode(what) {
|
||||
return function (j) {
|
||||
switch (j[0]) {
|
||||
case "patch":
|
||||
return Syndicate.stateChange(Patch.fromJSON(j[1]));
|
||||
case "message":
|
||||
return Syndicate.message(j[1]);
|
||||
default:
|
||||
throw new Error("Invalid JSON-encoded " + what + ": " + JSON.stringify(j));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// module.exports.encodeEvent = _encode;
|
||||
// module.exports.decodeEvent = _decode("event");
|
||||
// module.exports.encodeAction = _encode;
|
||||
// module.exports.decodeAction = _decode("action");
|
||||
|
||||
var Codec = {
|
||||
encodeEvent: _encode,
|
||||
decodeEvent: _decode("event"),
|
||||
encodeAction: _encode,
|
||||
decodeAction: _decode("action")
|
||||
};
|
|
@ -5,12 +5,9 @@
|
|||
<meta charset="utf-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<link href="style.css" rel="stylesheet">
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/immutable/3.8.1/immutable.min.js"></script> <!-- TODO: ??? -->
|
||||
<script src="../../third-party/jquery-2.2.0.min.js"></script>
|
||||
<script src="../../dist/syndicatecompiler.js"></script>
|
||||
<script src="../../dist/syndicate.js"></script>
|
||||
<script src="codec.js"></script>
|
||||
<script src="broker-client.js"></script>
|
||||
<script type="text/syndicate-js" src="index.js"></script>
|
||||
</head>
|
||||
<body>
|
||||
|
|
|
@ -4,6 +4,12 @@ assertion type says(who, message);
|
|||
var DOM = Syndicate.DOM.DOM;
|
||||
var jQueryEvent = Syndicate.JQuery.jQueryEvent;
|
||||
|
||||
var brokerConnected = Syndicate.Broker.brokerConnected;
|
||||
var brokerConnection = Syndicate.Broker.brokerConnection;
|
||||
var toBroker = Syndicate.Broker.toBroker;
|
||||
var fromBroker = Syndicate.Broker.fromBroker;
|
||||
var forceBrokerDisconnect = Syndicate.Broker.forceBrokerDisconnect;
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Application
|
||||
|
||||
|
@ -105,7 +111,7 @@ $(document).ready(function () {
|
|||
Syndicate.JQuery.spawnJQueryDriver();
|
||||
Syndicate.DOM.spawnDOMDriver();
|
||||
Syndicate.WakeDetector.spawnWakeDetector();
|
||||
spawnBrokerClientDriver();
|
||||
Syndicate.Broker.spawnBrokerClientDriver();
|
||||
spawnInputChangeMonitor();
|
||||
spawnChatApp();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue