From b682a3fc3fad3de93770365682f3f3e8c8148dee Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 12 May 2019 23:26:01 +0100 Subject: [PATCH] Updated client-server protocol --- packages/broker/chat.html | 2 +- packages/broker/src/chat.js | 19 +++++------ packages/broker/src/client.js | 51 ++++++++++++++++------------- packages/broker/src/index.js | 57 ++++++++++++++++++++++----------- packages/broker/src/monitor.js | 16 ++++----- packages/broker/src/protocol.js | 10 +++--- 6 files changed, 92 insertions(+), 63 deletions(-) diff --git a/packages/broker/chat.html b/packages/broker/chat.html index 45cde8d..a99596d 100644 --- a/packages/broker/chat.html +++ b/packages/broker/chat.html @@ -12,7 +12,7 @@
- + diff --git a/packages/broker/src/chat.js b/packages/broker/src/chat.js index 438f90b..7f2765e 100644 --- a/packages/broker/src/chat.js +++ b/packages/broker/src/chat.js @@ -4,7 +4,7 @@ const UI = activate require("@syndicate-lang/driver-browser-ui"); // @jsx UI.html // @jsxFrag UI.htmlFragment -const { ToBroker, FromBroker, BrokerConnected } = activate require("./client"); +const { WSBroker, ToBroker, FromBroker, BrokerConnected } = activate require("./client"); assertion type Present(name); assertion type Says(who, what); @@ -29,30 +29,31 @@ spawn { const ui = new UI.Anchor(); during UI.UIChangeableProperty('#wsurl', 'value', $url) { - during BrokerConnected(url) { - on start outputItem(connected to {url}, + const addr = WSBroker(url, "broker"); + during BrokerConnected(addr) { + on start outputItem(connected to {addr}, 'state_connected'); - on stop outputItem(disconnected from {url}, + on stop outputItem(disconnected from {addr}, 'state_disconnected'); - assert ToBroker(url, Present(this.nym)); - during FromBroker(url, Present($who)) { + assert ToBroker(addr, Present(this.nym)); + during FromBroker(addr, Present($who)) { assert ui.context(who).html('#nymlist',
  • {who}
  • ); } on message UI.GlobalEvent('#send_chat', 'click', _) { - if (this.next_chat) send ToBroker(url, Says(this.nym, this.next_chat)); + if (this.next_chat) send ToBroker(addr, Says(this.nym, this.next_chat)); send UI.SetProperty('#chat_input', 'value', ''); } - on message FromBroker(url, Says($who, $what)) { + on message FromBroker(addr, Says($who, $what)) { outputItem( {who}{what} ); } // on message Syndicate.WakeDetector.wakeEvent() { - // :: forceBrokerDisconnect(url); + // :: forceBrokerDisconnect(addr); // } } } diff --git a/packages/broker/src/client.js b/packages/broker/src/client.js index 7aa468b..808de40 100644 --- a/packages/broker/src/client.js +++ b/packages/broker/src/client.js @@ -9,68 +9,75 @@ import { const WS = activate require("@syndicate-lang/driver-websocket"); const { + Connect, Peer, Assert, Clear, Message, - Add, Del, Msg, + Add, Del, Msg, Err, Ping, Pong, makeDecoder, } = activate require("./protocol"); -assertion type ToBroker(url, assertion); -assertion type FromBroker(url, assertion); -assertion type BrokerConnection(url); -assertion type BrokerConnected(url); -message type ForceBrokerDisconnect(url); +assertion type WSBroker(url, scope); -message type _BrokerPacket(url, packet); +assertion type ToBroker(addr, assertion); +assertion type FromBroker(addr, assertion); +assertion type BrokerConnection(addr); +assertion type BrokerConnected(addr); +message type ForceBrokerDisconnect(addr); + +message type _BrokerPacket(addr, packet); Object.assign(module.exports, { + WSBroker, ToBroker, FromBroker, BrokerConnection, BrokerConnected, ForceBrokerDisconnect, }); spawn named "BrokerClientFactory" { - during ToBroker($url, _) assert BrokerConnection(url); - during Observe(FromBroker($url, _)) assert BrokerConnection(url); - during Observe(BrokerConnected($url)) assert BrokerConnection(url); + during ToBroker($addr, _) assert BrokerConnection(addr); + during Observe(FromBroker($addr, _)) assert BrokerConnection(addr); + during Observe(BrokerConnected($addr)) assert BrokerConnection(addr); - during BrokerConnection($url) spawn named ['Broker', url] { + during BrokerConnection($addr(WSBroker($url, $scope))) spawn named ['Broker', addr] { const wsId = genUuid('broker'); during WS.WebSocket(wsId, url, {}) { - assert BrokerConnected(url); + assert BrokerConnected(addr); function w(x) { send WS.DataOut(wsId, new Encoder().push(x).contents()); } + + on start w(Connect(scope)); + on message WS.DataIn(wsId, $data) { if (data instanceof Bytes) { - send _BrokerPacket(url, makeDecoder(data).next()); + send _BrokerPacket(addr, makeDecoder(data).next()); } } - during ToBroker(url, $a) { + during ToBroker(addr, $a) { const ep = genUuid('pub'); on start w(Assert(ep, a)); on stop w(Clear(ep)); } - on message ToBroker(url, $a) w(Message(a)); + on message ToBroker(addr, $a) w(Message(a)); - on message _BrokerPacket(url, Ping()) w(Pong()); + on message _BrokerPacket(addr, Ping()) w(Pong()); - during Observe(FromBroker(url, $spec)) { + during Observe(FromBroker(addr, $spec)) { const ep = genUuid('sub'); on start w(Assert(ep, Observe(spec))); on stop w(Clear(ep)); - on message _BrokerPacket(url, Add(ep, $vs)) { + on message _BrokerPacket(addr, Add(ep, $vs)) { react { - assert Skeleton.instantiateAssertion(FromBroker(url, spec), vs); - stop on message _BrokerPacket(url, Del(ep, vs)); + assert Skeleton.instantiateAssertion(FromBroker(addr, spec), vs); + stop on message _BrokerPacket(addr, Del(ep, vs)); } } - on message _BrokerPacket(url, Msg(ep, $vs)) { - send Skeleton.instantiateAssertion(FromBroker(url, spec), vs); + on message _BrokerPacket(addr, Msg(ep, $vs)) { + send Skeleton.instantiateAssertion(FromBroker(addr, spec), vs); } } } diff --git a/packages/broker/src/index.js b/packages/broker/src/index.js index 10ad75e..b94a833 100644 --- a/packages/broker/src/index.js +++ b/packages/broker/src/index.js @@ -25,7 +25,6 @@ const gatewayId = dataspaceId + ':' + localId; const fs = require('fs'); -assertion type ConnectionName(scope, id); assertion type Connection(connId); message type Request(connId, body); message type Response(connId, body); @@ -35,9 +34,13 @@ message type Disconnect(connId); // Internal isolation assertion type Envelope(scope, body); +// Monitoring +assertion type ConnectionScope(connId, scope); + const { + Connect, Peer, Assert, Clear, Message, - Add, Del, Msg, + Add, Del, Msg, Err, makeDecoder, } = activate require("./protocol"); @@ -66,13 +69,23 @@ spawn named 'rootServer' { )); } + during Http.Request($reqId, server, 'get', ['chat.html'], _, _) { + const contents = fs.readFileSync(__dirname + '/../chat.html'); + assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); + } + + during Http.Request($reqId, server, 'get', ['style.css'], _, _) { + const contents = fs.readFileSync(__dirname + '/../style.css'); + assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); + } + during Http.Request($reqId, server, 'get', ['dist', $file], _, _) { const contents = fs.readFileSync(__dirname + '/../dist/' + file); assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); } - during Connection($name) assert Envelope('monitor', Connection(name)); - on message Envelope('monitor', Disconnect($name)) send Disconnect(name); + during ConnectionScope($connId, $scope) assert Envelope('monitor', ConnectionScope(connId, scope)); + on message Envelope('monitor', Disconnect($connId)) send Disconnect(connId); } spawn named 'websocketListener' { @@ -81,16 +94,15 @@ spawn named 'websocketListener' { assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'), null, HTTP_PORT, ["tier=0", "path=/monitor"]); - during Http.WebSocket($reqId, server, [$scope], _) spawn named ['wsConnection', scope, reqId] { - const name = ConnectionName(scope, reqId); - assert Connection(name); + during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', reqId] { + assert Connection(reqId); on message Http.DataIn(reqId, $data) { if (data instanceof Bytes) { - send Request(name, makeDecoder(data).next()); + send Request(reqId, makeDecoder(data).next()); } } - on message Response(name, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); - stop on message Disconnect(name); + on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); + stop on message Disconnect(reqId); } } @@ -110,28 +122,35 @@ spawn named 'unixListener' { function spawnStreamConnection(debugLabel, id) { spawn named [debugLabel, id] { stop on retracted S.Duplex(id); - const name = ConnectionName('broker', id); - assert Connection(name); + assert Connection(id); const decoder = makeDecoder(null); on message S.Data(id, $data) { decoder.write(data); let v; while ((v = decoder.try_next())) { - send Request(name, v); + send Request(id, v); } } - on message Response(name, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); - stop on message Disconnect(name); + on message Response(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); + stop on message Disconnect(id); } } spawn named 'connectionHandler' { - during Connection($connId(ConnectionName($scope,_))) spawn named Connection(connId) { + during Connection($connId) spawn named Connection(connId) { on start console.log(connId.toString(), 'connected'); on stop console.log(connId.toString(), 'disconnected'); + field this.scope = null; + assert ConnectionScope(connId, this.scope) when (this.scope !== null); + let endpoints = Set(); + on message Request(connId, Connect($scope)) { + // TODO: Enforce requirement that Connect appear exactly once, before anything else + this.scope = scope; + } + on message Request(connId, Assert($ep, $a)) { if (!endpoints.includes(ep)) { endpoints = endpoints.add(ep); @@ -142,7 +161,7 @@ spawn named 'connectionHandler' { currentFacet().addEndpoint(() => { if (Observe.isClassOf(this.assertion)) { - const spec = Envelope(scope, this.assertion.get(0)); + const spec = Envelope(this.scope, this.assertion.get(0)); const analysis = Skeleton.analyzeAssertion(spec); analysis.callback = Dataspace.wrap((evt, vs) => { currentFacet().actor.scheduleScript(() => { @@ -162,7 +181,7 @@ spawn named 'connectionHandler' { }); return [Observe(spec), analysis]; } else { - return [Envelope(scope, this.assertion), null]; + return [Envelope(this.scope, this.assertion), null]; } }, true); @@ -173,7 +192,7 @@ spawn named 'connectionHandler' { } on message Request(connId, Message($body)) { - send Envelope(scope, body); + send Envelope(this.scope, body); } on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString()); diff --git a/packages/broker/src/monitor.js b/packages/broker/src/monitor.js index f570b3a..6e0b6fc 100644 --- a/packages/broker/src/monitor.js +++ b/packages/broker/src/monitor.js @@ -4,10 +4,9 @@ const UI = activate require("@syndicate-lang/driver-browser-ui"); // @jsx UI.html // @jsxFrag UI.htmlFragment -const { ToBroker, FromBroker, BrokerConnected } = activate require("./client"); +const { WSBroker, ToBroker, FromBroker, BrokerConnected } = activate require("./client"); -assertion type ConnectionName(scope, id); -assertion type Connection(connId); +assertion type ConnectionScope(connId, scope); message type Disconnect(connId); spawn { @@ -21,24 +20,25 @@ spawn { const url = (function () { const u = new URL(document.location); u.protocol = u.protocol.replace(/^http/, 'ws'); - u.pathname = '/monitor'; + u.pathname = '/'; return u.toString(); })(); + const addr = WSBroker(url, "monitor"); - during BrokerConnected(url) { - during FromBroker(url, Connection(ConnectionName($scope, _))) { + during BrokerConnected(addr) { + during FromBroker(addr, ConnectionScope(_, $scope)) { const ui = new UI.Anchor(); assert ui.html('#scopes',

    Scope: {scope}

      ); - during FromBroker(url, Connection(ConnectionName(scope, $id))) { + during FromBroker(addr, ConnectionScope($id, scope)) { const ui = new UI.Anchor(); assert ui.html(`#scopes div.scope_${scope} ul`,
    • {id.toString()}
    • ); on message UI.UIEvent(ui.fragmentId, 'button.disconnect', 'click', _) { - send ToBroker(url, Disconnect(ConnectionName(scope, id))); + send ToBroker(addr, Disconnect(id)); } } } diff --git a/packages/broker/src/protocol.js b/packages/broker/src/protocol.js index f8a7807..ea9fcf7 100644 --- a/packages/broker/src/protocol.js +++ b/packages/broker/src/protocol.js @@ -2,17 +2,18 @@ import { Decoder, Discard, Capture, Observe } from "@syndicate-lang/core"; -// Client ---> Broker +message type Connect(scope); +message type Peer(scope); + message type Assert(endpointName, assertion); message type Clear(endpointName); message type Message(body); -// Client <--- Broker message type Add(endpointName, captures); message type Del(endpointName, captures); message type Msg(endpointName, captures); +message type Err(detail); -// Bidirectional message type Ping(); message type Pong(); @@ -27,8 +28,9 @@ function makeDecoder(initialBuffer) { } Object.assign(module.exports, { + Connect, Peer, Assert, Clear, Message, - Add, Del, Msg, + Add, Del, Msg, Err, Ping, Pong, makeDecoder, });