From 71db93185f563a6307aa0b14577f7c879dd98bbf Mon Sep 17 00:00:00 2001 From: Electra Chong Date: Tue, 31 Oct 2017 17:57:49 -0700 Subject: [PATCH] fix: undefined stream.destroy call To end streaming in case of error, we were calling an unofficial method of the stream API which was removed and does not exist in the version of node we use. The method is re-added officially in node v.8 but until we upgrade we need to destroy the streams manually, by pushing null for readables and calling stream.end() for writables. --- .../azureHelpers/SubStreamInterface.js | 29 +++++++-- lib/s3middleware/azureHelpers/mpuUtils.js | 2 +- .../azureHelpers/SubStreamingInterface.js | 63 +++++++++++++++++++ 3 files changed, 88 insertions(+), 6 deletions(-) create mode 100644 tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js diff --git a/lib/s3middleware/azureHelpers/SubStreamInterface.js b/lib/s3middleware/azureHelpers/SubStreamInterface.js index caaa9bc15..40e50af3f 100644 --- a/lib/s3middleware/azureHelpers/SubStreamInterface.js +++ b/lib/s3middleware/azureHelpers/SubStreamInterface.js @@ -1,5 +1,20 @@ const stream = require('stream'); +class SubStream extends stream.PassThrough { + constructor(options) { + super(options); + + this.on('stopStreamingToAzure', function stopStreamingToAzure() { + this._abortStreaming(); + }); + } + + _abortStreaming() { + this.push(null); + this.end(); + } +} + /** * Interface for streaming subparts. * @class SubStreamInterface @@ -14,7 +29,8 @@ class SubStreamInterface { this._totalLengthCounter = 0; this._lengthCounter = 0; this._subPartIndex = 0; - this._currentStream = new stream.PassThrough(); + this._currentStream = new SubStream(); + this._streamingAborted = false; } /** @@ -51,12 +67,11 @@ class SubStreamInterface { * @return {undefined} */ stopStreaming(piper) { + this._streamingAborted = true; if (piper) { piper.unpipe(); - piper.destroy(); } - this._sourceStream.destroy(); - this._currentStream.destroy(); + this._currentStream.emit('stopStreamingToAzure'); } /** @@ -97,7 +112,7 @@ class SubStreamInterface { this._totalLengthCounter += this._lengthCounter; this._lengthCounter = 0; this._subPartIndex++; - this._currentStream = new stream.PassThrough(); + this._currentStream = new SubStream(); this.resumeStreaming(); return { nextStream: this._currentStream, @@ -111,6 +126,10 @@ class SubStreamInterface { * @return {undefined} */ write(chunk) { + if (this._streamingAborted) { + // don't write + return; + } const ready = this._currentStream.write(chunk); if (!ready) { diff --git a/lib/s3middleware/azureHelpers/mpuUtils.js b/lib/s3middleware/azureHelpers/mpuUtils.js index 440b1163d..f2d899558 100644 --- a/lib/s3middleware/azureHelpers/mpuUtils.js +++ b/lib/s3middleware/azureHelpers/mpuUtils.js @@ -151,9 +151,9 @@ dataStoreName, log, cb) => { 'putting multiple parts'); resultsCollector.on('error', (err, subPartIndex) => { - streamInterface.stopStreaming(request); log.error(`Error putting subpart to Azure: ${subPartIndex}`, { error: err.message, dataStoreName }); + streamInterface.stopStreaming(request); if (err.code === 'ContainerNotFound') { return cb(errors.NoSuchBucket); } diff --git a/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js b/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js new file mode 100644 index 000000000..e63f522ea --- /dev/null +++ b/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js @@ -0,0 +1,63 @@ +const assert = require('assert'); +const stream = require('stream'); +const SubStreamInterface = + require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface'); + +describe('s3middleware SubStreamInterface.stopStreaming()', () => { + const eventsEmitted = { + sourceStreamUnpiped: false, + currentStreamStopStreamingToAzure: false, + currentStreamEnded: false, + }; + const expectedSequence = { + sourceStreamUnpiped: 0, + currentStreamStopStreamingToAzure: 1, + currentStreamEnded: 2, + }; + const data = Buffer.alloc(100); + let dataMarker = 0; + let eventSequence = 0; + const mockRequest = new stream.Readable({ + read: () => { + if (dataMarker >= data.length) { + return mockRequest.push(null); + } + mockRequest.push(data.slice(dataMarker, dataMarker + 1)); + dataMarker += 1; + return undefined; + }, + }); + const sourceStream = new stream.PassThrough(); + const subStreamInterface = new SubStreamInterface(sourceStream); + sourceStream.on('unpipe', () => { + eventsEmitted.sourceStreamUnpiped = eventSequence++; + }); + subStreamInterface._currentStream.on('stopStreamingToAzure', () => { + eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++; + }); + subStreamInterface._currentStream.on('finish', () => { + eventsEmitted.currentStreamEnded = eventSequence++; + }); + it('should stop streaming data and end current stream', done => { + sourceStream.on('data', chunk => { + const currentLength = subStreamInterface.getLengthCounter(); + if (currentLength === 10) { + Object.keys(eventsEmitted).forEach(key => { + assert.strictEqual(eventsEmitted[key], false); + }); + assert.strictEqual(mockRequest._readableState.pipesCount, 1); + return subStreamInterface.stopStreaming(mockRequest); + } + return subStreamInterface.write(chunk); + }); + mockRequest.pipe(sourceStream); + setTimeout(() => { + Object.keys(eventsEmitted).forEach(key => { + assert.strictEqual(eventsEmitted[key], expectedSequence[key]); + }); + assert.strictEqual(subStreamInterface.getLengthCounter(), 10); + assert.strictEqual(mockRequest._readableState.pipesCount, 0); + return done(); + }, 1000); + }); +});