From 9e7f255e854e93b17ca4cf618862cc88681d603f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 13:29:43 +0200 Subject: [PATCH] stream: pipeline should only destroy un-finished streams This PR logically reverts https://github.com/nodejs/node/pull/31940 which has caused lots of unnecessary breakage in the ecosystem. This PR also aligns better with the actual documented behavior: `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. The behavior introduced in https://github.com/nodejs/node/pull/31940 was much more aggressive in terms of destroying streams. This was good for avoiding potential resources leaks however breaks some common assumputions in legacy streams. Furthermore, it makes the code simpler and removes some hacks. Fixes: https://github.com/nodejs/node/issues/32954 Fixes: https://github.com/nodejs/node/issues/32955 PR-URL: #32968 Reviewed-By: Matteo Collina Reviewed-By: Mathias Buus Backport-PR-URL: https://github.com/nodejs/node/pull/32980 --- lib/internal/streams/pipeline.js | 51 ++------- test/parallel/test-stream-pipeline.js | 142 +++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 44 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index fc8a2b063ebafb..8feb2f63732e37 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -25,52 +25,23 @@ let EE; let PassThrough; let createReadableStreamAsyncIterator; -function isIncoming(stream) { - return ( - stream.socket && - typeof stream.complete === 'boolean' && - ArrayIsArray(stream.rawTrailers) && - ArrayIsArray(stream.rawHeaders) - ); -} - -function isOutgoing(stream) { - return ( - stream.socket && - typeof stream.setHeader === 'function' - ); -} - -function destroyer(stream, reading, writing, final, callback) { +function destroyer(stream, reading, writing, callback) { callback = once(callback); - let destroyed = false; + + let finished = false; + stream.on('close', () => { + finished = true; + }); if (eos === undefined) eos = require('internal/streams/end-of-stream'); eos(stream, { readable: reading, writable: writing }, (err) => { - if (destroyed) return; - destroyed = true; - - if (!err && (isIncoming(stream) || isOutgoing(stream))) { - // http/1 request objects have a coupling to their response and should - // not be prematurely destroyed. Assume they will handle their own - // lifecycle. - return callback(); - } - - if (!err && reading && !writing && stream.writable) { - return callback(); - } - - if (err || !final || !stream.readable) { - destroyImpl.destroyer(stream, err); - } - + finished = !err; callback(err); }); return (err) => { - if (destroyed) return; - destroyed = true; + if (finished) return; + finished = true; destroyImpl.destroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; @@ -192,7 +163,7 @@ function pipeline(...streams) { if (isStream(stream)) { finishCount++; - destroys.push(destroyer(stream, reading, writing, !reading, finish)); + destroys.push(destroyer(stream, reading, writing, finish)); } if (i === 0) { @@ -250,7 +221,7 @@ function pipeline(...streams) { ret = pt; finishCount++; - destroys.push(destroyer(ret, false, true, true, finish)); + destroys.push(destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { if (isReadable(ret)) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index f3923bc794fc2f..f698b86cf698df 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -7,11 +7,13 @@ const { Readable, Transform, pipeline, - PassThrough + PassThrough, + Duplex } = require('stream'); const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); +const net = require('net'); { let finished = false; @@ -917,7 +919,7 @@ const { promisify } = require('util'); const src = new PassThrough({ autoDestroy: false }); const dst = new PassThrough({ autoDestroy: false }); pipeline(src, dst, common.mustCall(() => { - assert.strictEqual(src.destroyed, true); + assert.strictEqual(src.destroyed, false); assert.strictEqual(dst.destroyed, false); })); src.end(); @@ -959,8 +961,8 @@ const { promisify } = require('util'); } { - const src = new PassThrough(); - const dst = new PassThrough(); + const src = new PassThrough({ autoDestroy: true }); + const dst = new PassThrough({ autoDestroy: true }); dst.readable = false; pipeline(src, dst, common.mustCall((err) => { assert(!err); @@ -1061,3 +1063,135 @@ const { promisify } = require('util'); assert.ifError(err); })); } + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Writable({ + write(chunk, encoding, callback) { + callback(); + } + }); + src.on('close', () => { + closed = true; + }); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +} + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Duplex({}); + src.on('close', common.mustCall(() => { + closed = true; + })); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +} + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + // 13 force destroys the socket before it has a chance to emit finish + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +} + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + // `destroy()` won't be invoked by pipeline since + // the writable side has not completed when + // the pipeline has completed. + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +}