diff --git a/packages/server/src/index.js b/packages/server/src/index.js index 606f763..e4e95f0 100644 --- a/packages/server/src/index.js +++ b/packages/server/src/index.js @@ -7,6 +7,8 @@ const UI = require("@syndicate-lang/driver-browser-ui"); const Http = activate require("@syndicate-lang/driver-http-node"); const S = activate require("@syndicate-lang/driver-streams-node"); const M = activate require("@syndicate-lang/driver-mdns"); +const P = activate require("./internal_protocol"); +const Server = activate require("./server"); import { Set, Bytes, @@ -25,26 +27,6 @@ const gatewayId = dataspaceId + ':' + localId; const fs = require('fs'); -assertion type Connection(connId); -message type Request(connId, body); -message type Response(connId, body); - -message type Disconnect(connId); - -// Internal isolation -assertion type Proposal(scope, body); -assertion type Envelope(scope, body); - -// Monitoring -assertion type ConnectionScope(connId, scope); - -const { - Connect, Peer, - Assert, Clear, Message, - Add, Del, Msg, Err, - makeDecoder, -} = activate require("./protocol"); - spawn named 'serverLogger' { on asserted Http.Request(_, server, $method, $path, $query, $req) { console.log(method, path.toJS(), query.toJS()); @@ -85,8 +67,8 @@ spawn named 'rootServer' { assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); } - during ConnectionScope($connId, $scope) assert Envelope('monitor', ConnectionScope(connId, scope)); - on message Envelope('monitor', Disconnect($connId)) send Disconnect(connId); + during P.POAScope($connId, $scope) assert P.Envelope('monitor', P.POAScope(connId, scope)); + on message P.Envelope('monitor', P.Disconnect($connId)) send P.Disconnect(connId); } spawn named 'websocketListener' { @@ -96,112 +78,20 @@ spawn named 'websocketListener' { null, HTTP_PORT, ["tier=0", "path=/monitor"]); during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', reqId] { - assert Connection(reqId); - on message Http.DataIn(reqId, $data) { - if (data instanceof Bytes) { - send Request(reqId, makeDecoder(data).next()); - } - } - on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); - stop on message Disconnect(reqId); + Server.websocketServerFacet(reqId); } } spawn named 'tcpListener' { assert M.Publish(M.Service(gatewayId, '_syndicate._tcp'), null, TCP_PORT, ["tier=0"]); on asserted S.IncomingConnection($id, S.TcpListener(TCP_PORT)) { - spawnStreamConnection('tcpServer', id); + Server.streamServerActor(id, 'tcpServer'); } } spawn named 'unixListener' { on asserted S.IncomingConnection($id, S.UnixSocketServer("./sock")) { - spawnStreamConnection('unixServer', id); - } -} - -function spawnStreamConnection(debugLabel, id) { - spawn named [debugLabel, id] { - stop on retracted S.Duplex(id); - assert Connection(id); - const decoder = makeDecoder(null); - on message S.Data(id, $data) { - decoder.write(data); - let v; - while ((v = decoder.try_next())) { - send Request(id, v); - } - } - on message Response(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); - stop on message Disconnect(id); - } -} - -spawn named 'connectionHandler' { - during Proposal($scope, $assertion) assert Envelope(scope, assertion); - on message Proposal($scope, $assertion) send Envelope(scope, assertion); - - during Connection($connId) spawn named Connection(connId) { - on start console.log(connId.toString(), 'connected'); - on stop console.log(connId.toString(), 'disconnected'); - - field this.scope = null; - assert ConnectionScope(connId, this.scope) when (this.scope !== null); - - let endpoints = Set(); - - on message Request(connId, Connect($scope)) { - // TODO: Enforce requirement that Connect appear exactly once, before anything else - this.scope = scope; - } - - on message Request(connId, Assert($ep, $a)) { - if (!endpoints.includes(ep)) { - endpoints = endpoints.add(ep); - react { - on stop { endpoints = endpoints.remove(ep); } - - field this.assertion = a; - assert Proposal(this.scope, this.assertion); - - currentFacet().addEndpoint(() => { - if (Observe.isClassOf(this.assertion)) { - const spec = Envelope(this.scope, this.assertion.get(0)); - const analysis = Skeleton.analyzeAssertion(spec); - analysis.callback = Dataspace.wrap((evt, vs) => { - currentFacet().actor.scheduleScript(() => { - console.log('EVENT', currentFacet().toString(), connId.toString(), ep, evt, vs); - switch (evt) { - case Skeleton.EVENT_ADDED: - send Response(connId, Add(ep, vs)); - break; - case Skeleton.EVENT_REMOVED: - send Response(connId, Del(ep, vs)); - break; - case Skeleton.EVENT_MESSAGE: - send Response(connId, Msg(ep, vs)); - break; - } - }); - }); - return [Observe(spec), analysis]; - } else { - return [void 0, null]; - } - }, true); - - on message Request(connId, Assert(ep, $newAssertion)) this.assertion = newAssertion; - stop on message Request(connId, Clear(ep)); - } - } - } - - on message Request(connId, Message($body)) { - send Proposal(this.scope, body); - } - - on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString()); - on message Response(connId, $resp) console.log('OUT:', connId.toString(), resp.toString()); + Server.streamServerActor(id, 'unixServer'); } } diff --git a/packages/server/src/internal_protocol.js b/packages/server/src/internal_protocol.js new file mode 100644 index 0000000..e3cc744 --- /dev/null +++ b/packages/server/src/internal_protocol.js @@ -0,0 +1,24 @@ +"use strict"; + +assertion type ServerActive(scope) = Symbol.for('server-active'); + +assertion type POA(connId) = Symbol.for('server-poa'); +message type FromPOA(connId, body) = Symbol.for('message-poa->server'); +message type ToPOA(connId, body) = Symbol.for('message-server->poa'); + +message type Disconnect(connId) = Symbol.for('disconnect-poa'); + +// Internal isolation +assertion type Proposal(scope, body) = Symbol.for('server-proposal'); +assertion type Envelope(scope, body) = Symbol.for('server-envelope'); + +// Monitoring +assertion type POAScope(connId, scope) = Symbol.for('server-poa-scope'); + +Object.assign(module.exports, { + ServerActive, + POA, FromPOA, ToPOA, + Disconnect, + Proposal, Envelope, + POAScope, +}); diff --git a/packages/server/src/monitor.js b/packages/server/src/monitor.js index cc3f813..05ddd96 100644 --- a/packages/server/src/monitor.js +++ b/packages/server/src/monitor.js @@ -5,9 +5,7 @@ const UI = activate require("@syndicate-lang/driver-browser-ui"); // @jsxFrag UI.htmlFragment const { WSServer, ToServer, FromServer, ServerConnected } = activate require("./client"); - -assertion type ConnectionScope(connId, scope); -message type Disconnect(connId); +const P = activate require("./internal_protocol"); spawn { const ui = new UI.Anchor(); @@ -26,19 +24,19 @@ spawn { const addr = WSServer(url, "monitor"); during ServerConnected(addr) { - during FromServer(addr, ConnectionScope(_, $scope)) { + during FromServer(addr, P.POAScope(_, $scope)) { const ui = new UI.Anchor(); assert ui.html('#scopes',

Scope: {scope}

); - during FromServer(addr, ConnectionScope($id, scope)) { + during FromServer(addr, P.POAScope($id, scope)) { const ui = new UI.Anchor(); assert ui.html(`#scopes div.scope_${scope} ul`,
  • {id.toString()}
  • ); on message UI.UIEvent(ui.fragmentId, 'button.disconnect', 'click', _) { - send ToServer(addr, Disconnect(id)); + send ToServer(addr, P.Disconnect(id)); } } } diff --git a/packages/server/src/server.js b/packages/server/src/server.js new file mode 100644 index 0000000..ec0c29b --- /dev/null +++ b/packages/server/src/server.js @@ -0,0 +1,100 @@ +"use strict"; + +const Http = activate require("@syndicate-lang/driver-http-node"); +const S = activate require("@syndicate-lang/driver-streams-node"); + +import { + Set, Bytes, + Encoder, Observe, + Dataspace, Skeleton, currentFacet, genUuid, RandomID +} from "@syndicate-lang/core"; + +const P = activate require("./internal_protocol"); +const W = activate require("./protocol"); + +export function websocketServerFacet(reqId) { + assert P.POA(reqId); + on message Http.DataIn(reqId, $data) { + if (data instanceof Bytes) send P.FromPOA(reqId, W.makeDecoder(data).next()); + } + on message P.ToPOA(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); + stop on message P.Disconnect(reqId); + stop on retracted P.POAScope(reqId, _); +} + +export function streamServerFacet(id) { + assert P.POA(id); + const decoder = W.makeDecoder(null); + on message S.Data(id, $data) { + decoder.write(data); + let v; + while ((v = decoder.try_next())) send P.FromPOA(id, v); + } + on message P.ToPOA(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); + stop on message P.Disconnect(id); + stop on retracted P.POAScope(id, _); +} + +export function streamServerActor(id, debugLabel) { + spawn named [debugLabel || 'stream-poa', id] { + stop on retracted S.Duplex(id); + streamServerFacet(id); + } +} + +spawn named '@syndicate-lang/server/server/POAHandler' { + during P.Proposal($scope, $assertion) assert P.Envelope(scope, assertion); + on message P.Proposal($scope, $assertion) send P.Envelope(scope, assertion); + during Observe(P.Envelope($scope, $spec)) assert P.Proposal(scope, Observe(spec)); + + during P.POA($connId) spawn named P.POA(connId) { + field this.scope = null; + assert P.POAScope(connId, this.scope) when (this.scope !== null); + assert P.ServerActive(this.scope) when (this.scope !== null); + + let endpoints = Set(); + + on message P.FromPOA(connId, W.Connect($scope)) { + // TODO: Enforce requirement that Connect appear exactly once, before anything else + this.scope = scope; + } + + on message P.FromPOA(connId, W.Assert($ep, $a)) { + if (!endpoints.includes(ep)) { + endpoints = endpoints.add(ep); + react { + on stop { endpoints = endpoints.remove(ep); } + + field this.assertion = a; + assert P.Proposal(this.scope, this.assertion); + + currentFacet().addEndpoint(() => { + if (Observe.isClassOf(this.assertion)) { + const spec = P.Envelope(this.scope, this.assertion.get(0)); + const analysis = Skeleton.analyzeAssertion(spec); + analysis.callback = Dataspace.wrap((evt, vs) => { + currentFacet().actor.scheduleScript(() => { + switch (evt) { + case Skeleton.EVENT_ADDED: send P.ToPOA(connId, W.Add(ep, vs)); break; + case Skeleton.EVENT_REMOVED: send P.ToPOA(connId, W.Del(ep, vs)); break; + case Skeleton.EVENT_MESSAGE: send P.ToPOA(connId, W.Msg(ep, vs)); break; + } + }); + }); + return [Observe(spec), analysis]; + } else { + return [void 0, null]; + } + }, true); + + on message P.FromPOA(connId, W.Assert(ep, $newAssertion)) this.assertion = newAssertion; + stop on message P.FromPOA(connId, W.Clear(ep)); + } + } + } + + on message P.FromPOA(connId, W.Message($body)) { + send P.Proposal(this.scope, body); + } + } +}