diff --git a/TODO.md b/TODO.md index 9230efe..bb7d9ca 100644 --- a/TODO.md +++ b/TODO.md @@ -32,3 +32,6 @@ some formulations of the game-restart logic in the flappy bird demo. + - driver-streams-node/src/subprocess.js: perhaps figure out some way + of blocking SIGQUIT, which I'm currently using to get debug output, + in children, so they don't terminate too? diff --git a/packages/driver-streams-node/src/index.js b/packages/driver-streams-node/src/index.js index 8d80bfd..34af563 100644 --- a/packages/driver-streams-node/src/index.js +++ b/packages/driver-streams-node/src/index.js @@ -19,3 +19,4 @@ Object.assign(module.exports, activate require('./streams.js')); Object.assign(module.exports, activate require('./net.js')); +Object.assign(module.exports, activate require('./subprocess.js')); diff --git a/packages/driver-streams-node/src/subprocess.js b/packages/driver-streams-node/src/subprocess.js new file mode 100644 index 0000000..49f23dc --- /dev/null +++ b/packages/driver-streams-node/src/subprocess.js @@ -0,0 +1,87 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/driver-streams-node, Stream support for Syndicate/js +// Copyright (C) 2016-2018 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +import { currentFacet, Observe, Dataspace, genUuid, Bytes } from "@syndicate-lang/core"; +const S = activate require("./streams"); +const child_process = require('child_process'); + +assertion type Subprocess(id, command, args, options); +message type SubprocessError(id, err); + +assertion type SubprocessRunning(id, pid, stdio); +assertion type SubprocessExit(id, code, signal); + +message type SubprocessKill(id, signal); // also on frame teardown + +export { + Subprocess, SubprocessError, + SubprocessRunning, SubprocessExit, + SubprocessKill, +}; + +spawn named 'driver/Subprocess' { + during Subprocess($id, $command, $args, $options) spawn named ['Subprocess', id] { + const sp = child_process.spawn(command, args.toJS(), options ? options.toJS() : void 0); + + const stdio = sp.stdio.map((s, i) => { + if (s !== null) { + const fd = genUuid('fd'); + if (s.readable && s.writable) { + on start react S.duplexStreamBehaviour(fd, s); + } else if (s.readable) { + on start react S.readableStreamBehaviour(fd, s); + } else if (s.writable) { + on start react S.writableStreamBehaviour(fd, s); + } + return fd; + } else { + return null; + } + }); + + field this.isRunning = null; + + on stop if (this.isRunning !== false) sp.kill('SIGKILL'); + + assert SubprocessRunning(id, sp.pid, stdio) when (this.isRunning === true); + + sp.on('exit', Dataspace.wrapExternal((code, signal) => { + this.isRunning = false; + react assert SubprocessExit(id, code, signal); + })); + + sp.on('error', Dataspace.wrapExternal((err) => { + this.isRunning = false; + send SubprocessError(id, err); + currentFacet().stop(); + })); + + process.nextTick(Dataspace.wrapExternal(() => { + if (this.isRunning === null) { + this.isRunning = true; + } + })); + + on message SubprocessKill(id, $signal) { + if (this.isRunning !== false) { + this.isRunning = false; + sp.kill(signal); + } + } + } +} diff --git a/packages/syntax-playground/src/ncchatclient.js b/packages/syntax-playground/src/ncchatclient.js new file mode 100644 index 0000000..bfed1e6 --- /dev/null +++ b/packages/syntax-playground/src/ncchatclient.js @@ -0,0 +1,57 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/syntax-test, a demo of Syndicate extensions to JS. +// Copyright (C) 2016-2018 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +const { Observe, currentFacet, genUuid } = require("@syndicate-lang/core"); +const { sleep } = activate require("@syndicate-lang/driver-timer"); +const S = activate require("@syndicate-lang/driver-streams-node"); + +const stdin = genUuid('stdin'); +const stdout = genUuid('stdout'); +spawn named 'stdioServer' { + during Observe(S.Readable(stdin)) spawn S.readableStreamBehaviour(stdin, process.stdin); + during Observe(S.Writable(stdout)) spawn S.writableStreamBehaviour(stdout, process.stdout); +} + +spawn named 'chatclient-via-nc' { + const id = genUuid('p'); + assert S.Subprocess(id, 'nc', ['localhost', '5999'], {stdio: ['pipe', 'pipe', 'ignore']}); + stop on message S.SubprocessError(id, $err) { + console.error("Couldn't start subprocess", err); + } + stop on retracted S.Readable(stdin); + stop on retracted S.Writable(stdout); + on asserted S.SubprocessRunning(id, _, [$i, $o, _]) { + react { + on message S.Line(stdin, $line) { + console.log('INPUT:', line); + send S.Push(i, line.toString('utf-8') + '\n', null); + } + on message S.End(stdin) { + console.log('INPUT EOF'); + send S.Close(i, null); + } + + on message S.Line(o, $line) send S.Push(stdout, line.toString('utf-8') + '\n', null); + } + } + stop on asserted S.SubprocessExit(id, $code, $signal) { + if (code !== 0) { + console.error('Subprocess exited with code', code, 'signal', signal); + } + } +} diff --git a/packages/syntax-playground/src/subprocess.js b/packages/syntax-playground/src/subprocess.js new file mode 100644 index 0000000..dd175d8 --- /dev/null +++ b/packages/syntax-playground/src/subprocess.js @@ -0,0 +1,46 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/syntax-test, a demo of Syndicate extensions to JS. +// Copyright (C) 2016-2018 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +const { currentFacet, genUuid } = require("@syndicate-lang/core"); +const { sleep } = activate require("@syndicate-lang/driver-timer"); +const S = activate require("@syndicate-lang/driver-streams-node"); + +spawn named 'lister' { + const go = () => { + react { + const id = genUuid('p'); + assert S.Subprocess(id, 'nc', ['localhost', '80'], {stdio: ['pipe', 'pipe', 'ignore']}); + stop on message S.SubprocessError(id, $err) { + console.error("Couldn't start subprocess", err); + } + on asserted S.SubprocessRunning(id, _, [$i, $o, _]) { + send S.Push(i, "GET / HTTP/1.0\r\n\r\n", null); + send S.Close(i, null); + react { + on message S.Data(o, $chunk) console.log(chunk); + on asserted S.End(o) console.log('DONE!'); + } + } + stop on asserted S.SubprocessExit(id, $code, $signal) { + console.log('No longer running', new Date(), code, signal); + sleep(1000, go); + } + } + }; + on start go(); +}