syndicate-js/packages/broker/src/index.js

137 lines
4.2 KiB
JavaScript
Raw Normal View History

2018-11-13 21:27:26 +00:00
"use strict";
const UI = require("@syndicate-lang/driver-browser-ui");
// @jsx UI.html
// @jsxFrag UI.htmlFragment
const Http = activate require("@syndicate-lang/driver-http-node");
const Tcp = activate require("@syndicate-lang/driver-tcp-node");
2018-11-19 16:57:22 +00:00
import {
Set, Bytes,
Encoder, Observe,
2018-11-19 16:57:22 +00:00
Dataspace, Skeleton, currentFacet,
} from "@syndicate-lang/core";
2018-11-13 21:27:26 +00:00
const server = Http.HttpServer(null, 8000);
2018-11-21 13:21:08 +00:00
assertion type ConnectionName(scope, id);
2018-11-19 16:57:22 +00:00
assertion type Connection(connId);
message type Request(connId, body);
message type Response(connId, body);
// Internal isolation
2018-11-21 13:21:08 +00:00
assertion type Envelope(scope, body);
2018-11-19 16:57:22 +00:00
const {
Assert, Clear, Message,
Add, Del, Msg,
makeDecoder,
} = activate require("./protocol");
2018-11-19 16:57:22 +00:00
2018-11-13 21:27:26 +00:00
spawn named 'serverLogger' {
on asserted Http.Request(_, server, $method, $path, $query, $req) {
console.log(method, path.toJS(), query.toJS());
}
2018-11-15 11:00:30 +00:00
on asserted Http.WebSocket(_, server, $path, $query) {
console.log(path.toJS(), query.toJS());
}
2018-11-13 21:27:26 +00:00
}
spawn named 'rootServer' {
during Http.Request($reqId, server, 'get', [], _, _) {
assert :snapshot Http.Response(
reqId, 200, "OK", {"Content-type": "text/html"},
'<!DOCTYPE html>' + UI.htmlToString(
<div>
<p>Hello</p>
</div>
));
}
}
spawn named 'websocketListener' {
2018-11-21 13:21:08 +00:00
during Http.WebSocket($reqId, server, [$scope], _) spawn named ['wsConnection', scope, reqId] {
const name = ConnectionName(scope, reqId);
assert Connection(name);
on message Http.DataIn(reqId, $data) {
if (data instanceof Bytes) {
2018-11-21 13:21:08 +00:00
send Request(name, makeDecoder(data).next());
}
}
2018-11-21 13:21:08 +00:00
on message Response(name, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
2018-11-13 21:27:26 +00:00
}
}
spawn named 'tcpListener' {
during Tcp.TcpConnection($id, Tcp.TcpListener(8001)) spawn named ['tcpConnection', id] {
2018-11-21 13:21:08 +00:00
const name = ConnectionName(scope, id);
2018-11-13 21:27:26 +00:00
assert Tcp.TcpAccepted(id);
2018-11-21 13:21:08 +00:00
assert Connection(name);
const decoder = makeDecoder(null);
on message Tcp.DataIn(id, $data) {
decoder.write(data);
let v;
while ((v = decoder.try_next())) {
2018-11-21 13:21:08 +00:00
send Request(name, v);
}
}
2018-11-21 13:21:08 +00:00
on message Response(name, $resp) send Tcp.DataOut(id, new Encoder().push(resp).contents());
2018-11-19 16:57:22 +00:00
}
}
spawn named 'connectionHandler' {
2018-11-21 13:21:08 +00:00
during Connection($connId(ConnectionName($scope,_))) spawn named Connection(connId) {
on start console.log(connId.toString(), 'connected');
on stop console.log(connId.toString(), 'disconnected');
2018-11-19 16:57:22 +00:00
let endpoints = Set();
on message Request(connId, Assert($ep, $a)) {
if (!endpoints.includes(ep)) {
endpoints = endpoints.add(ep);
react {
on stop { endpoints = endpoints.remove(ep); }
field this.assertion = a;
currentFacet().addEndpoint(() => {
if (Observe.isClassOf(this.assertion)) {
2018-11-21 13:21:08 +00:00
const spec = Envelope(scope, this.assertion.get(0));
2018-11-19 16:57:22 +00:00
const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = Dataspace.wrap((evt, vs) => {
currentFacet().actor.scheduleScript(() => {
2018-11-21 13:21:08 +00:00
console.log('EVENT', currentFacet().toString(), connId.toString(), ep, evt, vs);
2018-11-19 16:57:22 +00:00
switch (evt) {
case Skeleton.EVENT_ADDED:
send Response(connId, Add(ep, vs));
break;
case Skeleton.EVENT_REMOVED:
send Response(connId, Del(ep, vs));
break;
case Skeleton.EVENT_MESSAGE:
send Response(connId, Msg(ep, vs));
break;
}
});
});
return [Observe(spec), analysis];
} else {
2018-11-21 13:21:08 +00:00
return [Envelope(scope, this.assertion), null];
2018-11-19 16:57:22 +00:00
}
}, true);
on message Request(connId, Assert(ep, $newAssertion)) this.assertion = newAssertion;
stop on message Request(connId, Clear(ep));
}
}
2018-11-13 21:27:26 +00:00
}
2018-11-19 16:57:22 +00:00
on message Request(connId, Message($body)) {
2018-11-21 13:21:08 +00:00
send Envelope(scope, body);
2018-11-19 16:57:22 +00:00
}
2018-11-21 13:21:08 +00:00
on message Request(connId, $req) console.log('IN: ', connId.toString(), req.toString());
on message Response(connId, $resp) console.log('OUT:', connId.toString(), resp.toString());
2018-11-13 21:27:26 +00:00
}
}