From f3d7c05d7e6e74d82be31b8eba7c99eefb7924ec Mon Sep 17 00:00:00 2001 From: Electra Chong Date: Mon, 6 Nov 2017 10:29:36 -0800 Subject: [PATCH] [s] address comments --- .../azureHelpers/SubStreamInterface.js | 26 ++++++++++----- .../azureHelpers/SubStreamingInterface.js | 32 ++++++++++++------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/lib/s3middleware/azureHelpers/SubStreamInterface.js b/lib/s3middleware/azureHelpers/SubStreamInterface.js index 9721eb487..40e50af3f 100644 --- a/lib/s3middleware/azureHelpers/SubStreamInterface.js +++ b/lib/s3middleware/azureHelpers/SubStreamInterface.js @@ -1,5 +1,20 @@ const stream = require('stream'); +class SubStream extends stream.PassThrough { + constructor(options) { + super(options); + + this.on('stopStreamingToAzure', function stopStreamingToAzure() { + this._abortStreaming(); + }); + } + + _abortStreaming() { + this.push(null); + this.end(); + } +} + /** * Interface for streaming subparts. * @class SubStreamInterface @@ -14,7 +29,7 @@ class SubStreamInterface { this._totalLengthCounter = 0; this._lengthCounter = 0; this._subPartIndex = 0; - this._currentStream = new stream.PassThrough(); + this._currentStream = new SubStream(); this._streamingAborted = false; } @@ -55,13 +70,8 @@ class SubStreamInterface { this._streamingAborted = true; if (piper) { piper.unpipe(); - piper.push(null); } - this._sourceStream.pause(); - this._sourceStream.push(null); - this._sourceStream.end(); - this._currentStream.push(null); - this._currentStream.end(); + this._currentStream.emit('stopStreamingToAzure'); } /** @@ -102,7 +112,7 @@ class SubStreamInterface { this._totalLengthCounter += this._lengthCounter; this._lengthCounter = 0; this._subPartIndex++; - this._currentStream = new stream.PassThrough(); + this._currentStream = new SubStream(); this.resumeStreaming(); return { nextStream: this._currentStream, diff --git a/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js b/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js index 1c3439ce5..e63f522ea 100644 --- a/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js +++ b/tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js @@ -4,12 +4,19 @@ const SubStreamInterface = require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface'); describe('s3middleware SubStreamInterface.stopStreaming()', () => { - const emittedFinished = { - sourceStream: false, - currentStream: false, + const eventsEmitted = { + sourceStreamUnpiped: false, + currentStreamStopStreamingToAzure: false, + currentStreamEnded: false, + }; + const expectedSequence = { + sourceStreamUnpiped: 0, + currentStreamStopStreamingToAzure: 1, + currentStreamEnded: 2, }; const data = Buffer.alloc(100); let dataMarker = 0; + let eventSequence = 0; const mockRequest = new stream.Readable({ read: () => { if (dataMarker >= data.length) { @@ -22,18 +29,21 @@ describe('s3middleware SubStreamInterface.stopStreaming()', () => { }); const sourceStream = new stream.PassThrough(); const subStreamInterface = new SubStreamInterface(sourceStream); - sourceStream.on('finish', () => { - emittedFinished.sourceStream = true; + sourceStream.on('unpipe', () => { + eventsEmitted.sourceStreamUnpiped = eventSequence++; + }); + subStreamInterface._currentStream.on('stopStreamingToAzure', () => { + eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++; }); subStreamInterface._currentStream.on('finish', () => { - emittedFinished.currentStream = true; + eventsEmitted.currentStreamEnded = eventSequence++; }); - it('should stop streaming data and end all streams', done => { + it('should stop streaming data and end current stream', done => { sourceStream.on('data', chunk => { const currentLength = subStreamInterface.getLengthCounter(); if (currentLength === 10) { - Object.keys(emittedFinished).forEach(key => { - assert.strictEqual(emittedFinished[key], false); + Object.keys(eventsEmitted).forEach(key => { + assert.strictEqual(eventsEmitted[key], false); }); assert.strictEqual(mockRequest._readableState.pipesCount, 1); return subStreamInterface.stopStreaming(mockRequest); @@ -42,8 +52,8 @@ describe('s3middleware SubStreamInterface.stopStreaming()', () => { }); mockRequest.pipe(sourceStream); setTimeout(() => { - Object.keys(emittedFinished).forEach(key => { - assert.strictEqual(emittedFinished[key], true); + Object.keys(eventsEmitted).forEach(key => { + assert.strictEqual(eventsEmitted[key], expectedSequence[key]); }); assert.strictEqual(subStreamInterface.getLengthCounter(), 10); assert.strictEqual(mockRequest._readableState.pipesCount, 0);