diff --git a/packages/server/bin/syndicate-tree-disco.js b/packages/server/bin/syndicate-tree-disco.js deleted file mode 100755 index 4f35521..0000000 --- a/packages/server/bin/syndicate-tree-disco.js +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env node -require("@syndicate-lang/core").bootModule(require('../lib/disco.js')); diff --git a/packages/server/package.json b/packages/server/package.json index 9dbf092..1f50844 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -25,7 +25,6 @@ }, "main": "lib/index.js", "bin": { - "syndicate-server": "./bin/syndicate-server.js", - "syndicate-tree-disco": "./bin/syndicate-tree-disco.js" + "syndicate-server": "./bin/syndicate-server.js" } } diff --git a/packages/server/src/disco.js b/packages/server/src/disco.js index 9f0bff1..eaca2b6 100644 --- a/packages/server/src/disco.js +++ b/packages/server/src/disco.js @@ -1,59 +1,158 @@ "use strict"; +assertion type AvailableTransport(spec); +assertion type WebSocketTransport(port, path); +// S.TcpListener specifies TCP transport +// S.UnixSocketServer specifies Unix socket transport + +assertion type Overlay(id, rootAddr); +assertion type OverlayNode(id); +assertion type OverlayRoot(); +assertion type OverlayLink(downNode, upNode); + +assertion type Peer(overlayId, nodeId, ip, addr); + +Object.assign(module.exports, { + AvailableTransport, + WebSocketTransport, + Overlay, OverlayNode, OverlayRoot, OverlayLink, + Peer, +}); + const C = activate require("./client"); const M = activate require("@syndicate-lang/driver-mdns"); const P = activate require("./internal_protocol"); const S = activate require("@syndicate-lang/driver-streams-node"); const Federation = activate require("./federation"); -const D = activate require("./disco_protocol"); import { + Set, Map, RandomID, } from "@syndicate-lang/core"; const fs = require('fs'); -let currentManagementScope = 'local'; +spawn named 'peerAdvertisement' { + const localId = RandomID.randomId(8, false); + assert OverlayNode(localId); -const localId = RandomID.randomId(8, false); -const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server -const gatewayId = dataspaceId + ':' + localId; + during Federation.ManagementScope($managementScope) { + during P.Envelope(managementScope, Overlay($overlayId, _)) { + const gatewayId = overlayId + ':' + localId; -const serverAddr = C.WSServer('ws://localhost:8000/', 'local'); - -spawn named 'advertise_server' { - during C.ServerConnected(serverAddr) { - during C.FromServer(serverAddr, D.AvailableTransport(D.WebSocketTransport($port, $path))) { - assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, ["path="+path]); + during P.Envelope(managementScope, AvailableTransport(WebSocketTransport($port, $path))) { + assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, [ + "path="+path, + "scope="+managementScope + ]); + } + // Other variants for later: // assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, []); - // assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, ["path=/"]); } } } -spawn named 'peerDiscovery' { - console.info('Peer discovery running'); - // during M.DefaultGateway($gwif, _) { - // on start console.log('GW+', gwif); - // on stop console.log('GW-', gwif); - during M.Discovered( - M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", $gwif) - { - const [dsId, peerId] = name.split(':'); - - let tier = null; - txt.forEach((t) => { - t.split(' ').forEach((kv) => { - const [k, v] = kv.split('='); - if (k === 'tier') { - tier = Number.parseInt(v); - } - }); - }); - - on start console.log('+ws', gwif, tier, name, host, port, addr); - on stop console.log('-ws', gwif, tier, name, host, port, addr); - } - // } +function txtsToMap(txts) { + let m = Map(); + txts.forEach((t) => { + t.split(' ').forEach((kv) => { + const [k, v] = kv.split('='); + m = m.set(k, v); + }); + }); + return m; +} + +spawn named 'peerDiscovery' { + during M.DefaultGateway($gatewayInterface, $gatewayIp) { + on start console.log('Gateway IP is', gatewayIp, 'on interface', gatewayInterface); + + during M.Discovered(M.Service($name, '_syndicate+ws._tcp'), + _, // hostname + $port, + $txts, + $addr, + "IPv4", + gatewayInterface) + { + const [overlayId, nodeId] = name.split(':'); + let params = txtsToMap(txts); + assert Peer(overlayId, + nodeId, + addr, + C.WSServer('ws://' + addr + ':' + port + params.get('path', '/'), + params.get('scope', 'local'))); + } + } +} + +spawn named 'helpful info output' { + console.info('Peer discovery running'); + during Peer($overlayId, $nodeId, $ip, $addr) { + on start console.info("+PEER", ip, overlayId, nodeId, addr.toString()); + on stop console.info("-PEER", ip, overlayId, nodeId, addr.toString()); + } +} + +spawn named 'uplinkSelection' { + field this.gatewayIp = null; + on asserted M.DefaultGateway(_, $gatewayIp) this.gatewayIp = gatewayIp; + + during OverlayNode($localId) { + during Federation.ManagementScope($managementScope) { + during P.Envelope(managementScope, Overlay($overlayId, $rootAddr)) { + + // For each overlay: + // + // Collect all peers. + // Partition them into two sets: those on our actual gateway, and those not. + // For each set, pick the best element, measured by smallness of nodeId. + // If there's a best gateway peer, choose that. + // Otherwise, if there's a best non-gateway peer, choose that. + // + // Now, if we have chosen a peer, and that peer is not ourselves, use it; + // Otherwise, fall back to a direct connection to the root. + + field this.peers = Set(); + on asserted $p(Peer(overlayId,_,_,_)) this.peers = this.peers.add(p); + on retracted $p(Peer(overlayId,_,_,_)) this.peers = this.peers.remove(p); + + field this.bestAddr = null; + field this.bestPeer = null; + dataflow { + let best = null; + const better = (a) => { + if (!best) return true; + if ((a.get(2) === this.gatewayIp)) { + if (best.get(2) !== this.gatewayIp) return true; + return (a.get(1) < best.get(1)); + } else { + if (best.get(2) === this.gatewayIp) return false; + return (a.get(1) < best.get(1)); + } + }; + this.peers.forEach((p) => { if (better(p)) best = p; }); + if (best && (best.get(1) !== localId)) { + this.bestAddr = best.get(3); + this.bestPeer = best; + } else { + this.bestAddr = rootAddr; + this.bestPeer = null; + } + } + + dataflow if (this.bestAddr) { + console.log('Selected uplink for overlay', overlayId, 'is', this.bestAddr.toString()); + } + + assert P.Proposal(managementScope, Federation.Uplink(overlayId, this.bestAddr, overlayId)) + when (this.bestAddr); + + assert P.Proposal(overlayId, OverlayLink(OverlayNode(localId), + this.bestPeer || OverlayRoot())) + when (this.bestAddr); + } + } + } } diff --git a/packages/server/src/disco_protocol.js b/packages/server/src/disco_protocol.js deleted file mode 100644 index 0163e5a..0000000 --- a/packages/server/src/disco_protocol.js +++ /dev/null @@ -1,11 +0,0 @@ -"use strict"; - -assertion type AvailableTransport(spec); -assertion type WebSocketTransport(port, path); -// S.TcpListener specifies TCP transport -// S.UnixSocketServer specifies Unix socket transport - -Object.assign(module.exports, { - AvailableTransport, - WebSocketTransport, -}); diff --git a/packages/server/src/index.js b/packages/server/src/index.js index f162bca..637db89 100644 --- a/packages/server/src/index.js +++ b/packages/server/src/index.js @@ -8,7 +8,7 @@ const Http = activate require("@syndicate-lang/driver-http-node"); const S = activate require("@syndicate-lang/driver-streams-node"); const C = activate require("./client"); const P = activate require("./internal_protocol"); -const D = activate require("./disco_protocol"); +const D = activate require("./disco"); const Server = activate require("./server"); const Federation = activate require("./federation"); const fs = require('fs'); @@ -31,29 +31,48 @@ function usage() { console.info(' --uplink LOCALSCOPE WEBSOCKETURL REMOTESCOPE'); console.info(' Establish a federation uplink from the named local'); console.info(' scope to the remote scope within the server at the URL'); + console.info(''); + console.info(' --overlay OVERLAYID WEBSOCKETURL'); + console.info(' Participate in a self-assembling overlay with the'); + console.info(' given ID and root node server URL'); } const uplinks = []; +const overlays = []; function process_command_line(args) { - const strArg = () => args.shift(); - const numArg = () => Number.parseInt(args.shift()); + const notUndefined = (x, w) => { + if (x === void 0) { + console.error('Missing '+w+' argument on command line'); + usage(); + process.exit(1); + } + return x; + }; + const strArg = (w) => notUndefined(args.shift(), w); + const numArg = (w) => Number.parseInt(notUndefined(args.shift(), w)); while (args.length) { const opt = args.shift(); switch (opt) { - case "--tcp": spawnTcpServer(numArg()); break; - case "--http": spawnWebSocketServer(numArg()); break; - case "--unix": spawnUnixSocketServer(strArg()); break; - case "--monitor": spawnMonitorAppServer(numArg()); break; - case "--management": currentManagementScope = strArg(); break; + case "--tcp": spawnTcpServer(numArg('TCP port')); break; + case "--http": spawnWebSocketServer(numArg('HTTP port')); break; + case "--unix": spawnUnixSocketServer(strArg('Unix socket path')); break; + case "--monitor": spawnMonitorAppServer(numArg('monitor HTTP port')); break; + case "--management": currentManagementScope = strArg('management scope'); break; case "--uplink": { - const localScope = strArg(); - const target = strArg(); - const remoteScope = strArg(); + const localScope = strArg('local scope'); + const target = strArg('remote WebSocket URL'); + const remoteScope = strArg('remote scope'); uplinks.push(Federation.Uplink(localScope, C.WSServer(target, currentManagementScope), remoteScope)); break; } + case "--overlay": { + const overlayId = strArg('overlay id'); + const rootUrl = strArg('overlay root WebSocket URL'); + overlays.push(D.Overlay(overlayId, C.WSServer(rootUrl, currentManagementScope))); + break; + } default: console.error("Unsupported command-line argument: " + opt); /* FALL THROUGH */ @@ -72,6 +91,9 @@ spawn named 'server' { uplinks.forEach((link) => { assert P.Proposal(currentManagementScope, link); }); + overlays.forEach((o) => { + assert P.Proposal(currentManagementScope, o); + }); } spawn named 'helpful info output' { @@ -80,6 +102,7 @@ spawn named 'helpful info output' { spawn named 'federationRoutingInfo' { during Federation.ManagementScope($managementScope) { + // assert P.Proposal(managementScope, Federation.ManagementScope(managementScope)); during $t(D.AvailableTransport(_)) assert P.Proposal(managementScope, t); } }