Federated uplinks; server command-line parsing

This commit is contained in:
Tony Garnock-Jones 2019-05-23 15:52:10 +01:00
parent 6dac70199d
commit ef1c849d87
3 changed files with 183 additions and 67 deletions

View File

@ -1,4 +1,4 @@
#!/bin/sh
s6-svwait ../avahi-daemon
cd /data/packages/server
exec node lib/index.js
exec node lib/index.js --advertise --http 8000 --tcp 8001 --monitor 8000

View File

@ -2,18 +2,72 @@
const P = activate require("./internal_protocol");
const W = activate require("./protocol");
const C = activate require("./client");
assertion type ManagementScope(scope) = Symbol.for('federation-management-scope');
module.exports.ManagementScope = ManagementScope;
assertion type Uplink(localScope, peer, remoteScope) = Symbol.for('federated-uplink');
assertion type UplinkConnected(link) = Symbol.for('federated-uplink-connected');
Object.assign(module.exports, {
ManagementScope,
Uplink, UplinkConnected,
});
import {
Set, Map,
Set, Map, List,
Observe,
Skeleton, Dataspace, currentFacet,
genUuid,
} from "@syndicate-lang/core";
spawn named '@syndicate-lang/server/federation/UplinkFactory' {
during ManagementScope($managementScope) {
during P.Envelope(managementScope, $link(Uplink($localScope, $peerAddr, $remoteScope)))
spawn named link
{
during C.ServerConnected(peerAddr) {
const sessionId = genUuid('peer');
assert P.Proposal(managementScope, UplinkConnected(link));
assert P.Proposal(managementScope, P.FederatedLink(sessionId, localScope));
assert C.ToServer(peerAddr, P.FederatedLink(sessionId, remoteScope));
field this.pendingIn = List();
field this.pendingOut = List();
on message C.FromServer(peerAddr, P.ToPOA(sessionId, $p)) {
this.pendingIn = this.pendingIn.push(p);
}
on message P.Envelope(managementScope, P.ToPOA(sessionId, $p)) {
this.pendingOut = this.pendingOut.push(p);
}
during P.Envelope(managementScope, Observe(P.FromPOA(sessionId, _))) {
during C.FromServer(peerAddr, Observe(P.FromPOA(sessionId, _))) {
dataflow {
if (!this.pendingIn.isEmpty()) {
this.pendingIn.forEach((p) => {
send P.Proposal(managementScope, P.FromPOA(sessionId, p));
});
this.pendingIn = List();
}
}
dataflow {
if (!this.pendingOut.isEmpty()) {
this.pendingOut.forEach((p) => {
send C.ToServer(peerAddr, P.FromPOA(sessionId, p));
});
this.pendingOut = List();
}
}
}
}
}
}
}
}
spawn named '@syndicate-lang/server/federation/LocalLinkFactory' {
during ManagementScope($managementScope) {
during P.Envelope(managementScope, P.FederatedLink(_, $scope)) {

View File

@ -8,6 +8,7 @@ const Http = activate require("@syndicate-lang/driver-http-node");
const S = activate require("@syndicate-lang/driver-streams-node");
const M = activate require("@syndicate-lang/driver-mdns");
const P = activate require("./internal_protocol");
const C = activate require("./client");
const Server = activate require("./server");
const Federation = activate require("./federation");
@ -17,87 +18,148 @@ import {
Dataspace, Skeleton, currentFacet, genUuid, RandomID
} from "@syndicate-lang/core";
const HTTP_PORT = 8000;
const TCP_PORT = 8001;
const server = Http.HttpServer(null, HTTP_PORT);
const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server
const localId = RandomID.randomId(8, false);
const gatewayId = dataspaceId + ':' + localId;
const fs = require('fs');
spawn named 'serverLogger' {
on asserted Http.Request(_, server, $method, $path, $query, $req) {
console.log(method, path.toJS(), query.toJS());
}
on asserted Http.WebSocket(_, server, $path, $query) {
console.log(path.toJS(), query.toJS());
let currentManagementScope = 'local';
function usage() {
// --------------------------------------------------------------------------------
console.info('Usage: syndicate-server [ OPTION [ OPTION ... ] ]');
console.info('');
console.info('where OPTION may be repeated any number of times and is drawn from:');
console.info('');
console.info(' --advertise Enable mDNS advertisement for subsequent services');
console.info(' --no-advertise Disable mDNS advertisement for subsequent services');
console.info('');
console.info(' --tcp PORTNUMBER Create a plain TCP service on the given port');
console.info(' --http PORTNUMBER Create an HTTP WebSocket service on the given port');
console.info(' --unix PATH Create a Unix socket service at the given path');
console.info('');
console.info(' --monitor PORTNUMBER Serve a simple HTML/JS monitoring app on the port');
console.info('');
console.info(' --management SCOPE Set the management scope for --uplink etc to use');
console.info(' --uplink LOCALSCOPE WEBSOCKETURL REMOTESCOPE');
console.info(' Establish a federation uplink from the named local');
console.info(' scope to the remote scope within the server at the URL');
console.info('');
console.info('mDNS advertisement starts out enabled.');
}
const uplinks = [];
function process_command_line(args) {
const strArg = () => args.shift();
const numArg = () => Number.parseInt(args.shift());
let advertise = true;
while (args.length) {
const opt = args.shift();
switch (opt) {
case "--advertise": advertise = true; break;
case "--no-advertise": advertise = false; break;
case "--tcp": spawnTcpServer(numArg(), advertise); break;
case "--http": spawnWebSocketServer(numArg(), advertise); break;
case "--unix": spawnUnixSocketServer(strArg(), advertise); break;
case "--monitor": spawnMonitorAppServer(numArg(), advertise); break;
case "--management": currentManagementScope = strArg(); break;
case "--uplink": {
const localScope = strArg();
const target = strArg();
const remoteScope = strArg();
uplinks.push(Federation.Uplink(localScope,
C.WSServer(target, currentManagementScope),
remoteScope));
break;
}
default:
console.error("Unsupported command-line argument: " + opt);
/* FALL THROUGH */
case '--help':
case '-h':
usage();
process.exit(1);
}
}
}
spawn named 'rootServer' {
assert Federation.ManagementScope('local');
const localId = RandomID.randomId(8, false);
const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server
const gatewayId = dataspaceId + ':' + localId;
during Http.Request($reqId, server, 'get', [], _, _) {
assert :snapshot Http.Response(
reqId, 200, "OK", {"Content-type": "text/html"},
'<!DOCTYPE html>' + UI.htmlToString(
<html>
<head>
<meta charset="utf-8"></meta>
</head>
<body>
<script src="dist/monitor.js"></script>
</body>
</html>
));
process_command_line(process.argv.slice(2));
spawn named 'server' {
assert Federation.ManagementScope(currentManagementScope);
uplinks.forEach((link) => {
assert P.Proposal(currentManagementScope, link);
});
}
function spawnTcpServer(port, advertise) {
spawn named ['tcpServer', port] {
if (advertise) {
assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, ["tier=0"]);
}
on asserted S.IncomingConnection($id, S.TcpListener(port)) {
Server.streamServerActor(id, ['tcpServer', port, id]);
}
}
}
during Http.Request($reqId, server, 'get', ['chat.html'], _, _) {
const contents = fs.readFileSync(__dirname + '/../chat.html');
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
function spawnWebSocketServer(port, advertise) {
spawn named ['wsConnection', port] {
const server = Http.HttpServer(null, port);
if (advertise) {
assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port,
["tier=0", "path=/local"]);
assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'), null, port,
["tier=0", "path=/monitor"]);
}
during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', port, reqId] {
Server.websocketServerFacet(reqId);
}
}
}
during Http.Request($reqId, server, 'get', ['style.css'], _, _) {
const contents = fs.readFileSync(__dirname + '/../style.css');
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
function spawnUnixSocketServer(path) {
spawn named ['unixServer', path] {
on asserted S.IncomingConnection($id, S.UnixSocketServer(path)) {
Server.streamServerActor(id, ['unixServer', path, id]);
}
}
}
during Http.Request($reqId, server, 'get', ['dist', $file], _, _) {
const contents = fs.readFileSync(__dirname + '/../dist/' + file);
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
function spawnMonitorAppServer(port) {
spawn named ['monitorAppServer', port] {
const server = Http.HttpServer(null, port);
during Http.Request($reqId, server, 'get', [], _, _) {
assert :snapshot Http.Response(reqId, 200, "OK", {"Content-type": "text/html"},
'<!DOCTYPE html>' + UI.htmlToString(
<html>
<head><meta charset="utf-8"></meta></head>
<body><script src="dist/monitor.js"></script></body>
</html>));
}
function assertFileResponse(reqId, path) {
assert :snapshot Http.Response(reqId, 200, "OK", {}, fs.readFileSync(path));
}
during Http.Request($reqId, server, 'get', ['chat.html'], _, _)
assertFileResponse(reqId, __dirname + '/../chat.html');
during Http.Request($reqId, server, 'get', ['style.css'], _, _)
assertFileResponse(reqId, __dirname + '/../style.css');
during Http.Request($reqId, server, 'get', ['dist', $file], _, _)
assertFileResponse(reqId, __dirname + '/../dist/' + file);
}
}
during P.POAScope($connId, $scope) assert P.Envelope('monitor', P.POAScope(connId, scope));
spawn named 'monitorApp' {
during P.POAScope($connId, $scope) assert P.Proposal('monitor', P.POAScope(connId, scope));
on message P.Envelope('monitor', P.Disconnect($connId)) send P.Disconnect(connId);
}
spawn named 'websocketListener' {
assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'),
null, HTTP_PORT, ["tier=0", "path=/local"]);
assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'),
null, HTTP_PORT, ["tier=0", "path=/monitor"]);
during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', reqId] {
Server.websocketServerFacet(reqId);
}
}
spawn named 'tcpListener' {
assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, TCP_PORT, ["tier=0"]);
on asserted S.IncomingConnection($id, S.TcpListener(TCP_PORT)) {
Server.streamServerActor(id, 'tcpServer');
}
}
spawn named 'unixListener' {
on asserted S.IncomingConnection($id, S.UnixSocketServer("./sock")) {
Server.streamServerActor(id, 'unixServer');
}
}
spawn named 'peerDiscovery' {
// during M.DefaultGateway($gwif, _) {
// on start console.log('GW+', gwif);