Skip to content

Commit

Permalink
stream: fix pipeline callback not called on ended stream
Browse files Browse the repository at this point in the history
Fixes: nodejs#46595
PR-URL: nodejs#46600
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Colin Ihrig <[email protected]>
  • Loading branch information
debadree25 committed Feb 27, 2023
1 parent 53345df commit 09f78e0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
12 changes: 10 additions & 2 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const {
isTransformStream,
isWebStream,
isReadableStream,
isReadableEnded,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

Expand Down Expand Up @@ -417,10 +418,17 @@ function pipe(src, dst, finish, { end }) {
// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
src.once('end', () => {

function endFn() {
ended = true;
dst.end();
});
}

if (isReadableEnded(src)) { // End the destination if the source has already ended.
process.nextTick(endFn);
} else {
src.once('end', endFn);
}
} else {
finish();
}
Expand Down
25 changes: 25 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1591,3 +1591,28 @@ const tsp = require('timers/promises');
assert.strictEqual(writable.endCount, 1);
}));
}

{
const readable = new Readable({
read() {}
});
readable.on('end', common.mustCall(() => {
pipeline(readable, new PassThrough(), common.mustSucceed());
}));
readable.push(null);
readable.read();
}

{
const dup = new Duplex({
read() {},
write(chunk, enc, cb) {
cb();
}
});
dup.on('end', common.mustCall(() => {
pipeline(dup, new PassThrough(), common.mustSucceed());
}));
dup.push(null);
dup.read();
}
12 changes: 11 additions & 1 deletion test/parallel/test-webstreams-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const common = require('../common');
const assert = require('assert');
const { Readable, Writable, Transform, pipeline } = require('stream');
const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');
const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
const http = require('http');
Expand Down Expand Up @@ -410,3 +410,13 @@ const http = require('http');
}
c.close();
}

{
const rs = new ReadableStream({
start(controller) {
controller.close();
}
});

pipeline(rs, new PassThrough(), common.mustSucceed());
}

0 comments on commit 09f78e0

Please sign in to comment.