diff --git a/lib/s3middleware/azureHelpers/SubStreamInterface.js b/lib/s3middleware/azureHelpers/SubStreamInterface.js index caaa9bc15..40e50af3f 100644 --- a/lib/s3middleware/azureHelpers/SubStreamInterface.js +++ b/lib/s3middleware/azureHelpers/SubStreamInterface.js @@ -1,5 +1,20 @@ const stream = require('stream'); +class SubStream extends stream.PassThrough { + constructor(options) { + super(options); + + this.on('stopStreamingToAzure', function stopStreamingToAzure() { + this._abortStreaming(); + }); + } + + _abortStreaming() { + this.push(null); + this.end(); + } +} + /** * Interface for streaming subparts. * @class SubStreamInterface @@ -14,7 +29,8 @@ class SubStreamInterface { this._totalLengthCounter = 0; this._lengthCounter = 0; this._subPartIndex = 0; - this._currentStream = new stream.PassThrough(); + this._currentStream = new SubStream(); + this._streamingAborted = false; } /** @@ -51,12 +67,11 @@ class SubStreamInterface { * @return {undefined} */ stopStreaming(piper) { + this._streamingAborted = true; if (piper) { piper.unpipe(); - piper.destroy(); } - this._sourceStream.destroy(); - this._currentStream.destroy(); + this._currentStream.emit('stopStreamingToAzure'); } /** @@ -97,7 +112,7 @@ class SubStreamInterface { this._totalLengthCounter += this._lengthCounter; this._lengthCounter = 0; this._subPartIndex++; - this._currentStream = new stream.PassThrough(); + this._currentStream = new SubStream(); this.resumeStreaming(); return { nextStream: this._currentStream, @@ -111,6 +126,10 @@ class SubStreamInterface { * @return {undefined} */ write(chunk) { + if (this._streamingAborted) { + // don't write + return; + } const ready = this._currentStream.write(chunk); if (!ready) { diff --git a/lib/s3middleware/azureHelpers/mpuUtils.js b/lib/s3middleware/azureHelpers/mpuUtils.js index 440b1163d..f2d899558 100644 --- a/lib/s3middleware/azureHelpers/mpuUtils.js +++ b/lib/s3middleware/azureHelpers/mpuUtils.js @@ -151,9 +151,9 @@ dataStoreName, log, cb) => { 'putting multiple parts'); resultsCollector.on('error', (err, subPartIndex) => { - streamInterface.stopStreaming(request); log.error(`Error putting subpart to Azure: ${subPartIndex}`, { error: err.message, dataStoreName }); + streamInterface.stopStreaming(request); if (err.code === 'ContainerNotFound') { return cb(errors.NoSuchBucket); } diff --git a/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js b/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js new file mode 100644 index 000000000..e63f522ea --- /dev/null +++ b/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js @@ -0,0 +1,63 @@ +const assert = require('assert'); +const stream = require('stream'); +const SubStreamInterface = + require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface'); + +describe('s3middleware SubStreamInterface.stopStreaming()', () => { + const eventsEmitted = { + sourceStreamUnpiped: false, + currentStreamStopStreamingToAzure: false, + currentStreamEnded: false, + }; + const expectedSequence = { + sourceStreamUnpiped: 0, + currentStreamStopStreamingToAzure: 1, + currentStreamEnded: 2, + }; + const data = Buffer.alloc(100); + let dataMarker = 0; + let eventSequence = 0; + const mockRequest = new stream.Readable({ + read: () => { + if (dataMarker >= data.length) { + return mockRequest.push(null); + } + mockRequest.push(data.slice(dataMarker, dataMarker + 1)); + dataMarker += 1; + return undefined; + }, + }); + const sourceStream = new stream.PassThrough(); + const subStreamInterface = new SubStreamInterface(sourceStream); + sourceStream.on('unpipe', () => { + eventsEmitted.sourceStreamUnpiped = eventSequence++; + }); + subStreamInterface._currentStream.on('stopStreamingToAzure', () => { + eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++; + }); + subStreamInterface._currentStream.on('finish', () => { + eventsEmitted.currentStreamEnded = eventSequence++; + }); + it('should stop streaming data and end current stream', done => { + sourceStream.on('data', chunk => { + const currentLength = subStreamInterface.getLengthCounter(); + if (currentLength === 10) { + Object.keys(eventsEmitted).forEach(key => { + assert.strictEqual(eventsEmitted[key], false); + }); + assert.strictEqual(mockRequest._readableState.pipesCount, 1); + return subStreamInterface.stopStreaming(mockRequest); + } + return subStreamInterface.write(chunk); + }); + mockRequest.pipe(sourceStream); + setTimeout(() => { + Object.keys(eventsEmitted).forEach(key => { + assert.strictEqual(eventsEmitted[key], expectedSequence[key]); + }); + assert.strictEqual(subStreamInterface.getLengthCounter(), 10); + assert.strictEqual(mockRequest._readableState.pipesCount, 0); + return done(); + }, 1000); + }); +});