diff --git a/js/src/mux.js b/js/src/mux.js index fd0e52a..70c274c 100644 --- a/js/src/mux.js +++ b/js/src/mux.js @@ -44,27 +44,43 @@ Mux.prototype.updateStream = function (pid, unclampedPatch) { deltaAggregate: deltaAggregate }; }; +var atMetaEverything = Route.compilePattern(true, Patch.atMeta(Route.__)); +var atMetaBranchKeys = Immutable.List([Route.SOA, Patch.$AtMeta]); +var onlyMeta = Immutable.Set.of("meta"); + +function echoCancelledTrie(t) { + return Route.subtract(t, atMetaEverything, function (v1, v2) { + return v1.has("meta") ? onlyMeta : null; + }); +} + function computeEvents(oldMux, newMux, updateStreamResult) { var actingPid = updateStreamResult.pid; var delta = updateStreamResult.delta; var deltaAggregate = updateStreamResult.deltaAggregate; + var deltaAggregateNoEcho = (actingPid === "meta") + ? delta // because echo-cancellation means that meta-SCNs are always new information + : new Patch.Patch(Route.triePruneBranch(deltaAggregate.added, atMetaBranchKeys), + Route.triePruneBranch(deltaAggregate.removed, atMetaBranchKeys)); var oldRoutingTable = oldMux.routingTable; var newRoutingTable = newMux.routingTable; - var affectedPids = computeAffectedPids(oldRoutingTable, delta).add(actingPid).remove("meta"); + var affectedPids = + computeAffectedPids(oldRoutingTable, deltaAggregateNoEcho).add(actingPid).remove("meta"); return { eventMap: Immutable.Map().withMutations(function (result) { affectedPids.forEach(function (pid) { var patchForPid; if (pid === actingPid) { - var part1 = new Patch.Patch(Patch.biasedIntersection(newRoutingTable, delta.added), - Patch.biasedIntersection(oldRoutingTable, delta.removed)); - var part2 = new Patch.Patch(Patch.biasedIntersection(deltaAggregate.added, + var part1 = new Patch.Patch( + echoCancelledTrie(Patch.biasedIntersection(newRoutingTable, delta.added)), + echoCancelledTrie(Patch.biasedIntersection(oldRoutingTable, delta.removed))); + var part2 = new Patch.Patch(Patch.biasedIntersection(deltaAggregateNoEcho.added, newMux.interestsOf(pid)), - Patch.biasedIntersection(deltaAggregate.removed, + Patch.biasedIntersection(deltaAggregateNoEcho.removed, oldMux.interestsOf(pid))); patchForPid = part1.unsafeUnion(part2); } else { - patchForPid = updateStreamResult.deltaAggregate.viewFrom(oldMux.interestsOf(pid)); + patchForPid = deltaAggregateNoEcho.viewFrom(oldMux.interestsOf(pid)); } if (patchForPid.isNonEmpty()) { result.set(pid, patchForPid); diff --git a/js/test/test-syndicate.js b/js/test/test-syndicate.js index b71f037..823a6b1 100644 --- a/js/test/test-syndicate.js +++ b/js/test/test-syndicate.js @@ -1,83 +1,135 @@ "use strict"; var expect = require('expect.js'); +var Immutable = require('immutable'); + var Syndicate = require('../src/main.js'); - var Network = Syndicate.Network; +var Patch = Syndicate.Patch; -// var sub = Syndicate.sub; -// var pub = Syndicate.pub; -// var __ = Syndicate.__; -// var _$ = Syndicate._$; +var __ = Syndicate.__; +var _$ = Syndicate._$; -// function configurationTrace(bootConfiguration) { -// var eventLog = []; -// function trace(item) { -// eventLog.push(item); -// } +function configurationTrace(bootConfiguration) { + var eventLog = []; + function trace(item) { + eventLog.push(item); + } -// var G = new Syndicate.Ground(function () { -// bootConfiguration(trace); -// }); + var G = new Syndicate.Ground(function () { + bootConfiguration(trace); + }); -// while (G.step()) { -// // do nothing until G becomes inert -// } + while (G.step()) { + // do nothing until G becomes inert + } -// return eventLog; -// } + return eventLog; +} -// function checkTrace(bootConfiguration, expected) { -// expect(configurationTrace(bootConfiguration)).to.eql(expected); -// } +function traceEvent(trace) { + return function(item) { + trace((item.type === "stateChange") ? item.patch.pretty() : item); + } +} -// describe("configurationTrace", function() { -// describe("with an inert configuration", function () { -// it("should yield an empty trace", function () { -// checkTrace(function (trace) {}, []); -// }); -// }); +function checkTrace(bootConfiguration, expected) { + expect(configurationTrace(bootConfiguration)).to.eql(expected); +} -// describe("with a single trace in an inert configuration", function () { -// it("should yield that trace", function () { -// checkTrace(function (trace) { trace(1) }, [1]); -// }); -// }); +describe("configurationTrace", function() { + describe("with an inert configuration", function () { + it("should yield an empty trace", function () { + checkTrace(function (trace) {}, []); + }); + }); -// describe("with some traced communication", function () { -// it("should yield an appropriate trace", function () { -// checkTrace(function (trace) { -// Network.spawn({ -// boot: function () { return [sub(__)] }, -// handleEvent: function (e) { -// trace(e); -// } -// }); -// Network.send(123); -// Network.send(234); -// }, [Syndicate.updateRoutes([]), -// Syndicate.sendMessage(123), -// Syndicate.sendMessage(234)]); -// }); -// }); -// }); + describe("with a single trace in an inert configuration", function () { + it("should yield that trace", function () { + checkTrace(function (trace) { trace(1) }, [1]); + }); + }); -// describe("nonempty initial routes", function () { -// it("should be immediately signalled to the process", function () { -// // Specifically, no Syndicate.updateRoutes([]) first. -// checkTrace(function (trace) { -// Network.spawn({ -// boot: function () { return [pub(["A", __])] }, -// handleEvent: function (e) { -// Network.spawn({ -// boot: function () { return [sub(["A", __], 0, 1)] }, -// handleEvent: trace -// }); -// } -// }); -// }, [Syndicate.updateRoutes([pub(["A", __]).label(1)])]); -// }); -// }); + describe("with some traced communication", function () { + it("should yield an appropriate trace", function () { + checkTrace(function (trace) { + Network.spawn({ + boot: function () { return Syndicate.sub(__); }, + handleEvent: traceEvent(trace) + }); + Network.send(123); + Network.send(234); + }, ['<<<<<<<< Removed:\n'+ + '::: nothing\n'+ + '======== Added:\n'+ + ' < $Observe ★ > >{[0]}\n' + +' >::: nothing\n'+ + '>>>>>>>>', + Syndicate.message(123), + Syndicate.message(234)]); + }); + }); +}); + +describe("nonempty initial routes", function () { + it("should be immediately signalled to the process", function () { + // Specifically, no Syndicate.updateRoutes([]) first. + checkTrace(function (trace) { + Network.spawn({ + boot: function () { return Patch.assert(["A", __]); }, + handleEvent: function (e) {} + }); + Network.spawn({ + boot: function () { return Patch.sub(["A", __]); }, + handleEvent: traceEvent(trace) + }); + }, ['<<<<<<<< Removed:\n'+ + '::: nothing\n'+ + '======== Added:\n'+ + ' < "A" ★ > >{[1]}\n'+ + '>>>>>>>>']); + }); +}); + +describe("nested actor with an echoey protocol", function () { + it("shouldn't see an echoed assertion", function () { + checkTrace(function (trace) { + Network.spawn(new Network(function () { + Network.spawn({ + boot: function () { + Network.stateChange(Patch.retract("X", 1)); // happens after subs on next line! + return Patch.sub("X", 1).andThen(Patch.assert("X", 1)); + }, + handleEvent: traceEvent(trace) + }); + })); + }, ['<<<<<<<< Removed:\n'+ + '::: nothing\n'+ + '======== Added:\n'+ + ' < $AtMeta "X" > >{[0]}\n'+ + '>>>>>>>>', + '<<<<<<<< Removed:\n'+ + ' < $AtMeta "X" > >{[0]}\n'+ + '======== Added:\n'+ + '::: nothing\n'+ + '>>>>>>>>']); + }) + it("shouldn't see an echoed message", function () { + checkTrace(function (trace) { + Network.spawn(new Network(function () { + Network.spawn({ + boot: function () { + Network.send("X", 1); // happens after subs on next line! + return Patch.sub("X", 1); + }, + handleEvent: traceEvent(trace) + }); + })); + }, [Syndicate.message(Patch.atMeta("X"))]); + }); + it("shouldn't see an echoed assertion", function () { + }); +}); // describe("actor with nonempty initial routes", function () { // it("shouldn't see initial empty conversational context", function () {