syndicate-js/packages/server/src/index.js

243 lines
7.6 KiB
JavaScript
Raw Normal View History

2018-11-13 21:27:26 +00:00
"use strict";
const UI = require("@syndicate-lang/driver-browser-ui");
// @jsx UI.html
// @jsxFrag UI.htmlFragment
const Http = activate require("@syndicate-lang/driver-http-node");
const S = activate require("@syndicate-lang/driver-streams-node");
2018-12-12 17:16:10 +00:00
const M = activate require("@syndicate-lang/driver-mdns");
2018-11-19 16:57:22 +00:00
import {
Set, Bytes,
Encoder, Observe,
2018-12-12 17:16:10 +00:00
Dataspace, Skeleton, currentFacet, genUuid, RandomID
2018-11-19 16:57:22 +00:00
} from "@syndicate-lang/core";
2018-11-13 21:27:26 +00:00
2018-12-12 17:16:10 +00:00
const HTTP_PORT = 8000;
const TCP_PORT = 8001;
const server = Http.HttpServer(null, HTTP_PORT);
2019-05-15 16:26:39 +00:00
const dataspaceId = 'EToUNUJI0ykSfudmN9Z99wu62qGQB1nd8SHvjNtL5tM'; // public key of root server
2018-12-12 17:16:10 +00:00
const localId = RandomID.randomId(8, false);
const gatewayId = dataspaceId + ':' + localId;
2018-11-13 21:27:26 +00:00
2018-11-21 14:23:30 +00:00
const fs = require('fs');
2018-11-19 16:57:22 +00:00
assertion type Connection(connId);
message type Request(connId, body);
message type Response(connId, body);
2018-11-21 14:23:30 +00:00
message type Disconnect(connId);
2018-11-19 16:57:22 +00:00
// Internal isolation
assertion type Proposal(scope, body);
2018-11-21 13:21:08 +00:00
assertion type Envelope(scope, body);
2018-11-19 16:57:22 +00:00
2019-05-12 22:26:01 +00:00
// Monitoring
assertion type ConnectionScope(connId, scope);
const {
2019-05-12 22:26:01 +00:00
Connect, Peer,
Assert, Clear, Message,
2019-05-12 22:26:01 +00:00
Add, Del, Msg, Err,
makeDecoder,
} = activate require("./protocol");
2018-11-19 16:57:22 +00:00
2018-11-13 21:27:26 +00:00
spawn named 'serverLogger' {
on asserted Http.Request(_, server, $method, $path, $query, $req) {
console.log(method, path.toJS(), query.toJS());
}
2018-11-15 11:00:30 +00:00
on asserted Http.WebSocket(_, server, $path, $query) {
console.log(path.toJS(), query.toJS());
}
2018-11-13 21:27:26 +00:00
}
spawn named 'rootServer' {
during Http.Request($reqId, server, 'get', [], _, _) {
assert :snapshot Http.Response(
reqId, 200, "OK", {"Content-type": "text/html"},
'<!DOCTYPE html>' + UI.htmlToString(
2018-11-21 14:23:30 +00:00
<html>
<head>
<meta charset="utf-8"></meta>
</head>
<body>
<script src="dist/monitor.js"></script>
</body>
</html>
2018-11-13 21:27:26 +00:00
));
}
2018-11-21 14:23:30 +00:00
2019-05-12 22:26:01 +00:00
during Http.Request($reqId, server, 'get', ['chat.html'], _, _) {
const contents = fs.readFileSync(__dirname + '/../chat.html');
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
}
during Http.Request($reqId, server, 'get', ['style.css'], _, _) {
const contents = fs.readFileSync(__dirname + '/../style.css');
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
}
2018-11-21 14:23:30 +00:00
during Http.Request($reqId, server, 'get', ['dist', $file], _, _) {
const contents = fs.readFileSync(__dirname + '/../dist/' + file);
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
}
2019-05-12 22:26:01 +00:00
during ConnectionScope($connId, $scope) assert Envelope('monitor', ConnectionScope(connId, scope));
on message Envelope('monitor', Disconnect($connId)) send Disconnect(connId);
2018-11-13 21:27:26 +00:00
}
spawn named 'websocketListener' {
2018-12-12 17:16:10 +00:00
assert M.Publish(M.Service(gatewayId, '_syndicate+ws._tcp'),
2019-05-16 19:14:38 +00:00
null, HTTP_PORT, ["tier=0", "path=/local"]);
2018-12-12 17:16:10 +00:00
assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'),
null, HTTP_PORT, ["tier=0", "path=/monitor"]);
2019-05-12 22:26:01 +00:00
during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', reqId] {
assert Connection(reqId);
on message Http.DataIn(reqId, $data) {
if (data instanceof Bytes) {
2019-05-12 22:26:01 +00:00
send Request(reqId, makeDecoder(data).next());
}
}
2019-05-12 22:26:01 +00:00
on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
stop on message Disconnect(reqId);
2018-11-13 21:27:26 +00:00
}
}
spawn named 'tcpListener' {
2018-12-12 17:16:10 +00:00
assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, TCP_PORT, ["tier=0"]);
on asserted S.IncomingConnection($id, S.TcpListener(TCP_PORT)) {
2019-05-15 16:26:39 +00:00
spawnStreamConnection('tcpServer', id);
2018-11-19 16:57:22 +00:00
}
}
spawn named 'unixListener' {
on asserted S.IncomingConnection($id, S.UnixSocketServer("./sock")) {
2019-05-15 16:26:39 +00:00
spawnStreamConnection('unixServer', id);
}
}
function spawnStreamConnection(debugLabel, id) {
spawn named [debugLabel, id] {
stop on retracted S.Duplex(id);
2019-05-12 22:26:01 +00:00
assert Connection(id);
const decoder = makeDecoder(null);
on message S.Data(id, $data) {
decoder.write(data);
let v;
while ((v = decoder.try_next())) {
2019-05-12 22:26:01 +00:00
send Request(id, v);
}
}
2019-05-12 22:26:01 +00:00
on message Response(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null);
stop on message Disconnect(id);
}
}
2018-11-19 16:57:22 +00:00
spawn named 'connectionHandler' {
during Proposal($scope, $assertion) assert Envelope(scope, assertion);
on message Proposal($scope, $assertion) send Envelope(scope, assertion);
2019-05-12 22:26:01 +00:00
during Connection($connId) spawn named Connection(connId) {
2018-11-21 13:21:08 +00:00
on start console.log(connId.toString(), 'connected');
on stop console.log(connId.toString(), 'disconnected');
2018-11-19 16:57:22 +00:00
2019-05-12 22:26:01 +00:00
field this.scope = null;
assert ConnectionScope(connId, this.scope) when (this.scope !== null);
2018-11-19 16:57:22 +00:00
let endpoints = Set();
2019-05-12 22:26:01 +00:00
on message Request(connId, Connect($scope)) {
// TODO: Enforce requirement that Connect appear exactly once, before anything else
this.scope = scope;
}
2018-11-19 16:57:22 +00:00
on message Request(connId, Assert($ep, $a)) {
if (!endpoints.includes(ep)) {
endpoints = endpoints.add(ep);
react {
on stop { endpoints = endpoints.remove(ep); }
field this.assertion = a;
assert Proposal(this.scope, this.assertion);
2018-11-19 16:57:22 +00:00
currentFacet().addEndpoint(() => {
if (Observe.isClassOf(this.assertion)) {
2019-05-12 22:26:01 +00:00
const spec = Envelope(this.scope, this.assertion.get(0));
2018-11-19 16:57:22 +00:00
const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => {
2018-11-21 13:21:08 +00:00
console.log('EVENT', currentFacet().toString(), connId.toString(), ep, evt, vs);
2018-11-19 16:57:22 +00:00
switch (evt) {
case Skeleton.EVENT_ADDED:
send Response(connId, Add(ep, vs));
break;
case Skeleton.EVENT_REMOVED:
send Response(connId, Del(ep, vs));
break;
case Skeleton.EVENT_MESSAGE:
send Response(connId, Msg(ep, vs));
break;
}
});
});
return [Observe(spec), analysis];
} else {
return [void 0, null];
2018-11-19 16:57:22 +00:00
}
}, true);
on message Request(connId, Assert(ep, $newAssertion)) this.assertion = newAssertion;
stop on message Request(connId, Clear(ep));
}
}
2018-11-13 21:27:26 +00:00
}
2018-11-19 16:57:22 +00:00
on message Request(connId, Message($body)) {
send Proposal(this.scope, body);
2018-11-19 16:57:22 +00:00
}
2018-11-21 13:21:08 +00:00
on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString());
on message Response(connId, $resp) console.log('OUT:', connId.toString(), resp.toString());
2018-11-13 21:27:26 +00:00
}
}
2018-12-12 17:16:10 +00:00
spawn named 'peerDiscovery' {
// during M.DefaultGateway($gwif, _) {
// on start console.log('GW+', gwif);
// on stop console.log('GW-', gwif);
during M.Discovered(
M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", $gwif)
{
const [dsId, peerId] = name.split(':');
let tier = null;
txt.forEach((t) => {
t.split(' ').forEach((kv) => {
const [k, v] = kv.split('=');
if (k === 'tier') {
tier = Number.parseInt(v);
}
});
});
on start console.log('+ws', gwif, tier, name, host, port, addr);
on stop console.log('-ws', gwif, tier, name, host, port, addr);
}
// }
/*
2019-05-15 16:26:39 +00:00
If there's a server on our gateway interface, see if it's better than us.
2018-12-12 17:16:10 +00:00
- if it is, use it.
- if it's not, pretend it isn't there.
2019-05-15 16:26:39 +00:00
If there's no server on our gateway interface (or we're pretending
2018-12-12 17:16:10 +00:00
none exists), try to connect to the top.
*/
}