diff --git a/packages/driver-streams-node/src/buffer.js b/packages/driver-streams-node/src/buffer.js new file mode 100644 index 0000000..60447d7 --- /dev/null +++ b/packages/driver-streams-node/src/buffer.js @@ -0,0 +1,68 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/driver-streams-node, Stream support for Syndicate/js +// Copyright (C) 2016-2018 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +import { currentFacet, Observe, genUuid, Bytes, List } from "@syndicate-lang/core"; +const S = activate require("./streams"); + +message type PacketRequest(id, size); + +export { + PacketRequest, +}; + +export function spawnBufferStream() { + const id = genUuid('buffer-stream'); + spawn named id { + stop on retracted Observe(S.Duplex(id)); + assert S.Duplex(id); + assert S.StreamInfo(id, 'Duplex', null); + + field this.buffer = Bytes(); + field this.queue = List(); + + on message S.Push(id, $chunk, $ack) { + this.buffer = Bytes.concat([this.buffer, chunk]); + if (ack !== null) send ack; + } + + stop on message S.Close(id, $ack) { + if (ack !== null) send ack; + } + + on message PacketRequest(id, $size) { + if (size === 0) { + // Signal to terminate. + currentFacet().stop(() => { send S.Data(id, this.buffer); }); + } else { + this.queue = this.queue.push(size); + } + } + + dataflow { + if (!this.queue.isEmpty()) { + const expected = this.queue.first(); + if (this.buffer.size >= expected) { + send S.Data(id, this.buffer.slice(0, expected)); + this.buffer = this.buffer.slice(expected); + this.queue = this.queue.shift(); + } + } + } + } + return id; +} diff --git a/packages/driver-streams-node/src/index.js b/packages/driver-streams-node/src/index.js index 34af563..6ee1167 100644 --- a/packages/driver-streams-node/src/index.js +++ b/packages/driver-streams-node/src/index.js @@ -20,3 +20,4 @@ Object.assign(module.exports, activate require('./streams.js')); Object.assign(module.exports, activate require('./net.js')); Object.assign(module.exports, activate require('./subprocess.js')); +Object.assign(module.exports, activate require('./buffer.js'));