Skip to content

Commit

Permalink
[s] address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
electrachong committed Nov 6, 2017
1 parent a39057f commit f3d7c05
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
26 changes: 18 additions & 8 deletions lib/s3middleware/azureHelpers/SubStreamInterface.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
const stream = require('stream');

class SubStream extends stream.PassThrough {
constructor(options) {
super(options);

this.on('stopStreamingToAzure', function stopStreamingToAzure() {
this._abortStreaming();
});
}

_abortStreaming() {
this.push(null);
this.end();
}
}

/**
* Interface for streaming subparts.
* @class SubStreamInterface
Expand All @@ -14,7 +29,7 @@ class SubStreamInterface {
this._totalLengthCounter = 0;
this._lengthCounter = 0;
this._subPartIndex = 0;
this._currentStream = new stream.PassThrough();
this._currentStream = new SubStream();
this._streamingAborted = false;
}

Expand Down Expand Up @@ -55,13 +70,8 @@ class SubStreamInterface {
this._streamingAborted = true;
if (piper) {
piper.unpipe();
piper.push(null);
}
this._sourceStream.pause();
this._sourceStream.push(null);
this._sourceStream.end();
this._currentStream.push(null);
this._currentStream.end();
this._currentStream.emit('stopStreamingToAzure');
}

/**
Expand Down Expand Up @@ -102,7 +112,7 @@ class SubStreamInterface {
this._totalLengthCounter += this._lengthCounter;
this._lengthCounter = 0;
this._subPartIndex++;
this._currentStream = new stream.PassThrough();
this._currentStream = new SubStream();
this.resumeStreaming();
return {
nextStream: this._currentStream,
Expand Down
32 changes: 21 additions & 11 deletions tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ const SubStreamInterface =
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');

describe('s3middleware SubStreamInterface.stopStreaming()', () => {
const emittedFinished = {
sourceStream: false,
currentStream: false,
const eventsEmitted = {
sourceStreamUnpiped: false,
currentStreamStopStreamingToAzure: false,
currentStreamEnded: false,
};
const expectedSequence = {
sourceStreamUnpiped: 0,
currentStreamStopStreamingToAzure: 1,
currentStreamEnded: 2,
};
const data = Buffer.alloc(100);
let dataMarker = 0;
let eventSequence = 0;
const mockRequest = new stream.Readable({
read: () => {
if (dataMarker >= data.length) {
Expand All @@ -22,18 +29,21 @@ describe('s3middleware SubStreamInterface.stopStreaming()', () => {
});
const sourceStream = new stream.PassThrough();
const subStreamInterface = new SubStreamInterface(sourceStream);
sourceStream.on('finish', () => {
emittedFinished.sourceStream = true;
sourceStream.on('unpipe', () => {
eventsEmitted.sourceStreamUnpiped = eventSequence++;
});
subStreamInterface._currentStream.on('stopStreamingToAzure', () => {
eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++;
});
subStreamInterface._currentStream.on('finish', () => {
emittedFinished.currentStream = true;
eventsEmitted.currentStreamEnded = eventSequence++;
});
it('should stop streaming data and end all streams', done => {
it('should stop streaming data and end current stream', done => {
sourceStream.on('data', chunk => {
const currentLength = subStreamInterface.getLengthCounter();
if (currentLength === 10) {
Object.keys(emittedFinished).forEach(key => {
assert.strictEqual(emittedFinished[key], false);
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], false);
});
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
return subStreamInterface.stopStreaming(mockRequest);
Expand All @@ -42,8 +52,8 @@ describe('s3middleware SubStreamInterface.stopStreaming()', () => {
});
mockRequest.pipe(sourceStream);
setTimeout(() => {
Object.keys(emittedFinished).forEach(key => {
assert.strictEqual(emittedFinished[key], true);
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], expectedSequence[key]);
});
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
assert.strictEqual(mockRequest._readableState.pipesCount, 0);
Expand Down

0 comments on commit f3d7c05

Please sign in to comment.