Echo-cancel assertions, following syndicate/racket.

This commit is contained in:
Tony Garnock-Jones 2016-02-11 23:56:42 -05:00
parent e9b431c50f
commit 935fb98a1f
2 changed files with 139 additions and 71 deletions

View File

@ -44,27 +44,43 @@ Mux.prototype.updateStream = function (pid, unclampedPatch) {
deltaAggregate: deltaAggregate };
};
var atMetaEverything = Route.compilePattern(true, Patch.atMeta(Route.__));
var atMetaBranchKeys = Immutable.List([Route.SOA, Patch.$AtMeta]);
var onlyMeta = Immutable.Set.of("meta");
function echoCancelledTrie(t) {
return Route.subtract(t, atMetaEverything, function (v1, v2) {
return v1.has("meta") ? onlyMeta : null;
});
}
function computeEvents(oldMux, newMux, updateStreamResult) {
var actingPid = updateStreamResult.pid;
var delta = updateStreamResult.delta;
var deltaAggregate = updateStreamResult.deltaAggregate;
var deltaAggregateNoEcho = (actingPid === "meta")
? delta // because echo-cancellation means that meta-SCNs are always new information
: new Patch.Patch(Route.triePruneBranch(deltaAggregate.added, atMetaBranchKeys),
Route.triePruneBranch(deltaAggregate.removed, atMetaBranchKeys));
var oldRoutingTable = oldMux.routingTable;
var newRoutingTable = newMux.routingTable;
var affectedPids = computeAffectedPids(oldRoutingTable, delta).add(actingPid).remove("meta");
var affectedPids =
computeAffectedPids(oldRoutingTable, deltaAggregateNoEcho).add(actingPid).remove("meta");
return {
eventMap: Immutable.Map().withMutations(function (result) {
affectedPids.forEach(function (pid) {
var patchForPid;
if (pid === actingPid) {
var part1 = new Patch.Patch(Patch.biasedIntersection(newRoutingTable, delta.added),
Patch.biasedIntersection(oldRoutingTable, delta.removed));
var part2 = new Patch.Patch(Patch.biasedIntersection(deltaAggregate.added,
var part1 = new Patch.Patch(
echoCancelledTrie(Patch.biasedIntersection(newRoutingTable, delta.added)),
echoCancelledTrie(Patch.biasedIntersection(oldRoutingTable, delta.removed)));
var part2 = new Patch.Patch(Patch.biasedIntersection(deltaAggregateNoEcho.added,
newMux.interestsOf(pid)),
Patch.biasedIntersection(deltaAggregate.removed,
Patch.biasedIntersection(deltaAggregateNoEcho.removed,
oldMux.interestsOf(pid)));
patchForPid = part1.unsafeUnion(part2);
} else {
patchForPid = updateStreamResult.deltaAggregate.viewFrom(oldMux.interestsOf(pid));
patchForPid = deltaAggregateNoEcho.viewFrom(oldMux.interestsOf(pid));
}
if (patchForPid.isNonEmpty()) {
result.set(pid, patchForPid);

View File

@ -1,83 +1,135 @@
"use strict";
var expect = require('expect.js');
var Immutable = require('immutable');
var Syndicate = require('../src/main.js');
var Network = Syndicate.Network;
var Patch = Syndicate.Patch;
// var sub = Syndicate.sub;
// var pub = Syndicate.pub;
// var __ = Syndicate.__;
// var _$ = Syndicate._$;
var __ = Syndicate.__;
var _$ = Syndicate._$;
// function configurationTrace(bootConfiguration) {
// var eventLog = [];
// function trace(item) {
// eventLog.push(item);
// }
function configurationTrace(bootConfiguration) {
var eventLog = [];
function trace(item) {
eventLog.push(item);
}
// var G = new Syndicate.Ground(function () {
// bootConfiguration(trace);
// });
var G = new Syndicate.Ground(function () {
bootConfiguration(trace);
});
// while (G.step()) {
// // do nothing until G becomes inert
// }
while (G.step()) {
// do nothing until G becomes inert
}
// return eventLog;
// }
return eventLog;
}
// function checkTrace(bootConfiguration, expected) {
// expect(configurationTrace(bootConfiguration)).to.eql(expected);
// }
function traceEvent(trace) {
return function(item) {
trace((item.type === "stateChange") ? item.patch.pretty() : item);
}
}
// describe("configurationTrace", function() {
// describe("with an inert configuration", function () {
// it("should yield an empty trace", function () {
// checkTrace(function (trace) {}, []);
// });
// });
function checkTrace(bootConfiguration, expected) {
expect(configurationTrace(bootConfiguration)).to.eql(expected);
}
// describe("with a single trace in an inert configuration", function () {
// it("should yield that trace", function () {
// checkTrace(function (trace) { trace(1) }, [1]);
// });
// });
describe("configurationTrace", function() {
describe("with an inert configuration", function () {
it("should yield an empty trace", function () {
checkTrace(function (trace) {}, []);
});
});
// describe("with some traced communication", function () {
// it("should yield an appropriate trace", function () {
// checkTrace(function (trace) {
// Network.spawn({
// boot: function () { return [sub(__)] },
// handleEvent: function (e) {
// trace(e);
// }
// });
// Network.send(123);
// Network.send(234);
// }, [Syndicate.updateRoutes([]),
// Syndicate.sendMessage(123),
// Syndicate.sendMessage(234)]);
// });
// });
// });
describe("with a single trace in an inert configuration", function () {
it("should yield that trace", function () {
checkTrace(function (trace) { trace(1) }, [1]);
});
});
// describe("nonempty initial routes", function () {
// it("should be immediately signalled to the process", function () {
// // Specifically, no Syndicate.updateRoutes([]) first.
// checkTrace(function (trace) {
// Network.spawn({
// boot: function () { return [pub(["A", __])] },
// handleEvent: function (e) {
// Network.spawn({
// boot: function () { return [sub(["A", __], 0, 1)] },
// handleEvent: trace
// });
// }
// });
// }, [Syndicate.updateRoutes([pub(["A", __]).label(1)])]);
// });
// });
describe("with some traced communication", function () {
it("should yield an appropriate trace", function () {
checkTrace(function (trace) {
Network.spawn({
boot: function () { return Syndicate.sub(__); },
handleEvent: traceEvent(trace)
});
Network.send(123);
Network.send(234);
}, ['<<<<<<<< Removed:\n'+
'::: nothing\n'+
'======== Added:\n'+
' < $Observe ★ > >{[0]}\n'
+' >::: nothing\n'+
'>>>>>>>>',
Syndicate.message(123),
Syndicate.message(234)]);
});
});
});
describe("nonempty initial routes", function () {
it("should be immediately signalled to the process", function () {
// Specifically, no Syndicate.updateRoutes([]) first.
checkTrace(function (trace) {
Network.spawn({
boot: function () { return Patch.assert(["A", __]); },
handleEvent: function (e) {}
});
Network.spawn({
boot: function () { return Patch.sub(["A", __]); },
handleEvent: traceEvent(trace)
});
}, ['<<<<<<<< Removed:\n'+
'::: nothing\n'+
'======== Added:\n'+
' < "A" ★ > >{[1]}\n'+
'>>>>>>>>']);
});
});
describe("nested actor with an echoey protocol", function () {
it("shouldn't see an echoed assertion", function () {
checkTrace(function (trace) {
Network.spawn(new Network(function () {
Network.spawn({
boot: function () {
Network.stateChange(Patch.retract("X", 1)); // happens after subs on next line!
return Patch.sub("X", 1).andThen(Patch.assert("X", 1));
},
handleEvent: traceEvent(trace)
});
}));
}, ['<<<<<<<< Removed:\n'+
'::: nothing\n'+
'======== Added:\n'+
' < $AtMeta "X" > >{[0]}\n'+
'>>>>>>>>',
'<<<<<<<< Removed:\n'+
' < $AtMeta "X" > >{[0]}\n'+
'======== Added:\n'+
'::: nothing\n'+
'>>>>>>>>']);
})
it("shouldn't see an echoed message", function () {
checkTrace(function (trace) {
Network.spawn(new Network(function () {
Network.spawn({
boot: function () {
Network.send("X", 1); // happens after subs on next line!
return Patch.sub("X", 1);
},
handleEvent: traceEvent(trace)
});
}));
}, [Syndicate.message(Patch.atMeta("X"))]);
});
it("shouldn't see an echoed assertion", function () {
});
});
// describe("actor with nonempty initial routes", function () {
// it("shouldn't see initial empty conversational context", function () {