Updated client-server protocol

This commit is contained in:
Tony Garnock-Jones 2019-05-12 23:26:01 +01:00
parent 69968d6dea
commit b682a3fc3f
6 changed files with 92 additions and 63 deletions

View File

@ -12,7 +12,7 @@
<form name="nym_form"> <form name="nym_form">
<fieldset> <fieldset>
<label class="control-label" for="wsurl">Server:</label> <label class="control-label" for="wsurl">Server:</label>
<input type="text" id="wsurl" name="wsurl" value="ws://localhost:8000/broker"> <input type="text" id="wsurl" name="wsurl" value="ws://localhost:8000/">
<label class="control-label" for="nym">Nym:</label> <label class="control-label" for="nym">Nym:</label>
<input type="text" id="nym" name="nym" value=""> <input type="text" id="nym" name="nym" value="">

View File

@ -4,7 +4,7 @@ const UI = activate require("@syndicate-lang/driver-browser-ui");
// @jsx UI.html // @jsx UI.html
// @jsxFrag UI.htmlFragment // @jsxFrag UI.htmlFragment
const { ToBroker, FromBroker, BrokerConnected } = activate require("./client"); const { WSBroker, ToBroker, FromBroker, BrokerConnected } = activate require("./client");
assertion type Present(name); assertion type Present(name);
assertion type Says(who, what); assertion type Says(who, what);
@ -29,30 +29,31 @@ spawn {
const ui = new UI.Anchor(); const ui = new UI.Anchor();
during UI.UIChangeableProperty('#wsurl', 'value', $url) { during UI.UIChangeableProperty('#wsurl', 'value', $url) {
during BrokerConnected(url) { const addr = WSBroker(url, "broker");
on start outputItem(<span class="connected">connected to {url}</span>, during BrokerConnected(addr) {
on start outputItem(<span class="connected">connected to {addr}</span>,
'state_connected'); 'state_connected');
on stop outputItem(<span class="disconnected">disconnected from {url}</span>, on stop outputItem(<span class="disconnected">disconnected from {addr}</span>,
'state_disconnected'); 'state_disconnected');
assert ToBroker(url, Present(this.nym)); assert ToBroker(addr, Present(this.nym));
during FromBroker(url, Present($who)) { during FromBroker(addr, Present($who)) {
assert ui.context(who).html('#nymlist', <li><span class="nym">{who}</span></li>); assert ui.context(who).html('#nymlist', <li><span class="nym">{who}</span></li>);
} }
on message UI.GlobalEvent('#send_chat', 'click', _) { on message UI.GlobalEvent('#send_chat', 'click', _) {
if (this.next_chat) send ToBroker(url, Says(this.nym, this.next_chat)); if (this.next_chat) send ToBroker(addr, Says(this.nym, this.next_chat));
send UI.SetProperty('#chat_input', 'value', ''); send UI.SetProperty('#chat_input', 'value', '');
} }
on message FromBroker(url, Says($who, $what)) { on message FromBroker(addr, Says($who, $what)) {
outputItem(<span class="utterance"> outputItem(<span class="utterance">
<span class="nym">{who}</span><span class="utterance">{what}</span> <span class="nym">{who}</span><span class="utterance">{what}</span>
</span>); </span>);
} }
// on message Syndicate.WakeDetector.wakeEvent() { // on message Syndicate.WakeDetector.wakeEvent() {
// :: forceBrokerDisconnect(url); // :: forceBrokerDisconnect(addr);
// } // }
} }
} }

View File

@ -9,68 +9,75 @@ import {
const WS = activate require("@syndicate-lang/driver-websocket"); const WS = activate require("@syndicate-lang/driver-websocket");
const { const {
Connect, Peer,
Assert, Clear, Message, Assert, Clear, Message,
Add, Del, Msg, Add, Del, Msg, Err,
Ping, Pong, Ping, Pong,
makeDecoder, makeDecoder,
} = activate require("./protocol"); } = activate require("./protocol");
assertion type ToBroker(url, assertion); assertion type WSBroker(url, scope);
assertion type FromBroker(url, assertion);
assertion type BrokerConnection(url);
assertion type BrokerConnected(url);
message type ForceBrokerDisconnect(url);
message type _BrokerPacket(url, packet); assertion type ToBroker(addr, assertion);
assertion type FromBroker(addr, assertion);
assertion type BrokerConnection(addr);
assertion type BrokerConnected(addr);
message type ForceBrokerDisconnect(addr);
message type _BrokerPacket(addr, packet);
Object.assign(module.exports, { Object.assign(module.exports, {
WSBroker,
ToBroker, FromBroker, ToBroker, FromBroker,
BrokerConnection, BrokerConnected, BrokerConnection, BrokerConnected,
ForceBrokerDisconnect, ForceBrokerDisconnect,
}); });
spawn named "BrokerClientFactory" { spawn named "BrokerClientFactory" {
during ToBroker($url, _) assert BrokerConnection(url); during ToBroker($addr, _) assert BrokerConnection(addr);
during Observe(FromBroker($url, _)) assert BrokerConnection(url); during Observe(FromBroker($addr, _)) assert BrokerConnection(addr);
during Observe(BrokerConnected($url)) assert BrokerConnection(url); during Observe(BrokerConnected($addr)) assert BrokerConnection(addr);
during BrokerConnection($url) spawn named ['Broker', url] { during BrokerConnection($addr(WSBroker($url, $scope))) spawn named ['Broker', addr] {
const wsId = genUuid('broker'); const wsId = genUuid('broker');
during WS.WebSocket(wsId, url, {}) { during WS.WebSocket(wsId, url, {}) {
assert BrokerConnected(url); assert BrokerConnected(addr);
function w(x) { function w(x) {
send WS.DataOut(wsId, new Encoder().push(x).contents()); send WS.DataOut(wsId, new Encoder().push(x).contents());
} }
on start w(Connect(scope));
on message WS.DataIn(wsId, $data) { on message WS.DataIn(wsId, $data) {
if (data instanceof Bytes) { if (data instanceof Bytes) {
send _BrokerPacket(url, makeDecoder(data).next()); send _BrokerPacket(addr, makeDecoder(data).next());
} }
} }
during ToBroker(url, $a) { during ToBroker(addr, $a) {
const ep = genUuid('pub'); const ep = genUuid('pub');
on start w(Assert(ep, a)); on start w(Assert(ep, a));
on stop w(Clear(ep)); on stop w(Clear(ep));
} }
on message ToBroker(url, $a) w(Message(a)); on message ToBroker(addr, $a) w(Message(a));
on message _BrokerPacket(url, Ping()) w(Pong()); on message _BrokerPacket(addr, Ping()) w(Pong());
during Observe(FromBroker(url, $spec)) { during Observe(FromBroker(addr, $spec)) {
const ep = genUuid('sub'); const ep = genUuid('sub');
on start w(Assert(ep, Observe(spec))); on start w(Assert(ep, Observe(spec)));
on stop w(Clear(ep)); on stop w(Clear(ep));
on message _BrokerPacket(url, Add(ep, $vs)) { on message _BrokerPacket(addr, Add(ep, $vs)) {
react { react {
assert Skeleton.instantiateAssertion(FromBroker(url, spec), vs); assert Skeleton.instantiateAssertion(FromBroker(addr, spec), vs);
stop on message _BrokerPacket(url, Del(ep, vs)); stop on message _BrokerPacket(addr, Del(ep, vs));
} }
} }
on message _BrokerPacket(url, Msg(ep, $vs)) { on message _BrokerPacket(addr, Msg(ep, $vs)) {
send Skeleton.instantiateAssertion(FromBroker(url, spec), vs); send Skeleton.instantiateAssertion(FromBroker(addr, spec), vs);
} }
} }
} }

View File

@ -25,7 +25,6 @@ const gatewayId = dataspaceId + ':' + localId;
const fs = require('fs'); const fs = require('fs');
assertion type ConnectionName(scope, id);
assertion type Connection(connId); assertion type Connection(connId);
message type Request(connId, body); message type Request(connId, body);
message type Response(connId, body); message type Response(connId, body);
@ -35,9 +34,13 @@ message type Disconnect(connId);
// Internal isolation // Internal isolation
assertion type Envelope(scope, body); assertion type Envelope(scope, body);
// Monitoring
assertion type ConnectionScope(connId, scope);
const { const {
Connect, Peer,
Assert, Clear, Message, Assert, Clear, Message,
Add, Del, Msg, Add, Del, Msg, Err,
makeDecoder, makeDecoder,
} = activate require("./protocol"); } = activate require("./protocol");
@ -66,13 +69,23 @@ spawn named 'rootServer' {
)); ));
} }
during Http.Request($reqId, server, 'get', ['chat.html'], _, _) {
const contents = fs.readFileSync(__dirname + '/../chat.html');
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
}
during Http.Request($reqId, server, 'get', ['style.css'], _, _) {
const contents = fs.readFileSync(__dirname + '/../style.css');
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
}
during Http.Request($reqId, server, 'get', ['dist', $file], _, _) { during Http.Request($reqId, server, 'get', ['dist', $file], _, _) {
const contents = fs.readFileSync(__dirname + '/../dist/' + file); const contents = fs.readFileSync(__dirname + '/../dist/' + file);
assert :snapshot Http.Response(reqId, 200, "OK", {}, contents); assert :snapshot Http.Response(reqId, 200, "OK", {}, contents);
} }
during Connection($name) assert Envelope('monitor', Connection(name)); during ConnectionScope($connId, $scope) assert Envelope('monitor', ConnectionScope(connId, scope));
on message Envelope('monitor', Disconnect($name)) send Disconnect(name); on message Envelope('monitor', Disconnect($connId)) send Disconnect(connId);
} }
spawn named 'websocketListener' { spawn named 'websocketListener' {
@ -81,16 +94,15 @@ spawn named 'websocketListener' {
assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'), assert M.Publish(M.Service(localId, '_syndicate+ws._tcp'),
null, HTTP_PORT, ["tier=0", "path=/monitor"]); null, HTTP_PORT, ["tier=0", "path=/monitor"]);
during Http.WebSocket($reqId, server, [$scope], _) spawn named ['wsConnection', scope, reqId] { during Http.WebSocket($reqId, server, [], _) spawn named ['wsConnection', reqId] {
const name = ConnectionName(scope, reqId); assert Connection(reqId);
assert Connection(name);
on message Http.DataIn(reqId, $data) { on message Http.DataIn(reqId, $data) {
if (data instanceof Bytes) { if (data instanceof Bytes) {
send Request(name, makeDecoder(data).next()); send Request(reqId, makeDecoder(data).next());
} }
} }
on message Response(name, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
stop on message Disconnect(name); stop on message Disconnect(reqId);
} }
} }
@ -110,28 +122,35 @@ spawn named 'unixListener' {
function spawnStreamConnection(debugLabel, id) { function spawnStreamConnection(debugLabel, id) {
spawn named [debugLabel, id] { spawn named [debugLabel, id] {
stop on retracted S.Duplex(id); stop on retracted S.Duplex(id);
const name = ConnectionName('broker', id); assert Connection(id);
assert Connection(name);
const decoder = makeDecoder(null); const decoder = makeDecoder(null);
on message S.Data(id, $data) { on message S.Data(id, $data) {
decoder.write(data); decoder.write(data);
let v; let v;
while ((v = decoder.try_next())) { while ((v = decoder.try_next())) {
send Request(name, v); send Request(id, v);
} }
} }
on message Response(name, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); on message Response(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null);
stop on message Disconnect(name); stop on message Disconnect(id);
} }
} }
spawn named 'connectionHandler' { spawn named 'connectionHandler' {
during Connection($connId(ConnectionName($scope,_))) spawn named Connection(connId) { during Connection($connId) spawn named Connection(connId) {
on start console.log(connId.toString(), 'connected'); on start console.log(connId.toString(), 'connected');
on stop console.log(connId.toString(), 'disconnected'); on stop console.log(connId.toString(), 'disconnected');
field this.scope = null;
assert ConnectionScope(connId, this.scope) when (this.scope !== null);
let endpoints = Set(); let endpoints = Set();
on message Request(connId, Connect($scope)) {
// TODO: Enforce requirement that Connect appear exactly once, before anything else
this.scope = scope;
}
on message Request(connId, Assert($ep, $a)) { on message Request(connId, Assert($ep, $a)) {
if (!endpoints.includes(ep)) { if (!endpoints.includes(ep)) {
endpoints = endpoints.add(ep); endpoints = endpoints.add(ep);
@ -142,7 +161,7 @@ spawn named 'connectionHandler' {
currentFacet().addEndpoint(() => { currentFacet().addEndpoint(() => {
if (Observe.isClassOf(this.assertion)) { if (Observe.isClassOf(this.assertion)) {
const spec = Envelope(scope, this.assertion.get(0)); const spec = Envelope(this.scope, this.assertion.get(0));
const analysis = Skeleton.analyzeAssertion(spec); const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => { analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => { currentFacet().actor.scheduleScript(() => {
@ -162,7 +181,7 @@ spawn named 'connectionHandler' {
}); });
return [Observe(spec), analysis]; return [Observe(spec), analysis];
} else { } else {
return [Envelope(scope, this.assertion), null]; return [Envelope(this.scope, this.assertion), null];
} }
}, true); }, true);
@ -173,7 +192,7 @@ spawn named 'connectionHandler' {
} }
on message Request(connId, Message($body)) { on message Request(connId, Message($body)) {
send Envelope(scope, body); send Envelope(this.scope, body);
} }
on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString()); on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString());

View File

@ -4,10 +4,9 @@ const UI = activate require("@syndicate-lang/driver-browser-ui");
// @jsx UI.html // @jsx UI.html
// @jsxFrag UI.htmlFragment // @jsxFrag UI.htmlFragment
const { ToBroker, FromBroker, BrokerConnected } = activate require("./client"); const { WSBroker, ToBroker, FromBroker, BrokerConnected } = activate require("./client");
assertion type ConnectionName(scope, id); assertion type ConnectionScope(connId, scope);
assertion type Connection(connId);
message type Disconnect(connId); message type Disconnect(connId);
spawn { spawn {
@ -21,24 +20,25 @@ spawn {
const url = (function () { const url = (function () {
const u = new URL(document.location); const u = new URL(document.location);
u.protocol = u.protocol.replace(/^http/, 'ws'); u.protocol = u.protocol.replace(/^http/, 'ws');
u.pathname = '/monitor'; u.pathname = '/';
return u.toString(); return u.toString();
})(); })();
const addr = WSBroker(url, "monitor");
during BrokerConnected(url) { during BrokerConnected(addr) {
during FromBroker(url, Connection(ConnectionName($scope, _))) { during FromBroker(addr, ConnectionScope(_, $scope)) {
const ui = new UI.Anchor(); const ui = new UI.Anchor();
assert ui.html('#scopes', assert ui.html('#scopes',
<div class={`scope_${scope}`}> <div class={`scope_${scope}`}>
<p>Scope: <tt>{scope}</tt></p> <p>Scope: <tt>{scope}</tt></p>
<ul></ul> <ul></ul>
</div>); </div>);
during FromBroker(url, Connection(ConnectionName(scope, $id))) { during FromBroker(addr, ConnectionScope($id, scope)) {
const ui = new UI.Anchor(); const ui = new UI.Anchor();
assert ui.html(`#scopes div.scope_${scope} ul`, assert ui.html(`#scopes div.scope_${scope} ul`,
<li>{id.toString()} <button class="disconnect">Disconnect</button></li>); <li>{id.toString()} <button class="disconnect">Disconnect</button></li>);
on message UI.UIEvent(ui.fragmentId, 'button.disconnect', 'click', _) { on message UI.UIEvent(ui.fragmentId, 'button.disconnect', 'click', _) {
send ToBroker(url, Disconnect(ConnectionName(scope, id))); send ToBroker(addr, Disconnect(id));
} }
} }
} }

View File

@ -2,17 +2,18 @@
import { Decoder, Discard, Capture, Observe } from "@syndicate-lang/core"; import { Decoder, Discard, Capture, Observe } from "@syndicate-lang/core";
// Client ---> Broker message type Connect(scope);
message type Peer(scope);
message type Assert(endpointName, assertion); message type Assert(endpointName, assertion);
message type Clear(endpointName); message type Clear(endpointName);
message type Message(body); message type Message(body);
// Client <--- Broker
message type Add(endpointName, captures); message type Add(endpointName, captures);
message type Del(endpointName, captures); message type Del(endpointName, captures);
message type Msg(endpointName, captures); message type Msg(endpointName, captures);
message type Err(detail);
// Bidirectional
message type Ping(); message type Ping();
message type Pong(); message type Pong();
@ -27,8 +28,9 @@ function makeDecoder(initialBuffer) {
} }
Object.assign(module.exports, { Object.assign(module.exports, {
Connect, Peer,
Assert, Clear, Message, Assert, Clear, Message,
Add, Del, Msg, Add, Del, Msg, Err,
Ping, Pong, Ping, Pong,
makeDecoder, makeDecoder,
}); });