diff --git a/packages/broker/TODO.md b/packages/broker/TODO.md new file mode 100644 index 0000000..4ead00f --- /dev/null +++ b/packages/broker/TODO.md @@ -0,0 +1,2 @@ + - TLS for both WS and TCP + - authn, authz diff --git a/packages/broker/src/index.js b/packages/broker/src/index.js index 39a3291..ec04f16 100644 --- a/packages/broker/src/index.js +++ b/packages/broker/src/index.js @@ -6,10 +6,35 @@ const UI = require("@syndicate-lang/driver-browser-ui"); const Http = activate require("@syndicate-lang/driver-http-node"); const Tcp = activate require("@syndicate-lang/driver-tcp-node"); -import { Decoder, Bytes } from "@syndicate-lang/core"; +import { + Set, Bytes, + Decoder, Encoder, + Discard, Capture, Observe, + Dataspace, Skeleton, currentFacet, +} from "@syndicate-lang/core"; const server = Http.HttpServer(null, 8000); +assertion type Connection(connId); +message type Fragment(connId, data); +message type Request(connId, body); +message type Response(connId, body); + +// Internal isolation +assertion type Envelope(body); + +// Client ---> Broker +message type Assert(endpointName, assertion); +message type Clear(endpointName); +message type Message(body); + +// Client <--- Broker +message type Add(endpointName, captures); +message type Del(endpointName, captures); +message type Msg(endpointName, captures); + +assertion type Endpoint(connId, endpointName, assertion); + spawn named 'serverLogger' { on asserted Http.Request(_, server, $method, $path, $query, $req) { console.log(method, path.toJS(), query.toJS()); @@ -33,21 +58,89 @@ spawn named 'rootServer' { spawn named 'websocketListener' { during Http.WebSocket($reqId, server, ['broker'], _) spawn named ['wsConnection', reqId] { - on message Http.DataIn(reqId, $message) { - console.log('got', reqId, new Decoder(message).next()); - send Http.DataOut(reqId, message); - } - - stop on message Http.DataIn(reqId, Bytes.from("quit")); + assert Connection(reqId); + on message Http.DataIn(reqId, $data) send Fragment(reqId, data); + on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents()); } } spawn named 'tcpListener' { during Tcp.TcpConnection($id, Tcp.TcpListener(8001)) spawn named ['tcpConnection', id] { assert Tcp.TcpAccepted(id); - on message Tcp.DataIn(id, $data) { - console.log('got', id, new Decoder(data).next()); - send Tcp.DataOut(id, data); - } + assert Connection(id); + on message Tcp.DataIn(id, $data) send Fragment(id, data); + on message Response(id, $resp) send Tcp.DataOut(id, new Encoder().push(resp).contents()); + } +} + +spawn named 'connectionHandler' { + during Connection($connId) spawn named Connection(connId) { + on start console.log(connId, 'connected'); + on stop console.log(connId, 'disconnected'); + + let endpoints = Set(); + + const decoder = new Decoder(null, { + shortForms: { + 0: Discard.constructorInfo.label, + 1: Capture.constructorInfo.label, + 2: Observe.constructorInfo.label, + } + }); + on message Fragment(connId, $data) { + decoder.write(data); + let v; + while ((v = decoder.try_next())) { + send Request(connId, v); + } + } + + on message Request(connId, Assert($ep, $a)) { + if (!endpoints.includes(ep)) { + endpoints = endpoints.add(ep); + react { + on stop { endpoints = endpoints.remove(ep); } + + field this.assertion = a; + + currentFacet().addEndpoint(() => { + if (Observe.isClassOf(this.assertion)) { + console.log("Subscription", connId, ep, this.assertion.toString()); + const spec = Envelope(this.assertion.get(0)); + const analysis = Skeleton.analyzeAssertion(spec); + analysis.callback = Dataspace.wrap((evt, vs) => { + currentFacet().actor.scheduleScript(() => { + console.log('EVENT', currentFacet().toString(), connId, ep, evt, vs); + switch (evt) { + case Skeleton.EVENT_ADDED: + send Response(connId, Add(ep, vs)); + break; + case Skeleton.EVENT_REMOVED: + send Response(connId, Del(ep, vs)); + break; + case Skeleton.EVENT_MESSAGE: + send Response(connId, Msg(ep, vs)); + break; + } + }); + }); + return [Observe(spec), analysis]; + } else { + return [Envelope(this.assertion), null]; + } + }, true); + + on message Request(connId, Assert(ep, $newAssertion)) this.assertion = newAssertion; + stop on message Request(connId, Clear(ep)); + } + } + } + + on message Request(connId, Message($body)) { + send Envelope(body); + } + + on message Request(connId, $req) console.log('IN: ', connId, req.toString()); + on message Response(connId, $resp) console.log('OUT:', connId, resp.toString()); } }