Skip to content

Commit

Permalink
merge #367
Browse files Browse the repository at this point in the history
  • Loading branch information
ironman-machine authored and Cloud User committed Nov 6, 2017
2 parents 1270412 + 71db931 commit 2eee4fb
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 6 deletions.
29 changes: 24 additions & 5 deletions lib/s3middleware/azureHelpers/SubStreamInterface.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
const stream = require('stream');

class SubStream extends stream.PassThrough {
constructor(options) {
super(options);

this.on('stopStreamingToAzure', function stopStreamingToAzure() {
this._abortStreaming();
});
}

_abortStreaming() {
this.push(null);
this.end();
}
}

/**
* Interface for streaming subparts.
* @class SubStreamInterface
Expand All @@ -14,7 +29,8 @@ class SubStreamInterface {
this._totalLengthCounter = 0;
this._lengthCounter = 0;
this._subPartIndex = 0;
this._currentStream = new stream.PassThrough();
this._currentStream = new SubStream();
this._streamingAborted = false;
}

/**
Expand Down Expand Up @@ -51,12 +67,11 @@ class SubStreamInterface {
* @return {undefined}
*/
stopStreaming(piper) {
this._streamingAborted = true;
if (piper) {
piper.unpipe();
piper.destroy();
}
this._sourceStream.destroy();
this._currentStream.destroy();
this._currentStream.emit('stopStreamingToAzure');
}

/**
Expand Down Expand Up @@ -97,7 +112,7 @@ class SubStreamInterface {
this._totalLengthCounter += this._lengthCounter;
this._lengthCounter = 0;
this._subPartIndex++;
this._currentStream = new stream.PassThrough();
this._currentStream = new SubStream();
this.resumeStreaming();
return {
nextStream: this._currentStream,
Expand All @@ -111,6 +126,10 @@ class SubStreamInterface {
* @return {undefined}
*/
write(chunk) {
if (this._streamingAborted) {
// don't write
return;
}
const ready = this._currentStream.write(chunk);

if (!ready) {
Expand Down
2 changes: 1 addition & 1 deletion lib/s3middleware/azureHelpers/mpuUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ dataStoreName, log, cb) => {
'putting multiple parts');

resultsCollector.on('error', (err, subPartIndex) => {
streamInterface.stopStreaming(request);
log.error(`Error putting subpart to Azure: ${subPartIndex}`,
{ error: err.message, dataStoreName });
streamInterface.stopStreaming(request);
if (err.code === 'ContainerNotFound') {
return cb(errors.NoSuchBucket);
}
Expand Down
63 changes: 63 additions & 0 deletions tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const assert = require('assert');
const stream = require('stream');
const SubStreamInterface =
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');

describe('s3middleware SubStreamInterface.stopStreaming()', () => {
const eventsEmitted = {
sourceStreamUnpiped: false,
currentStreamStopStreamingToAzure: false,
currentStreamEnded: false,
};
const expectedSequence = {
sourceStreamUnpiped: 0,
currentStreamStopStreamingToAzure: 1,
currentStreamEnded: 2,
};
const data = Buffer.alloc(100);
let dataMarker = 0;
let eventSequence = 0;
const mockRequest = new stream.Readable({
read: () => {
if (dataMarker >= data.length) {
return mockRequest.push(null);
}
mockRequest.push(data.slice(dataMarker, dataMarker + 1));
dataMarker += 1;
return undefined;
},
});
const sourceStream = new stream.PassThrough();
const subStreamInterface = new SubStreamInterface(sourceStream);
sourceStream.on('unpipe', () => {
eventsEmitted.sourceStreamUnpiped = eventSequence++;
});
subStreamInterface._currentStream.on('stopStreamingToAzure', () => {
eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++;
});
subStreamInterface._currentStream.on('finish', () => {
eventsEmitted.currentStreamEnded = eventSequence++;
});
it('should stop streaming data and end current stream', done => {
sourceStream.on('data', chunk => {
const currentLength = subStreamInterface.getLengthCounter();
if (currentLength === 10) {
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], false);
});
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
return subStreamInterface.stopStreaming(mockRequest);
}
return subStreamInterface.write(chunk);
});
mockRequest.pipe(sourceStream);
setTimeout(() => {
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], expectedSequence[key]);
});
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
assert.strictEqual(mockRequest._readableState.pipesCount, 0);
return done();
}, 1000);
});
});

0 comments on commit 2eee4fb

Please sign in to comment.