Skip to content

Commit

Permalink
fix: undefined stream.destroy call
Browse files Browse the repository at this point in the history
To end streaming in case of error, we were calling an unofficial method of the stream API which was removed and does not exist in the version of node we use. The method is re-added officially in node v.8 but until we upgrade we need to destroy the streams manually, by pushing null for readables and calling stream.end() for writables.
  • Loading branch information
electrachong committed Nov 6, 2017
1 parent 1270412 commit a39057f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
15 changes: 12 additions & 3 deletions lib/s3middleware/azureHelpers/SubStreamInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class SubStreamInterface {
this._lengthCounter = 0;
this._subPartIndex = 0;
this._currentStream = new stream.PassThrough();
this._streamingAborted = false;
}

/**
Expand Down Expand Up @@ -51,12 +52,16 @@ class SubStreamInterface {
* @return {undefined}
*/
stopStreaming(piper) {
this._streamingAborted = true;
if (piper) {
piper.unpipe();
piper.destroy();
piper.push(null);
}
this._sourceStream.destroy();
this._currentStream.destroy();
this._sourceStream.pause();
this._sourceStream.push(null);
this._sourceStream.end();
this._currentStream.push(null);
this._currentStream.end();
}

/**
Expand Down Expand Up @@ -111,6 +116,10 @@ class SubStreamInterface {
* @return {undefined}
*/
write(chunk) {
if (this._streamingAborted) {
// don't write
return;
}
const ready = this._currentStream.write(chunk);

if (!ready) {
Expand Down
2 changes: 1 addition & 1 deletion lib/s3middleware/azureHelpers/mpuUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ dataStoreName, log, cb) => {
'putting multiple parts');

resultsCollector.on('error', (err, subPartIndex) => {
streamInterface.stopStreaming(request);
log.error(`Error putting subpart to Azure: ${subPartIndex}`,
{ error: err.message, dataStoreName });
streamInterface.stopStreaming(request);
if (err.code === 'ContainerNotFound') {
return cb(errors.NoSuchBucket);
}
Expand Down
53 changes: 53 additions & 0 deletions tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const assert = require('assert');
const stream = require('stream');
const SubStreamInterface =
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');

describe('s3middleware SubStreamInterface.stopStreaming()', () => {
const emittedFinished = {
sourceStream: false,
currentStream: false,
};
const data = Buffer.alloc(100);
let dataMarker = 0;
const mockRequest = new stream.Readable({
read: () => {
if (dataMarker >= data.length) {
return mockRequest.push(null);
}
mockRequest.push(data.slice(dataMarker, dataMarker + 1));
dataMarker += 1;
return undefined;
},
});
const sourceStream = new stream.PassThrough();
const subStreamInterface = new SubStreamInterface(sourceStream);
sourceStream.on('finish', () => {
emittedFinished.sourceStream = true;
});
subStreamInterface._currentStream.on('finish', () => {
emittedFinished.currentStream = true;
});
it('should stop streaming data and end all streams', done => {
sourceStream.on('data', chunk => {
const currentLength = subStreamInterface.getLengthCounter();
if (currentLength === 10) {
Object.keys(emittedFinished).forEach(key => {
assert.strictEqual(emittedFinished[key], false);
});
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
return subStreamInterface.stopStreaming(mockRequest);
}
return subStreamInterface.write(chunk);
});
mockRequest.pipe(sourceStream);
setTimeout(() => {
Object.keys(emittedFinished).forEach(key => {
assert.strictEqual(emittedFinished[key], true);
});
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
assert.strictEqual(mockRequest._readableState.pipesCount, 0);
return done();
}, 1000);
});
});

0 comments on commit a39057f

Please sign in to comment.