Improved LAN tree construction
This commit is contained in:
parent
286bf7ecc1
commit
864c54f53b
|
@ -24,6 +24,7 @@ const M = activate require("@syndicate-lang/driver-mdns");
|
|||
const P = activate require("./internal_protocol");
|
||||
const S = activate require("@syndicate-lang/driver-streams-node");
|
||||
const Federation = activate require("./federation");
|
||||
const { TimeLaterThan } = activate require("@syndicate-lang/driver-timer");
|
||||
|
||||
import {
|
||||
Set, Map,
|
||||
|
@ -115,20 +116,24 @@ spawn named 'uplinkSelection' {
|
|||
field this.gatewayIp = null;
|
||||
on asserted M.DefaultGateway(_, $gatewayIp) this.gatewayIp = gatewayIp;
|
||||
|
||||
function orderByNodeId(peers) {
|
||||
return peers.toList().sortBy(Peer._nodeId);
|
||||
}
|
||||
|
||||
during OverlayNode($localId) {
|
||||
during Federation.ManagementScope($managementScope) {
|
||||
during P.Envelope(managementScope, Overlay($overlayId, $rootAddr)) {
|
||||
|
||||
// For each overlay:
|
||||
//
|
||||
// Collect all peers.
|
||||
// Partition them into two sets: those on our actual gateway, and those not.
|
||||
// For each set, pick the best element, measured by smallness of nodeId.
|
||||
// If there's a best gateway peer, choose that.
|
||||
// Otherwise, if there's a best non-gateway peer, choose that.
|
||||
//
|
||||
// Now, if we have chosen a peer, and that peer is not ourselves, use it;
|
||||
// Otherwise, fall back to a direct connection to the root.
|
||||
// We constantly maintain a notion of the best uplink to establish.
|
||||
// Simultaneously, we try to maintain a stable connection to some upstream peer.
|
||||
|
||||
// To figure out the best uplink:
|
||||
// Collect all peers into two sets:
|
||||
// 1. Those on our current gateway IP whose _nodeId is not equal to ours.
|
||||
// 2. Those not on the gateway IP whose _nodeId is strictly less than ours.
|
||||
// If there are any nodes in set 1, select the node with the smallest _nodeId.
|
||||
// Otherwise, if there are any nodes in set 2, sort by _nodeId and select the middle one.
|
||||
// Otherwise, select the root.
|
||||
|
||||
field this.peers = Set();
|
||||
on asserted $p(Peer(overlayId,_,_,_)) this.peers = this.peers.add(p);
|
||||
|
@ -137,19 +142,21 @@ spawn named 'uplinkSelection' {
|
|||
field this.bestAddr = null;
|
||||
field this.bestPeer = null;
|
||||
dataflow {
|
||||
const gwPeers = orderByNodeId(this.peers.filter(
|
||||
(p) => (Peer._ip(p) === this.gatewayIp) && (Peer._nodeId(p) !== localId)));
|
||||
const others = orderByNodeId(this.peers.filter(
|
||||
(p) => (Peer._ip(p) !== this.gatewayIp) && (Peer._nodeId(p) < localId)));
|
||||
|
||||
let best = null;
|
||||
const better = (a) => {
|
||||
if (!best) return true;
|
||||
if (Peer._ip(a) === this.gatewayIp) {
|
||||
if (Peer._ip(best) !== this.gatewayIp) return true;
|
||||
return (Peer._nodeId(a) < Peer._nodeId(best));
|
||||
} else {
|
||||
if (Peer._ip(best) === this.gatewayIp) return false;
|
||||
return (Peer._nodeId(a) < Peer._nodeId(best));
|
||||
}
|
||||
};
|
||||
this.peers.forEach((p) => { if (better(p)) best = p; });
|
||||
if (best && (Peer._nodeId(best) !== localId)) {
|
||||
if (!gwPeers.isEmpty()) {
|
||||
best = gwPeers.first();
|
||||
} else if (!others.isEmpty()) {
|
||||
best = others.get(others.size >> 1);
|
||||
} else {
|
||||
// Use the root
|
||||
}
|
||||
|
||||
if (best) {
|
||||
this.bestAddr = Peer._addr(best);
|
||||
this.bestPeer = OverlayNode(Peer._nodeId(best));
|
||||
} else {
|
||||
|
@ -159,17 +166,66 @@ spawn named 'uplinkSelection' {
|
|||
}
|
||||
|
||||
dataflow if (this.bestAddr) {
|
||||
debug('Selected uplink peer for overlay', overlayId, 'is', this.bestPeer.toString(), 'at', this.bestAddr.toString());
|
||||
debug('Current best uplink peer for overlay', overlayId,
|
||||
'is', this.bestPeer.toString(),
|
||||
'at', this.bestAddr.toString());
|
||||
}
|
||||
|
||||
assert P.Proposal(managementScope, Federation.Uplink(overlayId, this.bestAddr, overlayId))
|
||||
when (this.bestAddr);
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
const loopbackAddr = C.Loopback(overlayId);
|
||||
during C.ServerConnected(loopbackAddr) {
|
||||
assert C.ToServer(loopbackAddr, OverlayLink(OverlayNode(localId), this.bestPeer))
|
||||
when (this.bestAddr);
|
||||
}
|
||||
const futureTime = (deltaMs) => {
|
||||
return (+(new Date())) + deltaMs;
|
||||
};
|
||||
|
||||
const assertSelectedUplink = (link) => {
|
||||
assert P.Proposal(managementScope, link);
|
||||
};
|
||||
|
||||
const START = () => {
|
||||
react {
|
||||
// Wait for stability:
|
||||
const timeout = futureTime(5000);
|
||||
stop on asserted TimeLaterThan(timeout) CONNECT(null);
|
||||
}
|
||||
};
|
||||
|
||||
const CONNECT = (prevBestPeer) => {
|
||||
// We've settled on something to try.
|
||||
const peer = this.bestPeer;
|
||||
const addr = this.bestAddr;
|
||||
const link = Federation.Uplink(overlayId, addr, overlayId);
|
||||
if (!prevBestPeer || !prevBestPeer.equals(peer)) {
|
||||
debug('Selecting uplink', peer.toString(), 'for overlay', overlayId);
|
||||
}
|
||||
|
||||
react {
|
||||
assertSelectedUplink(link);
|
||||
const timeout = futureTime(15000);
|
||||
stop on asserted TimeLaterThan(timeout) CONNECT(peer);
|
||||
stop on retracted peer CONNECT(null);
|
||||
stop on asserted P.Envelope(managementScope, Federation.UplinkConnected(link)) {
|
||||
MAINTAIN(peer, link);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const MAINTAIN = (peer, link) => {
|
||||
// We're connected.
|
||||
react {
|
||||
assertSelectedUplink(link);
|
||||
assert C.ToServer(C.Loopback(overlayId), OverlayLink(OverlayNode(localId), peer));
|
||||
stop on retracted P.Envelope(managementScope, Federation.UplinkConnected(link)) {
|
||||
CONNECT(peer);
|
||||
}
|
||||
if (OverlayRoot.isClassOf(peer)) {
|
||||
// See if something better (more local) comes along every now and then.
|
||||
const timeout = futureTime(5000);
|
||||
stop on asserted TimeLaterThan(timeout) CONNECT(peer);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
on start START();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue