Progress
This commit is contained in:
parent
3dc6559d26
commit
5dcd794f7c
|
@ -0,0 +1,2 @@
|
||||||
|
- TLS for both WS and TCP
|
||||||
|
- authn, authz
|
|
@ -6,10 +6,35 @@ const UI = require("@syndicate-lang/driver-browser-ui");
|
||||||
|
|
||||||
const Http = activate require("@syndicate-lang/driver-http-node");
|
const Http = activate require("@syndicate-lang/driver-http-node");
|
||||||
const Tcp = activate require("@syndicate-lang/driver-tcp-node");
|
const Tcp = activate require("@syndicate-lang/driver-tcp-node");
|
||||||
import { Decoder, Bytes } from "@syndicate-lang/core";
|
import {
|
||||||
|
Set, Bytes,
|
||||||
|
Decoder, Encoder,
|
||||||
|
Discard, Capture, Observe,
|
||||||
|
Dataspace, Skeleton, currentFacet,
|
||||||
|
} from "@syndicate-lang/core";
|
||||||
|
|
||||||
const server = Http.HttpServer(null, 8000);
|
const server = Http.HttpServer(null, 8000);
|
||||||
|
|
||||||
|
assertion type Connection(connId);
|
||||||
|
message type Fragment(connId, data);
|
||||||
|
message type Request(connId, body);
|
||||||
|
message type Response(connId, body);
|
||||||
|
|
||||||
|
// Internal isolation
|
||||||
|
assertion type Envelope(body);
|
||||||
|
|
||||||
|
// Client ---> Broker
|
||||||
|
message type Assert(endpointName, assertion);
|
||||||
|
message type Clear(endpointName);
|
||||||
|
message type Message(body);
|
||||||
|
|
||||||
|
// Client <--- Broker
|
||||||
|
message type Add(endpointName, captures);
|
||||||
|
message type Del(endpointName, captures);
|
||||||
|
message type Msg(endpointName, captures);
|
||||||
|
|
||||||
|
assertion type Endpoint(connId, endpointName, assertion);
|
||||||
|
|
||||||
spawn named 'serverLogger' {
|
spawn named 'serverLogger' {
|
||||||
on asserted Http.Request(_, server, $method, $path, $query, $req) {
|
on asserted Http.Request(_, server, $method, $path, $query, $req) {
|
||||||
console.log(method, path.toJS(), query.toJS());
|
console.log(method, path.toJS(), query.toJS());
|
||||||
|
@ -33,21 +58,89 @@ spawn named 'rootServer' {
|
||||||
|
|
||||||
spawn named 'websocketListener' {
|
spawn named 'websocketListener' {
|
||||||
during Http.WebSocket($reqId, server, ['broker'], _) spawn named ['wsConnection', reqId] {
|
during Http.WebSocket($reqId, server, ['broker'], _) spawn named ['wsConnection', reqId] {
|
||||||
on message Http.DataIn(reqId, $message) {
|
assert Connection(reqId);
|
||||||
console.log('got', reqId, new Decoder(message).next());
|
on message Http.DataIn(reqId, $data) send Fragment(reqId, data);
|
||||||
send Http.DataOut(reqId, message);
|
on message Response(reqId, $resp) send Http.DataOut(reqId, new Encoder().push(resp).contents());
|
||||||
}
|
|
||||||
|
|
||||||
stop on message Http.DataIn(reqId, Bytes.from("quit"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
spawn named 'tcpListener' {
|
spawn named 'tcpListener' {
|
||||||
during Tcp.TcpConnection($id, Tcp.TcpListener(8001)) spawn named ['tcpConnection', id] {
|
during Tcp.TcpConnection($id, Tcp.TcpListener(8001)) spawn named ['tcpConnection', id] {
|
||||||
assert Tcp.TcpAccepted(id);
|
assert Tcp.TcpAccepted(id);
|
||||||
on message Tcp.DataIn(id, $data) {
|
assert Connection(id);
|
||||||
console.log('got', id, new Decoder(data).next());
|
on message Tcp.DataIn(id, $data) send Fragment(id, data);
|
||||||
send Tcp.DataOut(id, data);
|
on message Response(id, $resp) send Tcp.DataOut(id, new Encoder().push(resp).contents());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spawn named 'connectionHandler' {
|
||||||
|
during Connection($connId) spawn named Connection(connId) {
|
||||||
|
on start console.log(connId, 'connected');
|
||||||
|
on stop console.log(connId, 'disconnected');
|
||||||
|
|
||||||
|
let endpoints = Set();
|
||||||
|
|
||||||
|
const decoder = new Decoder(null, {
|
||||||
|
shortForms: {
|
||||||
|
0: Discard.constructorInfo.label,
|
||||||
|
1: Capture.constructorInfo.label,
|
||||||
|
2: Observe.constructorInfo.label,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
on message Fragment(connId, $data) {
|
||||||
|
decoder.write(data);
|
||||||
|
let v;
|
||||||
|
while ((v = decoder.try_next())) {
|
||||||
|
send Request(connId, v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
on message Request(connId, Assert($ep, $a)) {
|
||||||
|
if (!endpoints.includes(ep)) {
|
||||||
|
endpoints = endpoints.add(ep);
|
||||||
|
react {
|
||||||
|
on stop { endpoints = endpoints.remove(ep); }
|
||||||
|
|
||||||
|
field this.assertion = a;
|
||||||
|
|
||||||
|
currentFacet().addEndpoint(() => {
|
||||||
|
if (Observe.isClassOf(this.assertion)) {
|
||||||
|
console.log("Subscription", connId, ep, this.assertion.toString());
|
||||||
|
const spec = Envelope(this.assertion.get(0));
|
||||||
|
const analysis = Skeleton.analyzeAssertion(spec);
|
||||||
|
analysis.callback = Dataspace.wrap((evt, vs) => {
|
||||||
|
currentFacet().actor.scheduleScript(() => {
|
||||||
|
console.log('EVENT', currentFacet().toString(), connId, ep, evt, vs);
|
||||||
|
switch (evt) {
|
||||||
|
case Skeleton.EVENT_ADDED:
|
||||||
|
send Response(connId, Add(ep, vs));
|
||||||
|
break;
|
||||||
|
case Skeleton.EVENT_REMOVED:
|
||||||
|
send Response(connId, Del(ep, vs));
|
||||||
|
break;
|
||||||
|
case Skeleton.EVENT_MESSAGE:
|
||||||
|
send Response(connId, Msg(ep, vs));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return [Observe(spec), analysis];
|
||||||
|
} else {
|
||||||
|
return [Envelope(this.assertion), null];
|
||||||
|
}
|
||||||
|
}, true);
|
||||||
|
|
||||||
|
on message Request(connId, Assert(ep, $newAssertion)) this.assertion = newAssertion;
|
||||||
|
stop on message Request(connId, Clear(ep));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
on message Request(connId, Message($body)) {
|
||||||
|
send Envelope(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
on message Request(connId, $req) console.log('IN: ', connId, req.toString());
|
||||||
|
on message Response(connId, $resp) console.log('OUT:', connId, resp.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue