diff --git a/packages/syntax-playground/package.json b/packages/syntax-playground/package.json index 4088fdd..57b0448 100644 --- a/packages/syntax-playground/package.json +++ b/packages/syntax-playground/package.json @@ -22,6 +22,8 @@ "@syndicate-lang/driver-timer": "^0.1.2", "@syndicate-lang/driver-udp-node": "^0.1.2", "@syndicate-lang/driver-websocket": "^0.1.2", + "@syndicate-lang/server": "^0.1.4", + "debug": "^4.1.1", "webpack": "^4.27.1", "webpack-cli": "^3.1.2" } diff --git a/packages/syntax-playground/src/socks.js b/packages/syntax-playground/src/socks.js index a743e6d..5e6a6e5 100644 --- a/packages/syntax-playground/src/socks.js +++ b/packages/syntax-playground/src/socks.js @@ -1,9 +1,60 @@ // https://www.ietf.org/rfc/rfc1928.txt -const { currentFacet, genUuid, Bytes, List, Set, Observe } = require("@syndicate-lang/core"); +const { currentFacet, genUuid, Bytes, Map, Observe, Skeleton } = require("@syndicate-lang/core"); +const C = activate require("@syndicate-lang/server/lib/client"); const S = activate require("@syndicate-lang/driver-streams-node"); +const debugFactory = require('debug'); assertion type VirtualTcpAddress(host, port); +assertion type AddressMap(from, nodeId, to); + +assertion type ToNode(nodeId, assertion); +assertion type FromNode(nodeId, assertion); +assertion type RestrictedFromNode(nodeId, spec, captures); + +function usage() { + console.info('Usage: syndicate-socks --server WEBSOCKETURL SCOPE'); + console.info(''); + console.info(' --help, -h Produce this message and terminate'); +} + +let server_url = null; +let server_scope = null; +function process_command_line(args) { + const notUndefined = (x, w) => { + if (x === void 0) { + console.error('Missing '+w+' argument on command line'); + usage(); + process.exit(1); + } + return x; + }; + const strArg = (w) => notUndefined(args.shift(), w); + const numArg = (w) => Number.parseInt(notUndefined(args.shift(), w)); + while (args.length) { + const opt = args.shift(); + switch (opt) { + case '--server': + server_url = strArg('server WebSocket URL'); + server_scope = strArg('server scope name'); + break; + default: + console.error("Unsupported command-line argument: " + opt); + /* FALL THROUGH */ + case '--help': + case '-h': + usage(); + process.exit(1); + } + } +} + +process_command_line(process.argv.slice(2)); +if (!server_url || !server_scope) { + usage(); + process.exit(1); +} +const server_addr = C.WSServer(server_url, server_scope); spawn named 'socks-server' { on asserted S.Stream($conn, S.Incoming(S.TcpListener(1080))) { @@ -197,41 +248,191 @@ spawn named 'socks-server' { } spawn named 'remap-service' { - field this.mapped = Set(); + field this.table = Map(); - on asserted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) { - this.mapped = this.mapped.add(a); - } - on retracted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) { - this.mapped = this.mapped.remove(a); - } + during C.ServerConnected(server_addr) { + on asserted C.FromServer(server_addr, $entry(AddressMap(_, _, _))) { + this.table = this.table.set(AddressMap._from(entry), entry); + } + on retracted C.FromServer(server_addr, $entry(AddressMap(_, _, _))) { + this.table = this.table.remove(AddressMap._from(entry)); + } - during S.Stream($id, S.Outgoing($a(VirtualTcpAddress($host, $port)))) { - if (host.endsWith('.fruit')) { - if (!this.mapped.includes(a)) { - console.error("No virtual mapping for", a.toString()); - const err = new Error(`No virtual mapping for ${a.toString()}`); - err.errno = err.code = 'ENOTFOUND'; - err.hostname = err.host = host; - err.port = port; - on start send S.Stream(id, S.Rejected(err)); + during S.Stream($id, S.Outgoing($a(VirtualTcpAddress($host, $port)))) { + if (host.endsWith('.fruit')) { + if (this.table.has(a)) { + const entry = this.table.get(a); + const A = server_addr; + const N = AddressMap._nodeId(entry); + const L = id; + const R = genUuid('proxiedStream'); + assert C.ToServer(A, ToNode(N, S.Stream(R, S.Outgoing(AddressMap._to(entry))))); + stop on message C.FromServer(A, FromNode(N, S.Stream(R, S.Rejected($err)))) { + send S.Stream(L, S.Rejected(err)); + } + stop on message C.FromServer(A, FromNode(N, S.Stream(R, S.Accepted()))) { + react { + on start send S.Stream(L, S.Accepted()); + + assert S.Stream(L, S.Info(Symbol.for("Duplex"), false)); // TODO + + assert S.Stream(L, S.Duplex()); + stop on retracted Observe(S.Stream(L, S.Duplex())); + stop on retracted C.FromServer(A, FromNode(N, S.Stream(R, S.Duplex()))); + + // Readable + + during Observe(S.Stream(L, S.End())) + during C.FromServer(A, FromNode(N, S.Stream(R, S.End()))) + assert S.Stream(L, S.End()); + + on message S.Stream(L, S.Pushback($chunk)) + send C.ToServer(A, ToNode(N, S.Stream(R, S.Pushback(chunk)))); + + during S.Stream(L, S.BackPressure($sinkL)) { + const sinkR = genUuid('sink'); + assert C.ToServer(A, ToNode(N, S.Stream(R, S.BackPressure(sinkR)))); + field this.seqno = -1; + field this.amount = 0; + on asserted S.Stream(sinkL, S.Window($seqno, $amount)) { + this.seqno = seqno; + this.amount = amount; + } + assert C.ToServer(A, ToNode(N, S.Stream(sinkR, S.Window(this.seqno, this.amount)))) + when (this.seqno >= 0); + } + + during C.FromServer(A, FromNode(N, S.Stream(R, S.DataReady()))) + assert S.Stream(L, S.DataReady()); + + during Observe(S.Stream(L, S.Data(_))) + on message C.FromServer(A, FromNode(N, S.Stream(R, S.Data($chunk)))) + send S.Stream(L, S.Data(chunk)); + + // Writable + + during Observe(S.Stream(L, S.Window(_, _))) { + field this.seqno = -1; + field this.amount = 0; + on asserted C.FromServer(A, FromNode(N, S.Stream(R, S.Window($seqno, $amount)))) { + this.seqno = seqno; + this.amount = amount; + } + assert S.Stream(L, S.Window(this.seqno, this.amount)) when (this.seqno >= 0); + } + + const withCallback = (ackL, f) => { + if (ackL === false) { + f(false); + } else { + react { + const ackR = genUuid('ack'); + on message C.FromServer(A, FromNode(N, ackR)) send ackL; + on start f(ackR); + } + } + } + + on message S.Stream(L, S.Push($chunk, $ackL)) { + withCallback(ackL, (ackR) => { + send C.ToServer(A, ToNode(N, S.Stream(R, S.Push(chunk, ackR)))); + }); + } + + on message S.Stream(L, S.Close($ackL)) { + withCallback(ackL, (ackR) => { + send C.ToServer(A, ToNode(N, S.Stream(R, S.Close(ackR)))); + }); + } + } + } + } else { + console.error("No virtual mapping for", a.toString()); + const err = new Error(`No virtual mapping for ${a.toString()}`); + err.errno = err.code = 'ENOTFOUND'; + err.hostname = err.host = host; + err.port = port; + on start send S.Stream(id, S.Rejected(err)); + } + } else { + assert S.Stream(id, S.Outgoing(S.TcpAddress(host, port))); } - } else { - assert S.Stream(id, S.Outgoing(S.TcpAddress(host, port))); } } } -spawn named 'test-remap' { - during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9999))) { - assert S.Stream(id, S.Outgoing(S.TcpAddress('steam.eighty-twenty.org', 22))); - } - - during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9998))) { - assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/sh', [], {}))); - } - - during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9997))) { - assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/cat', ['/proc/cpuinfo'], {}))); +spawn named 'from-node-relay' { + const debug = debugFactory('syndicate/server:socks:from-node-relay'); + during Observe(C.FromServer($addr, FromNode($node, $spec))) { + on start debug('?+', addr.toString(), node.toString(), spec.toString()); + on stop debug('?-', addr.toString(), node.toString(), spec.toString()); + during C.FromServer(addr, RestrictedFromNode(node, spec.toString(), $vs)) { + // ^ TODO: Use real quoting instead of spec.toString() hack!! + // TODO: Shouldn't the dataspace/client be doing the necessary quoting for us?? + const a = Skeleton.instantiateAssertion(C.FromServer(addr, FromNode(node, spec)), vs); + on start debug('+', a.toString()); + on stop debug('-', a.toString()); + assert a; + } + on message C.FromServer(addr, RestrictedFromNode(node, spec.toString(), $vs)) { + const a = Skeleton.instantiateAssertion(C.FromServer(addr, FromNode(node, spec)), vs); + debug('!', a.toString()); + send a; + } + } +} + +/////////////////////////////////////////////////////////////////////////// + +const nodeId = genUuid('node'); + +spawn named 'test-remap' { + during C.ServerConnected(server_addr) { + assert C.ToServer(server_addr, AddressMap(VirtualTcpAddress("steam.fruit", 22), + nodeId, + S.TcpAddress('steam.eighty-twenty.org', 22))); + + assert C.ToServer(server_addr, AddressMap(VirtualTcpAddress("shell.fruit", 9999), + nodeId, + S.SubprocessAddress('/bin/sh', [], {}))); + } +} + +spawn named 'to-node-relay' { + const debug = debugFactory('syndicate/server:socks:to-node-relay'); + during C.ServerConnected(server_addr) { + during C.FromServer(server_addr, ToNode(nodeId, $a)) { + on start debug('Remote peer has asserted', a && a.toString()); + on stop debug('Remote peer has retracted', a && a.toString()); + assert a; + } + on message C.FromServer(server_addr, ToNode(nodeId, $a)) { + send a; + } + during C.FromServer(server_addr, Observe(FromNode(nodeId, $spec))) { + on start debug('Remote peer has asserted interest in', spec && spec.toString()); + on stop debug('Remote peer has retracted interest in', spec && spec.toString()); + currentFacet().addObserverEndpoint(() => spec, { + add: (vs) => { + const a = RestrictedFromNode(nodeId, spec.toString(), vs); + debug('+', a && a.toString()); + // The "react { assert; stop on retracted ... }" pattern won't work here because of + // the `VisibilityRestriction`s. We'll never see the "retracted" event if we "stop on + // retracted aLocal" where aLocal = Skeleton.instantiateAssertion(spec, vs). Instead, + // we need to use `adhocAssert` and the `del` callback. + currentFacet().actor.adhocAssert(C.ToServer(server_addr, a)); + }, + del: (vs) => { + const a = RestrictedFromNode(nodeId, spec.toString(), vs); + debug('-', a && a.toString()); + currentFacet().actor.adhocRetract(C.ToServer(server_addr, a)); + }, + msg: (vs) => { + const a = RestrictedFromNode(nodeId, spec.toString(), vs); + debug('!', a && a.toString()); + send C.ToServer(server_addr, a); + } + }); + } } }