Skip to content

Commit

Permalink
stream: don't destroy final readable stream in pipeline
Browse files Browse the repository at this point in the history
If the last stream in a pipeline is still usable/readable
don't destroy it to allow further composition.

Fixes: #32105
  • Loading branch information
ronag committed Mar 5, 2020
1 parent b1d4c13 commit 5fefa4f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
13 changes: 10 additions & 3 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ let EE;
let PassThrough;
let createReadableStreamAsyncIterator;

function destroyer(stream, reading, writing, callback) {
function destroyer(stream, reading, writing, final, callback) {
const _destroy = once((err) => {
destroyImpl.destroyer(stream, err);
const readable = stream.readable || isRequest(stream);
if (err || !final || !readable) {
destroyImpl.destroyer(stream, err);
}
callback(err);
});

Expand Down Expand Up @@ -68,6 +71,10 @@ function popCallback(streams) {
return streams.pop();
}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isPromise(obj) {
return !!(obj && typeof obj.then === 'function');
}
Expand Down Expand Up @@ -159,7 +166,7 @@ function pipeline(...streams) {
}

function wrap(stream, reading, writing, final) {
destroys.push(destroyer(stream, reading, writing, (err) => {
destroys.push(destroyer(stream, reading, writing, final, (err) => {
finish(err, final);
}));
}
Expand Down
48 changes: 47 additions & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ const { promisify } = require('util');
const dst = new PassThrough({ autoDestroy: false });
pipeline(src, dst, common.mustCall(() => {
assert.strictEqual(src.destroyed, true);
assert.strictEqual(dst.destroyed, true);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}
Expand All @@ -938,3 +938,49 @@ const { promisify } = require('util');
r.push(null);
r.emit('close');
}

{
const server = http.createServer((req, res) => {
});

server.listen(0, () => {
const req = http.request({
port: server.address().port
});

const body = new PassThrough();
pipeline(
body,
req,
common.mustCall((err) => {
assert(!err);
assert(!req.res);
assert(!req.aborted);
req.abort();
server.close();
})
);
body.end();
});
}

{
const src = new PassThrough();
const dst = new PassThrough();
pipeline(src, dst, common.mustCall((err) => {
assert(!err);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}

{
const src = new PassThrough();
const dst = new PassThrough();
dst.readable = false;
pipeline(src, dst, common.mustCall((err) => {
assert(!err);
assert.strictEqual(dst.destroyed, true);
}));
src.end();
}

0 comments on commit 5fefa4f

Please sign in to comment.