Use `false` for acks, since `null` doesn't travel well
This commit is contained in:
parent
2e9c5ddc33
commit
b6017f1501
|
@ -52,7 +52,7 @@ function _spawnBufferStream(id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
stop on message S.Stream(id, S.Close($ack)) {
|
stop on message S.Stream(id, S.Close($ack)) {
|
||||||
if (ack !== null) send ack;
|
if (ack !== false) send ack;
|
||||||
}
|
}
|
||||||
|
|
||||||
on message S.Stream(id, PacketRequest($size)) {
|
on message S.Stream(id, PacketRequest($size)) {
|
||||||
|
|
|
@ -41,8 +41,8 @@ message type Rejected(err); // for both incoming and outgoing connections
|
||||||
// Each `chunk` to/from a stream in OBJECT mode may be any value
|
// Each `chunk` to/from a stream in OBJECT mode may be any value
|
||||||
// except `null`.
|
// except `null`.
|
||||||
//
|
//
|
||||||
// Each `ack`, if non-`null`, is an acknowledgement MESSAGE to be sent
|
// Each `ack`, if non-`false`, is an acknowledgement MESSAGE to be
|
||||||
// when the corresponding chunk is completely processed.
|
// sent when the corresponding chunk is completely processed.
|
||||||
|
|
||||||
// Interest in StreamInfo is non-creative
|
// Interest in StreamInfo is non-creative
|
||||||
assertion type Info(kind, stream); // kind ∈ Readable, Writable, Duplex
|
assertion type Info(kind, stream); // kind ∈ Readable, Writable, Duplex
|
||||||
|
@ -189,7 +189,7 @@ function _writableStreamBehaviour(id, s) {
|
||||||
|
|
||||||
s.on('drain', Dataspace.wrapExternal(() => { this.inboundWindow = refreshWindow(); }));
|
s.on('drain', Dataspace.wrapExternal(() => { this.inboundWindow = refreshWindow(); }));
|
||||||
|
|
||||||
const callbackFor = (k) => (k === null ? void 0 : Dataspace.wrapExternal(() => { send k; }));
|
const callbackFor = (k) => (k === false ? void 0 : Dataspace.wrapExternal(() => { send k; }));
|
||||||
|
|
||||||
on message Stream(id, Push($chunk, $ack)) {
|
on message Stream(id, Push($chunk, $ack)) {
|
||||||
s.write(objectMode ? chunk : Bytes.toIO(chunk), callbackFor(ack));
|
s.write(objectMode ? chunk : Bytes.toIO(chunk), callbackFor(ack));
|
||||||
|
|
|
@ -37,7 +37,7 @@ export function streamServerFacet(id) {
|
||||||
let v;
|
let v;
|
||||||
while ((v = decoder.try_next())) send P.FromPOA(id, v);
|
while ((v = decoder.try_next())) send P.FromPOA(id, v);
|
||||||
});
|
});
|
||||||
on message P.ToPOA(id, $resp) send S.Stream(id, S.Push(new Encoder().push(resp).contents(), null));
|
on message P.ToPOA(id, $resp) send S.Stream(id, S.Push(new Encoder().push(resp).contents(), false));
|
||||||
stop on message P.Disconnect(id);
|
stop on message P.Disconnect(id);
|
||||||
stop on retracted P.POAReady(id);
|
stop on retracted P.POAReady(id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,10 +45,10 @@ spawn named 'chatclient' {
|
||||||
assert S.Stream(id, S.BackPressure(stdout));
|
assert S.Stream(id, S.BackPressure(stdout));
|
||||||
|
|
||||||
on message S.Stream(stdin, S.Line($line)) {
|
on message S.Stream(stdin, S.Line($line)) {
|
||||||
send S.Stream(id, S.Push(line.toString('utf-8') + '\n', null));
|
send S.Stream(id, S.Push(line.toString('utf-8') + '\n', false));
|
||||||
}
|
}
|
||||||
on message S.Stream(id, S.Line($line)) {
|
on message S.Stream(id, S.Line($line)) {
|
||||||
send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', null));
|
send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,11 +29,11 @@ spawn named 'chatserver' {
|
||||||
stop on retracted S.Stream(id, S.Duplex());
|
stop on retracted S.Stream(id, S.Duplex());
|
||||||
|
|
||||||
assert Present(me);
|
assert Present(me);
|
||||||
on asserted Present($who) send S.Stream(id, S.Push(`${who} arrived.\n`, null));
|
on asserted Present($who) send S.Stream(id, S.Push(`${who} arrived.\n`, false));
|
||||||
on retracted Present($who) send S.Stream(id, S.Push(`${who} departed.\n`, null));
|
on retracted Present($who) send S.Stream(id, S.Push(`${who} departed.\n`, false));
|
||||||
|
|
||||||
on message S.Stream(id, S.Line($line)) send Speak(me, line);
|
on message S.Stream(id, S.Line($line)) send Speak(me, line);
|
||||||
on message Speak($who, $what) send S.Stream(id, S.Push(`${who}: ${what}\n`, null));
|
on message Speak($who, $what) send S.Stream(id, S.Push(`${who}: ${what}\n`, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,15 +41,15 @@ spawn named 'chatclient-via-nc' {
|
||||||
react {
|
react {
|
||||||
on message S.Stream(stdin, S.Line($line)) {
|
on message S.Stream(stdin, S.Line($line)) {
|
||||||
console.log('INPUT:', line);
|
console.log('INPUT:', line);
|
||||||
send S.Stream(i, S.Push(line.toString('utf-8') + '\n', null));
|
send S.Stream(i, S.Push(line.toString('utf-8') + '\n', false));
|
||||||
}
|
}
|
||||||
on message S.Stream(stdin, S.End()) {
|
on message S.Stream(stdin, S.End()) {
|
||||||
console.log('INPUT EOF');
|
console.log('INPUT EOF');
|
||||||
send S.Stream(i, S.Close(null));
|
send S.Stream(i, S.Close(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
on message S.Stream(o, S.Line($line)) {
|
on message S.Stream(o, S.Line($line)) {
|
||||||
send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', null));
|
send S.Stream(stdout, S.Push(line.toString('utf-8') + '\n', false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ spawn named 'socks-server' {
|
||||||
on start react {
|
on start react {
|
||||||
stop on (!this.bufferWanted);
|
stop on (!this.bufferWanted);
|
||||||
assert Observe(S.Stream(buf, S.Duplex()));
|
assert Observe(S.Stream(buf, S.Duplex()));
|
||||||
on message S.Stream(conn, S.Data($chunk)) send S.Stream(buf, S.Push(chunk, null));
|
on message S.Stream(conn, S.Data($chunk)) send S.Stream(buf, S.Push(chunk, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
on start selectAuthenticationMethod();
|
on start selectAuthenticationMethod();
|
||||||
|
@ -40,7 +40,7 @@ spawn named 'socks-server' {
|
||||||
send S.Stream(conn, S.Push(Bytes.concat([
|
send S.Stream(conn, S.Push(Bytes.concat([
|
||||||
Bytes.from([5, replyCode, 0]),
|
Bytes.from([5, replyCode, 0]),
|
||||||
(addrTypeAddrPort || Bytes.from([1, 0,0,0,0, 0,0]))
|
(addrTypeAddrPort || Bytes.from([1, 0,0,0,0, 0,0]))
|
||||||
]), null));
|
]), false));
|
||||||
}
|
}
|
||||||
|
|
||||||
function dieOnBadVersion(packet) {
|
function dieOnBadVersion(packet) {
|
||||||
|
@ -54,10 +54,10 @@ spawn named 'socks-server' {
|
||||||
readChunk(nMethods, (methods) => {
|
readChunk(nMethods, (methods) => {
|
||||||
if (!methods.includes(0)) {
|
if (!methods.includes(0)) {
|
||||||
console.error('Client will not accept no-authentication');
|
console.error('Client will not accept no-authentication');
|
||||||
send S.Stream(conn, S.Push(Bytes.from([5, 255]), null));
|
send S.Stream(conn, S.Push(Bytes.from([5, 255]), false));
|
||||||
rootFacet.stop();
|
rootFacet.stop();
|
||||||
} else {
|
} else {
|
||||||
send S.Stream(conn, S.Push(Bytes.from([5, 0]), null)); // select no-authentication
|
send S.Stream(conn, S.Push(Bytes.from([5, 0]), false)); // select no-authentication
|
||||||
readSocksRequest();
|
readSocksRequest();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -175,15 +175,15 @@ spawn named 'socks-server' {
|
||||||
sendReply(0 /* success */, localEnd);
|
sendReply(0 /* success */, localEnd);
|
||||||
readChunk(0, (firstChunk) => {
|
readChunk(0, (firstChunk) => {
|
||||||
self.bufferWanted = false;
|
self.bufferWanted = false;
|
||||||
send S.Stream(out, S.Push(firstChunk, null));
|
send S.Stream(out, S.Push(firstChunk, false));
|
||||||
react {
|
react {
|
||||||
assert S.Stream(conn, S.BackPressure(out));
|
assert S.Stream(conn, S.BackPressure(out));
|
||||||
assert S.Stream(out, S.BackPressure(conn));
|
assert S.Stream(out, S.BackPressure(conn));
|
||||||
on message S.Stream(conn, S.Data($chunk)) {
|
on message S.Stream(conn, S.Data($chunk)) {
|
||||||
send S.Stream(out, S.Push(chunk, null));
|
send S.Stream(out, S.Push(chunk, false));
|
||||||
}
|
}
|
||||||
on message S.Stream(out, S.Data($chunk)) {
|
on message S.Stream(out, S.Data($chunk)) {
|
||||||
send S.Stream(conn, S.Push(chunk, null));
|
send S.Stream(conn, S.Push(chunk, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -17,8 +17,8 @@ spawn named 'ssh-relay-server' {
|
||||||
stop on retracted S.Stream(daemon, S.Duplex());
|
stop on retracted S.Stream(daemon, S.Duplex());
|
||||||
assert S.Stream(conn, S.BackPressure(daemon));
|
assert S.Stream(conn, S.BackPressure(daemon));
|
||||||
assert S.Stream(daemon, S.BackPressure(conn));
|
assert S.Stream(daemon, S.BackPressure(conn));
|
||||||
on message S.Stream(conn, S.Data($chunk)) send S.Stream(daemon, S.Push(chunk, null));
|
on message S.Stream(conn, S.Data($chunk)) send S.Stream(daemon, S.Push(chunk, false));
|
||||||
on message S.Stream(daemon, S.Data($chunk)) send S.Stream(conn, S.Push(chunk, null));
|
on message S.Stream(daemon, S.Data($chunk)) send S.Stream(conn, S.Push(chunk, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,8 +29,8 @@ spawn named 'lister' {
|
||||||
stop on message S.SubprocessError(id, $err) console.error("Couldn't start subprocess", err);
|
stop on message S.SubprocessError(id, $err) console.error("Couldn't start subprocess", err);
|
||||||
|
|
||||||
on asserted S.SubprocessRunning(id, _, [$i, $o, _]) {
|
on asserted S.SubprocessRunning(id, _, [$i, $o, _]) {
|
||||||
send S.Stream(i, S.Push("GET / HTTP/1.0\r\n\r\n", null));
|
send S.Stream(i, S.Push("GET / HTTP/1.0\r\n\r\n", false));
|
||||||
send S.Stream(i, S.Close(null));
|
send S.Stream(i, S.Close(false));
|
||||||
react {
|
react {
|
||||||
on message S.Stream(o, S.Data($chunk)) console.log(chunk);
|
on message S.Stream(o, S.Data($chunk)) console.log(chunk);
|
||||||
on asserted S.Stream(o, S.End()) console.log('DONE!');
|
on asserted S.Stream(o, S.End()) console.log('DONE!');
|
||||||
|
|
Loading…
Reference in New Issue