Mux; beginnings of tests for it
This commit is contained in:
parent
6b9c7fee67
commit
4d87f071da
|
@ -0,0 +1,101 @@
|
|||
var Immutable = require('immutable');
|
||||
var Route = require('./route.js');
|
||||
var Patch = require('./patch.js');
|
||||
|
||||
function Mux(nextPid, routingTable, interestTable) {
|
||||
this.nextPid = nextPid || 0;
|
||||
this.routingTable = routingTable || Route.emptyTrie;
|
||||
this.interestTable = interestTable || Immutable.Map(); // pid -> Trie
|
||||
}
|
||||
|
||||
Mux.prototype.shallowCopy = function () {
|
||||
return new Mux(this.nextPid, this.routingTable, this.interestTable);
|
||||
};
|
||||
|
||||
Mux.prototype.addStream = function (initialPatch) {
|
||||
var newPid = this.nextPid++;
|
||||
return this.updateStream(newPid, initialPatch);
|
||||
};
|
||||
|
||||
Mux.prototype.removeStream = function (pid) {
|
||||
return this.updateStream(pid, Patch.removeEverythingPatch);
|
||||
};
|
||||
|
||||
Mux.prototype.updateStream = function (pid, unclampedPatch) {
|
||||
var oldInterests = this.interestsOf(pid);
|
||||
var oldRoutingTable = this.routingTable;
|
||||
var delta = unclampedPatch.label(Immutable.Set.of(pid)).limit(oldInterests);
|
||||
var deltaAggregate = delta.computeAggregate(pid, oldRoutingTable);
|
||||
var newInterests = delta.applyTo(oldInterests);
|
||||
var newRoutingTable = delta.applyTo(oldRoutingTable);
|
||||
|
||||
this.routingTable = newRoutingTable;
|
||||
|
||||
if (Route.is_emptyTrie(newInterests)) {
|
||||
this.interestTable = this.interestTable.remove(pid);
|
||||
} else {
|
||||
this.interestTable = this.interestTable.set(pid, newInterests);
|
||||
}
|
||||
|
||||
return { pid: pid,
|
||||
delta: delta,
|
||||
deltaAggregate: deltaAggregate };
|
||||
};
|
||||
|
||||
function computePatches(oldMux, newMux, updateStreamResult) {
|
||||
var actingPid = updateStreamResult.pid;
|
||||
var delta = updateStreamResult.delta;
|
||||
var deltaAggregate = updateStreamResult.deltaAggregate;
|
||||
var oldRoutingTable = oldMux.routingTable;
|
||||
var newRoutingTable = newMux.routingTable;
|
||||
var affectedPids = computeAffectedPids(oldRoutingTable, delta).remove("meta");
|
||||
return {
|
||||
eventMap: Immutable.Map().withMutations(function (result) {
|
||||
affectedPids.forEach(function (pid) {
|
||||
if (pid === actingPid) {
|
||||
var part1 = new Patch.Patch(Patch.biasedIntersection(newRoutingTable, delta.added),
|
||||
Patch.biasedIntersection(oldRoutingTable, delta.removed));
|
||||
var part2 = new Patch.Patch(Patch.biasedIntersection(deltaAggregate.added,
|
||||
newMux.interestsOf(pid)),
|
||||
Patch.biasedIntersection(deltaAggregate.removed,
|
||||
oldMux.interestsOf(pid)));
|
||||
results.set(pid, part1.unsafeUnion(part2));
|
||||
} else {
|
||||
result.set(pid, updateStreamResult.deltaAggregate.viewFrom(oldMux.interestsOf(pid)));
|
||||
}
|
||||
});
|
||||
}),
|
||||
metaEvents: (actingPid === "meta")
|
||||
? Immutable.List()
|
||||
: Immutable.List.of(delta.computeAggregate(actingPid, oldRoutingTable, true).drop())
|
||||
};
|
||||
}
|
||||
|
||||
function computeAffectedPids(routingTable, delta) {
|
||||
var cover = Route._union(delta.added, delta.removed);
|
||||
routingTable = Route.trieStep(routingTable, Route.SOA);
|
||||
routingTable = Route.trieStep(routingTable, Patch.$Observe);
|
||||
return Route.matchTrie(cover, routingTable, Immutable.Set(),
|
||||
function (v, r, acc) {
|
||||
return acc.union(Route.trieStep(r, Route.EOA).value);
|
||||
});
|
||||
}
|
||||
|
||||
Mux.prototype.routeMessage = function (body) {
|
||||
if (Route.matchValue(this.routingTable, body) === null) {
|
||||
return Route.matchValue(m.routingTable, Patch.observe(body)) || Immutable.Set();
|
||||
} else {
|
||||
// Some other stream has declared body
|
||||
return Immutable.Set();
|
||||
}
|
||||
};
|
||||
|
||||
Mux.prototype.interestsOf = function (pid) {
|
||||
return this.interestTable.get(pid, Route.emptyTrie);
|
||||
};
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
module.exports.Mux = Mux;
|
||||
module.exports.computePatches = computePatches;
|
||||
module.exports.computeAffectedPids = computeAffectedPids;
|
|
@ -10,6 +10,7 @@ function Patch(added, removed) {
|
|||
}
|
||||
|
||||
var emptyPatch = new Patch(Route.emptyTrie, Route.emptyTrie);
|
||||
var removeEverythingPatch = new Patch(Route.emptyTrie, Route.compilePattern(true, __));
|
||||
|
||||
var $Observe = new Route.$Special("$Observe");
|
||||
var $AtMeta = new Route.$Special("$AtMeta");
|
||||
|
@ -206,6 +207,7 @@ function prettyPatch(p) {
|
|||
|
||||
module.exports.Patch = Patch;
|
||||
module.exports.emptyPatch = emptyPatch;
|
||||
module.exports.removeEverythingPatch = removeEverythingPatch;
|
||||
|
||||
module.exports.$Observe = $Observe;
|
||||
module.exports.$AtMeta = $AtMeta;
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
var expect = require('expect.js');
|
||||
var Immutable = require('immutable');
|
||||
|
||||
var Route = require('../src/route.js');
|
||||
var Patch = require('../src/patch.js');
|
||||
var Mux = require('../src/mux.js');
|
||||
|
||||
var __ = Route.__;
|
||||
var _$ = Route._$;
|
||||
|
||||
function checkPrettyTrie(m, expected) {
|
||||
expect(Route.prettyTrie(m)).to.equal(expected.join('\n'));
|
||||
}
|
||||
|
||||
function checkPrettyPatch(p, expectedAdded, expectedRemoved) {
|
||||
expect(Patch.prettyPatch(p)).to.equal(
|
||||
('<<<<<<<< Removed:\n' + expectedRemoved.join('\n') +
|
||||
'======== Added:\n' + expectedAdded.join('\n') +
|
||||
'>>>>>>>>\n'));
|
||||
}
|
||||
|
||||
describe('mux stream', function () {
|
||||
function getM() {
|
||||
var m = new Mux.Mux();
|
||||
expect(m.addStream(Patch.assert(1).andThen(Patch.assert(2))).pid).to.equal(0);
|
||||
expect(m.addStream(Patch.assert(3).andThen(Patch.assert(2))).pid).to.equal(1);
|
||||
return m;
|
||||
}
|
||||
|
||||
describe('addition', function () {
|
||||
it('should union interests appropriately', function () {
|
||||
var m = getM();
|
||||
checkPrettyTrie(m.routingTable, [' 1 >{[0]}',
|
||||
' 2 >{[0,1]}',
|
||||
' 3 >{[1]}']);
|
||||
checkPrettyTrie(m.interestsOf(0), [' 1 >{[0]}',
|
||||
' 2 >{[0]}']);
|
||||
checkPrettyTrie(m.interestsOf(1), [' 2 >{[1]}',
|
||||
' 3 >{[1]}']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('update', function () {
|
||||
it('should update interests appropriately', function () {
|
||||
var rawPatch =
|
||||
Patch.assert(1)
|
||||
.andThen(Patch.retract(2))
|
||||
.andThen(Patch.retract(3))
|
||||
.andThen(Patch.assert(4))
|
||||
.andThen(Patch.retract(99));
|
||||
checkPrettyPatch(rawPatch,
|
||||
[' 1 >{true}',
|
||||
' 4 >{true}'],
|
||||
[' 2 >{true}',
|
||||
' 3 >{true}',
|
||||
' 99 >{true}']);
|
||||
|
||||
var m = getM();
|
||||
var updateStreamResult = m.updateStream(1, rawPatch);
|
||||
expect(updateStreamResult.pid).to.equal(1);
|
||||
checkPrettyPatch(updateStreamResult.delta,
|
||||
[' 1 >{[1]}',
|
||||
' 4 >{[1]}'],
|
||||
[' 2 >{[1]}',
|
||||
' 3 >{[1]}']);
|
||||
checkPrettyTrie(m.routingTable, [' 1 >{[0,1]}',
|
||||
' 2 >{[0]}',
|
||||
' 4 >{[1]}']);
|
||||
checkPrettyTrie(m.interestsOf(0), [' 1 >{[0]}',
|
||||
' 2 >{[0]}']);
|
||||
checkPrettyTrie(m.interestsOf(1), [' 1 >{[1]}',
|
||||
' 4 >{[1]}']);
|
||||
checkPrettyPatch(updateStreamResult.deltaAggregate,
|
||||
[' 4 >{[1]}'],
|
||||
[' 3 >{[1]}']);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Loading…
Reference in New Issue