Skip to content

Commit

Permalink
fix: undefined stream.destroy call
Browse files Browse the repository at this point in the history
To end streaming in case of error, we were calling an unofficial method of the stream API which was removed and does not exist in the version of node we use. The method is re-added officially in node v.8 but until we upgrade we need to destroy the streams manually, by pushing null for readables and calling stream.end() for writables.
  • Loading branch information
electrachong committed Nov 6, 2017
1 parent 1270412 commit 71db931
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 6 deletions.
29 changes: 24 additions & 5 deletions lib/s3middleware/azureHelpers/SubStreamInterface.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
const stream = require('stream');

class SubStream extends stream.PassThrough {
constructor(options) {
super(options);

this.on('stopStreamingToAzure', function stopStreamingToAzure() {
this._abortStreaming();
});
}

_abortStreaming() {
this.push(null);
this.end();
}
}

/**
* Interface for streaming subparts.
* @class SubStreamInterface
Expand All @@ -14,7 +29,8 @@ class SubStreamInterface {
this._totalLengthCounter = 0;
this._lengthCounter = 0;
this._subPartIndex = 0;
this._currentStream = new stream.PassThrough();
this._currentStream = new SubStream();
this._streamingAborted = false;
}

/**
Expand Down Expand Up @@ -51,12 +67,11 @@ class SubStreamInterface {
* @return {undefined}
*/
stopStreaming(piper) {
this._streamingAborted = true;
if (piper) {
piper.unpipe();
piper.destroy();
}
this._sourceStream.destroy();
this._currentStream.destroy();
this._currentStream.emit('stopStreamingToAzure');
}

/**
Expand Down Expand Up @@ -97,7 +112,7 @@ class SubStreamInterface {
this._totalLengthCounter += this._lengthCounter;
this._lengthCounter = 0;
this._subPartIndex++;
this._currentStream = new stream.PassThrough();
this._currentStream = new SubStream();
this.resumeStreaming();
return {
nextStream: this._currentStream,
Expand All @@ -111,6 +126,10 @@ class SubStreamInterface {
* @return {undefined}
*/
write(chunk) {
if (this._streamingAborted) {
// don't write
return;
}
const ready = this._currentStream.write(chunk);

if (!ready) {
Expand Down
2 changes: 1 addition & 1 deletion lib/s3middleware/azureHelpers/mpuUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ dataStoreName, log, cb) => {
'putting multiple parts');

resultsCollector.on('error', (err, subPartIndex) => {
streamInterface.stopStreaming(request);
log.error(`Error putting subpart to Azure: ${subPartIndex}`,
{ error: err.message, dataStoreName });
streamInterface.stopStreaming(request);
if (err.code === 'ContainerNotFound') {
return cb(errors.NoSuchBucket);
}
Expand Down
63 changes: 63 additions & 0 deletions tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const assert = require('assert');
const stream = require('stream');
const SubStreamInterface =
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');

describe('s3middleware SubStreamInterface.stopStreaming()', () => {
const eventsEmitted = {
sourceStreamUnpiped: false,
currentStreamStopStreamingToAzure: false,
currentStreamEnded: false,
};
const expectedSequence = {
sourceStreamUnpiped: 0,
currentStreamStopStreamingToAzure: 1,
currentStreamEnded: 2,
};
const data = Buffer.alloc(100);
let dataMarker = 0;
let eventSequence = 0;
const mockRequest = new stream.Readable({
read: () => {
if (dataMarker >= data.length) {
return mockRequest.push(null);
}
mockRequest.push(data.slice(dataMarker, dataMarker + 1));
dataMarker += 1;
return undefined;
},
});
const sourceStream = new stream.PassThrough();
const subStreamInterface = new SubStreamInterface(sourceStream);
sourceStream.on('unpipe', () => {
eventsEmitted.sourceStreamUnpiped = eventSequence++;
});
subStreamInterface._currentStream.on('stopStreamingToAzure', () => {
eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++;
});
subStreamInterface._currentStream.on('finish', () => {
eventsEmitted.currentStreamEnded = eventSequence++;
});
it('should stop streaming data and end current stream', done => {
sourceStream.on('data', chunk => {
const currentLength = subStreamInterface.getLengthCounter();
if (currentLength === 10) {
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], false);
});
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
return subStreamInterface.stopStreaming(mockRequest);
}
return subStreamInterface.write(chunk);
});
mockRequest.pipe(sourceStream);
setTimeout(() => {
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], expectedSequence[key]);
});
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
assert.strictEqual(mockRequest._readableState.pipesCount, 0);
return done();
}, 1000);
});
});

0 comments on commit 71db931

Please sign in to comment.