Prepare for separate transport advertisement and spanning-tree construction

This commit is contained in:
Tony Garnock-Jones 2019-05-28 11:56:29 +01:00
parent 0690660af8
commit caf75f3d1e
2 changed files with 70 additions and 83 deletions

View File

@ -16,7 +16,7 @@ const {
makeDecoder, makeDecoder,
} = activate require("./protocol"); } = activate require("./protocol");
assertion type WSServer(url, scope); assertion type WSServer(url, scope) = Symbol.for('server-websocket-connection');
assertion type ToServer(addr, assertion); assertion type ToServer(addr, assertion);
assertion type FromServer(addr, assertion); assertion type FromServer(addr, assertion);

View File

@ -6,7 +6,6 @@ const UI = require("@syndicate-lang/driver-browser-ui");
const Http = activate require("@syndicate-lang/driver-http-node"); const Http = activate require("@syndicate-lang/driver-http-node");
const S = activate require("@syndicate-lang/driver-streams-node"); const S = activate require("@syndicate-lang/driver-streams-node");
const M = activate require("@syndicate-lang/driver-mdns");
const P = activate require("./internal_protocol"); const P = activate require("./internal_protocol");
const C = activate require("./client"); const C = activate require("./client");
const Server = activate require("./server"); const Server = activate require("./server");
@ -18,6 +17,11 @@ import {
Dataspace, Skeleton, currentFacet, genUuid, RandomID Dataspace, Skeleton, currentFacet, genUuid, RandomID
} from "@syndicate-lang/core"; } from "@syndicate-lang/core";
assertion type AvailableTransport(spec);
assertion type WebSocketTransport(port, path);
// S.TcpListener specifies TCP transport
// S.UnixSocketServer specifies Unix socket transport
const fs = require('fs'); const fs = require('fs');
let currentManagementScope = 'local'; let currentManagementScope = 'local';
@ -28,9 +32,6 @@ function usage() {
console.info(''); console.info('');
console.info('where OPTION may be repeated any number of times and is drawn from:'); console.info('where OPTION may be repeated any number of times and is drawn from:');
console.info(''); console.info('');
console.info(' --advertise Enable mDNS advertisement for subsequent services');
console.info(' --no-advertise Disable mDNS advertisement for subsequent services');
console.info('');
console.info(' --tcp PORTNUMBER Create a plain TCP service on the given port'); console.info(' --tcp PORTNUMBER Create a plain TCP service on the given port');
console.info(' --http PORTNUMBER Create an HTTP WebSocket service on the given port'); console.info(' --http PORTNUMBER Create an HTTP WebSocket service on the given port');
console.info(' --unix PATH Create a Unix socket service at the given path'); console.info(' --unix PATH Create a Unix socket service at the given path');
@ -41,24 +42,19 @@ function usage() {
console.info(' --uplink LOCALSCOPE WEBSOCKETURL REMOTESCOPE'); console.info(' --uplink LOCALSCOPE WEBSOCKETURL REMOTESCOPE');
console.info(' Establish a federation uplink from the named local'); console.info(' Establish a federation uplink from the named local');
console.info(' scope to the remote scope within the server at the URL'); console.info(' scope to the remote scope within the server at the URL');
console.info('');
console.info('mDNS advertisement starts out enabled.');
} }
const uplinks = []; const uplinks = [];
function process_command_line(args) { function process_command_line(args) {
const strArg = () => args.shift(); const strArg = () => args.shift();
const numArg = () => Number.parseInt(args.shift()); const numArg = () => Number.parseInt(args.shift());
let advertise = true;
while (args.length) { while (args.length) {
const opt = args.shift(); const opt = args.shift();
switch (opt) { switch (opt) {
case "--advertise": advertise = true; break; case "--tcp": spawnTcpServer(numArg()); break;
case "--no-advertise": advertise = false; break; case "--http": spawnWebSocketServer(numArg()); break;
case "--tcp": spawnTcpServer(numArg(), advertise); break; case "--unix": spawnUnixSocketServer(strArg()); break;
case "--http": spawnWebSocketServer(numArg(), advertise); break; case "--monitor": spawnMonitorAppServer(numArg()); break;
case "--unix": spawnUnixSocketServer(strArg(), advertise); break;
case "--monitor": spawnMonitorAppServer(numArg(), advertise); break;
case "--management": currentManagementScope = strArg(); break; case "--management": currentManagementScope = strArg(); break;
case "--uplink": { case "--uplink": {
const localScope = strArg(); const localScope = strArg();
@ -80,10 +76,6 @@ function process_command_line(args) {
} }
} }
const localId = RandomID.randomId(8, false);
const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server
const gatewayId = dataspaceId + ':' + localId;
process_command_line(process.argv.slice(2)); process_command_line(process.argv.slice(2));
spawn named 'server' { spawn named 'server' {
@ -93,45 +85,44 @@ spawn named 'server' {
}); });
} }
function spawnTcpServer(port, advertise) { spawn named 'helpful info output' {
console.info('TCP server on port', port, advertise ? '(advertised)' : '(not advertised)'); on asserted AvailableTransport($spec) console.info('Transport:', spec.toString());
spawn named ['tcpServer', port] { }
if (advertise) {
assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, ["tier=0"]); spawn named 'federationRoutingInfo' {
} during Federation.ManagementScope($managementScope) {
on asserted S.IncomingConnection($id, S.TcpListener(port)) { during $t(AvailableTransport(_)) assert P.Proposal(managementScope, t);
Server.streamServerActor(id, ['tcpServer', port, id]);
}
} }
} }
function spawnWebSocketServer(port, advertise) { function _spawnStreamServer(spec) {
console.info('WebSocket server on port', port, advertise ? '(advertised)' : '(not advertised)'); spawn named spec {
spawn named ['wsConnection', port] { assert AvailableTransport(spec);
on asserted S.IncomingConnection($id, spec) Server.streamServerActor(id, [spec, id]);
}
}
function spawnTcpServer(port) {
_spawnStreamServer(S.TcpListener(port));
}
function spawnUnixSocketServer(path) {
_spawnStreamServer(S.UnixSocketServer(path));
}
function spawnWebSocketServer(port) {
const spec = WebSocketTransport(port, '/');
spawn named spec {
const server = Http.HttpServer(null, port); const server = Http.HttpServer(null, port);
if (advertise) { assert AvailableTransport(spec);
assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, during Http.WebSocket($reqId, server, [], _) spawn named [spec, reqId] {
["tier=0", "path=/local"]);
assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'), null, port,
["tier=0", "path=/monitor"]);
}
during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', port, reqId] {
Server.websocketServerFacet(reqId); Server.websocketServerFacet(reqId);
} }
} }
} }
function spawnUnixSocketServer(path) {
console.info('Unix socket server on path', path, advertise ? '(advertised)' : '(not advertised)');
spawn named ['unixServer', path] {
on asserted S.IncomingConnection($id, S.UnixSocketServer(path)) {
Server.streamServerActor(id, ['unixServer', path, id]);
}
}
}
function spawnMonitorAppServer(port) { function spawnMonitorAppServer(port) {
console.info('Monitor app on port', port, advertise ? '(advertised)' : '(not advertised)'); console.info('Monitor app on port', port);
spawn named ['monitorAppServer', port] { spawn named ['monitorAppServer', port] {
const server = Http.HttpServer(null, port); const server = Http.HttpServer(null, port);
@ -164,39 +155,35 @@ spawn named 'monitorApp' {
on message P.Envelope('monitor', P.Disconnect($connId)) send P.Disconnect(connId); on message P.Envelope('monitor', P.Disconnect($connId)) send P.Disconnect(connId);
} }
spawn named 'peerDiscovery' { // const localId = RandomID.randomId(8, false);
console.info('Peer discovery running'); // const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server
// during M.DefaultGateway($gwif, _) { // const gatewayId = dataspaceId + ':' + localId;
// on start console.log('GW+', gwif); //
// on stop console.log('GW-', gwif); // const M = activate require("@syndicate-lang/driver-mdns");
during M.Discovered( // // assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, []);
M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", $gwif) // // assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, ["path=/"]);
{ // spawn named 'peerDiscovery' {
const [dsId, peerId] = name.split(':'); // console.info('Peer discovery running');
// // during M.DefaultGateway($gwif, _) {
let tier = null; // // on start console.log('GW+', gwif);
txt.forEach((t) => { // // on stop console.log('GW-', gwif);
t.split(' ').forEach((kv) => { // during M.Discovered(
const [k, v] = kv.split('='); // M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", $gwif)
if (k === 'tier') { // {
tier = Number.parseInt(v); // const [dsId, peerId] = name.split(':');
} //
}); // let tier = null;
}); // txt.forEach((t) => {
// t.split(' ').forEach((kv) => {
on start console.log('+ws', gwif, tier, name, host, port, addr); // const [k, v] = kv.split('=');
on stop console.log('-ws', gwif, tier, name, host, port, addr); // if (k === 'tier') {
} // tier = Number.parseInt(v);
// } // }
// });
/* // });
//
If there's a server on our gateway interface, see if it's better than us. // on start console.log('+ws', gwif, tier, name, host, port, addr);
- if it is, use it. // on stop console.log('-ws', gwif, tier, name, host, port, addr);
- if it's not, pretend it isn't there. // }
// // }
If there's no server on our gateway interface (or we're pretending // }
none exists), try to connect to the top.
*/
}