2016-02-03 02:11:50 +00:00
|
|
|
"use strict";
|
|
|
|
|
2016-02-02 19:36:31 +00:00
|
|
|
var Immutable = require('immutable');
|
2016-05-08 15:33:39 +00:00
|
|
|
var Trie = require('./trie.js');
|
2016-02-02 19:36:31 +00:00
|
|
|
var Patch = require('./patch.js');
|
|
|
|
|
|
|
|
function Mux(nextPid, routingTable, interestTable) {
|
|
|
|
this.nextPid = nextPid || 0;
|
2016-05-08 15:33:39 +00:00
|
|
|
this.routingTable = routingTable || Trie.emptyTrie;
|
2016-02-02 19:36:31 +00:00
|
|
|
this.interestTable = interestTable || Immutable.Map(); // pid -> Trie
|
|
|
|
}
|
|
|
|
|
|
|
|
Mux.prototype.shallowCopy = function () {
|
|
|
|
return new Mux(this.nextPid, this.routingTable, this.interestTable);
|
|
|
|
};
|
|
|
|
|
|
|
|
Mux.prototype.addStream = function (initialPatch) {
|
|
|
|
var newPid = this.nextPid++;
|
|
|
|
return this.updateStream(newPid, initialPatch);
|
|
|
|
};
|
|
|
|
|
|
|
|
Mux.prototype.removeStream = function (pid) {
|
|
|
|
return this.updateStream(pid, Patch.removeEverythingPatch);
|
|
|
|
};
|
|
|
|
|
|
|
|
Mux.prototype.updateStream = function (pid, unclampedPatch) {
|
|
|
|
var oldInterests = this.interestsOf(pid);
|
|
|
|
var oldRoutingTable = this.routingTable;
|
|
|
|
var delta = unclampedPatch.label(Immutable.Set.of(pid)).limit(oldInterests);
|
|
|
|
var deltaAggregate = delta.computeAggregate(pid, oldRoutingTable);
|
|
|
|
var newInterests = delta.applyTo(oldInterests);
|
|
|
|
var newRoutingTable = delta.applyTo(oldRoutingTable);
|
|
|
|
|
|
|
|
this.routingTable = newRoutingTable;
|
|
|
|
|
2016-05-08 15:33:39 +00:00
|
|
|
if (Trie.is_emptyTrie(newInterests)) {
|
2016-02-02 19:36:31 +00:00
|
|
|
this.interestTable = this.interestTable.remove(pid);
|
|
|
|
} else {
|
|
|
|
this.interestTable = this.interestTable.set(pid, newInterests);
|
|
|
|
}
|
|
|
|
|
|
|
|
return { pid: pid,
|
|
|
|
delta: delta,
|
|
|
|
deltaAggregate: deltaAggregate };
|
|
|
|
};
|
|
|
|
|
2016-05-08 15:33:39 +00:00
|
|
|
var atMetaEverything = Trie.compilePattern(true, Patch.atMeta(Trie.__));
|
2016-05-10 04:40:53 +00:00
|
|
|
var atMetaBranchKeys = Immutable.List([[Patch.atMeta.meta.arity, Patch.atMeta.meta]]);
|
2016-05-08 15:33:39 +00:00
|
|
|
var onlyMeta = Trie.trieSuccess(Immutable.Set.of("meta"));
|
2016-02-12 04:56:42 +00:00
|
|
|
|
|
|
|
function echoCancelledTrie(t) {
|
2016-05-08 15:33:39 +00:00
|
|
|
return Trie.subtract(t, atMetaEverything, function (v1, v2) {
|
|
|
|
return v1.has("meta") ? onlyMeta : Trie.emptyTrie;
|
2016-02-12 04:56:42 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-02-02 20:52:48 +00:00
|
|
|
function computeEvents(oldMux, newMux, updateStreamResult) {
|
2016-02-02 19:36:31 +00:00
|
|
|
var actingPid = updateStreamResult.pid;
|
|
|
|
var delta = updateStreamResult.delta;
|
|
|
|
var deltaAggregate = updateStreamResult.deltaAggregate;
|
2016-02-12 04:56:42 +00:00
|
|
|
var deltaAggregateNoEcho = (actingPid === "meta")
|
|
|
|
? delta // because echo-cancellation means that meta-SCNs are always new information
|
2016-05-10 04:40:53 +00:00
|
|
|
: deltaAggregate.withoutAtMeta();
|
2016-02-02 19:36:31 +00:00
|
|
|
var oldRoutingTable = oldMux.routingTable;
|
|
|
|
var newRoutingTable = newMux.routingTable;
|
2016-02-12 04:56:42 +00:00
|
|
|
var affectedPids =
|
|
|
|
computeAffectedPids(oldRoutingTable, deltaAggregateNoEcho).add(actingPid).remove("meta");
|
2016-02-02 19:36:31 +00:00
|
|
|
return {
|
|
|
|
eventMap: Immutable.Map().withMutations(function (result) {
|
|
|
|
affectedPids.forEach(function (pid) {
|
2016-02-02 20:52:48 +00:00
|
|
|
var patchForPid;
|
2016-02-02 19:36:31 +00:00
|
|
|
if (pid === actingPid) {
|
2016-02-12 04:56:42 +00:00
|
|
|
var part1 = new Patch.Patch(
|
|
|
|
echoCancelledTrie(Patch.biasedIntersection(newRoutingTable, delta.added)),
|
|
|
|
echoCancelledTrie(Patch.biasedIntersection(oldRoutingTable, delta.removed)));
|
|
|
|
var part2 = new Patch.Patch(Patch.biasedIntersection(deltaAggregateNoEcho.added,
|
2016-02-02 19:36:31 +00:00
|
|
|
newMux.interestsOf(pid)),
|
2016-02-12 04:56:42 +00:00
|
|
|
Patch.biasedIntersection(deltaAggregateNoEcho.removed,
|
2016-02-02 19:36:31 +00:00
|
|
|
oldMux.interestsOf(pid)));
|
2016-02-02 20:52:48 +00:00
|
|
|
patchForPid = part1.unsafeUnion(part2);
|
2016-02-02 19:36:31 +00:00
|
|
|
} else {
|
2016-02-12 04:56:42 +00:00
|
|
|
patchForPid = deltaAggregateNoEcho.viewFrom(oldMux.interestsOf(pid));
|
2016-02-02 20:52:48 +00:00
|
|
|
}
|
|
|
|
if (patchForPid.isNonEmpty()) {
|
|
|
|
result.set(pid, patchForPid);
|
2016-02-02 19:36:31 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}),
|
|
|
|
metaEvents: (actingPid === "meta")
|
|
|
|
? Immutable.List()
|
|
|
|
: Immutable.List.of(delta.computeAggregate(actingPid, oldRoutingTable, true).drop())
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
function computeAffectedPids(routingTable, delta) {
|
2016-05-08 15:33:39 +00:00
|
|
|
var cover = Trie._union(delta.added, delta.removed);
|
2016-05-10 04:40:53 +00:00
|
|
|
routingTable = Trie.trieStep(routingTable, Patch.observe.meta.arity, Patch.observe.meta);
|
2016-05-08 15:33:39 +00:00
|
|
|
return Trie.matchTrie(cover, routingTable, Immutable.Set(),
|
|
|
|
function (v1, v2, acc) { return acc.union(v2); });
|
2016-02-02 19:36:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Mux.prototype.routeMessage = function (body) {
|
2016-06-27 18:59:53 +00:00
|
|
|
if (Trie.matchValue(this.routingTable, body, null, function(a, b) { return a||b }) === null) {
|
|
|
|
return Trie.matchValue(this.routingTable, Patch.observe(body), Immutable.Set(),
|
|
|
|
function (a, b) { return a.union(b) });
|
2016-02-02 19:36:31 +00:00
|
|
|
} else {
|
|
|
|
// Some other stream has declared body
|
|
|
|
return Immutable.Set();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
Mux.prototype.interestsOf = function (pid) {
|
2016-05-08 15:33:39 +00:00
|
|
|
return this.interestTable.get(pid, Trie.emptyTrie);
|
2016-02-02 19:36:31 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
///////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
module.exports.Mux = Mux;
|
2016-02-02 20:52:48 +00:00
|
|
|
module.exports.computeEvents = computeEvents;
|
2016-02-02 19:36:31 +00:00
|
|
|
module.exports.computeAffectedPids = computeAffectedPids;
|