From 864c54f53b697c4afdb11ca2d5afae745073644b Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 20 Jun 2019 23:09:55 +0100 Subject: [PATCH] Improved LAN tree construction --- packages/server/src/disco.js | 116 ++++++++++++++++++++++++++--------- 1 file changed, 86 insertions(+), 30 deletions(-) diff --git a/packages/server/src/disco.js b/packages/server/src/disco.js index b8a2d4d..21998c9 100644 --- a/packages/server/src/disco.js +++ b/packages/server/src/disco.js @@ -24,6 +24,7 @@ const M = activate require("@syndicate-lang/driver-mdns"); const P = activate require("./internal_protocol"); const S = activate require("@syndicate-lang/driver-streams-node"); const Federation = activate require("./federation"); +const { TimeLaterThan } = activate require("@syndicate-lang/driver-timer"); import { Set, Map, @@ -115,20 +116,24 @@ spawn named 'uplinkSelection' { field this.gatewayIp = null; on asserted M.DefaultGateway(_, $gatewayIp) this.gatewayIp = gatewayIp; + function orderByNodeId(peers) { + return peers.toList().sortBy(Peer._nodeId); + } + during OverlayNode($localId) { during Federation.ManagementScope($managementScope) { during P.Envelope(managementScope, Overlay($overlayId, $rootAddr)) { - // For each overlay: - // - // Collect all peers. - // Partition them into two sets: those on our actual gateway, and those not. - // For each set, pick the best element, measured by smallness of nodeId. - // If there's a best gateway peer, choose that. - // Otherwise, if there's a best non-gateway peer, choose that. - // - // Now, if we have chosen a peer, and that peer is not ourselves, use it; - // Otherwise, fall back to a direct connection to the root. + // We constantly maintain a notion of the best uplink to establish. + // Simultaneously, we try to maintain a stable connection to some upstream peer. + + // To figure out the best uplink: + // Collect all peers into two sets: + // 1. Those on our current gateway IP whose _nodeId is not equal to ours. + // 2. Those not on the gateway IP whose _nodeId is strictly less than ours. + // If there are any nodes in set 1, select the node with the smallest _nodeId. + // Otherwise, if there are any nodes in set 2, sort by _nodeId and select the middle one. + // Otherwise, select the root. field this.peers = Set(); on asserted $p(Peer(overlayId,_,_,_)) this.peers = this.peers.add(p); @@ -137,19 +142,21 @@ spawn named 'uplinkSelection' { field this.bestAddr = null; field this.bestPeer = null; dataflow { + const gwPeers = orderByNodeId(this.peers.filter( + (p) => (Peer._ip(p) === this.gatewayIp) && (Peer._nodeId(p) !== localId))); + const others = orderByNodeId(this.peers.filter( + (p) => (Peer._ip(p) !== this.gatewayIp) && (Peer._nodeId(p) < localId))); + let best = null; - const better = (a) => { - if (!best) return true; - if (Peer._ip(a) === this.gatewayIp) { - if (Peer._ip(best) !== this.gatewayIp) return true; - return (Peer._nodeId(a) < Peer._nodeId(best)); - } else { - if (Peer._ip(best) === this.gatewayIp) return false; - return (Peer._nodeId(a) < Peer._nodeId(best)); - } - }; - this.peers.forEach((p) => { if (better(p)) best = p; }); - if (best && (Peer._nodeId(best) !== localId)) { + if (!gwPeers.isEmpty()) { + best = gwPeers.first(); + } else if (!others.isEmpty()) { + best = others.get(others.size >> 1); + } else { + // Use the root + } + + if (best) { this.bestAddr = Peer._addr(best); this.bestPeer = OverlayNode(Peer._nodeId(best)); } else { @@ -159,17 +166,66 @@ spawn named 'uplinkSelection' { } dataflow if (this.bestAddr) { - debug('Selected uplink peer for overlay', overlayId, 'is', this.bestPeer.toString(), 'at', this.bestAddr.toString()); + debug('Current best uplink peer for overlay', overlayId, + 'is', this.bestPeer.toString(), + 'at', this.bestAddr.toString()); } - assert P.Proposal(managementScope, Federation.Uplink(overlayId, this.bestAddr, overlayId)) - when (this.bestAddr); + //--------------------------------------------------------------------------- - const loopbackAddr = C.Loopback(overlayId); - during C.ServerConnected(loopbackAddr) { - assert C.ToServer(loopbackAddr, OverlayLink(OverlayNode(localId), this.bestPeer)) - when (this.bestAddr); - } + const futureTime = (deltaMs) => { + return (+(new Date())) + deltaMs; + }; + + const assertSelectedUplink = (link) => { + assert P.Proposal(managementScope, link); + }; + + const START = () => { + react { + // Wait for stability: + const timeout = futureTime(5000); + stop on asserted TimeLaterThan(timeout) CONNECT(null); + } + }; + + const CONNECT = (prevBestPeer) => { + // We've settled on something to try. + const peer = this.bestPeer; + const addr = this.bestAddr; + const link = Federation.Uplink(overlayId, addr, overlayId); + if (!prevBestPeer || !prevBestPeer.equals(peer)) { + debug('Selecting uplink', peer.toString(), 'for overlay', overlayId); + } + + react { + assertSelectedUplink(link); + const timeout = futureTime(15000); + stop on asserted TimeLaterThan(timeout) CONNECT(peer); + stop on retracted peer CONNECT(null); + stop on asserted P.Envelope(managementScope, Federation.UplinkConnected(link)) { + MAINTAIN(peer, link); + } + } + }; + + const MAINTAIN = (peer, link) => { + // We're connected. + react { + assertSelectedUplink(link); + assert C.ToServer(C.Loopback(overlayId), OverlayLink(OverlayNode(localId), peer)); + stop on retracted P.Envelope(managementScope, Federation.UplinkConnected(link)) { + CONNECT(peer); + } + if (OverlayRoot.isClassOf(peer)) { + // See if something better (more local) comes along every now and then. + const timeout = futureTime(5000); + stop on asserted TimeLaterThan(timeout) CONNECT(peer); + } + } + }; + + on start START(); } } }