diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index fc8a2b063ebafb..8feb2f63732e37 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -25,52 +25,23 @@ let EE; let PassThrough; let createReadableStreamAsyncIterator; -function isIncoming(stream) { - return ( - stream.socket && - typeof stream.complete === 'boolean' && - ArrayIsArray(stream.rawTrailers) && - ArrayIsArray(stream.rawHeaders) - ); -} - -function isOutgoing(stream) { - return ( - stream.socket && - typeof stream.setHeader === 'function' - ); -} - -function destroyer(stream, reading, writing, final, callback) { +function destroyer(stream, reading, writing, callback) { callback = once(callback); - let destroyed = false; + + let finished = false; + stream.on('close', () => { + finished = true; + }); if (eos === undefined) eos = require('internal/streams/end-of-stream'); eos(stream, { readable: reading, writable: writing }, (err) => { - if (destroyed) return; - destroyed = true; - - if (!err && (isIncoming(stream) || isOutgoing(stream))) { - // http/1 request objects have a coupling to their response and should - // not be prematurely destroyed. Assume they will handle their own - // lifecycle. - return callback(); - } - - if (!err && reading && !writing && stream.writable) { - return callback(); - } - - if (err || !final || !stream.readable) { - destroyImpl.destroyer(stream, err); - } - + finished = !err; callback(err); }); return (err) => { - if (destroyed) return; - destroyed = true; + if (finished) return; + finished = true; destroyImpl.destroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; @@ -192,7 +163,7 @@ function pipeline(...streams) { if (isStream(stream)) { finishCount++; - destroys.push(destroyer(stream, reading, writing, !reading, finish)); + destroys.push(destroyer(stream, reading, writing, finish)); } if (i === 0) { @@ -250,7 +221,7 @@ function pipeline(...streams) { ret = pt; finishCount++; - destroys.push(destroyer(ret, false, true, true, finish)); + destroys.push(destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { if (isReadable(ret)) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index f3923bc794fc2f..f698b86cf698df 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -7,11 +7,13 @@ const { Readable, Transform, pipeline, - PassThrough + PassThrough, + Duplex } = require('stream'); const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); +const net = require('net'); { let finished = false; @@ -917,7 +919,7 @@ const { promisify } = require('util'); const src = new PassThrough({ autoDestroy: false }); const dst = new PassThrough({ autoDestroy: false }); pipeline(src, dst, common.mustCall(() => { - assert.strictEqual(src.destroyed, true); + assert.strictEqual(src.destroyed, false); assert.strictEqual(dst.destroyed, false); })); src.end(); @@ -959,8 +961,8 @@ const { promisify } = require('util'); } { - const src = new PassThrough(); - const dst = new PassThrough(); + const src = new PassThrough({ autoDestroy: true }); + const dst = new PassThrough({ autoDestroy: true }); dst.readable = false; pipeline(src, dst, common.mustCall((err) => { assert(!err); @@ -1061,3 +1063,135 @@ const { promisify } = require('util'); assert.ifError(err); })); } + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Writable({ + write(chunk, encoding, callback) { + callback(); + } + }); + src.on('close', () => { + closed = true; + }); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +} + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Duplex({}); + src.on('close', common.mustCall(() => { + closed = true; + })); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +} + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + // 13 force destroys the socket before it has a chance to emit finish + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +} + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + // `destroy()` won't be invoked by pipeline since + // the writable side has not completed when + // the pipeline has completed. + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +}