Factor out server.js module

This commit is contained in:
Tony Garnock-Jones 2019-05-23 10:33:59 +01:00
parent 7a9eed8f0d
commit 8fdf6b7032
4 changed files with 135 additions and 123 deletions

View File

@ -7,6 +7,8 @@ const UI = require("@syndicate-lang/driver-browser-ui");
const Http = activate require("@syndicate-lang/driver-http-node");
const S = activate require("@syndicate-lang/driver-streams-node");
const M = activate require("@syndicate-lang/driver-mdns");
const P = activate require("./internal_protocol");
const Server = activate require("./server");
import {
Set, Bytes,
@ -25,26 +27,6 @@ const gatewayId = dataspaceId + ':' + localId;
const fs = require('fs');
assertion type Connection(connId);
message type Request(connId, body);
message type Response(connId, body);
message type Disconnect(connId);
// Internal isolation
assertion type Proposal(scope, body);
assertion type Envelope(scope, body);
// Monitoring
assertion type ConnectionScope(connId, scope);
const {
Connect, Peer,
Assert, Clear, Message,
Add, Del, Msg, Err,
makeDecoder,
} = activate require("./protocol");
spawn named 'serverLogger' {
on asserted Http.Request(_, server, $method, $path, $query, $req) {
console.log(method, path.toJS(), query.toJS());
@ -85,8 +67,8 @@ spawn named 'rootServer' {
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
}
during ConnectionScope($connId, $scope) assert Envelope('monitor', ConnectionScope(connId, scope));
on message Envelope('monitor', Disconnect($connId)) send Disconnect(connId);
during P.POAScope($connId, $scope) assert P.Envelope('monitor', P.POAScope(connId, scope));
on message P.Envelope('monitor', P.Disconnect($connId)) send P.Disconnect(connId);
}
spawn named 'websocketListener' {
@ -96,112 +78,20 @@ spawn named 'websocketListener' {
null, HTTP_PORT, ["tier=0", "path=/monitor"]);
during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', reqId] {
assert Connection(reqId);
on message Http.DataIn(reqId, $data) {
if (data instanceof Bytes) {
send Request(reqId, makeDecoder(data).next());
}
}
on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
stop on message Disconnect(reqId);
Server.websocketServerFacet(reqId);
}
}
spawn named 'tcpListener' {
assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, TCP_PORT, ["tier=0"]);
on asserted S.IncomingConnection($id, S.TcpListener(TCP_PORT)) {
spawnStreamConnection('tcpServer', id);
Server.streamServerActor(id, 'tcpServer');
}
}
spawn named 'unixListener' {
on asserted S.IncomingConnection($id, S.UnixSocketServer("./sock")) {
spawnStreamConnection('unixServer', id);
}
}
function spawnStreamConnection(debugLabel, id) {
spawn named [debugLabel, id] {
stop on retracted S.Duplex(id);
assert Connection(id);
const decoder = makeDecoder(null);
on message S.Data(id, $data) {
decoder.write(data);
let v;
while ((v = decoder.try_next())) {
send Request(id, v);
}
}
on message Response(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null);
stop on message Disconnect(id);
}
}
spawn named 'connectionHandler' {
during Proposal($scope, $assertion) assert Envelope(scope, assertion);
on message Proposal($scope, $assertion) send Envelope(scope, assertion);
during Connection($connId) spawn named Connection(connId) {
on start console.log(connId.toString(), 'connected');
on stop console.log(connId.toString(), 'disconnected');
field this.scope = null;
assert ConnectionScope(connId, this.scope) when (this.scope !== null);
let endpoints = Set();
on message Request(connId, Connect($scope)) {
// TODO: Enforce requirement that Connect appear exactly once, before anything else
this.scope = scope;
}
on message Request(connId, Assert($ep, $a)) {
if (!endpoints.includes(ep)) {
endpoints = endpoints.add(ep);
react {
on stop { endpoints = endpoints.remove(ep); }
field this.assertion = a;
assert Proposal(this.scope, this.assertion);
currentFacet().addEndpoint(() => {
if (Observe.isClassOf(this.assertion)) {
const spec = Envelope(this.scope, this.assertion.get(0));
const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => {
console.log('EVENT', currentFacet().toString(), connId.toString(), ep, evt, vs);
switch (evt) {
case Skeleton.EVENT_ADDED:
send Response(connId, Add(ep, vs));
break;
case Skeleton.EVENT_REMOVED:
send Response(connId, Del(ep, vs));
break;
case Skeleton.EVENT_MESSAGE:
send Response(connId, Msg(ep, vs));
break;
}
});
});
return [Observe(spec), analysis];
} else {
return [void 0, null];
}
}, true);
on message Request(connId, Assert(ep, $newAssertion)) this.assertion = newAssertion;
stop on message Request(connId, Clear(ep));
}
}
}
on message Request(connId, Message($body)) {
send Proposal(this.scope, body);
}
on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString());
on message Response(connId, $resp) console.log('OUT:', connId.toString(), resp.toString());
Server.streamServerActor(id, 'unixServer');
}
}

View File

