From 4b27087b30d87d590152cc177c7c4a8886e78f48 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 4 Oct 2023 12:13:06 +0200 Subject: [PATCH] stream: optimize Writable PR-URL: https://github.com/nodejs/node/pull/50012 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Yagiz Nizipli --- lib/internal/streams/writable.js | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 5800c9df171ff2..8b44731b462a95 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -108,6 +108,7 @@ const kWriteCb = 1 << 26; const kExpectWriteCb = 1 << 27; const kAfterWriteTickInfo = 1 << 28; const kAfterWritePending = 1 << 29; +const kHasBuffer = 1 << 30; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -340,6 +341,7 @@ function resetBuffer(state) { state.buffered = []; state.bufferedIndex = 0; state.state |= kAllBuffers | kAllNoop; + state.state &= ~kHasBuffer; } WritableState.prototype.getBuffer = function getBuffer() { @@ -396,11 +398,13 @@ function Writable(options) { destroyImpl.construct(this, () => { const state = this._writableState; - if (!state.writing) { + if ((state.state & kWriting) === 0) { clearBuffer(this, state); } - finishMaybe(this, state); + if ((state.state & kEnding) !== 0) { + finishMaybe(this, state); + } }); } @@ -523,6 +527,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) { state.buffered.push({ chunk, encoding, callback }); + state.state |= kHasBuffer; if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') { state.state &= ~kAllBuffers; } @@ -591,8 +596,9 @@ function onwrite(stream, er) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 er.stack; // eslint-disable-line no-unused-expressions - if (!state.errored) { - state.errored = er; + if ((state.state & kErrored) === 0) { + state[kErroredValue] = er; + state.state |= kErrored; } // In case of duplex streams we need to notify the readable side of the @@ -607,12 +613,12 @@ function onwrite(stream, er) { onwriteError(stream, state, er, cb); } } else { - if (state.buffered.length > state.bufferedIndex) { + if ((state.state & kHasBuffer) !== 0) { clearBuffer(stream, state); } if (sync) { - const needDrain = state.length === 0 && (state.state & kNeedDrain) !== 0; + const needDrain = (state.state & kNeedDrain) !== 0 && state.length === 0; const needTick = needDrain || (state.state & kDestroyed !== 0) || cb !== nop; // It is a common case that the callback passed to .write() is always @@ -625,7 +631,9 @@ function onwrite(stream, er) { state.state |= kAfterWritePending; } else { state.pendingcb--; - finishMaybe(stream, state, true); + if ((state.state & kEnding) !== 0) { + finishMaybe(stream, state, true); + } } } else if ((state.state & kAfterWriteTickInfo) !== 0 && state[kAfterWriteTickInfoValue].cb === cb) { @@ -636,7 +644,9 @@ function onwrite(stream, er) { state.state |= (kAfterWritePending | kAfterWriteTickInfo); } else { state.pendingcb--; - finishMaybe(stream, state, true); + if ((state.state & kEnding) !== 0) { + finishMaybe(stream, state, true); + } } } else { afterWrite(stream, state, 1, cb); @@ -668,7 +678,9 @@ function afterWrite(stream, state, count, cb) { errorBuffer(state); } - finishMaybe(stream, state); + if ((state.state & kEnding) !== 0) { + finishMaybe(stream, state, true); + } } // If there's something in the buffer waiting, then invoke callbacks. @@ -692,7 +704,7 @@ function errorBuffer(state) { // If there's something in the buffer waiting, then process it. function clearBuffer(stream, state) { - if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 || + if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer || (state.state & kConstructed) === 0) { return; }