From 4d87f071da81ea95f29800c05f14a1f171f1a57b Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 2 Feb 2016 14:36:31 -0500 Subject: [PATCH] Mux; beginnings of tests for it --- js/src/mux.js | 101 ++++++++++++++++++++++++++++++++++++++++++++ js/src/patch.js | 2 + js/test/test-mux.js | 79 ++++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+) create mode 100644 js/src/mux.js create mode 100644 js/test/test-mux.js diff --git a/js/src/mux.js b/js/src/mux.js new file mode 100644 index 0000000..e15c0d6 --- /dev/null +++ b/js/src/mux.js @@ -0,0 +1,101 @@ +var Immutable = require('immutable'); +var Route = require('./route.js'); +var Patch = require('./patch.js'); + +function Mux(nextPid, routingTable, interestTable) { + this.nextPid = nextPid || 0; + this.routingTable = routingTable || Route.emptyTrie; + this.interestTable = interestTable || Immutable.Map(); // pid -> Trie +} + +Mux.prototype.shallowCopy = function () { + return new Mux(this.nextPid, this.routingTable, this.interestTable); +}; + +Mux.prototype.addStream = function (initialPatch) { + var newPid = this.nextPid++; + return this.updateStream(newPid, initialPatch); +}; + +Mux.prototype.removeStream = function (pid) { + return this.updateStream(pid, Patch.removeEverythingPatch); +}; + +Mux.prototype.updateStream = function (pid, unclampedPatch) { + var oldInterests = this.interestsOf(pid); + var oldRoutingTable = this.routingTable; + var delta = unclampedPatch.label(Immutable.Set.of(pid)).limit(oldInterests); + var deltaAggregate = delta.computeAggregate(pid, oldRoutingTable); + var newInterests = delta.applyTo(oldInterests); + var newRoutingTable = delta.applyTo(oldRoutingTable); + + this.routingTable = newRoutingTable; + + if (Route.is_emptyTrie(newInterests)) { + this.interestTable = this.interestTable.remove(pid); + } else { + this.interestTable = this.interestTable.set(pid, newInterests); + } + + return { pid: pid, + delta: delta, + deltaAggregate: deltaAggregate }; +}; + +function computePatches(oldMux, newMux, updateStreamResult) { + var actingPid = updateStreamResult.pid; + var delta = updateStreamResult.delta; + var deltaAggregate = updateStreamResult.deltaAggregate; + var oldRoutingTable = oldMux.routingTable; + var newRoutingTable = newMux.routingTable; + var affectedPids = computeAffectedPids(oldRoutingTable, delta).remove("meta"); + return { + eventMap: Immutable.Map().withMutations(function (result) { + affectedPids.forEach(function (pid) { + if (pid === actingPid) { + var part1 = new Patch.Patch(Patch.biasedIntersection(newRoutingTable, delta.added), + Patch.biasedIntersection(oldRoutingTable, delta.removed)); + var part2 = new Patch.Patch(Patch.biasedIntersection(deltaAggregate.added, + newMux.interestsOf(pid)), + Patch.biasedIntersection(deltaAggregate.removed, + oldMux.interestsOf(pid))); + results.set(pid, part1.unsafeUnion(part2)); + } else { + result.set(pid, updateStreamResult.deltaAggregate.viewFrom(oldMux.interestsOf(pid))); + } + }); + }), + metaEvents: (actingPid === "meta") + ? Immutable.List() + : Immutable.List.of(delta.computeAggregate(actingPid, oldRoutingTable, true).drop()) + }; +} + +function computeAffectedPids(routingTable, delta) { + var cover = Route._union(delta.added, delta.removed); + routingTable = Route.trieStep(routingTable, Route.SOA); + routingTable = Route.trieStep(routingTable, Patch.$Observe); + return Route.matchTrie(cover, routingTable, Immutable.Set(), + function (v, r, acc) { + return acc.union(Route.trieStep(r, Route.EOA).value); + }); +} + +Mux.prototype.routeMessage = function (body) { + if (Route.matchValue(this.routingTable, body) === null) { + return Route.matchValue(m.routingTable, Patch.observe(body)) || Immutable.Set(); + } else { + // Some other stream has declared body + return Immutable.Set(); + } +}; + +Mux.prototype.interestsOf = function (pid) { + return this.interestTable.get(pid, Route.emptyTrie); +}; + +/////////////////////////////////////////////////////////////////////////// + +module.exports.Mux = Mux; +module.exports.computePatches = computePatches; +module.exports.computeAffectedPids = computeAffectedPids; diff --git a/js/src/patch.js b/js/src/patch.js index 427eb76..c2bcae0 100644 --- a/js/src/patch.js +++ b/js/src/patch.js @@ -10,6 +10,7 @@ function Patch(added, removed) { } var emptyPatch = new Patch(Route.emptyTrie, Route.emptyTrie); +var removeEverythingPatch = new Patch(Route.emptyTrie, Route.compilePattern(true, __)); var $Observe = new Route.$Special("$Observe"); var $AtMeta = new Route.$Special("$AtMeta"); @@ -206,6 +207,7 @@ function prettyPatch(p) { module.exports.Patch = Patch; module.exports.emptyPatch = emptyPatch; +module.exports.removeEverythingPatch = removeEverythingPatch; module.exports.$Observe = $Observe; module.exports.$AtMeta = $AtMeta; diff --git a/js/test/test-mux.js b/js/test/test-mux.js new file mode 100644 index 0000000..dfdf244 --- /dev/null +++ b/js/test/test-mux.js @@ -0,0 +1,79 @@ +var expect = require('expect.js'); +var Immutable = require('immutable'); + +var Route = require('../src/route.js'); +var Patch = require('../src/patch.js'); +var Mux = require('../src/mux.js'); + +var __ = Route.__; +var _$ = Route._$; + +function checkPrettyTrie(m, expected) { + expect(Route.prettyTrie(m)).to.equal(expected.join('\n')); +} + +function checkPrettyPatch(p, expectedAdded, expectedRemoved) { + expect(Patch.prettyPatch(p)).to.equal( + ('<<<<<<<< Removed:\n' + expectedRemoved.join('\n') + + '======== Added:\n' + expectedAdded.join('\n') + + '>>>>>>>>\n')); +} + +describe('mux stream', function () { + function getM() { + var m = new Mux.Mux(); + expect(m.addStream(Patch.assert(1).andThen(Patch.assert(2))).pid).to.equal(0); + expect(m.addStream(Patch.assert(3).andThen(Patch.assert(2))).pid).to.equal(1); + return m; + } + + describe('addition', function () { + it('should union interests appropriately', function () { + var m = getM(); + checkPrettyTrie(m.routingTable, [' 1 >{[0]}', + ' 2 >{[0,1]}', + ' 3 >{[1]}']); + checkPrettyTrie(m.interestsOf(0), [' 1 >{[0]}', + ' 2 >{[0]}']); + checkPrettyTrie(m.interestsOf(1), [' 2 >{[1]}', + ' 3 >{[1]}']); + }); + }); + + describe('update', function () { + it('should update interests appropriately', function () { + var rawPatch = + Patch.assert(1) + .andThen(Patch.retract(2)) + .andThen(Patch.retract(3)) + .andThen(Patch.assert(4)) + .andThen(Patch.retract(99)); + checkPrettyPatch(rawPatch, + [' 1 >{true}', + ' 4 >{true}'], + [' 2 >{true}', + ' 3 >{true}', + ' 99 >{true}']); + + var m = getM(); + var updateStreamResult = m.updateStream(1, rawPatch); + expect(updateStreamResult.pid).to.equal(1); + checkPrettyPatch(updateStreamResult.delta, + [' 1 >{[1]}', + ' 4 >{[1]}'], + [' 2 >{[1]}', + ' 3 >{[1]}']); + checkPrettyTrie(m.routingTable, [' 1 >{[0,1]}', + ' 2 >{[0]}', + ' 4 >{[1]}']); + checkPrettyTrie(m.interestsOf(0), [' 1 >{[0]}', + ' 2 >{[0]}']); + checkPrettyTrie(m.interestsOf(1), [' 1 >{[1]}', + ' 4 >{[1]}']); + checkPrettyPatch(updateStreamResult.deltaAggregate, + [' 4 >{[1]}'], + [' 3 >{[1]}']); + }); + }); +}); +