@ -0,0 +1,24 @@
"use strict";
assertion type ServerActive(scope) = Symbol.for('server-active');
assertion type POA(connId) = Symbol.for('server-poa');
message type FromPOA(connId, body) = Symbol.for('message-poa->server');
message type ToPOA(connId, body) = Symbol.for('message-server->poa');
message type Disconnect(connId) = Symbol.for('disconnect-poa');
// Internal isolation
assertion type Proposal(scope, body) = Symbol.for('server-proposal');
assertion type Envelope(scope, body) = Symbol.for('server-envelope');
// Monitoring
assertion type POAScope(connId, scope) = Symbol.for('server-poa-scope');
Object.assign(module.exports, {
ServerActive,
POA, FromPOA, ToPOA,
Disconnect,
Proposal, Envelope,
POAScope,
});

View File

@ -5,9 +5,7 @@ const UI = activate require("@syndicate-lang/driver-browser-ui");
// @jsxFrag UI.htmlFragment
const { WSServer, ToServer, FromServer, ServerConnected } = activate require("./client");
assertion type ConnectionScope(connId, scope);
message type Disconnect(connId);
const P = activate require("./internal_protocol");
spawn {
const ui = new UI.Anchor();
@ -26,19 +24,19 @@ spawn {
const addr = WSServer(url, "monitor");
during ServerConnected(addr) {
during FromServer(addr, ConnectionScope(_, $scope)) {
during FromServer(addr, P.POAScope(_, $scope)) {
const ui = new UI.Anchor();
assert ui.html('#scopes',
<div class={`scope_${scope}`}>
<p>Scope: <tt>{scope}</tt></p>
<ul></ul>
</div>);
during FromServer(addr, ConnectionScope($id, scope)) {
during FromServer(addr, P.POAScope($id, scope)) {
const ui = new UI.Anchor();
assert ui.html(`#scopes div.scope_${scope} ul`,
<li>{id.toString()} <button class="disconnect">Disconnect</button></li>);
on message UI.UIEvent(ui.fragmentId, 'button.disconnect', 'click', _) {
send ToServer(addr, Disconnect(id));
send ToServer(addr, P.Disconnect(id));
}
}
}

View File

@ -0,0 +1,100 @@
"use strict";
const Http = activate require("@syndicate-lang/driver-http-node");
const S = activate require("@syndicate-lang/driver-streams-node");
import {
Set, Bytes,
Encoder, Observe,
Dataspace, Skeleton, currentFacet, genUuid, RandomID
} from "@syndicate-lang/core";
const P = activate require("./internal_protocol");
const W = activate require("./protocol");
export function websocketServerFacet(reqId) {
assert P.POA(reqId);
on message Http.DataIn(reqId, $data) {
if (data instanceof Bytes) send P.FromPOA(reqId, W.makeDecoder(data).next());
}
on message P.ToPOA(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
stop on message P.Disconnect(reqId);
stop on retracted P.POAScope(reqId, _);
}
export function streamServerFacet(id) {
assert P.POA(id);
const decoder = W.makeDecoder(null);
on message S.Data(id, $data) {
decoder.write(data);
let v;
while ((v = decoder.try_next())) send P.FromPOA(id, v);
}
on message P.ToPOA(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null);
stop on message P.Disconnect(id);
stop on retracted P.POAScope(id, _);
}
export function streamServerActor(id, debugLabel) {
spawn named [debugLabel || 'stream-poa', id] {
stop on retracted S.Duplex(id);
streamServerFacet(id);
}
}
spawn named '@syndicate-lang/server/server/POAHandler' {
during P.Proposal($scope, $assertion) assert P.Envelope(scope, assertion);
on message P.Proposal($scope, $assertion) send P.Envelope(scope, assertion);
during Observe(P.Envelope($scope, $spec)) assert P.Proposal(scope, Observe(spec));
during P.POA($connId) spawn named P.POA(connId) {
field this.scope = null;
assert P.POAScope(connId, this.scope) when (this.scope !== null);
assert P.ServerActive(this.scope) when (this.scope !== null);
let endpoints = Set();
on message P.FromPOA(connId, W.Connect($scope)) {
// TODO: Enforce requirement that Connect appear exactly once, before anything else
this.scope = scope;
}
on message P.FromPOA(connId, W.Assert($ep, $a)) {
if (!endpoints.includes(ep)) {
endpoints = endpoints.add(ep);
react {
on stop { endpoints = endpoints.remove(ep); }
field this.assertion = a;
assert P.Proposal(this.scope, this.assertion);
currentFacet().addEndpoint(() => {
if (Observe.isClassOf(this.assertion)) {
const spec = P.Envelope(this.scope, this.assertion.get(0));
const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => {
switch (evt) {
case Skeleton.EVENT_ADDED: send P.ToPOA(connId, W.Add(ep, vs)); break;
case Skeleton.EVENT_REMOVED: send P.ToPOA(connId, W.Del(ep, vs)); break;
case Skeleton.EVENT_MESSAGE: send P.ToPOA(connId, W.Msg(ep, vs)); break;
}
});
});
return [Observe(spec), analysis];
} else {
return [void 0, null];
}
}, true);
on message P.FromPOA(connId, W.Assert(ep, $newAssertion)) this.assertion = newAssertion;
stop on message P.FromPOA(connId, W.Clear(ep));
}
}
}
on message P.FromPOA(connId, W.Message($body)) {
send P.Proposal(this.scope, body);
}
}
}