From 04403f1cb357f7e1710662922821bc889460551e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 21 Nov 2018 13:21:08 +0000 Subject: [PATCH] Scoped broker --- packages/broker/src/index.js | 37 +++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/packages/broker/src/index.js b/packages/broker/src/index.js index 8cf7071..face8b0 100644 --- a/packages/broker/src/index.js +++ b/packages/broker/src/index.js @@ -14,12 +14,13 @@ import { const server = Http.HttpServer(null, 8000); +assertion type ConnectionName(scope, id); assertion type Connection(connId); message type Request(connId, body); message type Response(connId, body); // Internal isolation -assertion type Envelope(body); +assertion type Envelope(scope, body); const { Assert, Clear, Message, @@ -49,37 +50,39 @@ spawn named 'rootServer' { } spawn named 'websocketListener' { - during Http.WebSocket($reqId, server, ['broker'], _) spawn named ['wsConnection', reqId] { - assert Connection(reqId); + during Http.WebSocket($reqId, server, [$scope], _) spawn named ['wsConnection', scope, reqId] { + const name = ConnectionName(scope, reqId); + assert Connection(name); on message Http.DataIn(reqId, $data) { if (data instanceof Bytes) { - send Request(reqId, makeDecoder(data).next()); + send Request(name, makeDecoder(data).next()); } } - on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); + on message Response(name, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); } } spawn named 'tcpListener' { during Tcp.TcpConnection($id, Tcp.TcpListener(8001)) spawn named ['tcpConnection', id] { + const name = ConnectionName(scope, id); assert Tcp.TcpAccepted(id); - assert Connection(id); + assert Connection(name); const decoder = makeDecoder(null); on message Tcp.DataIn(id, $data) { decoder.write(data); let v; while ((v = decoder.try_next())) { - send Request(id, v); + send Request(name, v); } } - on message Response(id, $resp) send Tcp.DataOut(id, new Encoder().push(resp).contents()); + on message Response(name, $resp) send Tcp.DataOut(id, new Encoder().push(resp).contents()); } } spawn named 'connectionHandler' { - during Connection($connId) spawn named Connection(connId) { - on start console.log(connId, 'connected'); - on stop console.log(connId, 'disconnected'); + during Connection($connId(ConnectionName($scope,_))) spawn named Connection(connId) { + on start console.log(connId.toString(), 'connected'); + on stop console.log(connId.toString(), 'disconnected'); let endpoints = Set(); @@ -93,11 +96,11 @@ spawn named 'connectionHandler' { currentFacet().addEndpoint(() => { if (Observe.isClassOf(this.assertion)) { - const spec = Envelope(this.assertion.get(0)); + const spec = Envelope(scope, this.assertion.get(0)); const analysis = Skeleton.analyzeAssertion(spec); analysis.callback = Dataspace.wrap((evt, vs) => { currentFacet().actor.scheduleScript(() => { - console.log('EVENT', currentFacet().toString(), connId, ep, evt, vs); + console.log('EVENT', currentFacet().toString(), connId.toString(), ep, evt, vs); switch (evt) { case Skeleton.EVENT_ADDED: send Response(connId, Add(ep, vs)); @@ -113,7 +116,7 @@ spawn named 'connectionHandler' { }); return [Observe(spec), analysis]; } else { - return [Envelope(this.assertion), null]; + return [Envelope(scope, this.assertion), null]; } }, true); @@ -124,10 +127,10 @@ spawn named 'connectionHandler' { } on message Request(connId, Message($body)) { - send Envelope(body); + send Envelope(scope, body); } - on message Request(connId, $req) console.log('IN: ', connId, req.toString()); - on message Response(connId, $resp) console.log('OUT:', connId, resp.toString()); + on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString()); + on message Response(connId, $resp) console.log('OUT:', connId.toString(), resp.toString()); } }