From 9a8898e8ec9aabd976e24cfd20d84db494e14145 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 31 May 2019 13:58:04 +0100 Subject: [PATCH] Significant refactoring of stream protocol --- packages/driver-http-node/src/index.js | 2 +- packages/driver-mdns/src/index.js | 8 +- packages/driver-streams-node/src/buffer.js | 18 +-- packages/driver-streams-node/src/net.js | 14 +-- packages/driver-streams-node/src/streams.js | 105 +++++++++--------- .../driver-streams-node/src/subprocess.js | 6 +- packages/server/src/index.js | 2 +- packages/server/src/server.js | 6 +- .../syntax-playground/src/avahipublish.js | 37 +----- packages/syntax-playground/src/chatclient.js | 30 +++-- packages/syntax-playground/src/chatserver.js | 12 +- .../syntax-playground/src/ncchatclient.js | 22 ++-- packages/syntax-playground/src/socks.js | 66 +++++------ packages/syntax-playground/src/ssh-relay.js | 22 ++-- packages/syntax-playground/src/subprocess.js | 8 +- 15 files changed, 176 insertions(+), 182 deletions(-) diff --git a/packages/driver-http-node/src/index.js b/packages/driver-http-node/src/index.js index 2646af8..d95183a 100644 --- a/packages/driver-http-node/src/index.js +++ b/packages/driver-http-node/src/index.js @@ -170,7 +170,7 @@ function _server(host, port, httpsOptions) { res.writeHead(500, "Internal server error", {}); res.end(); } - on asserted Observe(S.Readable(id)) react S.readableStreamBehaviour(id, req); + on asserted Observe(S.Stream(id, S.Readable())) react S.readableStreamBehaviour(id, req); on asserted Response(id, $code, $message, $headers, $detail) { res.writeHead(code, message, headers.toJS()); diff --git a/packages/driver-mdns/src/index.js b/packages/driver-mdns/src/index.js index c618a27..1f334fd 100644 --- a/packages/driver-mdns/src/index.js +++ b/packages/driver-mdns/src/index.js @@ -77,9 +77,9 @@ spawn named 'driver/avahi-publish' { field this.established = false; assert Published(svc, hostName, port, txtDataRecords) when (this.established); - on retracted S.Readable(stderr) topFacet.stop(); + on retracted S.Stream(stderr, S.Readable()) topFacet.stop(); - on message S.Line(stderr, $line) { + on message S.Stream(stderr, S.Line($line)) { line = line.toString('utf-8'); if (line.startsWith('Established')) { this.established = true; @@ -126,8 +126,8 @@ spawn named 'driver/avahi-browse' { on asserted S.SubprocessRunning(id, _, [_, $stdout, _]) { react { - on retracted S.Readable(stdout) topFacet.stop(); - on message S.Line(stdout, $line) { + on retracted S.Stream(stdout, S.Readable()) topFacet.stop(); + on message S.Stream(stdout, S.Line($line)) { // Parsing of TXT record data (appearing after the port // number in an '=' record) is unreliable given the way // avahi-browse formats it. diff --git a/packages/driver-streams-node/src/buffer.js b/packages/driver-streams-node/src/buffer.js index f6464ab..23adfa8 100644 --- a/packages/driver-streams-node/src/buffer.js +++ b/packages/driver-streams-node/src/buffer.js @@ -19,7 +19,7 @@ import { currentFacet, Observe, genUuid, Bytes, List } from "@syndicate-lang/core"; const S = activate require("./streams"); -message type PacketRequest(id, size); +message type PacketRequest(size); export { PacketRequest, @@ -39,26 +39,26 @@ export function spawnBufferStream() { function _spawnBufferStream(id) { spawn named id { - stop on retracted Observe(S.Duplex(id)); - assert S.Duplex(id); - assert S.StreamInfo(id, 'Duplex', null); + stop on retracted Observe(S.Stream(id, S.Duplex())); + assert S.Stream(id, S.Duplex()); + assert S.Stream(id, S.Info(Symbol.for('Duplex'), null)); field this.buffer = Bytes(); field this.queue = List(); - on message S.Push(id, $chunk, $ack) { + on message S.Stream(id, S.Push($chunk, $ack)) { this.buffer = Bytes.concat([this.buffer, chunk]); if (ack !== null) send ack; } - stop on message S.Close(id, $ack) { + stop on message S.Stream(id, S.Close($ack)) { if (ack !== null) send ack; } - on message PacketRequest(id, $size) { + on message S.Stream(id, PacketRequest($size)) { if (size === 0) { // Signal to terminate. - currentFacet().stop(() => { send S.Data(id, this.buffer); }); + currentFacet().stop(() => { send S.Stream(id, S.Data(this.buffer)); }); } else { this.queue = this.queue.push(size); } @@ -68,7 +68,7 @@ function _spawnBufferStream(id) { if (!this.queue.isEmpty()) { const expected = this.queue.first(); if (this.buffer.size >= expected) { - send S.Data(id, this.buffer.slice(0, expected)); + send S.Stream(id, S.Data(this.buffer.slice(0, expected))); this.buffer = this.buffer.slice(expected); this.queue = this.queue.shift(); } diff --git a/packages/driver-streams-node/src/net.js b/packages/driver-streams-node/src/net.js index da8317d..6b9a88d 100644 --- a/packages/driver-streams-node/src/net.js +++ b/packages/driver-streams-node/src/net.js @@ -30,7 +30,7 @@ assertion type UnixSocketServer(path); export { UnixSocketClient, UnixSocketServer }; spawn named 'NetDriver' { - during Observe(S.IncomingConnection(_, TcpListener($port))) spawn named ['TcpListener', port] { + during Observe(S.Stream(_, S.Incoming(TcpListener($port)))) spawn named ['TcpListener', port] { _netListener.call(this, () => genUuid('tcp' + port), TcpListener(port), @@ -38,7 +38,7 @@ spawn named 'NetDriver' { (server, err) => { throw err; }); } - during Observe(S.IncomingConnection(_, UnixSocketServer($path))) + during Observe(S.Stream(_, S.Incoming(UnixSocketServer($path)))) spawn named ['UnixSocketServer', path] { let retried = false; _netListener.call(this, @@ -94,14 +94,14 @@ spawn named 'NetDriver' { on stop try { server.close() } catch (e) { console.error(e); } } - during S.OutgoingConnection($id, TcpAddress($host, $port)) spawn named ['Tcp', id, host, port] { + during S.Stream($id, S.Outgoing(TcpAddress($host, $port))) spawn named ['Tcp', id, host, port] { _netConnector.call(this, id, (socket) => { socket.connect(port, host) }, TcpAddress(host, port)); } - during S.OutgoingConnection($id, UnixSocketClient($path)) spawn named ['Unix', id, path] { + during S.Stream($id, S.Outgoing(UnixSocketClient($path))) spawn named ['Unix', id, path] { _netConnector.call(this, id, (socket) => { socket.connect(path) }, @@ -118,11 +118,11 @@ spawn named 'NetDriver' { finish(); establishingFacet.stop(() => { socket.destroy(); - send S.ConnectionRejected(id, err); + send S.Stream(id, S.Rejected(err)); }); }); - on retracted S.OutgoingConnection(id, spec) { + on retracted S.Stream(id, S.Outgoing(spec)) { connectionErrorHandler(null); } @@ -130,7 +130,7 @@ spawn named 'NetDriver' { const readyHandler = Dataspace.wrapExternal(() => { socket.off('error', connectionErrorHandler); socket.off('ready', readyHandler); - send S.ConnectionAccepted(id); + send S.Stream(id, S.Accepted()); establishingFacet.stop(() => { react { on stop finish(); diff --git a/packages/driver-streams-node/src/streams.js b/packages/driver-streams-node/src/streams.js index 0ecda3f..65f4c32 100644 --- a/packages/driver-streams-node/src/streams.js +++ b/packages/driver-streams-node/src/streams.js @@ -22,10 +22,12 @@ import { } from "@syndicate-lang/core"; const stream = require('stream'); -assertion type IncomingConnection(id, spec); -assertion type OutgoingConnection(id, spec); -message type ConnectionAccepted(id); // for both incoming and outgoing connections -message type ConnectionRejected(id, err); // for both incoming and outgoing connections +assertion type Stream(id, detail); // for assertions and messages + +assertion type Incoming(spec); +assertion type Outgoing(spec); +message type Accepted(); // for both incoming and outgoing connections +message type Rejected(err); // for both incoming and outgoing connections // Each `chunk` to/from a stream in BINARY mode must be either a // String or a Uint8Array (or Buffer). Any `chunk` may be empty @@ -38,37 +40,38 @@ message type ConnectionRejected(id, err); // for both incoming and outgoing conn // when the corresponding chunk is completely processed. // Interest in StreamInfo is non-creative -assertion type StreamInfo(id, kind, stream); // kind ∈ "Readable", "Writable", "Duplex" +assertion type Info(kind, stream); // kind ∈ Readable, Writable, Duplex // Framing knowledge; interest in these is creative -assertion type Readable(id) = Symbol.for('stream-readable'); -assertion type Writable(id) = Symbol.for('stream-writable'); -assertion type Duplex(id) = Symbol.for('stream-duplex'); +assertion type Readable() = Symbol.for('stream-readable'); +assertion type Writable() = Symbol.for('stream-writable'); +assertion type Duplex() = Symbol.for('stream-duplex'); -message type Error(id, detail) = Symbol.for('stream-error'); +message type Error(detail) = Symbol.for('stream-error'); // From Readable: -message type Data(id, chunk) = Symbol.for('stream-data'); -assertion type End(id) = Symbol.for('stream-end'); // if no interest in this, frame torn down at end -assertion type DataReady(id) = Symbol.for('stream-data-ready'); +message type Data(chunk) = Symbol.for('stream-data'); +assertion type End() = Symbol.for('stream-end'); // if no interest in this, frame torn down at end +assertion type DataReady() = Symbol.for('stream-data-ready'); // To Writable: -message type Push(id, chunk, ack) = Symbol.for('stream-push'); -assertion type Close(id, ack) = Symbol.for('stream-close'); +message type Push(chunk, ack) = Symbol.for('stream-push'); +assertion type Close(ack) = Symbol.for('stream-close'); // From Writable: -assertion type BackPressure(readableId, writableId) = Symbol.for('stream-back-pressure'); -message type Window(writableId, seqno, amount) = Symbol.for('stream-credit'); +assertion type BackPressure(writableId) = Symbol.for('stream-back-pressure'); // readableId implicit +message type Window(seqno, amount) = Symbol.for('stream-credit'); // To Readable: -message type Pushback(id, chunk) = Symbol.for('stream-pushback'); +message type Pushback(chunk) = Symbol.for('stream-pushback'); // Readable output adapter: (TODO: move to separate module?) -message type Line(id, line) = Symbol.for('stream-line'); +message type Line(line) = Symbol.for('stream-line'); export { - IncomingConnection, OutgoingConnection, ConnectionAccepted, ConnectionRejected, - StreamInfo, Readable, Writable, Duplex, + Stream, + Incoming, Outgoing, Accepted, Rejected, + Info, Readable, Writable, Duplex, Error, Data, End, DataReady, Push, Close, @@ -102,7 +105,7 @@ function _readableStreamBehaviour(id, s) { const objectMode = s.objectMode || s.readableObjectMode || false; field this.endMonitorExists = false; - during Observe(End(id)) { + during Observe(Stream(id, End())) { on start this.endMonitorExists = true; on stop this.endMonitorExists = false; } @@ -110,7 +113,7 @@ function _readableStreamBehaviour(id, s) { s.on('end', Dataspace.wrapExternal(() => { if (this.endMonitorExists) { react { - assert End(id); + assert Stream(id, End()); stop on (!this.endMonitorExists) { this.stopBits |= READING_STOPPED; } @@ -120,16 +123,16 @@ function _readableStreamBehaviour(id, s) { } })); - on message Pushback(id, $chunk) s.unshift(chunk); + on message Stream(id, Pushback($chunk)) s.unshift(chunk); field this.outboundWindows = Map(); - during BackPressure(id, $writable) { - on asserted Window(writable, $seqno, $amount) { + during Stream(id, BackPressure($writable)) { + on asserted Stream(writable, Window($seqno, $amount)) { // Attend to `seqno` to allow otherwise-noop changes to // refresh the outboundWindow size. this.outboundWindows = this.outboundWindows.set(writable, amount); } - on retracted Window(writable, _, _) { + on retracted Stream(writable, Window(_, _)) { this.outboundWindows = this.outboundWindows.remove(writable); } } @@ -143,9 +146,9 @@ function _readableStreamBehaviour(id, s) { field this.readable = false; s.on('readable', Dataspace.wrapExternal(() => { this.readable = true; })); - assert DataReady(id) when (this.readable); + assert Stream(id, DataReady()) when (this.readable); - during Observe(Data(id, _)) { + during Observe(Stream(id, Data(_))) { dataflow { while (this.readable && (this.outboundWindow === null || this.outboundWindow > 0)) { const maxlen = (this.outboundWindow === null) @@ -159,7 +162,7 @@ function _readableStreamBehaviour(id, s) { // This is the adjustment that forces us to pay attention to seqno: this.outboundWindows = this.outboundWindows.mapEntries(([t, c]) => [t, c - amount]); if (this.outboundWindow !== null) this.outboundWindow -= amount; - send Data(id, chunk); + send Stream(id, Data(chunk)); } } } @@ -175,18 +178,20 @@ function _writableStreamBehaviour(id, s) { } field this.seqno = 0; field this.inboundWindow = refreshWindow(); - during Observe(Window(id, _, _)) assert Window(id, this.seqno, this.inboundWindow); + during Observe(Stream(id, Window(_, _))) { + assert Stream(id, Window(this.seqno, this.inboundWindow)); + } s.on('drain', Dataspace.wrapExternal(() => { this.inboundWindow = refreshWindow(); })); const callbackFor = (k) => (k === null ? void 0 : Dataspace.wrapExternal(() => { send k; })); - on message Push(id, $chunk, $ack) { + on message Stream(id, Push($chunk, $ack)) { s.write(objectMode ? chunk : Bytes.toIO(chunk), callbackFor(ack)); this.inboundWindow = refreshWindow(); } - on message Close(id, $ack) { + on message Stream(id, Close($ack)) { s.end(callbackFor(ack)); this.inboundWindow = refreshWindow(); } @@ -194,10 +199,10 @@ function _writableStreamBehaviour(id, s) { export function readableStreamBehaviour(id, s) { (function () { - assert StreamInfo(id, "Readable", s); + assert Stream(id, Info(Symbol.for("Readable"), s)); - assert Readable(id); - stop on retracted Observe(Readable(id)); + assert Stream(id, Readable()); + stop on retracted Observe(Stream(id, Readable())); _commonStreamBehaviour.call(this, s, WRITING_STOPPED); _readableStreamBehaviour.call(this, id, s); @@ -206,10 +211,10 @@ export function readableStreamBehaviour(id, s) { export function writableStreamBehaviour(id, s) { (function () { - assert StreamInfo(id, "Writable", s); + assert Stream(id, Info(Symbol.for("Writable"), s)); - assert Writable(id); - stop on retracted Observe(Writable(id)); + assert Stream(id, Writable()); + stop on retracted Observe(Stream(id, Writable())); _commonStreamBehaviour.call(this, s, READING_STOPPED); _writableStreamBehaviour.call(this, id, s); @@ -218,10 +223,10 @@ export function writableStreamBehaviour(id, s) { export function duplexStreamBehaviour(id, s) { (function () { - assert StreamInfo(id, "Duplex", s); + assert Stream(id, Info(Symbol.for("Duplex"), s)); - assert Duplex(id); - stop on retracted Observe(Duplex(id)); + assert Stream(id, Duplex()); + stop on retracted Observe(Stream(id, Duplex())); _commonStreamBehaviour.call(this, s, 0); _readableStreamBehaviour.call(this, id, s); @@ -230,26 +235,26 @@ export function duplexStreamBehaviour(id, s) { } spawn named 'driver/stream-line' { - during Observe(Line($id, _)) spawn named ['LineReader', id] { + during Observe(Stream($id, Line(_))) spawn named ['LineReader', id] { field this.buffer = Bytes(); - on message Data(id, $data) this.buffer = Bytes.concat([this.buffer, data]); + on message Stream(id, Data($data)) this.buffer = Bytes.concat([this.buffer, data]); dataflow { const pos = this.buffer.indexOf(10); if (pos !== -1) { const line = this.buffer.slice(0, pos); this.buffer = this.buffer.slice(pos + 1); - send Line(id, line); + send Stream(id, Line(line)); } } } } export function spawnConnection(id, spec, s) { - spawn named ['IncomingConnection', id, spec] { - assert IncomingConnection(id, spec); - stop on retracted Observe(IncomingConnection(_, spec)) s.destroy(); - stop on message ConnectionRejected(id, $err) s.destroy(err); - stop on asserted Observe(Duplex(id)) react duplexStreamBehaviour(id, s); - stop on message ConnectionAccepted(id) react duplexStreamBehaviour(id, s); + spawn named ['Incoming', id, spec] { + assert Stream(id, Incoming(spec)); + stop on retracted Observe(Stream(_, Incoming(spec))) s.destroy(); + stop on message Stream(id, Rejected($err)) s.destroy(err); + stop on asserted Observe(Stream(id, Duplex())) react duplexStreamBehaviour(id, s); + stop on message Stream(id, Accepted()) react duplexStreamBehaviour(id, s); } } diff --git a/packages/driver-streams-node/src/subprocess.js b/packages/driver-streams-node/src/subprocess.js index 9f17a0c..cc47dc4 100644 --- a/packages/driver-streams-node/src/subprocess.js +++ b/packages/driver-streams-node/src/subprocess.js @@ -37,7 +37,7 @@ export { }; spawn named 'driver/Subprocess' { - during S.OutgoingConnection($id, SubprocessAddress($command, $args, $options)) + during S.Stream($id, S.Outgoing(SubprocessAddress($command, $args, $options))) spawn named ['SubprocessConnection', id] { const establishingFacet = currentFacet(); @@ -46,7 +46,7 @@ spawn named 'driver/Subprocess' { (options || Map()).set('stdio', ['pipe', 'pipe', 'inherit']).toJS()); const rejecter = Dataspace.wrapExternal(() => { - send S.ConnectionRejected(id, null); + send S.Stream(id, S.Rejected(null)); establishingFacet.stop(); }); sp.on('exit', rejecter); @@ -55,7 +55,7 @@ spawn named 'driver/Subprocess' { process.nextTick(Dataspace.wrapExternal(() => { sp.off('exit', rejecter); sp.off('error', rejecter); - send S.ConnectionAccepted(id); + send S.Stream(id, S.Accepted()); const s = new Duplex(sp.stdout, sp.stdin); establishingFacet.stop(() => { react { diff --git a/packages/server/src/index.js b/packages/server/src/index.js index e340937..8c0511c 100644 --- a/packages/server/src/index.js +++ b/packages/server/src/index.js @@ -109,7 +109,7 @@ spawn named 'server' { function _spawnStreamServer(spec) { spawn named spec { assert D.AvailableTransport(spec); - on asserted S.IncomingConnection($id, spec) Server.streamServerActor(id, [spec, id]); + on asserted S.Stream($id, S.Incoming(spec)) Server.streamServerActor(id, [spec, id]); } } diff --git a/packages/server/src/server.js b/packages/server/src/server.js index 1a6207a..7116e31 100644 --- a/packages/server/src/server.js +++ b/packages/server/src/server.js @@ -31,20 +31,20 @@ export function streamServerFacet(id) { assert P.POA(id); const decoder = W.makeDecoder(null); const buf = B.buffer(this, 'chunks'); - on message S.Data(id, $data) buf.push(data); + on message S.Stream(id, S.Data($data)) buf.push(data); during P.POAReady(reqId) buf.drain((data) => { decoder.write(data); let v; while ((v = decoder.try_next())) send P.FromPOA(id, v); }); - on message P.ToPOA(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); + on message P.ToPOA(id, $resp) send S.Stream(id, S.Push(new Encoder().push(resp).contents(), null)); stop on message P.Disconnect(id); stop on retracted P.POAReady(id); } export function streamServerActor(id, debugLabel) { spawn named [debugLabel || 'stream-poa', id] { - stop on retracted S.Duplex(id); + stop on retracted S.Stream(id, S.Duplex()); streamServerFacet(id); } } diff --git a/packages/syntax-playground/src/avahipublish.js b/packages/syntax-playground/src/avahipublish.js index 7792d67..5fb8e47 100644 --- a/packages/syntax-playground/src/avahipublish.js +++ b/packages/syntax-playground/src/avahipublish.js @@ -1,42 +1,17 @@ const { Observe, currentFacet, genUuid } = require("@syndicate-lang/core"); -const S = activate require("@syndicate-lang/driver-streams-node"); const M = activate require("@syndicate-lang/driver-mdns"); spawn named 'test' { - // const svc = M.Service((new Date()).toJSON(), '_syndicate._tcp'); - // assert M.Publish(svc, null, 8001, []); - - // during M.Discovered(M.Service($name, '_syndicate._tcp'), - // $hostName, - // $port, - // $txtData, - // $address, - // "IPv4", - // $interfaceName) - // { - // on start console.log('+', name, hostName, port, txtData, address, interfaceName); - // on stop console.log('-', name, hostName, port, txtData, address, interfaceName); - // } + const svc = M.Service((new Date()).toJSON(), '_syndicate+testing._tcp'); + assert M.Publish(svc, null, 8001, []); field this.count = 0; dataflow console.log('Service count:', this.count); - during M.Discovered(M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", _) + during M.Discovered(M.Service($name, '_syndicate+testing._tcp'), + $host, $port, _, $addr, "IPv4", $ifName) { - on start { this.count++; console.log('+ws', name, host, port, txt.get(0, 'N/A'), addr); } - on stop { this.count--; console.log('-ws', name, host, port, txt.get(0, 'N/A'), addr); } + on start { this.count++; console.log('+', name, host, port, addr, ifName); } + on stop { this.count--; console.log('-', name, host, port, addr, ifName); } } - - during M.Discovered(M.Service($name, '_syndicate._tcp'), $host, $port, _, $addr, "IPv4", _) - { - on start { this.count++; console.log('+tcp', name, host, port, addr); } - on stop { this.count--; console.log('-tcp', name, host, port, addr); } - } - - // during M.Discovered(M.Service($n, $t), $h, $p, $d, $a, "IPv4", $i) { - // if (t !== '_syndicate._tcp') { - // on start console.log('**', t, n, h, p, d, a, i); - // on stop console.log('==', t, n, h, p, d, a, i); - // } - // } } diff --git a/packages/syntax-playground/src/chatclient.js b/packages/syntax-playground/src/chatclient.js index b70fb97..dbb2547 100644 --- a/packages/syntax-playground/src/chatclient.js +++ b/packages/syntax-playground/src/chatclient.js @@ -23,27 +23,33 @@ const net = require('net'); const stdin = genUuid('stdin'); const stdout = genUuid('stdout'); spawn named 'stdioServer' { - during Observe(S.Readable(stdin)) spawn S.readableStreamBehaviour(stdin, process.stdin); - during Observe(S.Writable(stdout)) spawn S.writableStreamBehaviour(stdout, process.stdout); + during Observe(S.Stream(stdin, S.Readable())) + spawn S.readableStreamBehaviour(stdin, process.stdin); + during Observe(S.Stream(stdout, S.Writable())) + spawn S.writableStreamBehaviour(stdout, process.stdout); } spawn named 'chatclient' { const id = genUuid('tcpconn'); - assert S.OutgoingConnection(id, S.TcpAddress('localhost', 5999)); - stop on message S.ConnectionRejected(id, $err) { + assert S.Stream(id, S.Outgoing(S.TcpAddress('localhost', 5999))); + stop on message S.Stream(id, S.Rejected($err)) { console.error('Connection rejected', err); } - stop on message S.ConnectionAccepted(id) { + stop on message S.Stream(id, S.Accepted()) { react { - stop on retracted S.Duplex(id); - stop on retracted S.Readable(stdin); - stop on retracted S.Writable(stdout); + stop on retracted S.Stream(id, S.Duplex()); + stop on retracted S.Stream(stdin, S.Readable()); + stop on retracted S.Stream(stdout, S.Writable()); - assert S.BackPressure(stdin, id); - assert S.BackPressure(id, stdout); + assert S.Stream(stdin, S.BackPressure(id)); + assert S.Stream(id, S.BackPressure(stdout)); - on message S.Line(stdin, $line) send S.Push(id, line.toString('utf-8') + '\n', null); - on message S.Line(id, $line) send S.Push(stdout, line.toString('utf-8') + '\n', null); + on message S.Stream(stdin, S.Line($line)) { + send S.Stream(id, S.Push(line.toString('utf-8') + '\n', null)); + } + on message S.Stream(id, S.Line($line)) { + send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', null)); + } } } } diff --git a/packages/syntax-playground/src/chatserver.js b/packages/syntax-playground/src/chatserver.js index 2201721..68e9bf8 100644 --- a/packages/syntax-playground/src/chatserver.js +++ b/packages/syntax-playground/src/chatserver.js @@ -23,17 +23,17 @@ message type Speak(who, what); assertion type Present(who); spawn named 'chatserver' { - on asserted S.IncomingConnection($id, S.TcpListener(5999)) { + on asserted S.Stream($id, S.Incoming(S.TcpListener(5999))) { const me = genUuid('user'); spawn named ['connectedUser', me] { - stop on retracted S.Duplex(id); + stop on retracted S.Stream(id, S.Duplex()); assert Present(me); - on asserted Present($who) send S.Push(id, `${who} arrived.\n`, null); - on retracted Present($who) send S.Push(id, `${who} departed.\n`, null); + on asserted Present($who) send S.Stream(id, S.Push(`${who} arrived.\n`, null)); + on retracted Present($who) send S.Stream(id, S.Push(`${who} departed.\n`, null)); - on message S.Line(id, $line) send Speak(me, line); - on message Speak($who, $what) send S.Push(id, `${who}: ${what}\n`, null); + on message S.Stream(id, S.Line($line)) send Speak(me, line); + on message Speak($who, $what) send S.Stream(id, S.Push(`${who}: ${what}\n`, null)); } } } diff --git a/packages/syntax-playground/src/ncchatclient.js b/packages/syntax-playground/src/ncchatclient.js index bfed1e6..ef3bb18 100644 --- a/packages/syntax-playground/src/ncchatclient.js +++ b/packages/syntax-playground/src/ncchatclient.js @@ -23,8 +23,10 @@ const S = activate require("@syndicate-lang/driver-streams-node"); const stdin = genUuid('stdin'); const stdout = genUuid('stdout'); spawn named 'stdioServer' { - during Observe(S.Readable(stdin)) spawn S.readableStreamBehaviour(stdin, process.stdin); - during Observe(S.Writable(stdout)) spawn S.writableStreamBehaviour(stdout, process.stdout); + during Observe(S.Stream(stdin, S.Readable())) + spawn S.readableStreamBehaviour(stdin, process.stdin); + during Observe(S.Stream(stdout, S.Writable())) + spawn S.writableStreamBehaviour(stdout, process.stdout); } spawn named 'chatclient-via-nc' { @@ -33,20 +35,22 @@ spawn named 'chatclient-via-nc' { stop on message S.SubprocessError(id, $err) { console.error("Couldn't start subprocess", err); } - stop on retracted S.Readable(stdin); - stop on retracted S.Writable(stdout); + stop on retracted S.Stream(stdin, S.Readable()); + stop on retracted S.Stream(stdout, S.Writable()); on asserted S.SubprocessRunning(id, _, [$i, $o, _]) { react { - on message S.Line(stdin, $line) { + on message S.Stream(stdin, S.Line($line)) { console.log('INPUT:', line); - send S.Push(i, line.toString('utf-8') + '\n', null); + send S.Stream(i, S.Push(line.toString('utf-8') + '\n', null)); } - on message S.End(stdin) { + on message S.Stream(stdin, S.End()) { console.log('INPUT EOF'); - send S.Close(i, null); + send S.Stream(i, S.Close(null)); } - on message S.Line(o, $line) send S.Push(stdout, line.toString('utf-8') + '\n', null); + on message S.Stream(o, S.Line($line)) { + send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', null)); + } } } stop on asserted S.SubprocessExit(id, $code, $signal) { diff --git a/packages/syntax-playground/src/socks.js b/packages/syntax-playground/src/socks.js index 736c630..1092731 100644 --- a/packages/syntax-playground/src/socks.js +++ b/packages/syntax-playground/src/socks.js @@ -6,7 +6,7 @@ const S = activate require("@syndicate-lang/driver-streams-node"); assertion type VirtualTcpAddress(host, port); spawn named 'socks-server' { - on asserted S.IncomingConnection($conn, S.TcpListener(1080)) { + on asserted S.Stream($conn, S.Incoming(S.TcpListener(1080))) { spawn named ['socksconn', conn] { const self = this; @@ -15,32 +15,32 @@ spawn named 'socks-server' { const rootFacet = currentFacet(); - stop on retracted S.Duplex(conn); + stop on retracted S.Stream(conn, S.Duplex()); const buf = S.onStartSpawnBufferStream(); field this.bufferWanted = true; on start react { stop on (!this.bufferWanted); - assert Observe(S.Duplex(buf)); - on message S.Data(conn, $chunk) send S.Push(buf, chunk, null); + assert Observe(S.Stream(buf, S.Duplex())); + on message S.Stream(conn, S.Data($chunk)) send S.Stream(buf, S.Push(chunk, null)); } on start selectAuthenticationMethod(); function readChunk(size, k) { react { - on start send S.PacketRequest(buf, size); - stop on message S.Data(buf, $chunk) { + on start send S.Stream(buf, S.PacketRequest(size)); + stop on message S.Stream(buf, S.Data($chunk)) { k(chunk); } } } function sendReply(replyCode, addrTypeAddrPort) { - send S.Push(conn, Bytes.concat([ + send S.Stream(conn, S.Push(Bytes.concat([ Bytes.from([5, replyCode, 0]), (addrTypeAddrPort || Bytes.from([1, 0,0,0,0, 0,0])) - ]), null); + ]), null)); } function dieOnBadVersion(packet) { @@ -54,10 +54,10 @@ spawn named 'socks-server' { readChunk(nMethods, (methods) => { if (!methods.includes(0)) { console.error('Client will not accept no-authentication'); - send S.Push(conn, Bytes.from([5, 255]), null); + send S.Stream(conn, S.Push(Bytes.from([5, 255]), null)); rootFacet.stop(); } else { - send S.Push(conn, Bytes.from([5, 0]), null); // select no-authentication + send S.Stream(conn, S.Push(Bytes.from([5, 0]), null)); // select no-authentication readSocksRequest(); } }); @@ -123,8 +123,8 @@ spawn named 'socks-server' { react { console.log(conn, 'CONNECT', addr, port); const out = genUuid('out'); - assert S.OutgoingConnection(out, VirtualTcpAddress(addr, port)); - stop on message S.ConnectionRejected(out, $err) { + assert S.Stream(out, S.Outgoing(VirtualTcpAddress(addr, port))); + stop on message S.Stream(out, S.Rejected($err)) { console.error('Could not connect outgoing', addr, port, err); switch (err.code) { case 'ENETUNREACH': @@ -149,10 +149,10 @@ spawn named 'socks-server' { break; } } - stop on message S.ConnectionAccepted(out) { + stop on message S.Stream(out, S.Accepted()) { react { - on retracted S.Duplex(out) rootFacet.stop(); - on asserted S.StreamInfo(out, _, $handle) { + on retracted S.Stream(out, S.Duplex()) rootFacet.stop(); + on asserted S.Stream(out, S.Info(_, $handle)) { const localAddrStr = handle.localAddress || '127.255.255.254'; const localPort = handle.localPort || 0; let localAddr = null; @@ -175,12 +175,16 @@ spawn named 'socks-server' { sendReply(0 /* success */, localEnd); readChunk(0, (firstChunk) => { self.bufferWanted = false; - send S.Push(out, firstChunk, null); + send S.Stream(out, S.Push(firstChunk, null)); react { - assert S.BackPressure(conn, out); - assert S.BackPressure(out, conn); - on message S.Data(conn, $chunk) send S.Push(out, chunk, null); - on message S.Data(out, $chunk) send S.Push(conn, chunk, null); + assert S.Stream(conn, S.BackPressure(out)); + assert S.Stream(out, S.BackPressure(conn)); + on message S.Stream(conn, S.Data($chunk)) { + send S.Stream(out, S.Push(chunk, null)); + } + on message S.Stream(out, S.Data($chunk)) { + send S.Stream(conn, S.Push(chunk, null)); + } } }); } @@ -195,14 +199,14 @@ spawn named 'socks-server' { spawn named 'remap-service' { field this.mapped = Set(); - on asserted Observe(S.OutgoingConnection(_, $a(VirtualTcpAddress(_, _)))) { + on asserted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) { this.mapped = this.mapped.add(a); } - on retracted Observe(S.OutgoingConnection(_, $a(VirtualTcpAddress(_, _)))) { + on retracted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) { this.mapped = this.mapped.remove(a); } - during S.OutgoingConnection($id, $a(VirtualTcpAddress($host, $port))) { + during S.Stream($id, S.Outgoing($a(VirtualTcpAddress($host, $port)))) { if (host.endsWith('.fruit')) { if (!this.mapped.includes(a)) { console.error("No virtual mapping for", a.toString()); @@ -210,24 +214,24 @@ spawn named 'remap-service' { err.errno = err.code = 'ENOTFOUND'; err.hostname = err.host = host; err.port = port; - on start send S.ConnectionRejected(id, err); + on start send S.Stream(id, S.Rejected(err)); } } else { - assert S.OutgoingConnection(id, S.TcpAddress(host, port)); + assert S.Stream(id, S.Outgoing(S.TcpAddress(host, port))); } } } spawn named 'test-remap' { - during S.OutgoingConnection($id, VirtualTcpAddress('foobar.fruit', 9999)) { - assert S.OutgoingConnection(id, S.TcpAddress('steam.eighty-twenty.org', 22)); + during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9999))) { + assert S.Stream(id, S.Outgoing(S.TcpAddress('steam.eighty-twenty.org', 22))); } - during S.OutgoingConnection($id, VirtualTcpAddress('foobar.fruit', 9998)) { - assert S.OutgoingConnection(id, S.SubprocessAddress('/bin/sh', [], {})); + during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9998))) { + assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/sh', [], {}))); } - during S.OutgoingConnection($id, VirtualTcpAddress('foobar.fruit', 9997)) { - assert S.OutgoingConnection(id, S.SubprocessAddress('/bin/cat', ['/proc/cpuinfo'], {})); + during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9997))) { + assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/cat', ['/proc/cpuinfo'], {}))); } } diff --git a/packages/syntax-playground/src/ssh-relay.js b/packages/syntax-playground/src/ssh-relay.js index 6835dc6..e564133 100644 --- a/packages/syntax-playground/src/ssh-relay.js +++ b/packages/syntax-playground/src/ssh-relay.js @@ -2,23 +2,23 @@ const { currentFacet, genUuid } = require("@syndicate-lang/core"); const S = activate require("@syndicate-lang/driver-streams-node"); spawn named 'ssh-relay-server' { - on asserted S.IncomingConnection($conn, S.TcpListener(2022)) { + on asserted S.Stream($conn, S.Incoming(S.TcpListener(2022))) { spawn named ['sshconn', conn] { - stop on retracted S.Duplex(conn); + stop on retracted S.Stream(conn, S.Duplex()); const daemon = genUuid('daemon'); - assert S.OutgoingConnection(daemon, S.SubprocessAddress('/usr/sbin/sshd', ['-dei'], {})); - stop on message S.ConnectionRejected(daemon, $err) { + assert S.Stream(daemon, S.Outgoing(S.SubprocessAddress('/usr/sbin/sshd', ['-dei'], {}))); + stop on message S.Stream(daemon, S.Rejected($err)) { console.error("Couldn't start sshd", err); } - stop on message S.ConnectionAccepted(daemon) { + stop on message S.Stream(daemon, S.Accepted()) { react { - stop on retracted S.Duplex(conn); - stop on retracted S.Duplex(daemon); - assert S.BackPressure(conn, daemon); - assert S.BackPressure(daemon, conn); - on message S.Data(conn, $chunk) send S.Push(daemon, chunk, null); - on message S.Data(daemon, $chunk) send S.Push(conn, chunk, null); + stop on retracted S.Stream(conn, S.Duplex()); + stop on retracted S.Stream(daemon, S.Duplex()); + assert S.Stream(conn, S.BackPressure(daemon)); + assert S.Stream(daemon, S.BackPressure(conn)); + on message S.Stream(conn, S.Data($chunk)) send S.Stream(daemon, S.Push(chunk, null)); + on message S.Stream(daemon, S.Data($chunk)) send S.Stream(conn, S.Push(chunk, null)); } } } diff --git a/packages/syntax-playground/src/subprocess.js b/packages/syntax-playground/src/subprocess.js index 1591166..38e0351 100644 --- a/packages/syntax-playground/src/subprocess.js +++ b/packages/syntax-playground/src/subprocess.js @@ -29,11 +29,11 @@ spawn named 'lister' { stop on message S.SubprocessError(id, $err) console.error("Couldn't start subprocess", err); on asserted S.SubprocessRunning(id, _, [$i, $o, _]) { - send S.Push(i, "GET / HTTP/1.0\r\n\r\n", null); - send S.Close(i, null); + send S.Stream(i, S.Push("GET / HTTP/1.0\r\n\r\n", null)); + send S.Stream(i, S.Close(null)); react { - on message S.Data(o, $chunk) console.log(chunk); - on asserted S.End(o) console.log('DONE!'); + on message S.Stream(o, S.Data($chunk)) console.log(chunk); + on asserted S.Stream(o, S.End()) console.log('DONE!'); } }