diff --git a/docker/service/syndicate-server/run b/docker/service/syndicate-server/run index e327f47..a2de263 100755 --- a/docker/service/syndicate-server/run +++ b/docker/service/syndicate-server/run @@ -1,4 +1,4 @@ #!/bin/sh s6-svwait ../avahi-daemon cd /data/packages/server -exec node lib/index.js +exec node lib/index.js --advertise --http 8000 --tcp 8001 --monitor 8000 diff --git a/packages/server/src/federation.js b/packages/server/src/federation.js index b790a5a..6f6a197 100644 --- a/packages/server/src/federation.js +++ b/packages/server/src/federation.js @@ -2,18 +2,72 @@ const P = activate require("./internal_protocol"); const W = activate require("./protocol"); +const C = activate require("./client"); assertion type ManagementScope(scope) = Symbol.for('federation-management-scope'); -module.exports.ManagementScope = ManagementScope; +assertion type Uplink(localScope, peer, remoteScope) = Symbol.for('federated-uplink'); +assertion type UplinkConnected(link) = Symbol.for('federated-uplink-connected'); + +Object.assign(module.exports, { + ManagementScope, + Uplink, UplinkConnected, +}); import { - Set, Map, + Set, Map, List, Observe, Skeleton, Dataspace, currentFacet, genUuid, } from "@syndicate-lang/core"; +spawn named '@syndicate-lang/server/federation/UplinkFactory' { + during ManagementScope($managementScope) { + during P.Envelope(managementScope, $link(Uplink($localScope, $peerAddr, $remoteScope))) + spawn named link + { + during C.ServerConnected(peerAddr) { + const sessionId = genUuid('peer'); + assert P.Proposal(managementScope, UplinkConnected(link)); + assert P.Proposal(managementScope, P.FederatedLink(sessionId, localScope)); + assert C.ToServer(peerAddr, P.FederatedLink(sessionId, remoteScope)); + + field this.pendingIn = List(); + field this.pendingOut = List(); + + on message C.FromServer(peerAddr, P.ToPOA(sessionId, $p)) { + this.pendingIn = this.pendingIn.push(p); + } + + on message P.Envelope(managementScope, P.ToPOA(sessionId, $p)) { + this.pendingOut = this.pendingOut.push(p); + } + + during P.Envelope(managementScope, Observe(P.FromPOA(sessionId, _))) { + during C.FromServer(peerAddr, Observe(P.FromPOA(sessionId, _))) { + dataflow { + if (!this.pendingIn.isEmpty()) { + this.pendingIn.forEach((p) => { + send P.Proposal(managementScope, P.FromPOA(sessionId, p)); + }); + this.pendingIn = List(); + } + } + dataflow { + if (!this.pendingOut.isEmpty()) { + this.pendingOut.forEach((p) => { + send C.ToServer(peerAddr, P.FromPOA(sessionId, p)); + }); + this.pendingOut = List(); + } + } + } + } + } + } + } +} + spawn named '@syndicate-lang/server/federation/LocalLinkFactory' { during ManagementScope($managementScope) { during P.Envelope(managementScope, P.FederatedLink(_, $scope)) { diff --git a/packages/server/src/index.js b/packages/server/src/index.js index af044b5..d0d0e63 100644 --- a/packages/server/src/index.js +++ b/packages/server/src/index.js @@ -8,6 +8,7 @@ const Http = activate require("@syndicate-lang/driver-http-node"); const S = activate require("@syndicate-lang/driver-streams-node"); const M = activate require("@syndicate-lang/driver-mdns"); const P = activate require("./internal_protocol"); +const C = activate require("./client"); const Server = activate require("./server"); const Federation = activate require("./federation"); @@ -17,87 +18,148 @@ import { Dataspace, Skeleton, currentFacet, genUuid, RandomID } from "@syndicate-lang/core"; -const HTTP_PORT = 8000; -const TCP_PORT = 8001; - -const server = Http.HttpServer(null, HTTP_PORT); - -const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server -const localId = RandomID.randomId(8, false); -const gatewayId = dataspaceId + ':' + localId; - const fs = require('fs'); -spawn named 'serverLogger' { - on asserted Http.Request(_, server, $method, $path, $query, $req) { - console.log(method, path.toJS(), query.toJS()); - } - on asserted Http.WebSocket(_, server, $path, $query) { - console.log(path.toJS(), query.toJS()); +let currentManagementScope = 'local'; + +function usage() { + // -------------------------------------------------------------------------------- + console.info('Usage: syndicate-server [ OPTION [ OPTION ... ] ]'); + console.info(''); + console.info('where OPTION may be repeated any number of times and is drawn from:'); + console.info(''); + console.info(' --advertise Enable mDNS advertisement for subsequent services'); + console.info(' --no-advertise Disable mDNS advertisement for subsequent services'); + console.info(''); + console.info(' --tcp PORTNUMBER Create a plain TCP service on the given port'); + console.info(' --http PORTNUMBER Create an HTTP WebSocket service on the given port'); + console.info(' --unix PATH Create a Unix socket service at the given path'); + console.info(''); + console.info(' --monitor PORTNUMBER Serve a simple HTML/JS monitoring app on the port'); + console.info(''); + console.info(' --management SCOPE Set the management scope for --uplink etc to use'); + console.info(' --uplink LOCALSCOPE WEBSOCKETURL REMOTESCOPE'); + console.info(' Establish a federation uplink from the named local'); + console.info(' scope to the remote scope within the server at the URL'); + console.info(''); + console.info('mDNS advertisement starts out enabled.'); +} + +const uplinks = []; +function process_command_line(args) { + const strArg = () => args.shift(); + const numArg = () => Number.parseInt(args.shift()); + let advertise = true; + while (args.length) { + const opt = args.shift(); + switch (opt) { + case "--advertise": advertise = true; break; + case "--no-advertise": advertise = false; break; + case "--tcp": spawnTcpServer(numArg(), advertise); break; + case "--http": spawnWebSocketServer(numArg(), advertise); break; + case "--unix": spawnUnixSocketServer(strArg(), advertise); break; + case "--monitor": spawnMonitorAppServer(numArg(), advertise); break; + case "--management": currentManagementScope = strArg(); break; + case "--uplink": { + const localScope = strArg(); + const target = strArg(); + const remoteScope = strArg(); + uplinks.push(Federation.Uplink(localScope, + C.WSServer(target, currentManagementScope), + remoteScope)); + break; + } + default: + console.error("Unsupported command-line argument: " + opt); + /* FALL THROUGH */ + case '--help': + case '-h': + usage(); + process.exit(1); + } } } -spawn named 'rootServer' { - assert Federation.ManagementScope('local'); +const localId = RandomID.randomId(8, false); +const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server +const gatewayId = dataspaceId + ':' + localId; - during Http.Request($reqId, server, 'get', [], _, _) { - assert :snapshot Http.Response( - reqId, 200, "OK", {"Content-type": "text/html"}, - '' + UI.htmlToString( - - - - - - - - - )); +process_command_line(process.argv.slice(2)); + +spawn named 'server' { + assert Federation.ManagementScope(currentManagementScope); + uplinks.forEach((link) => { + assert P.Proposal(currentManagementScope, link); + }); +} + +function spawnTcpServer(port, advertise) { + spawn named ['tcpServer', port] { + if (advertise) { + assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, port, ["tier=0"]); + } + on asserted S.IncomingConnection($id, S.TcpListener(port)) { + Server.streamServerActor(id, ['tcpServer', port, id]); + } } +} - during Http.Request($reqId, server, 'get', ['chat.html'], _, _) { - const contents = fs.readFileSync(__dirname + '/../chat.html'); - assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); +function spawnWebSocketServer(port, advertise) { + spawn named ['wsConnection', port] { + const server = Http.HttpServer(null, port); + if (advertise) { + assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), null, port, + ["tier=0", "path=/local"]); + assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'), null, port, + ["tier=0", "path=/monitor"]); + } + during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', port, reqId] { + Server.websocketServerFacet(reqId); + } } +} - during Http.Request($reqId, server, 'get', ['style.css'], _, _) { - const contents = fs.readFileSync(__dirname + '/../style.css'); - assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); +function spawnUnixSocketServer(path) { + spawn named ['unixServer', path] { + on asserted S.IncomingConnection($id, S.UnixSocketServer(path)) { + Server.streamServerActor(id, ['unixServer', path, id]); + } } +} - during Http.Request($reqId, server, 'get', ['dist', $file], _, _) { - const contents = fs.readFileSync(__dirname + '/../dist/' + file); - assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); +function spawnMonitorAppServer(port) { + spawn named ['monitorAppServer', port] { + const server = Http.HttpServer(null, port); + + during Http.Request($reqId, server, 'get', [], _, _) { + assert :snapshot Http.Response(reqId, 200, "OK", {"Content-type": "text/html"}, + '' + UI.htmlToString( + + + + )); + } + + function assertFileResponse(reqId, path) { + assert :snapshot Http.Response(reqId, 200, "OK", {}, fs.readFileSync(path)); + } + + during Http.Request($reqId, server, 'get', ['chat.html'], _, _) + assertFileResponse(reqId, __dirname + '/../chat.html'); + + during Http.Request($reqId, server, 'get', ['style.css'], _, _) + assertFileResponse(reqId, __dirname + '/../style.css'); + + during Http.Request($reqId, server, 'get', ['dist', $file], _, _) + assertFileResponse(reqId, __dirname + '/../dist/' + file); } +} - during P.POAScope($connId, $scope) assert P.Envelope('monitor', P.POAScope(connId, scope)); +spawn named 'monitorApp' { + during P.POAScope($connId, $scope) assert P.Proposal('monitor', P.POAScope(connId, scope)); on message P.Envelope('monitor', P.Disconnect($connId)) send P.Disconnect(connId); } -spawn named 'websocketListener' { - assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'), - null, HTTP_PORT, ["tier=0", "path=/local"]); - assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'), - null, HTTP_PORT, ["tier=0", "path=/monitor"]); - - during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', reqId] { - Server.websocketServerFacet(reqId); - } -} - -spawn named 'tcpListener' { - assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, TCP_PORT, ["tier=0"]); - on asserted S.IncomingConnection($id, S.TcpListener(TCP_PORT)) { - Server.streamServerActor(id, 'tcpServer'); - } -} - -spawn named 'unixListener' { - on asserted S.IncomingConnection($id, S.UnixSocketServer("./sock")) { - Server.streamServerActor(id, 'unixServer'); - } -} - spawn named 'peerDiscovery' { // during M.DefaultGateway($gwif, _) { // on start console.log('GW+', gwif);