Relay streams over the client-server protocol (!)
This commit is contained in:
parent
b6017f1501
commit
e457e270ba
|
@ -22,6 +22,8 @@
|
||||||
"@syndicate-lang/driver-timer": "^0.1.2",
|
"@syndicate-lang/driver-timer": "^0.1.2",
|
||||||
"@syndicate-lang/driver-udp-node": "^0.1.2",
|
"@syndicate-lang/driver-udp-node": "^0.1.2",
|
||||||
"@syndicate-lang/driver-websocket": "^0.1.2",
|
"@syndicate-lang/driver-websocket": "^0.1.2",
|
||||||
|
"@syndicate-lang/server": "^0.1.4",
|
||||||
|
"debug": "^4.1.1",
|
||||||
"webpack": "^4.27.1",
|
"webpack": "^4.27.1",
|
||||||
"webpack-cli": "^3.1.2"
|
"webpack-cli": "^3.1.2"
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,60 @@
|
||||||
// https://www.ietf.org/rfc/rfc1928.txt
|
// https://www.ietf.org/rfc/rfc1928.txt
|
||||||
|
|
||||||
const { currentFacet, genUuid, Bytes, List, Set, Observe } = require("@syndicate-lang/core");
|
const { currentFacet, genUuid, Bytes, Map, Observe, Skeleton } = require("@syndicate-lang/core");
|
||||||
|
const C = activate require("@syndicate-lang/server/lib/client");
|
||||||
const S = activate require("@syndicate-lang/driver-streams-node");
|
const S = activate require("@syndicate-lang/driver-streams-node");
|
||||||
|
const debugFactory = require('debug');
|
||||||
|
|
||||||
assertion type VirtualTcpAddress(host, port);
|
assertion type VirtualTcpAddress(host, port);
|
||||||
|
assertion type AddressMap(from, nodeId, to);
|
||||||
|
|
||||||
|
assertion type ToNode(nodeId, assertion);
|
||||||
|
assertion type FromNode(nodeId, assertion);
|
||||||
|
assertion type RestrictedFromNode(nodeId, spec, captures);
|
||||||
|
|
||||||
|
function usage() {
|
||||||
|
console.info('Usage: syndicate-socks --server WEBSOCKETURL SCOPE');
|
||||||
|
console.info('');
|
||||||
|
console.info(' --help, -h Produce this message and terminate');
|
||||||
|
}
|
||||||
|
|
||||||
|
let server_url = null;
|
||||||
|
let server_scope = null;
|
||||||
|
function process_command_line(args) {
|
||||||
|
const notUndefined = (x, w) => {
|
||||||
|
if (x === void 0) {
|
||||||
|
console.error('Missing '+w+' argument on command line');
|
||||||
|
usage();
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
return x;
|
||||||
|
};
|
||||||
|
const strArg = (w) => notUndefined(args.shift(), w);
|
||||||
|
const numArg = (w) => Number.parseInt(notUndefined(args.shift(), w));
|
||||||
|
while (args.length) {
|
||||||
|
const opt = args.shift();
|
||||||
|
switch (opt) {
|
||||||
|
case '--server':
|
||||||
|
server_url = strArg('server WebSocket URL');
|
||||||
|
server_scope = strArg('server scope name');
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
console.error("Unsupported command-line argument: " + opt);
|
||||||
|
/* FALL THROUGH */
|
||||||
|
case '--help':
|
||||||
|
case '-h':
|
||||||
|
usage();
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
process_command_line(process.argv.slice(2));
|
||||||
|
if (!server_url || !server_scope) {
|
||||||
|
usage();
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
const server_addr = C.WSServer(server_url, server_scope);
|
||||||
|
|
||||||
spawn named 'socks-server' {
|
spawn named 'socks-server' {
|
||||||
on asserted S.Stream($conn, S.Incoming(S.TcpListener(1080))) {
|
on asserted S.Stream($conn, S.Incoming(S.TcpListener(1080))) {
|
||||||
|
@ -197,41 +248,191 @@ spawn named 'socks-server' {
|
||||||
}
|
}
|
||||||
|
|
||||||
spawn named 'remap-service' {
|
spawn named 'remap-service' {
|
||||||
field this.mapped = Set();
|
field this.table = Map();
|
||||||
|
|
||||||
on asserted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) {
|
during C.ServerConnected(server_addr) {
|
||||||
this.mapped = this.mapped.add(a);
|
on asserted C.FromServer(server_addr, $entry(AddressMap(_, _, _))) {
|
||||||
}
|
this.table = this.table.set(AddressMap._from(entry), entry);
|
||||||
on retracted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) {
|
}
|
||||||
this.mapped = this.mapped.remove(a);
|
on retracted C.FromServer(server_addr, $entry(AddressMap(_, _, _))) {
|
||||||
}
|
this.table = this.table.remove(AddressMap._from(entry));
|
||||||
|
}
|
||||||
|
|
||||||
during S.Stream($id, S.Outgoing($a(VirtualTcpAddress($host, $port)))) {
|
during S.Stream($id, S.Outgoing($a(VirtualTcpAddress($host, $port)))) {
|
||||||
if (host.endsWith('.fruit')) {
|
if (host.endsWith('.fruit')) {
|
||||||
if (!this.mapped.includes(a)) {
|
if (this.table.has(a)) {
|
||||||
console.error("No virtual mapping for", a.toString());
|
const entry = this.table.get(a);
|
||||||
const err = new Error(`No virtual mapping for ${a.toString()}`);
|
const A = server_addr;
|
||||||
err.errno = err.code = 'ENOTFOUND';
|
const N = AddressMap._nodeId(entry);
|
||||||
err.hostname = err.host = host;
|
const L = id;
|
||||||
err.port = port;
|
const R = genUuid('proxiedStream');
|
||||||
on start send S.Stream(id, S.Rejected(err));
|
assert C.ToServer(A, ToNode(N, S.Stream(R, S.Outgoing(AddressMap._to(entry)))));
|
||||||
|
stop on message C.FromServer(A, FromNode(N, S.Stream(R, S.Rejected($err)))) {
|
||||||
|
send S.Stream(L, S.Rejected(err));
|
||||||
|
}
|
||||||
|
stop on message C.FromServer(A, FromNode(N, S.Stream(R, S.Accepted()))) {
|
||||||
|
react {
|
||||||
|
on start send S.Stream(L, S.Accepted());
|
||||||
|
|
||||||
|
assert S.Stream(L, S.Info(Symbol.for("Duplex"), false)); // TODO
|
||||||
|
|
||||||
|
assert S.Stream(L, S.Duplex());
|
||||||
|
stop on retracted Observe(S.Stream(L, S.Duplex()));
|
||||||
|
stop on retracted C.FromServer(A, FromNode(N, S.Stream(R, S.Duplex())));
|
||||||
|
|
||||||
|
// Readable
|
||||||
|
|
||||||
|
during Observe(S.Stream(L, S.End()))
|
||||||
|
during C.FromServer(A, FromNode(N, S.Stream(R, S.End())))
|
||||||
|
assert S.Stream(L, S.End());
|
||||||
|
|
||||||
|
on message S.Stream(L, S.Pushback($chunk))
|
||||||
|
send C.ToServer(A, ToNode(N, S.Stream(R, S.Pushback(chunk))));
|
||||||
|
|
||||||
|
during S.Stream(L, S.BackPressure($sinkL)) {
|
||||||
|
const sinkR = genUuid('sink');
|
||||||
|
assert C.ToServer(A, ToNode(N, S.Stream(R, S.BackPressure(sinkR))));
|
||||||
|
field this.seqno = -1;
|
||||||
|
field this.amount = 0;
|
||||||
|
on asserted S.Stream(sinkL, S.Window($seqno, $amount)) {
|
||||||
|
this.seqno = seqno;
|
||||||
|
this.amount = amount;
|
||||||
|
}
|
||||||
|
assert C.ToServer(A, ToNode(N, S.Stream(sinkR, S.Window(this.seqno, this.amount))))
|
||||||
|
when (this.seqno >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
during C.FromServer(A, FromNode(N, S.Stream(R, S.DataReady())))
|
||||||
|
assert S.Stream(L, S.DataReady());
|
||||||
|
|
||||||
|
during Observe(S.Stream(L, S.Data(_)))
|
||||||
|
on message C.FromServer(A, FromNode(N, S.Stream(R, S.Data($chunk))))
|
||||||
|
send S.Stream(L, S.Data(chunk));
|
||||||
|
|
||||||
|
// Writable
|
||||||
|
|
||||||
|
during Observe(S.Stream(L, S.Window(_, _))) {
|
||||||
|
field this.seqno = -1;
|
||||||
|
field this.amount = 0;
|
||||||
|
on asserted C.FromServer(A, FromNode(N, S.Stream(R, S.Window($seqno, $amount)))) {
|
||||||
|
this.seqno = seqno;
|
||||||
|
this.amount = amount;
|
||||||
|
}
|
||||||
|
assert S.Stream(L, S.Window(this.seqno, this.amount)) when (this.seqno >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
const withCallback = (ackL, f) => {
|
||||||
|
if (ackL === false) {
|
||||||
|
f(false);
|
||||||
|
} else {
|
||||||
|
react {
|
||||||
|
const ackR = genUuid('ack');
|
||||||
|
on message C.FromServer(A, FromNode(N, ackR)) send ackL;
|
||||||
|
on start f(ackR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
on message S.Stream(L, S.Push($chunk, $ackL)) {
|
||||||
|
withCallback(ackL, (ackR) => {
|
||||||
|
send C.ToServer(A, ToNode(N, S.Stream(R, S.Push(chunk, ackR))));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
on message S.Stream(L, S.Close($ackL)) {
|
||||||
|
withCallback(ackL, (ackR) => {
|
||||||
|
send C.ToServer(A, ToNode(N, S.Stream(R, S.Close(ackR))));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
console.error("No virtual mapping for", a.toString());
|
||||||
|
const err = new Error(`No virtual mapping for ${a.toString()}`);
|
||||||
|
err.errno = err.code = 'ENOTFOUND';
|
||||||
|
err.hostname = err.host = host;
|
||||||
|
err.port = port;
|
||||||
|
on start send S.Stream(id, S.Rejected(err));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert S.Stream(id, S.Outgoing(S.TcpAddress(host, port)));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
assert S.Stream(id, S.Outgoing(S.TcpAddress(host, port)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
spawn named 'test-remap' {
|
spawn named 'from-node-relay' {
|
||||||
during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9999))) {
|
const debug = debugFactory('syndicate/server:socks:from-node-relay');
|
||||||
assert S.Stream(id, S.Outgoing(S.TcpAddress('steam.eighty-twenty.org', 22)));
|
during Observe(C.FromServer($addr, FromNode($node, $spec))) {
|
||||||
}
|
on start debug('?+', addr.toString(), node.toString(), spec.toString());
|
||||||
|
on stop debug('?-', addr.toString(), node.toString(), spec.toString());
|
||||||
during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9998))) {
|
during C.FromServer(addr, RestrictedFromNode(node, spec.toString(), $vs)) {
|
||||||
assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/sh', [], {})));
|
// ^ TODO: Use real quoting instead of spec.toString() hack!!
|
||||||
}
|
// TODO: Shouldn't the dataspace/client be doing the necessary quoting for us??
|
||||||
|
const a = Skeleton.instantiateAssertion(C.FromServer(addr, FromNode(node, spec)), vs);
|
||||||
during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9997))) {
|
on start debug('+', a.toString());
|
||||||
assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/cat', ['/proc/cpuinfo'], {})));
|
on stop debug('-', a.toString());
|
||||||
|
assert a;
|
||||||
|
}
|
||||||
|
on message C.FromServer(addr, RestrictedFromNode(node, spec.toString(), $vs)) {
|
||||||
|
const a = Skeleton.instantiateAssertion(C.FromServer(addr, FromNode(node, spec)), vs);
|
||||||
|
debug('!', a.toString());
|
||||||
|
send a;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
const nodeId = genUuid('node');
|
||||||
|
|
||||||
|
spawn named 'test-remap' {
|
||||||
|
during C.ServerConnected(server_addr) {
|
||||||
|
assert C.ToServer(server_addr, AddressMap(VirtualTcpAddress("steam.fruit", 22),
|
||||||
|
nodeId,
|
||||||
|
S.TcpAddress('steam.eighty-twenty.org', 22)));
|
||||||
|
|
||||||
|
assert C.ToServer(server_addr, AddressMap(VirtualTcpAddress("shell.fruit", 9999),
|
||||||
|
nodeId,
|
||||||
|
S.SubprocessAddress('/bin/sh', [], {})));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spawn named 'to-node-relay' {
|
||||||
|
const debug = debugFactory('syndicate/server:socks:to-node-relay');
|
||||||
|
during C.ServerConnected(server_addr) {
|
||||||
|
during C.FromServer(server_addr, ToNode(nodeId, $a)) {
|
||||||
|
on start debug('Remote peer has asserted', a && a.toString());
|
||||||
|
on stop debug('Remote peer has retracted', a && a.toString());
|
||||||
|
assert a;
|
||||||
|
}
|
||||||
|
on message C.FromServer(server_addr, ToNode(nodeId, $a)) {
|
||||||
|
send a;
|
||||||
|
}
|
||||||
|
during C.FromServer(server_addr, Observe(FromNode(nodeId, $spec))) {
|
||||||
|
on start debug('Remote peer has asserted interest in', spec && spec.toString());
|
||||||
|
on stop debug('Remote peer has retracted interest in', spec && spec.toString());
|
||||||
|
currentFacet().addObserverEndpoint(() => spec, {
|
||||||
|
add: (vs) => {
|
||||||
|
const a = RestrictedFromNode(nodeId, spec.toString(), vs);
|
||||||
|
debug('+', a && a.toString());
|
||||||
|
// The "react { assert; stop on retracted ... }" pattern won't work here because of
|
||||||
|
// the `VisibilityRestriction`s. We'll never see the "retracted" event if we "stop on
|
||||||
|
// retracted aLocal" where aLocal = Skeleton.instantiateAssertion(spec, vs). Instead,
|
||||||
|
// we need to use `adhocAssert` and the `del` callback.
|
||||||
|
currentFacet().actor.adhocAssert(C.ToServer(server_addr, a));
|
||||||
|
},
|
||||||
|
del: (vs) => {
|
||||||
|
const a = RestrictedFromNode(nodeId, spec.toString(), vs);
|
||||||
|
debug('-', a && a.toString());
|
||||||
|
currentFacet().actor.adhocRetract(C.ToServer(server_addr, a));
|
||||||
|
},
|
||||||
|
msg: (vs) => {
|
||||||
|
const a = RestrictedFromNode(nodeId, spec.toString(), vs);
|
||||||
|
debug('!', a && a.toString());
|
||||||
|
send C.ToServer(server_addr, a);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue