diff --git a/packages/driver-streams-node/src/duplex.js b/packages/driver-streams-node/src/duplex.js new file mode 100644 index 0000000..129e10d --- /dev/null +++ b/packages/driver-streams-node/src/duplex.js @@ -0,0 +1,72 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/driver-streams-node, Stream support for Syndicate/js +// Copyright (C) 2016-2018 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +// Just enough functionality to make a pair of Readable/Writable +// streams appear to be a Duplex stream from the POV of the Syndicate +// driver in this package. + +const util = require('util'); +const events = require('events'); + +function Duplex(r, w) { + this.r = r; + this.w = w; +} + +Duplex.prototype.on = function (evt, cb) { + switch (evt) { + case 'readable': + case 'end': + this.r.on(evt, cb); + break; + case 'drain': + this.w.on(evt, cb); + break; + case 'close': + case 'error': + this.r.on(evt, cb); + this.w.on(evt, cb); + break; + default: throw new Error("Duplex: unsupported event: " + evt); + } + return this; +}; + +function proxyProp(name, target) { + Object.defineProperty(Duplex.prototype, name, { + configurable: true, + enumerable: true, + get: function () { return this[target][name]; } + }); +} + +proxyProp('readableLength', 'r'); +proxyProp('writableLength', 'w'); +proxyProp('writableHighWaterMark', 'w'); + +Duplex.prototype.read = function (size) { + return this.r.read(size); +}; + +Duplex.prototype.write = function (chunk, cb) { + return this.w.write(chunk, cb); +}; + +util.inherits(Duplex, events.EventEmitter); + +module.exports = Duplex; diff --git a/packages/driver-streams-node/src/subprocess.js b/packages/driver-streams-node/src/subprocess.js index 49f23dc..e37bfcf 100644 --- a/packages/driver-streams-node/src/subprocess.js +++ b/packages/driver-streams-node/src/subprocess.js @@ -18,9 +18,11 @@ import { currentFacet, Observe, Dataspace, genUuid, Bytes } from "@syndicate-lang/core"; const S = activate require("./streams"); +const Duplex = require("./duplex"); const child_process = require('child_process'); assertion type Subprocess(id, command, args, options); +assertion type SubprocessAddress(command, args, options); message type SubprocessError(id, err); assertion type SubprocessRunning(id, pid, stdio); @@ -29,12 +31,36 @@ assertion type SubprocessExit(id, code, signal); message type SubprocessKill(id, signal); // also on frame teardown export { - Subprocess, SubprocessError, + Subprocess, SubprocessAddress, SubprocessError, SubprocessRunning, SubprocessExit, SubprocessKill, }; spawn named 'driver/Subprocess' { + during S.OutgoingConnection($id, SubprocessAddress($command, $args, $options)) + spawn named ['SubprocessConnection', id] { + const establishingFacet = currentFacet(); + + const sp = child_process.spawn(command, + args.toJS(), + (options || Map()).set('stdio', + ['pipe', 'pipe', 'inherit']).toJS()); + const rejecter = Dataspace.wrapExternal(() => { + send S.ConnectionRejected(id, null); + establishingFacet.stop(); + }); + sp.on('exit', rejecter); + sp.on('error', rejecter); + + process.nextTick(Dataspace.wrapExternal(() => { + sp.off('exit', rejecter); + sp.off('error', rejecter); + send S.ConnectionAccepted(id); + const s = new Duplex(sp.stdout, sp.stdin); + establishingFacet.stop(() => { react S.duplexStreamBehaviour(id, s); }); + })); + } + during Subprocess($id, $command, $args, $options) spawn named ['Subprocess', id] { const sp = child_process.spawn(command, args.toJS(), options ? options.toJS() : void 0);