Self-assembling overlays
This commit is contained in:
parent
283bbf03e5
commit
39468f3557
|
@ -1,2 +0,0 @@
|
|||
#!/usr/bin/env node
|
||||
require("@syndicate-lang/core").bootModule(require('../lib/disco.js'));
|
|
@ -25,7 +25,6 @@
|
|||
},
|
||||
"main": "lib/index.js",
|
||||
"bin": {
|
||||
"syndicate-server": "./bin/syndicate-server.js",
|
||||
"syndicate-tree-disco": "./bin/syndicate-tree-disco.js"
|
||||
"syndicate-server": "./bin/syndicate-server.js"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,59 +1,158 @@
|
|||
"use strict";
|
||||
|
||||
assertion type AvailableTransport(spec);
|
||||
assertion type WebSocketTransport(port, path);
|
||||
// S.TcpListener specifies TCP transport
|
||||
// S.UnixSocketServer specifies Unix socket transport
|
||||
|
||||
assertion type Overlay(id, rootAddr);
|
||||
assertion type OverlayNode(id);
|
||||
assertion type OverlayRoot();
|
||||
assertion type OverlayLink(downNode, upNode);
|
||||
|
||||
assertion type Peer(overlayId, nodeId, ip, addr);
|
||||
|
||||
Object.assign(module.exports, {
|
||||
AvailableTransport,
|
||||
WebSocketTransport,
|
||||
Overlay, OverlayNode, OverlayRoot, OverlayLink,
|
||||
Peer,
|
||||
});
|
||||
|
||||
const C = activate require("./client");
|
||||
const M = activate require("@syndicate-lang/driver-mdns");
|
||||
const P = activate require("./internal_protocol");
|
||||
const S = activate require("@syndicate-lang/driver-streams-node");
|
||||
const Federation = activate require("./federation");
|
||||
const D = activate require("./disco_protocol");
|
||||
|
||||
import {
|
||||
Set, Map,
|
||||
RandomID,
|
||||
} from "@syndicate-lang/core";
|
||||
|
||||
const fs = require('fs');
|
||||
|
||||
let currentManagementScope = 'local';
|
||||
spawn named 'peerAdvertisement' {
|
||||
const localId = RandomID.randomId(8, false);
|
||||
assert OverlayNode(localId);
|
||||
|
||||
const localId = RandomID.randomId(8, false);
|
||||
const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server
|
||||
const gatewayId = dataspaceId + ':' + localId;
|
||||
during Federation.ManagementScope($managementScope) {
|
||||
during P.Envelope(managementScope, Overlay($overlayId, _)) {
|
||||
const gatewayId = overlayId + ':' + localId;
|
||||
|
||||
const serverAddr = C.WSServer('ws://localhost:8000/', 'local');
|
||||
|
||||
spawn named 'advertise_server' {
|
||||
during C.ServerConnected(serverAddr) {
|
||||
during C.FromServer(serverAddr, D.AvailableTransport(D.WebSocketTransport($port, $path))) {
|
||||
assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, ["path="+path]);
|
||||
during P.Envelope(managementScope, AvailableTransport(WebSocketTransport($port, $path))) {
|
||||
assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, [
|
||||
"path="+path,
|
||||
"scope="+managementScope
|
||||
]);
|
||||
}
|
||||
|
||||
// Other variants for later:
|
||||
// assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, []);
|
||||
// assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, ["path=/"]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
spawn named 'peerDiscovery' {
|
||||
console.info('Peer discovery running');
|
||||
// during M.DefaultGateway($gwif, _) {
|
||||
// on start console.log('GW+', gwif);
|
||||
// on stop console.log('GW-', gwif);
|
||||
during M.Discovered(
|
||||
M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", $gwif)
|
||||
{
|
||||
const [dsId, peerId] = name.split(':');
|
||||
|
||||
let tier = null;
|
||||
txt.forEach((t) => {
|
||||
t.split(' ').forEach((kv) => {
|
||||
const [k, v] = kv.split('=');
|
||||
if (k === 'tier') {
|
||||
tier = Number.parseInt(v);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
on start console.log('+ws', gwif, tier, name, host, port, addr);
|
||||
on stop console.log('-ws', gwif, tier, name, host, port, addr);
|
||||
}
|
||||
// }
|
||||
function txtsToMap(txts) {
|
||||
let m = Map();
|
||||
txts.forEach((t) => {
|
||||
t.split(' ').forEach((kv) => {
|
||||
const [k, v] = kv.split('=');
|
||||
m = m.set(k, v);
|
||||
});
|
||||
});
|
||||
return m;
|
||||
}
|
||||
|
||||
spawn named 'peerDiscovery' {
|
||||
during M.DefaultGateway($gatewayInterface, $gatewayIp) {
|
||||
on start console.log('Gateway IP is', gatewayIp, 'on interface', gatewayInterface);
|
||||
|
||||
during M.Discovered(M.Service($name, '_syndicate+ws._tcp'),
|
||||
_, // hostname
|
||||
$port,
|
||||
$txts,
|
||||
$addr,
|
||||
"IPv4",
|
||||
gatewayInterface)
|
||||
{
|
||||
const [overlayId, nodeId] = name.split(':');
|
||||
let params = txtsToMap(txts);
|
||||
assert Peer(overlayId,
|
||||
nodeId,
|
||||
addr,
|
||||
C.WSServer('ws://' + addr + ':' + port + params.get('path', '/'),
|
||||
params.get('scope', 'local')));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
spawn named 'helpful info output' {
|
||||
console.info('Peer discovery running');
|
||||
during Peer($overlayId, $nodeId, $ip, $addr) {
|
||||
on start console.info("+PEER", ip, overlayId, nodeId, addr.toString());
|
||||
on stop console.info("-PEER", ip, overlayId, nodeId, addr.toString());
|
||||
}
|
||||
}
|
||||
|
||||
spawn named 'uplinkSelection' {
|
||||
field this.gatewayIp = null;
|
||||
on asserted M.DefaultGateway(_, $gatewayIp) this.gatewayIp = gatewayIp;
|
||||
|
||||
during OverlayNode($localId) {
|
||||
during Federation.ManagementScope($managementScope) {
|
||||
during P.Envelope(managementScope, Overlay($overlayId, $rootAddr)) {
|
||||
|
||||
// For each overlay:
|
||||
//
|
||||
// Collect all peers.
|
||||
// Partition them into two sets: those on our actual gateway, and those not.
|
||||
// For each set, pick the best element, measured by smallness of nodeId.
|
||||
// If there's a best gateway peer, choose that.
|
||||
// Otherwise, if there's a best non-gateway peer, choose that.
|
||||
//
|
||||
// Now, if we have chosen a peer, and that peer is not ourselves, use it;
|
||||
// Otherwise, fall back to a direct connection to the root.
|
||||
|
||||
field this.peers = Set();
|
||||
on asserted $p(Peer(overlayId,_,_,_)) this.peers = this.peers.add(p);
|
||||
on retracted $p(Peer(overlayId,_,_,_)) this.peers = this.peers.remove(p);
|
||||
|
||||
field this.bestAddr = null;
|
||||
field this.bestPeer = null;
|
||||
dataflow {
|
||||
let best = null;
|
||||
const better = (a) => {
|
||||
if (!best) return true;
|
||||
if ((a.get(2) === this.gatewayIp)) {
|
||||
if (best.get(2) !== this.gatewayIp) return true;
|
||||
return (a.get(1) < best.get(1));
|
||||
} else {
|
||||
if (best.get(2) === this.gatewayIp) return false;
|
||||
return (a.get(1) < best.get(1));
|
||||
}
|
||||
};
|
||||
this.peers.forEach((p) => { if (better(p)) best = p; });
|
||||
if (best && (best.get(1) !== localId)) {
|
||||
this.bestAddr = best.get(3);
|
||||
this.bestPeer = best;
|
||||
} else {
|
||||
this.bestAddr = rootAddr;
|
||||
this.bestPeer = null;
|
||||
}
|
||||
}
|
||||
|
||||
dataflow if (this.bestAddr) {
|
||||
console.log('Selected uplink for overlay', overlayId, 'is', this.bestAddr.toString());
|
||||
}
|
||||
|
||||
assert P.Proposal(managementScope, Federation.Uplink(overlayId, this.bestAddr, overlayId))
|
||||
when (this.bestAddr);
|
||||
|
||||
assert P.Proposal(overlayId, OverlayLink(OverlayNode(localId),
|
||||
this.bestPeer || OverlayRoot()))
|
||||
when (this.bestAddr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
"use strict";
|
||||
|
||||
assertion type AvailableTransport(spec);
|
||||
assertion type WebSocketTransport(port, path);
|
||||
// S.TcpListener specifies TCP transport
|
||||
// S.UnixSocketServer specifies Unix socket transport
|
||||
|
||||
Object.assign(module.exports, {
|
||||
AvailableTransport,
|
||||
WebSocketTransport,
|
||||
});
|
|
@ -8,7 +8,7 @@ const Http = activate require("@syndicate-lang/driver-http-node");
|
|||
const S = activate require("@syndicate-lang/driver-streams-node");
|
||||
const C = activate require("./client");
|
||||
const P = activate require("./internal_protocol");
|
||||
const D = activate require("./disco_protocol");
|
||||
const D = activate require("./disco");
|
||||
const Server = activate require("./server");
|
||||
const Federation = activate require("./federation");
|
||||
const fs = require('fs');
|
||||
|
@ -31,29 +31,48 @@ function usage() {
|
|||
console.info(' --uplink LOCALSCOPE WEBSOCKETURL REMOTESCOPE');
|
||||
console.info(' Establish a federation uplink from the named local');
|
||||
console.info(' scope to the remote scope within the server at the URL');
|
||||
console.info('');
|
||||
console.info(' --overlay OVERLAYID WEBSOCKETURL');
|
||||
console.info(' Participate in a self-assembling overlay with the');
|
||||
console.info(' given ID and root node server URL');
|
||||
}
|
||||
|
||||
const uplinks = [];
|
||||
const overlays = [];
|
||||
function process_command_line(args) {
|
||||
const strArg = () => args.shift();
|
||||
const numArg = () => Number.parseInt(args.shift());
|
||||
const notUndefined = (x, w) => {
|
||||
if (x === void 0) {
|
||||
console.error('Missing '+w+' argument on command line');
|
||||
usage();
|
||||
process.exit(1);
|
||||
}
|
||||
return x;
|
||||
};
|
||||
const strArg = (w) => notUndefined(args.shift(), w);
|
||||
const numArg = (w) => Number.parseInt(notUndefined(args.shift(), w));
|
||||
while (args.length) {
|
||||
const opt = args.shift();
|
||||
switch (opt) {
|
||||
case "--tcp": spawnTcpServer(numArg()); break;
|
||||
case "--http": spawnWebSocketServer(numArg()); break;
|
||||
case "--unix": spawnUnixSocketServer(strArg()); break;
|
||||
case "--monitor": spawnMonitorAppServer(numArg()); break;
|
||||
case "--management": currentManagementScope = strArg(); break;
|
||||
case "--tcp": spawnTcpServer(numArg('TCP port')); break;
|
||||
case "--http": spawnWebSocketServer(numArg('HTTP port')); break;
|
||||
case "--unix": spawnUnixSocketServer(strArg('Unix socket path')); break;
|
||||
case "--monitor": spawnMonitorAppServer(numArg('monitor HTTP port')); break;
|
||||
case "--management": currentManagementScope = strArg('management scope'); break;
|
||||
case "--uplink": {
|
||||
const localScope = strArg();
|
||||
const target = strArg();
|
||||
const remoteScope = strArg();
|
||||
const localScope = strArg('local scope');
|
||||
const target = strArg('remote WebSocket URL');
|
||||
const remoteScope = strArg('remote scope');
|
||||
uplinks.push(Federation.Uplink(localScope,
|
||||
C.WSServer(target, currentManagementScope),
|
||||
remoteScope));
|
||||
break;
|
||||
}
|
||||
case "--overlay": {
|
||||
const overlayId = strArg('overlay id');
|
||||
const rootUrl = strArg('overlay root WebSocket URL');
|
||||
overlays.push(D.Overlay(overlayId, C.WSServer(rootUrl, currentManagementScope)));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
console.error("Unsupported command-line argument: " + opt);
|
||||
/* FALL THROUGH */
|
||||
|
@ -72,6 +91,9 @@ spawn named 'server' {
|
|||
uplinks.forEach((link) => {
|
||||
assert P.Proposal(currentManagementScope, link);
|
||||
});
|
||||
overlays.forEach((o) => {
|
||||
assert P.Proposal(currentManagementScope, o);
|
||||
});
|
||||
}
|
||||
|
||||
spawn named 'helpful info output' {
|
||||
|
@ -80,6 +102,7 @@ spawn named 'helpful info output' {
|
|||
|
||||
spawn named 'federationRoutingInfo' {
|
||||
during Federation.ManagementScope($managementScope) {
|
||||
// assert P.Proposal(managementScope, Federation.ManagementScope(managementScope));
|
||||
during $t(D.AvailableTransport(_)) assert P.Proposal(managementScope, t);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue