Significant refactoring of stream protocol

This commit is contained in:
Tony Garnock-Jones 2019-05-31 13:58:04 +01:00
parent 73d55a8018
commit 9a8898e8ec
15 changed files with 176 additions and 182 deletions

View File

@ -170,7 +170,7 @@ function _server(host, port, httpsOptions) {
res.writeHead(500, "Internal server error", {}); res.writeHead(500, "Internal server error", {});
res.end(); res.end();
} }
on asserted Observe(S.Readable(id)) react S.readableStreamBehaviour(id, req); on asserted Observe(S.Stream(id, S.Readable())) react S.readableStreamBehaviour(id, req);
on asserted Response(id, $code, $message, $headers, $detail) on asserted Response(id, $code, $message, $headers, $detail)
{ {
res.writeHead(code, message, headers.toJS()); res.writeHead(code, message, headers.toJS());

View File

@ -77,9 +77,9 @@ spawn named 'driver/avahi-publish' {
field this.established = false; field this.established = false;
assert Published(svc, hostName, port, txtDataRecords) when (this.established); assert Published(svc, hostName, port, txtDataRecords) when (this.established);
on retracted S.Readable(stderr) topFacet.stop(); on retracted S.Stream(stderr, S.Readable()) topFacet.stop();
on message S.Line(stderr, $line) { on message S.Stream(stderr, S.Line($line)) {
line = line.toString('utf-8'); line = line.toString('utf-8');
if (line.startsWith('Established')) { if (line.startsWith('Established')) {
this.established = true; this.established = true;
@ -126,8 +126,8 @@ spawn named 'driver/avahi-browse' {
on asserted S.SubprocessRunning(id, _, [_, $stdout, _]) { on asserted S.SubprocessRunning(id, _, [_, $stdout, _]) {
react { react {
on retracted S.Readable(stdout) topFacet.stop(); on retracted S.Stream(stdout, S.Readable()) topFacet.stop();
on message S.Line(stdout, $line) { on message S.Stream(stdout, S.Line($line)) {
// Parsing of TXT record data (appearing after the port // Parsing of TXT record data (appearing after the port
// number in an '=' record) is unreliable given the way // number in an '=' record) is unreliable given the way
// avahi-browse formats it. // avahi-browse formats it.

View File

@ -19,7 +19,7 @@
import { currentFacet, Observe, genUuid, Bytes, List } from "@syndicate-lang/core"; import { currentFacet, Observe, genUuid, Bytes, List } from "@syndicate-lang/core";
const S = activate require("./streams"); const S = activate require("./streams");
message type PacketRequest(id, size); message type PacketRequest(size);
export { export {
PacketRequest, PacketRequest,
@ -39,26 +39,26 @@ export function spawnBufferStream() {
function _spawnBufferStream(id) { function _spawnBufferStream(id) {
spawn named id { spawn named id {
stop on retracted Observe(S.Duplex(id)); stop on retracted Observe(S.Stream(id, S.Duplex()));
assert S.Duplex(id); assert S.Stream(id, S.Duplex());
assert S.StreamInfo(id, 'Duplex', null); assert S.Stream(id, S.Info(Symbol.for('Duplex'), null));
field this.buffer = Bytes(); field this.buffer = Bytes();
field this.queue = List(); field this.queue = List();
on message S.Push(id, $chunk, $ack) { on message S.Stream(id, S.Push($chunk, $ack)) {
this.buffer = Bytes.concat([this.buffer, chunk]); this.buffer = Bytes.concat([this.buffer, chunk]);
if (ack !== null) send ack; if (ack !== null) send ack;
} }
stop on message S.Close(id, $ack) { stop on message S.Stream(id, S.Close($ack)) {
if (ack !== null) send ack; if (ack !== null) send ack;
} }
on message PacketRequest(id, $size) { on message S.Stream(id, PacketRequest($size)) {
if (size === 0) { if (size === 0) {
// Signal to terminate. // Signal to terminate.
currentFacet().stop(() => { send S.Data(id, this.buffer); }); currentFacet().stop(() => { send S.Stream(id, S.Data(this.buffer)); });
} else { } else {
this.queue = this.queue.push(size); this.queue = this.queue.push(size);
} }
@ -68,7 +68,7 @@ function _spawnBufferStream(id) {
if (!this.queue.isEmpty()) { if (!this.queue.isEmpty()) {
const expected = this.queue.first(); const expected = this.queue.first();
if (this.buffer.size >= expected) { if (this.buffer.size >= expected) {
send S.Data(id, this.buffer.slice(0, expected)); send S.Stream(id, S.Data(this.buffer.slice(0, expected)));
this.buffer = this.buffer.slice(expected); this.buffer = this.buffer.slice(expected);
this.queue = this.queue.shift(); this.queue = this.queue.shift();
} }

View File

@ -30,7 +30,7 @@ assertion type UnixSocketServer(path);
export { UnixSocketClient, UnixSocketServer }; export { UnixSocketClient, UnixSocketServer };
spawn named 'NetDriver' { spawn named 'NetDriver' {
during Observe(S.IncomingConnection(_, TcpListener($port))) spawn named ['TcpListener', port] { during Observe(S.Stream(_, S.Incoming(TcpListener($port)))) spawn named ['TcpListener', port] {
_netListener.call(this, _netListener.call(this,
() => genUuid('tcp' + port), () => genUuid('tcp' + port),
TcpListener(port), TcpListener(port),
@ -38,7 +38,7 @@ spawn named 'NetDriver' {
(server, err) => { throw err; }); (server, err) => { throw err; });
} }
during Observe(S.IncomingConnection(_, UnixSocketServer($path))) during Observe(S.Stream(_, S.Incoming(UnixSocketServer($path))))
spawn named ['UnixSocketServer', path] { spawn named ['UnixSocketServer', path] {
let retried = false; let retried = false;
_netListener.call(this, _netListener.call(this,
@ -94,14 +94,14 @@ spawn named 'NetDriver' {
on stop try { server.close() } catch (e) { console.error(e); } on stop try { server.close() } catch (e) { console.error(e); }
} }
during S.OutgoingConnection($id, TcpAddress($host, $port)) spawn named ['Tcp', id, host, port] { during S.Stream($id, S.Outgoing(TcpAddress($host, $port))) spawn named ['Tcp', id, host, port] {
_netConnector.call(this, _netConnector.call(this,
id, id,
(socket) => { socket.connect(port, host) }, (socket) => { socket.connect(port, host) },
TcpAddress(host, port)); TcpAddress(host, port));
} }
during S.OutgoingConnection($id, UnixSocketClient($path)) spawn named ['Unix', id, path] { during S.Stream($id, S.Outgoing(UnixSocketClient($path))) spawn named ['Unix', id, path] {
_netConnector.call(this, _netConnector.call(this,
id, id,
(socket) => { socket.connect(path) }, (socket) => { socket.connect(path) },
@ -118,11 +118,11 @@ spawn named 'NetDriver' {
finish(); finish();
establishingFacet.stop(() => { establishingFacet.stop(() => {
socket.destroy(); socket.destroy();
send S.ConnectionRejected(id, err); send S.Stream(id, S.Rejected(err));
}); });
}); });
on retracted S.OutgoingConnection(id, spec) { on retracted S.Stream(id, S.Outgoing(spec)) {
connectionErrorHandler(null); connectionErrorHandler(null);
} }
@ -130,7 +130,7 @@ spawn named 'NetDriver' {
const readyHandler = Dataspace.wrapExternal(() => { const readyHandler = Dataspace.wrapExternal(() => {
socket.off('error', connectionErrorHandler); socket.off('error', connectionErrorHandler);
socket.off('ready', readyHandler); socket.off('ready', readyHandler);
send S.ConnectionAccepted(id); send S.Stream(id, S.Accepted());
establishingFacet.stop(() => { establishingFacet.stop(() => {
react { react {
on stop finish(); on stop finish();

View File

@ -22,10 +22,12 @@ import {
} from "@syndicate-lang/core"; } from "@syndicate-lang/core";
const stream = require('stream'); const stream = require('stream');
assertion type IncomingConnection(id, spec); assertion type Stream(id, detail); // for assertions and messages
assertion type OutgoingConnection(id, spec);
message type ConnectionAccepted(id); // for both incoming and outgoing connections assertion type Incoming(spec);
message type ConnectionRejected(id, err); // for both incoming and outgoing connections assertion type Outgoing(spec);
message type Accepted(); // for both incoming and outgoing connections
message type Rejected(err); // for both incoming and outgoing connections
// Each `chunk` to/from a stream in BINARY mode must be either a // Each `chunk` to/from a stream in BINARY mode must be either a
// String or a Uint8Array (or Buffer). Any `chunk` may be empty // String or a Uint8Array (or Buffer). Any `chunk` may be empty
@ -38,37 +40,38 @@ message type ConnectionRejected(id, err); // for both incoming and outgoing conn
// when the corresponding chunk is completely processed. // when the corresponding chunk is completely processed.
// Interest in StreamInfo is non-creative // Interest in StreamInfo is non-creative
assertion type StreamInfo(id, kind, stream); // kind ∈ "Readable", "Writable", "Duplex" assertion type Info(kind, stream); // kind ∈ Readable, Writable, Duplex
// Framing knowledge; interest in these is creative // Framing knowledge; interest in these is creative
assertion type Readable(id) = Symbol.for('stream-readable'); assertion type Readable() = Symbol.for('stream-readable');
assertion type Writable(id) = Symbol.for('stream-writable'); assertion type Writable() = Symbol.for('stream-writable');
assertion type Duplex(id) = Symbol.for('stream-duplex'); assertion type Duplex() = Symbol.for('stream-duplex');
message type Error(id, detail) = Symbol.for('stream-error'); message type Error(detail) = Symbol.for('stream-error');
// From Readable: // From Readable:
message type Data(id, chunk) = Symbol.for('stream-data'); message type Data(chunk) = Symbol.for('stream-data');
assertion type End(id) = Symbol.for('stream-end'); // if no interest in this, frame torn down at end assertion type End() = Symbol.for('stream-end'); // if no interest in this, frame torn down at end
assertion type DataReady(id) = Symbol.for('stream-data-ready'); assertion type DataReady() = Symbol.for('stream-data-ready');
// To Writable: // To Writable:
message type Push(id, chunk, ack) = Symbol.for('stream-push'); message type Push(chunk, ack) = Symbol.for('stream-push');
assertion type Close(id, ack) = Symbol.for('stream-close'); assertion type Close(ack) = Symbol.for('stream-close');
// From Writable: // From Writable:
assertion type BackPressure(readableId, writableId) = Symbol.for('stream-back-pressure'); assertion type BackPressure(writableId) = Symbol.for('stream-back-pressure'); // readableId implicit
message type Window(writableId, seqno, amount) = Symbol.for('stream-credit'); message type Window(seqno, amount) = Symbol.for('stream-credit');
// To Readable: // To Readable:
message type Pushback(id, chunk) = Symbol.for('stream-pushback'); message type Pushback(chunk) = Symbol.for('stream-pushback');
// Readable output adapter: (TODO: move to separate module?) // Readable output adapter: (TODO: move to separate module?)
message type Line(id, line) = Symbol.for('stream-line'); message type Line(line) = Symbol.for('stream-line');
export { export {
IncomingConnection, OutgoingConnection, ConnectionAccepted, ConnectionRejected, Stream,
StreamInfo, Readable, Writable, Duplex, Incoming, Outgoing, Accepted, Rejected,
Info, Readable, Writable, Duplex,
Error, Error,
Data, End, DataReady, Data, End, DataReady,
Push, Close, Push, Close,
@ -102,7 +105,7 @@ function _readableStreamBehaviour(id, s) {
const objectMode = s.objectMode || s.readableObjectMode || false; const objectMode = s.objectMode || s.readableObjectMode || false;
field this.endMonitorExists = false; field this.endMonitorExists = false;
during Observe(End(id)) { during Observe(Stream(id, End())) {
on start this.endMonitorExists = true; on start this.endMonitorExists = true;
on stop this.endMonitorExists = false; on stop this.endMonitorExists = false;
} }
@ -110,7 +113,7 @@ function _readableStreamBehaviour(id, s) {
s.on('end', Dataspace.wrapExternal(() => { s.on('end', Dataspace.wrapExternal(() => {
if (this.endMonitorExists) { if (this.endMonitorExists) {
react { react {
assert End(id); assert Stream(id, End());
stop on (!this.endMonitorExists) { stop on (!this.endMonitorExists) {
this.stopBits |= READING_STOPPED; this.stopBits |= READING_STOPPED;
} }
@ -120,16 +123,16 @@ function _readableStreamBehaviour(id, s) {
} }
})); }));
on message Pushback(id, $chunk) s.unshift(chunk); on message Stream(id, Pushback($chunk)) s.unshift(chunk);
field this.outboundWindows = Map(); field this.outboundWindows = Map();
during BackPressure(id, $writable) { during Stream(id, BackPressure($writable)) {
on asserted Window(writable, $seqno, $amount) { on asserted Stream(writable, Window($seqno, $amount)) {
// Attend to `seqno` to allow otherwise-noop changes to // Attend to `seqno` to allow otherwise-noop changes to
// refresh the outboundWindow size. // refresh the outboundWindow size.
this.outboundWindows = this.outboundWindows.set(writable, amount); this.outboundWindows = this.outboundWindows.set(writable, amount);
} }
on retracted Window(writable, _, _) { on retracted Stream(writable, Window(_, _)) {
this.outboundWindows = this.outboundWindows.remove(writable); this.outboundWindows = this.outboundWindows.remove(writable);
} }
} }
@ -143,9 +146,9 @@ function _readableStreamBehaviour(id, s) {
field this.readable = false; field this.readable = false;
s.on('readable', Dataspace.wrapExternal(() => { this.readable = true; })); s.on('readable', Dataspace.wrapExternal(() => { this.readable = true; }));
assert DataReady(id) when (this.readable); assert Stream(id, DataReady()) when (this.readable);
during Observe(Data(id, _)) { during Observe(Stream(id, Data(_))) {
dataflow { dataflow {
while (this.readable && (this.outboundWindow === null || this.outboundWindow > 0)) { while (this.readable && (this.outboundWindow === null || this.outboundWindow > 0)) {
const maxlen = (this.outboundWindow === null) const maxlen = (this.outboundWindow === null)
@ -159,7 +162,7 @@ function _readableStreamBehaviour(id, s) {
// This is the adjustment that forces us to pay attention to seqno: // This is the adjustment that forces us to pay attention to seqno:
this.outboundWindows = this.outboundWindows.mapEntries(([t, c]) => [t, c - amount]); this.outboundWindows = this.outboundWindows.mapEntries(([t, c]) => [t, c - amount]);
if (this.outboundWindow !== null) this.outboundWindow -= amount; if (this.outboundWindow !== null) this.outboundWindow -= amount;
send Data(id, chunk); send Stream(id, Data(chunk));
} }
} }
} }
@ -175,18 +178,20 @@ function _writableStreamBehaviour(id, s) {
} }
field this.seqno = 0; field this.seqno = 0;
field this.inboundWindow = refreshWindow(); field this.inboundWindow = refreshWindow();
during Observe(Window(id, _, _)) assert Window(id, this.seqno, this.inboundWindow); during Observe(Stream(id, Window(_, _))) {
assert Stream(id, Window(this.seqno, this.inboundWindow));
}
s.on('drain', Dataspace.wrapExternal(() => { this.inboundWindow = refreshWindow(); })); s.on('drain', Dataspace.wrapExternal(() => { this.inboundWindow = refreshWindow(); }));
const callbackFor = (k) => (k === null ? void 0 : Dataspace.wrapExternal(() => { send k; })); const callbackFor = (k) => (k === null ? void 0 : Dataspace.wrapExternal(() => { send k; }));
on message Push(id, $chunk, $ack) { on message Stream(id, Push($chunk, $ack)) {
s.write(objectMode ? chunk : Bytes.toIO(chunk), callbackFor(ack)); s.write(objectMode ? chunk : Bytes.toIO(chunk), callbackFor(ack));
this.inboundWindow = refreshWindow(); this.inboundWindow = refreshWindow();
} }
on message Close(id, $ack) { on message Stream(id, Close($ack)) {
s.end(callbackFor(ack)); s.end(callbackFor(ack));
this.inboundWindow = refreshWindow(); this.inboundWindow = refreshWindow();
} }
@ -194,10 +199,10 @@ function _writableStreamBehaviour(id, s) {
export function readableStreamBehaviour(id, s) { export function readableStreamBehaviour(id, s) {
(function () { (function () {
assert StreamInfo(id, "Readable", s); assert Stream(id, Info(Symbol.for("Readable"), s));
assert Readable(id); assert Stream(id, Readable());
stop on retracted Observe(Readable(id)); stop on retracted Observe(Stream(id, Readable()));
_commonStreamBehaviour.call(this, s, WRITING_STOPPED); _commonStreamBehaviour.call(this, s, WRITING_STOPPED);
_readableStreamBehaviour.call(this, id, s); _readableStreamBehaviour.call(this, id, s);
@ -206,10 +211,10 @@ export function readableStreamBehaviour(id, s) {
export function writableStreamBehaviour(id, s) { export function writableStreamBehaviour(id, s) {
(function () { (function () {
assert StreamInfo(id, "Writable", s); assert Stream(id, Info(Symbol.for("Writable"), s));
assert Writable(id); assert Stream(id, Writable());
stop on retracted Observe(Writable(id)); stop on retracted Observe(Stream(id, Writable()));
_commonStreamBehaviour.call(this, s, READING_STOPPED); _commonStreamBehaviour.call(this, s, READING_STOPPED);
_writableStreamBehaviour.call(this, id, s); _writableStreamBehaviour.call(this, id, s);
@ -218,10 +223,10 @@ export function writableStreamBehaviour(id, s) {
export function duplexStreamBehaviour(id, s) { export function duplexStreamBehaviour(id, s) {
(function () { (function () {
assert StreamInfo(id, "Duplex", s); assert Stream(id, Info(Symbol.for("Duplex"), s));
assert Duplex(id); assert Stream(id, Duplex());
stop on retracted Observe(Duplex(id)); stop on retracted Observe(Stream(id, Duplex()));
_commonStreamBehaviour.call(this, s, 0); _commonStreamBehaviour.call(this, s, 0);
_readableStreamBehaviour.call(this, id, s); _readableStreamBehaviour.call(this, id, s);
@ -230,26 +235,26 @@ export function duplexStreamBehaviour(id, s) {
} }
spawn named 'driver/stream-line' { spawn named 'driver/stream-line' {
during Observe(Line($id, _)) spawn named ['LineReader', id] { during Observe(Stream($id, Line(_))) spawn named ['LineReader', id] {
field this.buffer = Bytes(); field this.buffer = Bytes();
on message Data(id, $data) this.buffer = Bytes.concat([this.buffer, data]); on message Stream(id, Data($data)) this.buffer = Bytes.concat([this.buffer, data]);
dataflow { dataflow {
const pos = this.buffer.indexOf(10); const pos = this.buffer.indexOf(10);
if (pos !== -1) { if (pos !== -1) {
const line = this.buffer.slice(0, pos); const line = this.buffer.slice(0, pos);
this.buffer = this.buffer.slice(pos + 1); this.buffer = this.buffer.slice(pos + 1);
send Line(id, line); send Stream(id, Line(line));
} }
} }
} }
} }
export function spawnConnection(id, spec, s) { export function spawnConnection(id, spec, s) {
spawn named ['IncomingConnection', id, spec] { spawn named ['Incoming', id, spec] {
assert IncomingConnection(id, spec); assert Stream(id, Incoming(spec));
stop on retracted Observe(IncomingConnection(_, spec)) s.destroy(); stop on retracted Observe(Stream(_, Incoming(spec))) s.destroy();
stop on message ConnectionRejected(id, $err) s.destroy(err); stop on message Stream(id, Rejected($err)) s.destroy(err);
stop on asserted Observe(Duplex(id)) react duplexStreamBehaviour(id, s); stop on asserted Observe(Stream(id, Duplex())) react duplexStreamBehaviour(id, s);
stop on message ConnectionAccepted(id) react duplexStreamBehaviour(id, s); stop on message Stream(id, Accepted()) react duplexStreamBehaviour(id, s);
} }
} }

View File

@ -37,7 +37,7 @@ export {
}; };
spawn named 'driver/Subprocess' { spawn named 'driver/Subprocess' {
during S.OutgoingConnection($id, SubprocessAddress($command, $args, $options)) during S.Stream($id, S.Outgoing(SubprocessAddress($command, $args, $options)))
spawn named ['SubprocessConnection', id] { spawn named ['SubprocessConnection', id] {
const establishingFacet = currentFacet(); const establishingFacet = currentFacet();
@ -46,7 +46,7 @@ spawn named 'driver/Subprocess' {
(options || Map()).set('stdio', (options || Map()).set('stdio',
['pipe', 'pipe', 'inherit']).toJS()); ['pipe', 'pipe', 'inherit']).toJS());
const rejecter = Dataspace.wrapExternal(() => { const rejecter = Dataspace.wrapExternal(() => {
send S.ConnectionRejected(id, null); send S.Stream(id, S.Rejected(null));
establishingFacet.stop(); establishingFacet.stop();
}); });
sp.on('exit', rejecter); sp.on('exit', rejecter);
@ -55,7 +55,7 @@ spawn named 'driver/Subprocess' {
process.nextTick(Dataspace.wrapExternal(() => { process.nextTick(Dataspace.wrapExternal(() => {
sp.off('exit', rejecter); sp.off('exit', rejecter);
sp.off('error', rejecter); sp.off('error', rejecter);
send S.ConnectionAccepted(id); send S.Stream(id, S.Accepted());
const s = new Duplex(sp.stdout, sp.stdin); const s = new Duplex(sp.stdout, sp.stdin);
establishingFacet.stop(() => { establishingFacet.stop(() => {
react { react {

View File

@ -109,7 +109,7 @@ spawn named 'server' {
function _spawnStreamServer(spec) { function _spawnStreamServer(spec) {
spawn named spec { spawn named spec {
assert D.AvailableTransport(spec); assert D.AvailableTransport(spec);
on asserted S.IncomingConnection($id, spec) Server.streamServerActor(id, [spec, id]); on asserted S.Stream($id, S.Incoming(spec)) Server.streamServerActor(id, [spec, id]);
} }
} }

View File

@ -31,20 +31,20 @@ export function streamServerFacet(id) {
assert P.POA(id); assert P.POA(id);
const decoder = W.makeDecoder(null); const decoder = W.makeDecoder(null);
const buf = B.buffer(this, 'chunks'); const buf = B.buffer(this, 'chunks');
on message S.Data(id, $data) buf.push(data); on message S.Stream(id, S.Data($data)) buf.push(data);
during P.POAReady(reqId) buf.drain((data) => { during P.POAReady(reqId) buf.drain((data) => {
decoder.write(data); decoder.write(data);
let v; let v;
while ((v = decoder.try_next())) send P.FromPOA(id, v); while ((v = decoder.try_next())) send P.FromPOA(id, v);
}); });
on message P.ToPOA(id, $resp) send S.Push(id, new Encoder().push(resp).contents(), null); on message P.ToPOA(id, $resp) send S.Stream(id, S.Push(new Encoder().push(resp).contents(), null));
stop on message P.Disconnect(id); stop on message P.Disconnect(id);
stop on retracted P.POAReady(id); stop on retracted P.POAReady(id);
} }
export function streamServerActor(id, debugLabel) { export function streamServerActor(id, debugLabel) {
spawn named [debugLabel || 'stream-poa', id] { spawn named [debugLabel || 'stream-poa', id] {
stop on retracted S.Duplex(id); stop on retracted S.Stream(id, S.Duplex());
streamServerFacet(id); streamServerFacet(id);
} }
} }

View File

@ -1,42 +1,17 @@
const { Observe, currentFacet, genUuid } = require("@syndicate-lang/core"); const { Observe, currentFacet, genUuid } = require("@syndicate-lang/core");
const S = activate require("@syndicate-lang/driver-streams-node");
const M = activate require("@syndicate-lang/driver-mdns"); const M = activate require("@syndicate-lang/driver-mdns");
spawn named 'test' { spawn named 'test' {
// const svc = M.Service((new Date()).toJSON(), '_syndicate._tcp'); const svc = M.Service((new Date()).toJSON(), '_syndicate+testing._tcp');
// assert M.Publish(svc, null, 8001, []); assert M.Publish(svc, null, 8001, []);
// during M.Discovered(M.Service($name, '_syndicate._tcp'),
// $hostName,
// $port,
// $txtData,
// $address,
// "IPv4",
// $interfaceName)
// {
// on start console.log('+', name, hostName, port, txtData, address, interfaceName);
// on stop console.log('-', name, hostName, port, txtData, address, interfaceName);
// }
field this.count = 0; field this.count = 0;
dataflow console.log('Service count:', this.count); dataflow console.log('Service count:', this.count);
during M.Discovered(M.Service($name, '_syndicate+ws._tcp'), $host, $port, $txt, $addr, "IPv4", _) during M.Discovered(M.Service($name, '_syndicate+testing._tcp'),
$host, $port, _, $addr, "IPv4", $ifName)
{ {
on start { this.count++; console.log('+ws', name, host, port, txt.get(0, 'N/A'), addr); } on start { this.count++; console.log('+', name, host, port, addr, ifName); }
on stop { this.count--; console.log('-ws', name, host, port, txt.get(0, 'N/A'), addr); } on stop { this.count--; console.log('-', name, host, port, addr, ifName); }
} }
during M.Discovered(M.Service($name, '_syndicate._tcp'), $host, $port, _, $addr, "IPv4", _)
{
on start { this.count++; console.log('+tcp', name, host, port, addr); }
on stop { this.count--; console.log('-tcp', name, host, port, addr); }
}
// during M.Discovered(M.Service($n, $t), $h, $p, $d, $a, "IPv4", $i) {
// if (t !== '_syndicate._tcp') {
// on start console.log('**', t, n, h, p, d, a, i);
// on stop console.log('==', t, n, h, p, d, a, i);
// }
// }
} }

View File

@ -23,27 +23,33 @@ const net = require('net');
const stdin = genUuid('stdin'); const stdin = genUuid('stdin');
const stdout = genUuid('stdout'); const stdout = genUuid('stdout');
spawn named 'stdioServer' { spawn named 'stdioServer' {
during Observe(S.Readable(stdin)) spawn S.readableStreamBehaviour(stdin, process.stdin); during Observe(S.Stream(stdin, S.Readable()))
during Observe(S.Writable(stdout)) spawn S.writableStreamBehaviour(stdout, process.stdout); spawn S.readableStreamBehaviour(stdin, process.stdin);
during Observe(S.Stream(stdout, S.Writable()))
spawn S.writableStreamBehaviour(stdout, process.stdout);
} }
spawn named 'chatclient' { spawn named 'chatclient' {
const id = genUuid('tcpconn'); const id = genUuid('tcpconn');
assert S.OutgoingConnection(id, S.TcpAddress('localhost', 5999)); assert S.Stream(id, S.Outgoing(S.TcpAddress('localhost', 5999)));
stop on message S.ConnectionRejected(id, $err) { stop on message S.Stream(id, S.Rejected($err)) {
console.error('Connection rejected', err); console.error('Connection rejected', err);
} }
stop on message S.ConnectionAccepted(id) { stop on message S.Stream(id, S.Accepted()) {
react { react {
stop on retracted S.Duplex(id); stop on retracted S.Stream(id, S.Duplex());
stop on retracted S.Readable(stdin); stop on retracted S.Stream(stdin, S.Readable());
stop on retracted S.Writable(stdout); stop on retracted S.Stream(stdout, S.Writable());
assert S.BackPressure(stdin, id); assert S.Stream(stdin, S.BackPressure(id));
assert S.BackPressure(id, stdout); assert S.Stream(id, S.BackPressure(stdout));
on message S.Line(stdin, $line) send S.Push(id, line.toString('utf-8') + '\n', null); on message S.Stream(stdin, S.Line($line)) {
on message S.Line(id, $line) send S.Push(stdout, line.toString('utf-8') + '\n', null); send S.Stream(id, S.Push(line.toString('utf-8') + '\n', null));
}
on message S.Stream(id, S.Line($line)) {
send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', null));
}
} }
} }
} }

View File

@ -23,17 +23,17 @@ message type Speak(who, what);
assertion type Present(who); assertion type Present(who);
spawn named 'chatserver' { spawn named 'chatserver' {
on asserted S.IncomingConnection($id, S.TcpListener(5999)) { on asserted S.Stream($id, S.Incoming(S.TcpListener(5999))) {
const me = genUuid('user'); const me = genUuid('user');
spawn named ['connectedUser', me] { spawn named ['connectedUser', me] {
stop on retracted S.Duplex(id); stop on retracted S.Stream(id, S.Duplex());
assert Present(me); assert Present(me);
on asserted Present($who) send S.Push(id, `${who} arrived.\n`, null); on asserted Present($who) send S.Stream(id, S.Push(`${who} arrived.\n`, null));
on retracted Present($who) send S.Push(id, `${who} departed.\n`, null); on retracted Present($who) send S.Stream(id, S.Push(`${who} departed.\n`, null));
on message S.Line(id, $line) send Speak(me, line); on message S.Stream(id, S.Line($line)) send Speak(me, line);
on message Speak($who, $what) send S.Push(id, `${who}: ${what}\n`, null); on message Speak($who, $what) send S.Stream(id, S.Push(`${who}: ${what}\n`, null));
} }
} }
} }

View File

@ -23,8 +23,10 @@ const S = activate require("@syndicate-lang/driver-streams-node");
const stdin = genUuid('stdin'); const stdin = genUuid('stdin');
const stdout = genUuid('stdout'); const stdout = genUuid('stdout');
spawn named 'stdioServer' { spawn named 'stdioServer' {
during Observe(S.Readable(stdin)) spawn S.readableStreamBehaviour(stdin, process.stdin); during Observe(S.Stream(stdin, S.Readable()))
during Observe(S.Writable(stdout)) spawn S.writableStreamBehaviour(stdout, process.stdout); spawn S.readableStreamBehaviour(stdin, process.stdin);
during Observe(S.Stream(stdout, S.Writable()))
spawn S.writableStreamBehaviour(stdout, process.stdout);
} }
spawn named 'chatclient-via-nc' { spawn named 'chatclient-via-nc' {
@ -33,20 +35,22 @@ spawn named 'chatclient-via-nc' {
stop on message S.SubprocessError(id, $err) { stop on message S.SubprocessError(id, $err) {
console.error("Couldn't start subprocess", err); console.error("Couldn't start subprocess", err);
} }
stop on retracted S.Readable(stdin); stop on retracted S.Stream(stdin, S.Readable());
stop on retracted S.Writable(stdout); stop on retracted S.Stream(stdout, S.Writable());
on asserted S.SubprocessRunning(id, _, [$i, $o, _]) { on asserted S.SubprocessRunning(id, _, [$i, $o, _]) {
react { react {
on message S.Line(stdin, $line) { on message S.Stream(stdin, S.Line($line)) {
console.log('INPUT:', line); console.log('INPUT:', line);
send S.Push(i, line.toString('utf-8') + '\n', null); send S.Stream(i, S.Push(line.toString('utf-8') + '\n', null));
} }
on message S.End(stdin) { on message S.Stream(stdin, S.End()) {
console.log('INPUT EOF'); console.log('INPUT EOF');
send S.Close(i, null); send S.Stream(i, S.Close(null));
} }
on message S.Line(o, $line) send S.Push(stdout, line.toString('utf-8') + '\n', null); on message S.Stream(o, S.Line($line)) {
send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', null));
}
} }
} }
stop on asserted S.SubprocessExit(id, $code, $signal) { stop on asserted S.SubprocessExit(id, $code, $signal) {

View File

@ -6,7 +6,7 @@ const S = activate require("@syndicate-lang/driver-streams-node");
assertion type VirtualTcpAddress(host, port); assertion type VirtualTcpAddress(host, port);
spawn named 'socks-server' { spawn named 'socks-server' {
on asserted S.IncomingConnection($conn, S.TcpListener(1080)) { on asserted S.Stream($conn, S.Incoming(S.TcpListener(1080))) {
spawn named ['socksconn', conn] { spawn named ['socksconn', conn] {
const self = this; const self = this;
@ -15,32 +15,32 @@ spawn named 'socks-server' {
const rootFacet = currentFacet(); const rootFacet = currentFacet();
stop on retracted S.Duplex(conn); stop on retracted S.Stream(conn, S.Duplex());
const buf = S.onStartSpawnBufferStream(); const buf = S.onStartSpawnBufferStream();
field this.bufferWanted = true; field this.bufferWanted = true;
on start react { on start react {
stop on (!this.bufferWanted); stop on (!this.bufferWanted);
assert Observe(S.Duplex(buf)); assert Observe(S.Stream(buf, S.Duplex()));
on message S.Data(conn, $chunk) send S.Push(buf, chunk, null); on message S.Stream(conn, S.Data($chunk)) send S.Stream(buf, S.Push(chunk, null));
} }
on start selectAuthenticationMethod(); on start selectAuthenticationMethod();
function readChunk(size, k) { function readChunk(size, k) {
react { react {
on start send S.PacketRequest(buf, size); on start send S.Stream(buf, S.PacketRequest(size));
stop on message S.Data(buf, $chunk) { stop on message S.Stream(buf, S.Data($chunk)) {
k(chunk); k(chunk);
} }
} }
} }
function sendReply(replyCode, addrTypeAddrPort) { function sendReply(replyCode, addrTypeAddrPort) {
send S.Push(conn, Bytes.concat([ send S.Stream(conn, S.Push(Bytes.concat([
Bytes.from([5, replyCode, 0]), Bytes.from([5, replyCode, 0]),
(addrTypeAddrPort || Bytes.from([1, 0,0,0,0, 0,0])) (addrTypeAddrPort || Bytes.from([1, 0,0,0,0, 0,0]))
]), null); ]), null));
} }
function dieOnBadVersion(packet) { function dieOnBadVersion(packet) {
@ -54,10 +54,10 @@ spawn named 'socks-server' {
readChunk(nMethods, (methods) => { readChunk(nMethods, (methods) => {
if (!methods.includes(0)) { if (!methods.includes(0)) {
console.error('Client will not accept no-authentication'); console.error('Client will not accept no-authentication');
send S.Push(conn, Bytes.from([5, 255]), null); send S.Stream(conn, S.Push(Bytes.from([5, 255]), null));
rootFacet.stop(); rootFacet.stop();
} else { } else {
send S.Push(conn, Bytes.from([5, 0]), null); // select no-authentication send S.Stream(conn, S.Push(Bytes.from([5, 0]), null)); // select no-authentication
readSocksRequest(); readSocksRequest();
} }
}); });
@ -123,8 +123,8 @@ spawn named 'socks-server' {
react { react {
console.log(conn, 'CONNECT', addr, port); console.log(conn, 'CONNECT', addr, port);
const out = genUuid('out'); const out = genUuid('out');
assert S.OutgoingConnection(out, VirtualTcpAddress(addr, port)); assert S.Stream(out, S.Outgoing(VirtualTcpAddress(addr, port)));
stop on message S.ConnectionRejected(out, $err) { stop on message S.Stream(out, S.Rejected($err)) {
console.error('Could not connect outgoing', addr, port, err); console.error('Could not connect outgoing', addr, port, err);
switch (err.code) { switch (err.code) {
case 'ENETUNREACH': case 'ENETUNREACH':
@ -149,10 +149,10 @@ spawn named 'socks-server' {
break; break;
} }
} }
stop on message S.ConnectionAccepted(out) { stop on message S.Stream(out, S.Accepted()) {
react { react {
on retracted S.Duplex(out) rootFacet.stop(); on retracted S.Stream(out, S.Duplex()) rootFacet.stop();
on asserted S.StreamInfo(out, _, $handle) { on asserted S.Stream(out, S.Info(_, $handle)) {
const localAddrStr = handle.localAddress || '127.255.255.254'; const localAddrStr = handle.localAddress || '127.255.255.254';
const localPort = handle.localPort || 0; const localPort = handle.localPort || 0;
let localAddr = null; let localAddr = null;
@ -175,12 +175,16 @@ spawn named 'socks-server' {
sendReply(0 /* success */, localEnd); sendReply(0 /* success */, localEnd);
readChunk(0, (firstChunk) => { readChunk(0, (firstChunk) => {
self.bufferWanted = false; self.bufferWanted = false;
send S.Push(out, firstChunk, null); send S.Stream(out, S.Push(firstChunk, null));
react { react {
assert S.BackPressure(conn, out); assert S.Stream(conn, S.BackPressure(out));
assert S.BackPressure(out, conn); assert S.Stream(out, S.BackPressure(conn));
on message S.Data(conn, $chunk) send S.Push(out, chunk, null); on message S.Stream(conn, S.Data($chunk)) {
on message S.Data(out, $chunk) send S.Push(conn, chunk, null); send S.Stream(out, S.Push(chunk, null));
}
on message S.Stream(out, S.Data($chunk)) {
send S.Stream(conn, S.Push(chunk, null));
}
} }
}); });
} }
@ -195,14 +199,14 @@ spawn named 'socks-server' {
spawn named 'remap-service' { spawn named 'remap-service' {
field this.mapped = Set(); field this.mapped = Set();
on asserted Observe(S.OutgoingConnection(_, $a(VirtualTcpAddress(_, _)))) { on asserted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) {
this.mapped = this.mapped.add(a); this.mapped = this.mapped.add(a);
} }
on retracted Observe(S.OutgoingConnection(_, $a(VirtualTcpAddress(_, _)))) { on retracted Observe(S.Stream(_, S.Outgoing($a(VirtualTcpAddress(_, _))))) {
this.mapped = this.mapped.remove(a); this.mapped = this.mapped.remove(a);
} }
during S.OutgoingConnection($id, $a(VirtualTcpAddress($host, $port))) { during S.Stream($id, S.Outgoing($a(VirtualTcpAddress($host, $port)))) {
if (host.endsWith('.fruit')) { if (host.endsWith('.fruit')) {
if (!this.mapped.includes(a)) { if (!this.mapped.includes(a)) {
console.error("No virtual mapping for", a.toString()); console.error("No virtual mapping for", a.toString());
@ -210,24 +214,24 @@ spawn named 'remap-service' {
err.errno = err.code = 'ENOTFOUND'; err.errno = err.code = 'ENOTFOUND';
err.hostname = err.host = host; err.hostname = err.host = host;
err.port = port; err.port = port;
on start send S.ConnectionRejected(id, err); on start send S.Stream(id, S.Rejected(err));
} }
} else { } else {
assert S.OutgoingConnection(id, S.TcpAddress(host, port)); assert S.Stream(id, S.Outgoing(S.TcpAddress(host, port)));
} }
} }
} }
spawn named 'test-remap' { spawn named 'test-remap' {
during S.OutgoingConnection($id, VirtualTcpAddress('foobar.fruit', 9999)) { during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9999))) {
assert S.OutgoingConnection(id, S.TcpAddress('steam.eighty-twenty.org', 22)); assert S.Stream(id, S.Outgoing(S.TcpAddress('steam.eighty-twenty.org', 22)));
} }
during S.OutgoingConnection($id, VirtualTcpAddress('foobar.fruit', 9998)) { during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9998))) {
assert S.OutgoingConnection(id, S.SubprocessAddress('/bin/sh', [], {})); assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/sh', [], {})));
} }
during S.OutgoingConnection($id, VirtualTcpAddress('foobar.fruit', 9997)) { during S.Stream($id, S.Outgoing(VirtualTcpAddress('foobar.fruit', 9997))) {
assert S.OutgoingConnection(id, S.SubprocessAddress('/bin/cat', ['/proc/cpuinfo'], {})); assert S.Stream(id, S.Outgoing(S.SubprocessAddress('/bin/cat', ['/proc/cpuinfo'], {})));
} }
} }

View File

@ -2,23 +2,23 @@ const { currentFacet, genUuid } = require("@syndicate-lang/core");
const S = activate require("@syndicate-lang/driver-streams-node"); const S = activate require("@syndicate-lang/driver-streams-node");
spawn named 'ssh-relay-server' { spawn named 'ssh-relay-server' {
on asserted S.IncomingConnection($conn, S.TcpListener(2022)) { on asserted S.Stream($conn, S.Incoming(S.TcpListener(2022))) {
spawn named ['sshconn', conn] { spawn named ['sshconn', conn] {
stop on retracted S.Duplex(conn); stop on retracted S.Stream(conn, S.Duplex());
const daemon = genUuid('daemon'); const daemon = genUuid('daemon');
assert S.OutgoingConnection(daemon, S.SubprocessAddress('/usr/sbin/sshd', ['-dei'], {})); assert S.Stream(daemon, S.Outgoing(S.SubprocessAddress('/usr/sbin/sshd', ['-dei'], {})));
stop on message S.ConnectionRejected(daemon, $err) { stop on message S.Stream(daemon, S.Rejected($err)) {
console.error("Couldn't start sshd", err); console.error("Couldn't start sshd", err);
} }
stop on message S.ConnectionAccepted(daemon) { stop on message S.Stream(daemon, S.Accepted()) {
react { react {
stop on retracted S.Duplex(conn); stop on retracted S.Stream(conn, S.Duplex());
stop on retracted S.Duplex(daemon); stop on retracted S.Stream(daemon, S.Duplex());
assert S.BackPressure(conn, daemon); assert S.Stream(conn, S.BackPressure(daemon));
assert S.BackPressure(daemon, conn); assert S.Stream(daemon, S.BackPressure(conn));
on message S.Data(conn, $chunk) send S.Push(daemon, chunk, null); on message S.Stream(conn, S.Data($chunk)) send S.Stream(daemon, S.Push(chunk, null));
on message S.Data(daemon, $chunk) send S.Push(conn, chunk, null); on message S.Stream(daemon, S.Data($chunk)) send S.Stream(conn, S.Push(chunk, null));
} }
} }
} }

View File

@ -29,11 +29,11 @@ spawn named 'lister' {
stop on message S.SubprocessError(id, $err) console.error("Couldn't start subprocess", err); stop on message S.SubprocessError(id, $err) console.error("Couldn't start subprocess", err);
on asserted S.SubprocessRunning(id, _, [$i, $o, _]) { on asserted S.SubprocessRunning(id, _, [$i, $o, _]) {
send S.Push(i, "GET / HTTP/1.0\r\n\r\n", null); send S.Stream(i, S.Push("GET / HTTP/1.0\r\n\r\n", null));
send S.Close(i, null); send S.Stream(i, S.Close(null));
react { react {
on message S.Data(o, $chunk) console.log(chunk); on message S.Stream(o, S.Data($chunk)) console.log(chunk);
on asserted S.End(o) console.log('DONE!'); on asserted S.Stream(o, S.End()) console.log('DONE!');
} }
} }