From 03c4ff760d2f528e6f87734c1470355969b848ca Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 28 Oct 2023 15:55:34 +0200 Subject: [PATCH] stream: use bit fields for construct/destroy PR-URL: https://github.com/nodejs/node/pull/50408 Reviewed-By: Matteo Collina Reviewed-By: Raz Luvaton Reviewed-By: Benjamin Gruenbaum Reviewed-By: Yagiz Nizipli --- lib/internal/streams/destroy.js | 66 ++++++++++++++++++++++---------- lib/internal/streams/readable.js | 47 ++++++++++++----------- lib/internal/streams/utils.js | 22 +++++++++++ lib/internal/streams/writable.js | 49 +++++++++++++----------- 4 files changed, 118 insertions(+), 66 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index cfb49f2c7c7273..28802cae5eff32 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -15,6 +15,15 @@ const { isDestroyed, isFinished, isServerRequest, + kState, + kErrorEmitted, + kEmitClose, + kClosed, + kCloseEmitted, + kConstructed, + kDestroyed, + kAutoDestroy, + kErrored, } = require('internal/streams/utils'); const kDestroy = Symbol('kDestroy'); @@ -42,7 +51,10 @@ function destroy(err, cb) { // With duplex streams we use the writable side for state. const s = w || r; - if (w?.destroyed || r?.destroyed) { + if ( + (w && (w[kState] & kDestroyed) !== 0) || + (r && (r[kState] & kDestroyed) !== 0) + ) { if (typeof cb === 'function') { cb(); } @@ -56,14 +68,14 @@ function destroy(err, cb) { checkError(err, w, r); if (w) { - w.destroyed = true; + w[kState] |= kDestroyed; } if (r) { - r.destroyed = true; + r[kState] |= kDestroyed; } // If still constructing then defer calling _destroy. - if (!s.constructed) { + if ((s[kState] & kConstructed) === 0) { this.once(kDestroy, function(er) { _destroy(this, aggregateTwoErrors(er, err), cb); }); @@ -89,10 +101,10 @@ function _destroy(self, err, cb) { checkError(err, w, r); if (w) { - w.closed = true; + w[kState] |= kClosed; } if (r) { - r.closed = true; + r[kState] |= kClosed; } if (typeof cb === 'function') { @@ -122,13 +134,16 @@ function emitCloseNT(self) { const w = self._writableState; if (w) { - w.closeEmitted = true; + w[kState] |= kCloseEmitted; } if (r) { - r.closeEmitted = true; + r[kState] |= kCloseEmitted; } - if (w?.emitClose || r?.emitClose) { + if ( + (w && (w[kState] & kEmitClose) !== 0) || + (r && (r[kState] & kEmitClose) !== 0) + ) { self.emit('close'); } } @@ -137,15 +152,18 @@ function emitErrorNT(self, err) { const r = self._readableState; const w = self._writableState; - if (w?.errorEmitted || r?.errorEmitted) { + if ( + (w && (w[kState] & kErrorEmitted) !== 0) || + (r && (r[kState] & kErrorEmitted) !== 0) + ) { return; } if (w) { - w.errorEmitted = true; + w[kState] |= kErrorEmitted; } if (r) { - r.errorEmitted = true; + r[kState] |= kErrorEmitted; } self.emit('error', err); @@ -192,20 +210,26 @@ function errorOrDestroy(stream, err, sync) { const r = stream._readableState; const w = stream._writableState; - if (w?.destroyed || r?.destroyed) { + if ( + (w && (w[kState] ? (w[kState] & kDestroyed) !== 0 : w.destroyed)) || + (r && (r[kState] ? (r[kState] & kDestroyed) !== 0 : r.destroyed)) + ) { return this; } - if (r?.autoDestroy || w?.autoDestroy) + if ( + (r && (r[kState] & kAutoDestroy) !== 0) || + (w && (w[kState] & kAutoDestroy) !== 0) + ) { stream.destroy(err); - else if (err) { + } else if (err) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 err.stack; // eslint-disable-line no-unused-expressions - if (w && !w.errored) { + if (w && (w[kState] & kErrored) === 0) { w.errored = err; } - if (r && !r.errored) { + if (r && (r[kState] & kErrored) === 0) { r.errored = err; } if (sync) { @@ -225,10 +249,10 @@ function construct(stream, cb) { const w = stream._writableState; if (r) { - r.constructed = false; + r[kState] &= ~kConstructed; } if (w) { - w.constructed = false; + w[kState] &= ~kConstructed; } stream.once(kConstruct, cb); @@ -256,10 +280,10 @@ function constructNT(stream) { const s = w || r; if (r) { - r.constructed = true; + r[kState] |= kConstructed; } if (w) { - w.constructed = true; + w[kState] |= kConstructed; } if (s.destroyed) { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 80798a35dcb34f..92dd7d68301400 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -58,6 +58,20 @@ const { getHighWaterMark, getDefaultHighWaterMark, } = require('internal/streams/state'); +const { + kState, + // bitfields + kObjectMode, + kErrorEmitted, + kAutoDestroy, + kEmitClose, + kDestroyed, + kClosed, + kCloseEmitted, + kErrored, + kConstructed, + kOnConstructed, +} = require('internal/streams/utils'); const { aggregateTwoErrors, @@ -72,9 +86,7 @@ const { AbortError, } = require('internal/errors'); const { validateObject } = require('internal/validators'); -const { kOnConstructed } = require('internal/streams/utils'); -const kState = Symbol('kState'); const FastBuffer = Buffer[SymbolSpecies]; const { StringDecoder } = require('string_decoder'); @@ -91,26 +103,17 @@ const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); const kDecoderValue = Symbol('kDecoderValue'); const kEncodingValue = Symbol('kEncodingValue'); -const kObjectMode = 1 << 0; -const kEnded = 1 << 1; -const kEndEmitted = 1 << 2; -const kReading = 1 << 3; -const kConstructed = 1 << 4; -const kSync = 1 << 5; -const kNeedReadable = 1 << 6; -const kEmittedReadable = 1 << 7; -const kReadableListening = 1 << 8; -const kResumeScheduled = 1 << 9; -const kErrorEmitted = 1 << 10; -const kEmitClose = 1 << 11; -const kAutoDestroy = 1 << 12; -const kDestroyed = 1 << 13; -const kClosed = 1 << 14; -const kCloseEmitted = 1 << 15; -const kMultiAwaitDrain = 1 << 16; -const kReadingMore = 1 << 17; -const kDataEmitted = 1 << 18; -const kErrored = 1 << 19; +const kEnded = 1 << 9; +const kEndEmitted = 1 << 10; +const kReading = 1 << 11; +const kSync = 1 << 12; +const kNeedReadable = 1 << 13; +const kEmittedReadable = 1 << 14; +const kReadableListening = 1 << 15; +const kResumeScheduled = 1 << 16; +const kMultiAwaitDrain = 1 << 17; +const kReadingMore = 1 << 18; +const kDataEmitted = 1 << 19; const kDefaultUTF8Encoding = 1 << 20; const kDecoder = 1 << 21; const kEncoding = 1 << 22; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 77efa32c71c8fe..2c6b841a89e6e5 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -22,6 +22,17 @@ const kOnConstructed = Symbol('kOnConstructed'); const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction'); +const kState = Symbol('kState'); +const kObjectMode = 1 << 0; +const kErrorEmitted = 1 << 1; +const kAutoDestroy = 1 << 2; +const kEmitClose = 1 << 3; +const kDestroyed = 1 << 4; +const kClosed = 1 << 5; +const kCloseEmitted = 1 << 6; +const kErrored = 1 << 7; +const kConstructed = 1 << 8; + function isReadableNodeStream(obj, strict = false) { return !!( obj && @@ -339,4 +350,15 @@ module.exports = { isServerResponse, willEmitClose, isTransformStream, + kState, + // bitfields + kObjectMode, + kErrorEmitted, + kAutoDestroy, + kEmitClose, + kDestroyed, + kClosed, + kCloseEmitted, + kErrored, + kConstructed, }; diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 74573033eaa44f..4facf8c5cd80b8 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -44,7 +44,6 @@ const EE = require('events'); const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); -const { kOnConstructed } = require('internal/streams/utils'); const { addAbortSignal, @@ -65,6 +64,20 @@ const { ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING, } = require('internal/errors').codes; +const { + kState, + // bitfields + kObjectMode, + kErrorEmitted, + kAutoDestroy, + kEmitClose, + kDestroyed, + kClosed, + kCloseEmitted, + kErrored, + kConstructed, + kOnConstructed, +} = require('internal/streams/utils'); const { errorOrDestroy } = destroyImpl; @@ -79,18 +92,8 @@ const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); const kWriteCbValue = Symbol('kWriteCbValue'); const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue'); const kBufferedValue = Symbol('kBufferedValue'); -const kState = Symbol('kState'); - -const kObjectMode = 1 << 0; -const kEnded = 1 << 1; -const kConstructed = 1 << 2; -const kSync = 1 << 3; -const kErrorEmitted = 1 << 4; -const kEmitClose = 1 << 5; -const kAutoDestroy = 1 << 6; -const kDestroyed = 1 << 7; -const kClosed = 1 << 8; -const kCloseEmitted = 1 << 9; + +const kSync = 1 << 9; const kFinalCalled = 1 << 10; const kNeedDrain = 1 << 11; const kEnding = 1 << 12; @@ -102,16 +105,16 @@ const kPrefinished = 1 << 17; const kAllBuffers = 1 << 18; const kAllNoop = 1 << 19; const kOnFinished = 1 << 20; -const kErrored = 1 << 21; -const kHasWritable = 1 << 22; -const kWritable = 1 << 23; -const kCorked = 1 << 24; -const kDefaultUTF8Encoding = 1 << 25; -const kWriteCb = 1 << 26; -const kExpectWriteCb = 1 << 27; -const kAfterWriteTickInfo = 1 << 28; -const kAfterWritePending = 1 << 29; -const kBuffered = 1 << 30; +const kHasWritable = 1 << 21; +const kWritable = 1 << 22; +const kCorked = 1 << 23; +const kDefaultUTF8Encoding = 1 << 24; +const kWriteCb = 1 << 25; +const kExpectWriteCb = 1 << 26; +const kAfterWriteTickInfo = 1 << 27; +const kAfterWritePending = 1 << 28; +const kBuffered = 1 << 29; +const kEnded = 1 << 30; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) {