diff --git a/src/utils/index.js b/src/utils/index.js index 504a74923046c..ba237b909b3be 100644 --- a/src/utils/index.js +++ b/src/utils/index.js @@ -4,3 +4,14 @@ export deepCloneWithBuffers from './deep_clone_with_buffers'; export fromRoot from './from_root'; export pkg from './package_json'; export unset from './unset'; + +export { + createConcatStream, + createIntersperseStream, + createJsonParseStream, + createJsonStringifyStream, + createListStream, + createPromiseFromStreams, + createReduceStream, + createSplitStream, +} from './streams'; diff --git a/src/utils/streams/__tests__/concat_stream.js b/src/utils/streams/__tests__/concat_stream.js new file mode 100644 index 0000000000000..44c7d9317b98a --- /dev/null +++ b/src/utils/streams/__tests__/concat_stream.js @@ -0,0 +1,79 @@ +import expect from 'expect.js'; + +import { + createListStream, + createPromiseFromStreams, + createConcatStream +} from '../'; + +describe('concatStream', () => { + it('accepts an initial value', async () => { + const output = await createPromiseFromStreams([ + createListStream([1,2,3]), + createConcatStream([0]) + ]); + + expect(output).to.eql([0,1,2,3]); + }); + + describe(`combines using the previous value's concat method`, () => { + it('works with strings', async () => { + const output = await createPromiseFromStreams([ + createListStream([ + 'a', + 'b', + 'c' + ]), + createConcatStream() + ]); + expect(output).to.eql('abc'); + }); + + it('works with arrays', async () => { + const output = await createPromiseFromStreams([ + createListStream([ + [1], + [2,3,4], + [10] + ]), + createConcatStream() + ]); + expect(output).to.eql([1,2,3,4,10]); + }); + + it('works with a mixture, starting with array', async () => { + const output = await createPromiseFromStreams([ + createListStream([ + [], + 1, + 2, + 3, + 4, + [5,6,7] + ]), + createConcatStream() + ]); + expect(output).to.eql([1,2,3,4,5,6,7]); + }); + + it('fails when the value does not have a concat method', async () => { + let promise; + try { + promise = createPromiseFromStreams([ + createListStream([1, '1']), + createConcatStream() + ]); + } catch (err) { + expect.fail('createPromiseFromStreams() should not fail synchronously'); + } + + try { + await promise; + expect.fail('Promise should have rejected'); + } catch (err) { + expect(err).to.be.an(Error); + expect(err.message).to.contain('concat'); + } + }); + }); +}); diff --git a/src/utils/streams/__tests__/intersperse_stream.js b/src/utils/streams/__tests__/intersperse_stream.js new file mode 100644 index 0000000000000..f0638be121d8a --- /dev/null +++ b/src/utils/streams/__tests__/intersperse_stream.js @@ -0,0 +1,38 @@ +import expect from 'expect.js'; +import sinon from 'sinon'; + +import { + createPromiseFromStreams, + createListStream, + createIntersperseStream, + createConcatStream +} from '../'; + +describe('intersperseStream', () => { + it('places the intersperse value between each provided value', async () => { + expect( + await createPromiseFromStreams([ + createListStream(['to', 'be', 'or', 'not', 'to', 'be']), + createIntersperseStream(' '), + createConcatStream() + ]) + ).to.be('to be or not to be'); + }); + + it('emits values as soon as possible, does not needlessly buffer', async () => { + const str = createIntersperseStream('y'); + const stub = sinon.stub(); + str.on('data', stub); + + str.write('a'); + sinon.assert.calledOnce(stub); + expect(stub.firstCall.args).to.eql(['a']); + stub.reset(); + + str.write('b'); + sinon.assert.calledTwice(stub); + expect(stub.firstCall.args).to.eql(['y']); + sinon.assert.calledTwice(stub); + expect(stub.secondCall.args).to.eql(['b']); + }); +}); diff --git a/src/utils/streams/__tests__/json_parse_stream.js b/src/utils/streams/__tests__/json_parse_stream.js new file mode 100644 index 0000000000000..cdce8c9e065ab --- /dev/null +++ b/src/utils/streams/__tests__/json_parse_stream.js @@ -0,0 +1,72 @@ +import expect from 'expect.js'; + +import { + createPromiseFromStreams, + createListStream, + createConcatStream, + createJsonParseStream +} from '../'; + +describe('jsonParseStream', () => { + describe('standard usage', () => { + it('parses json strings', async () => { + const str = createJsonParseStream(); + const dataPromise = new Promise((resolve, reject) => { + str.on('data', resolve); + str.on('error', reject); + }); + str.write('{ "foo": "bar" }'); + + expect(await dataPromise).to.eql({ + foo: 'bar' + }); + }); + + it('parses json value passed to it from a list stream', async () => { + expect(await createPromiseFromStreams([ + createListStream([ + '"foo"', + '1' + ]), + createJsonParseStream(), + createConcatStream([]) + ])) + .to.eql(['foo', 1]); + }); + }); + + describe('error handling', () => { + it('emits an error when there is a parse failure', async () => { + const str = createJsonParseStream(); + const errorPromise = new Promise(resolve => str.once('error', resolve)); + str.write('{"partial'); + const err = await errorPromise; + expect(err).to.be.an(Error); + expect(err).to.have.property('name', 'SyntaxError'); + }); + + it('continues parsing after an error', async () => { + const str = createJsonParseStream(); + + const firstEmitPromise = new Promise(resolve => { + str.once('error', v => resolve({ name: 'error', value: v })); + str.once('data', v => resolve({ name: 'data', value: v })); + }); + + str.write('{"partial'); + const firstEmit = await firstEmitPromise; + expect(firstEmit).to.have.property('name', 'error'); + expect(firstEmit.value).to.be.an(Error); + + const secondEmitPromise = new Promise(resolve => { + str.once('error', v => resolve({ name: 'error', value: v })); + str.once('data', v => resolve({ name: 'data', value: v })); + }); + + str.write('42'); + const secondEmit = await secondEmitPromise; + expect(secondEmit).to.have.property('name', 'data'); + expect(secondEmit).to.have.property('value', 42); + }); + }); +}); diff --git a/src/utils/streams/__tests__/json_stringify_stream.js b/src/utils/streams/__tests__/json_stringify_stream.js new file mode 100644 index 0000000000000..5f694567bbba4 --- /dev/null +++ b/src/utils/streams/__tests__/json_stringify_stream.js @@ -0,0 +1,79 @@ +import expect from 'expect.js'; + +import { + createPromiseFromStreams, + createListStream, + createConcatStream, + createJsonStringifyStream +} from '../'; + +function createCircularStructure() { + const obj = {}; + obj.obj = obj; // create circular reference + return obj; +} + +describe('jsonStringifyStream', () => { + describe('standard usage', () => { + it('stringifys js values', async () => { + const str = createJsonStringifyStream(); + const dataPromise = new Promise((resolve, reject) => { + str.on('data', resolve); + str.on('error', reject); + }); + str.write({ foo: 'bar' }); + + expect(await dataPromise).to.be('{"foo":"bar"}'); + }); + + it('stringifys js values passed from a list stream', async () => { + const all = await createPromiseFromStreams([ + createListStream([ + 'foo', + 1 + ]), + createJsonStringifyStream(), + createConcatStream([]) + ]); + + expect(all).to.eql(['"foo"', '1']); + }); + }); + + describe('error handling', () => { + it('emits an error when there is a parse failure', async () => { + const str = createJsonStringifyStream(); + const errorPromise = new Promise(resolve => str.once('error', resolve)); + str.write(createCircularStructure()); + const err = await errorPromise; + expect(err).to.be.an(Error); + expect(err).to.have.property('name', 'TypeError'); + expect(err.message).to.contain('circular'); + }); + + it('continues parsing after an error', async () => { + const str = createJsonStringifyStream(); + + const firstEmitPromise = new Promise(resolve => { + str.once('error', v => resolve({ name: 'error', value: v })); + str.once('data', v => resolve({ name: 'data', value: v })); + }); + + str.write(createCircularStructure()); + + const firstEmit = await firstEmitPromise; + expect(firstEmit).to.have.property('name', 'error'); + expect(firstEmit.value).to.be.an(Error); + + const secondEmitPromise = new Promise(resolve => { + str.once('error', v => resolve({ name: 'error', value: v })); + str.once('data', v => resolve({ name: 'data', value: v })); + }); + + str.write('foo'); + const secondEmit = await secondEmitPromise; + expect(secondEmit).to.have.property('name', 'data'); + expect(secondEmit).to.have.property('value', '"foo"'); + }); + }); +}); diff --git a/src/utils/streams/__tests__/list_stream.js b/src/utils/streams/__tests__/list_stream.js new file mode 100644 index 0000000000000..83042e117aaa4 --- /dev/null +++ b/src/utils/streams/__tests__/list_stream.js @@ -0,0 +1,28 @@ +import expect from 'expect.js'; +import sinon from 'sinon'; + +import { createListStream } from '../'; + +describe('listStream', () => { + it('provides the values in the initial list', async () => { + const str = createListStream([1,2,3,4]); + const stub = sinon.stub(); + str.on('data', stub); + + await new Promise(resolve => str.on('end', resolve)); + + sinon.assert.callCount(stub, 4); + expect(stub.getCall(0).args).to.eql([1]); + expect(stub.getCall(1).args).to.eql([2]); + expect(stub.getCall(2).args).to.eql([3]); + expect(stub.getCall(3).args).to.eql([4]); + }); + + it('does not modify the list passed', async () => { + const list = [1,2,3,4]; + const str = createListStream(list); + str.resume(); + await new Promise(resolve => str.on('end', resolve)); + expect(list).to.eql([1,2,3,4]); + }); +}); diff --git a/src/utils/streams/__tests__/promise_from_streams.js b/src/utils/streams/__tests__/promise_from_streams.js new file mode 100644 index 0000000000000..642898c92b70f --- /dev/null +++ b/src/utils/streams/__tests__/promise_from_streams.js @@ -0,0 +1,90 @@ +import { PassThrough, Writable, Duplex } from 'stream'; + +import Bluebird from 'bluebird'; +import expect from 'expect.js'; + +import { + createListStream, + createPromiseFromStreams, + createReduceStream +} from '../'; + +describe('promiseFromStreams', () => { + it('pipes together an array of streams', async () => { + const str1 = createListStream([1,2,3]); + const str2 = createReduceStream((acc, n) => acc + n, 0); + const sumPromise = new Promise(resolve => str2.once('data', resolve)); + createPromiseFromStreams([str1, str2]); + await new Promise(resolve => str2.once('end', resolve)); + expect(await sumPromise).to.be(6); + }); + + context('last stream is writable', () => { + it('waits for the last stream to finish writing', async () => { + let written = ''; + + await createPromiseFromStreams([ + createListStream(['a']), + new Writable({ + write(chunk, enc, cb) { + setTimeout(() => { + written += chunk; + cb(); + }, 100); + } + }) + ]); + + expect(written).to.be('a'); + }); + + it('resolves to undefined', async () => { + const result = await createPromiseFromStreams([ + createListStream(['a']), + new Writable({ + write(chunk, enc, cb) { + cb(); + } + }) + ]); + + expect(result).to.be(undefined); + }); + }); + + context('last stream is readable', () => { + it(`resolves to it's final value`, async () => { + const result = await createPromiseFromStreams([ + createListStream(['a', 'b', 'c']) + ]); + + expect(result).to.be('c'); + }); + }); + + context('last stream is duplex', () => { + it('waits for writing and resolves to final value', async () => { + let written = ''; + const result = await createPromiseFromStreams([ + createListStream(['a', 'b', 'c']), + new Duplex({ + read() { + this.push('foo'); + this.push('bar'); + this.push(null); + }, + + write(chunk, enc, cb) { + setTimeout(() => { + written += chunk; + cb(); + }, 50); + } + }).setEncoding('utf8') + ]); + + expect(written).to.eql('abc'); + expect(result).to.be('bar'); + }); + }); +}); diff --git a/src/utils/streams/__tests__/reduce_stream.js b/src/utils/streams/__tests__/reduce_stream.js new file mode 100644 index 0000000000000..25556b7161910 --- /dev/null +++ b/src/utils/streams/__tests__/reduce_stream.js @@ -0,0 +1,69 @@ +import sinon from 'sinon'; +import expect from 'expect.js'; + +import { + createReduceStream, + createPromiseFromStreams, + createListStream, +} from '../'; + +const promiseFromEvent = (name, emitter) => + new Promise(resolve => emitter.on(name, () => resolve(name))); + +describe('reduceStream', () => { + it('calls the reducer for each item provided', async () => { + const stub = sinon.stub(); + await createPromiseFromStreams([ + createListStream([1,2,3]), + createReduceStream(stub.returnsArg(1), 0) + ]); + sinon.assert.calledThrice(stub); + expect(stub.firstCall.args).to.eql([0, 1, 'utf8']); + expect(stub.secondCall.args).to.eql([1, 2, 'utf8']); + expect(stub.thirdCall.args).to.eql([2, 3, 'utf8']); + }); + + it('provides the return value of the last iteration of the reducer', async () => { + const result = await createPromiseFromStreams([ + createListStream('abcdefg'.split('')), + createReduceStream((acc) => acc + 1, 0) + ]); + expect(result).to.be(7); + }); + + it('emits an error if an iteration fails', async () => { + const reduce = createReduceStream((acc, i) => expect(i).to.be(1), 0); + const errorEvent = promiseFromEvent('error', reduce); + + reduce.write(1); + reduce.write(2); + reduce.resume(); + await errorEvent; + }); + + it('stops calling the reducer if an iteration fails, emits no data', async () => { + const reducer = sinon.spy((acc, i) => { + if (i < 100) return acc + i; + else throw new Error(i); + }); + const reduce$ = createReduceStream(reducer, 0); + + const dataStub = sinon.stub(); + const errorStub = sinon.stub(); + reduce$.on('data', dataStub); + reduce$.on('error', errorStub); + const endEvent = promiseFromEvent('end', reduce$); + + reduce$.write(1); + reduce$.write(2); + reduce$.write(300); + reduce$.write(400); + reduce$.write(1000); + reduce$.end(); + + await endEvent; + sinon.assert.calledThrice(reducer); + sinon.assert.notCalled(dataStub); + sinon.assert.calledOnce(errorStub); + }); +}); diff --git a/src/utils/streams/__tests__/split_stream.js b/src/utils/streams/__tests__/split_stream.js new file mode 100644 index 0000000000000..6f720458afc43 --- /dev/null +++ b/src/utils/streams/__tests__/split_stream.js @@ -0,0 +1,58 @@ +import expect from 'expect.js'; + +import { + createSplitStream, + createConcatStream, + createPromiseFromStreams, +} from '../'; + +async function split(stream, input) { + const concat = createConcatStream(); + concat.write([]); + stream.pipe(concat); + const output = createPromiseFromStreams([concat]); + + input.forEach(i => { + stream.write(i); + }); + stream.end(); + + return await output; +} + +describe('splitStream', () => { + it('splits buffers, produces strings', async () => { + const output = await split(createSplitStream('&'), [Buffer.from('foo&bar')]); + expect(output).to.eql(['foo', 'bar']); + }); + + it('supports mixed input', async () => { + const output = await split(createSplitStream('&'), [Buffer.from('foo&b'), 'ar']); + expect(output).to.eql(['foo', 'bar']); + }); + + it('supports buffer split chunks', async () => { + const output = await split(createSplitStream(Buffer.from('&')), ['foo&b', 'ar']); + expect(output).to.eql(['foo', 'bar']); + }); + + it('splits provided values by a delimiter', async () => { + const output = await split(createSplitStream('&'), ['foo&b', 'ar']); + expect(output).to.eql(['foo', 'bar']); + }); + + it('handles multi-character delimiters', async () => { + const output = await split(createSplitStream('oo'), ['foo&b', 'ar']); + expect(output).to.eql(['f', '&bar']); + }); + + it('handles delimiters that span multple chunks', async () => { + const output = await split(createSplitStream('ba'), ['foo&b', 'ar']); + expect(output).to.eql(['foo&', 'r']); + }); + + it('produces an empty chunk if the split char is at the end of the input', async () => { + const output = await split(createSplitStream('&bar'), ['foo&b', 'ar']); + expect(output).to.eql(['foo', '']); + }); +}); diff --git a/src/utils/streams/concat_stream.js b/src/utils/streams/concat_stream.js new file mode 100644 index 0000000000000..c425142bcf13a --- /dev/null +++ b/src/utils/streams/concat_stream.js @@ -0,0 +1,28 @@ +import { createReduceStream } from './'; + +/** + * Creates a Transform stream that consumes all provided + * values and concatenates them using each values `concat` + * method. + * + * Concatenate strings: + * createListStream(['f', 'o', 'o']) + * .pipe(createConcatStream()) + * .on('data', console.log) + * // logs "foo" + * + * Concatenate values into an array: + * createListStream([1,2,3]) + * .pipe(createConcatStream([])) + * .pipe(createJsonStringifyStream()) + * .on('data', console.log) + * // logs "[1,2,3]" + * + * + * @param {any} initial The initial value that subsequent + * items will concat with + * @return {Transform} + */ +export function createConcatStream(initial) { + return createReduceStream((acc, chunk) => acc.concat(chunk), initial); +} diff --git a/src/utils/streams/index.js b/src/utils/streams/index.js new file mode 100644 index 0000000000000..cec45e702fc95 --- /dev/null +++ b/src/utils/streams/index.js @@ -0,0 +1,7 @@ +export { createIntersperseStream } from './intersperse_stream'; +export { createSplitStream } from './split_stream'; +export { createListStream } from './list_stream'; +export { createReduceStream } from './reduce_stream'; +export { createJsonParseStream, createJsonStringifyStream } from './json_streams'; +export { createPromiseFromStreams } from './promise_from_streams'; +export { createConcatStream } from './concat_stream'; diff --git a/src/utils/streams/intersperse_stream.js b/src/utils/streams/intersperse_stream.js new file mode 100644 index 0000000000000..7b8d84ec0408f --- /dev/null +++ b/src/utils/streams/intersperse_stream.js @@ -0,0 +1,45 @@ +import { Transform } from 'stream'; + +/** + * Create a Transform stream that receives values in object mode, + * and intersperses a chunk between each object received. + * + * This is useful for writing lists: + * + * createListStream(['foo', 'bar']) + * .pipe(createIntersperseStream('\n')) + * .pipe(process.stdout) // outputs "foo\nbar" + * + * Combine with a concat stream to get "join" like functionality: + * + * await createPromiseFromStreams([ + * createListStream(['foo', 'bar']), + * createIntersperseStream(' '), + * createConcatStream() + * ]) // produces a single value "foo bar" + * + * @param {String|Buffer} intersperseChunk + * @return {Transform} + */ +export function createIntersperseStream(intersperseChunk) { + let first = true; + + return new Transform({ + writableObjectMode: true, + readableObjectMode: true, + transform(chunk, enc, callback) { + try { + if (first) { + first = false; + } else { + this.push(intersperseChunk); + } + + this.push(chunk); + callback(null); + } catch (err) { + callback(err); + } + } + }); +} diff --git a/src/utils/streams/json_streams.js b/src/utils/streams/json_streams.js new file mode 100644 index 0000000000000..ab1035f79d4a8 --- /dev/null +++ b/src/utils/streams/json_streams.js @@ -0,0 +1,53 @@ +import { Transform } from 'stream'; + +/** + * Create a Transform stream that accepts strings (in + * object mode) and parsed those streams to provide their + * JavaScript value. + * + * Parse errors are emitted with the "error" event, and + * if not caught will cause the process to crash. When caught + * the stream will continue to parse subsequent values. + * + * @return {Transform} + */ +export function createJsonParseStream() { + return new Transform({ + writableObjectMode: true, + readableObjectMode: true, + transform(json, enc, callback) { + try { + callback(null, JSON.parse(json)); + } catch (err) { + callback(err); + } + } + }); +} + +/** + * Create a Transform stream that accepts arbitrary JavaScript + * values, stringifies them, and provides the output in object + * mode to consumers. + * + * Serialization errors are emitted with the "error" event, and + * if not caught will cause the process to crash. When caught + * the stream will continue to stringify subsequent values. + * + * @param {Object} options + * @property {Boolean} options.pretty + * @return {Transform} + */ +export function createJsonStringifyStream({ pretty = false } = {}) { + return new Transform({ + writableObjectMode: true, + readableObjectMode: true, + transform(json, enc, callback) { + try { + callback(null, JSON.stringify(json, null, pretty ? 2 : 0)); + } catch (err) { + callback(err); + } + } + }); +} diff --git a/src/utils/streams/list_stream.js b/src/utils/streams/list_stream.js new file mode 100644 index 0000000000000..0f0d7f295f1ce --- /dev/null +++ b/src/utils/streams/list_stream.js @@ -0,0 +1,25 @@ +import { Readable } from 'stream'; + +/** + * Create a Readable stream that provides the items + * from a list as objects to subscribers + * + * @param {Array} items - the list of items to provide + * @return {Readable} + */ +export function createListStream(items = []) { + const queue = [].concat(items); + + return new Readable({ + objectMode: true, + read(size) { + queue.splice(0, size).forEach(item => { + this.push(item); + }); + + if (!queue.length) { + this.push(null); + } + } + }); +} diff --git a/src/utils/streams/promise_from_streams.js b/src/utils/streams/promise_from_streams.js new file mode 100644 index 0000000000000..3dcbcf21171b0 --- /dev/null +++ b/src/utils/streams/promise_from_streams.js @@ -0,0 +1,67 @@ +/** + * Take an array of streams, pipe the output + * from each one into the next, listening for + * errors from any of the streams, and then resolve + * the promise once the final stream has finished + * writing/reading. + * + * If the last stream is readable, it's final value + * will be provided as the promise value. + * + * Errors emmitted from any stream will cause + * the promise to be rejected with that error. + * + * @param {Array} streams + * @return {Promise} + */ +export async function createPromiseFromStreams(streams) { + const last = streams[streams.length - 1]; + + // reject if any of the streams emits an error + const anyStreamFailure = new Promise((resolve, reject) => { + streams.forEach((stream, i) => { + if (i > 0) streams[i - 1].pipe(stream); + stream.on('error', reject); + return stream; + }); + }); + + // resolve when the last stream has finished writing, or + // immediately if the last stream is not writable + const lastFinishedWriting = new Promise(resolve => { + if (typeof last.write !== 'function') { + resolve(); + return; + } + + last.on('finish', resolve); + }); + + // resolve with the final value provided by the last stream + // after the last stream has provided it, or immediately if the + // stream is not readable + const lastFinishedReading = new Promise(resolve => { + if (typeof last.read !== 'function') { + resolve(); + return; + } + + let finalChunk; + last.on('data', (chunk) => { + finalChunk = chunk; + }); + last.on('end', () => { + resolve(finalChunk); + }); + }); + + // wait (and rethrow) the first error, or for the last stream + // to both finish writing and providing values to read + await Promise.race([ + anyStreamFailure, + Promise.all([lastFinishedWriting, lastFinishedReading]) + ]); + + // return the final chunk read from the last stream + return await lastFinishedReading; +} diff --git a/src/utils/streams/reduce_stream.js b/src/utils/streams/reduce_stream.js new file mode 100644 index 0000000000000..230ee8aaa87d0 --- /dev/null +++ b/src/utils/streams/reduce_stream.js @@ -0,0 +1,61 @@ +import { Transform } from 'stream'; + +/** + * Create a transform stream that consumes each chunk it receives + * and passes it to the reducer, which will return the new value + * for the stream. Once all chunks have been received the reduce + * stream provides the result of final call to the reducer to + * subscribers. + * + * @param {Function} + * @param {any} initial Initial value for the stream, if undefined + * then the first chunk provided is used as the + * initial value. + * @return {Transform} + */ +export function createReduceStream(reducer, initial) { + let i = -1; + let value = initial; + + // if the reducer throws an error then the value is + // considered invalid and the stream will never provide + // it to subscribers. We will also stop calling the + // reducer for any new data that is provided to us + let failed = false; + + if (typeof reducer !== 'function') { + throw new TypeError('reducer must be a function'); + } + + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk, enc, callback) { + try { + if (failed) { + return callback(); + } + + i += 1; + if (i === 0 && initial === undefined) { + value = chunk; + } else { + value = await reducer(value, chunk, enc); + } + + callback(); + } catch (err) { + failed = true; + callback(err); + } + }, + + flush(callback) { + if (!failed) { + this.push(value); + } + + callback(); + } + }); +} diff --git a/src/utils/streams/split_stream.js b/src/utils/streams/split_stream.js new file mode 100644 index 0000000000000..6b9cccb081331 --- /dev/null +++ b/src/utils/streams/split_stream.js @@ -0,0 +1,55 @@ +import { Transform } from 'stream'; + +/** + * Creates a Transform stream that consumes a stream of Buffers + * and produces a stream of strings (in object mode) by splitting + * the received bytes using the splitChunk. + * + * Ways this is behaves like String#split: + * - instances of splitChunk are removed from the input + * - splitChunk can be on any size + * - if there are no bytes found after the last splitChunk + * a final empty chunk is emitted + * + * Ways this deviates from String#split: + * - splitChunk cannot be a regexp + * - an empty string or Buffer will not produce a stream of individual + * bytes like `string.split('')` would + * + * @param {String} splitChunk + * @return {Transform} + */ +export function createSplitStream(splitChunk) { + let unsplitBuffer = Buffer.alloc(0); + + return new Transform({ + writableObjectMode: false, + readableObjectMode: true, + transform(chunk, enc, callback) { + try { + let i; + let toSplit = Buffer.concat([unsplitBuffer, chunk]); + while ((i = toSplit.indexOf(splitChunk)) !== -1) { + const slice = toSplit.slice(0, i); + toSplit = toSplit.slice(i + splitChunk.length); + this.push(slice.toString('utf8')); + } + + unsplitBuffer = toSplit; + callback(null); + } catch (err) { + callback(err); + } + }, + + flush(callback) { + try { + this.push(unsplitBuffer.toString('utf8')); + + callback(null); + } catch (err) { + callback(err); + } + } + }); +}