Scoped broker

This commit is contained in:
Tony Garnock-Jones 2018-11-21 13:21:08 +00:00
parent 96201e7c0b
commit 04403f1cb3
1 changed files with 20 additions and 17 deletions

View File

@ -14,12 +14,13 @@ import {
const server = Http.HttpServer(null, 8000); const server = Http.HttpServer(null, 8000);
assertion type ConnectionName(scope, id);
assertion type Connection(connId); assertion type Connection(connId);
message type Request(connId, body); message type Request(connId, body);
message type Response(connId, body); message type Response(connId, body);
// Internal isolation // Internal isolation
assertion type Envelope(body); assertion type Envelope(scope, body);
const { const {
Assert, Clear, Message, Assert, Clear, Message,
@ -49,37 +50,39 @@ spawn named 'rootServer' {
} }
spawn named 'websocketListener' { spawn named 'websocketListener' {
during Http.WebSocket($reqId, server, ['broker'], _) spawn named ['wsConnection', reqId] { during Http.WebSocket($reqId, server, [$scope], _) spawn named ['wsConnection', scope, reqId] {
assert Connection(reqId); const name = ConnectionName(scope, reqId);
assert Connection(name);
on message Http.DataIn(reqId, $data) { on message Http.DataIn(reqId, $data) {
if (data instanceof Bytes) { if (data instanceof Bytes) {
send Request(reqId, makeDecoder(data).next()); send Request(name, makeDecoder(data).next());
} }
} }
on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); on message Response(name, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
} }
} }
spawn named 'tcpListener' { spawn named 'tcpListener' {
during Tcp.TcpConnection($id, Tcp.TcpListener(8001)) spawn named ['tcpConnection', id] { during Tcp.TcpConnection($id, Tcp.TcpListener(8001)) spawn named ['tcpConnection', id] {
const name = ConnectionName(scope, id);
assert Tcp.TcpAccepted(id); assert Tcp.TcpAccepted(id);
assert Connection(id); assert Connection(name);
const decoder = makeDecoder(null); const decoder = makeDecoder(null);
on message Tcp.DataIn(id, $data) { on message Tcp.DataIn(id, $data) {
decoder.write(data); decoder.write(data);
let v; let v;
while ((v = decoder.try_next())) { while ((v = decoder.try_next())) {
send Request(id, v); send Request(name, v);
} }
} }
on message Response(id, $resp) send Tcp.DataOut(id, new Encoder().push(resp).contents()); on message Response(name, $resp) send Tcp.DataOut(id, new Encoder().push(resp).contents());
} }
} }
spawn named 'connectionHandler' { spawn named 'connectionHandler' {
during Connection($connId) spawn named Connection(connId) { during Connection($connId(ConnectionName($scope,_))) spawn named Connection(connId) {
on start console.log(connId, 'connected'); on start console.log(connId.toString(), 'connected');
on stop console.log(connId, 'disconnected'); on stop console.log(connId.toString(), 'disconnected');
let endpoints = Set(); let endpoints = Set();
@ -93,11 +96,11 @@ spawn named 'connectionHandler' {
currentFacet().addEndpoint(() => { currentFacet().addEndpoint(() => {
if (Observe.isClassOf(this.assertion)) { if (Observe.isClassOf(this.assertion)) {
const spec = Envelope(this.assertion.get(0)); const spec = Envelope(scope, this.assertion.get(0));
const analysis = Skeleton.analyzeAssertion(spec); const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => { analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => { currentFacet().actor.scheduleScript(() => {
console.log('EVENT', currentFacet().toString(), connId, ep, evt, vs); console.log('EVENT', currentFacet().toString(), connId.toString(), ep, evt, vs);
switch (evt) { switch (evt) {
case Skeleton.EVENT_ADDED: case Skeleton.EVENT_ADDED:
send Response(connId, Add(ep, vs)); send Response(connId, Add(ep, vs));
@ -113,7 +116,7 @@ spawn named 'connectionHandler' {
}); });
return [Observe(spec), analysis]; return [Observe(spec), analysis];
} else { } else {
return [Envelope(this.assertion), null]; return [Envelope(scope, this.assertion), null];
} }
}, true); }, true);
@ -124,10 +127,10 @@ spawn named 'connectionHandler' {
} }
on message Request(connId, Message($body)) { on message Request(connId, Message($body)) {
send Envelope(body); send Envelope(scope, body);
} }
on message Request(connId, $req) console.log('IN: ', connId, req.toString()); on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString());
on message Response(connId, $resp) console.log('OUT:', connId, resp.toString()); on message Response(connId, $resp) console.log('OUT:', connId.toString(), resp.toString());
} }
} }