diff --git a/lib/internal/streams/duplex.js b/lib/internal/streams/duplex.js index 799eb5a32022da..eb43a329e5a92c 100644 --- a/lib/internal/streams/duplex.js +++ b/lib/internal/streams/duplex.js @@ -35,9 +35,12 @@ const { module.exports = Duplex; +const Stream = require('internal/streams/legacy').Stream; const Readable = require('internal/streams/readable'); const Writable = require('internal/streams/writable'); +const { kOnConstructed } = require('internal/streams/utils'); + ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); ObjectSetPrototypeOf(Duplex, Readable); @@ -55,8 +58,8 @@ function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); - Readable.call(this, options); - Writable.call(this, options); + this._readableState = new Readable.ReadableState(options, this, true); + this._writableState = new Writable.WritableState(options, this, true); if (options) { this.allowHalfOpen = options.allowHalfOpen !== false; @@ -73,9 +76,39 @@ function Duplex(options) { this._writableState.ended = true; this._writableState.finished = true; } + + if (typeof options.read === 'function') + this._read = options.read; + + if (typeof options.write === 'function') + this._write = options.write; + + if (typeof options.writev === 'function') + this._writev = options.writev; + + if (typeof options.destroy === 'function') + this._destroy = options.destroy; + + if (typeof options.final === 'function') + this._final = options.final; + + if (typeof options.construct === 'function') + this._construct = options.construct; + + if (options.signal) + addAbortSignal(options.signal, this); } else { this.allowHalfOpen = true; } + + Stream.call(this, options); + + if (this._construct != null) { + destroyImpl.construct(this, () => { + this._readableState[kOnConstructed](this); + this._writableState[kOnConstructed](this); + }); + } } ObjectDefineProperties(Duplex.prototype, { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a129b1b6f4b75d..0d65371095db49 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -71,6 +71,7 @@ const { AbortError, } = require('internal/errors'); const { validateObject } = require('internal/validators'); +const { kOnConstructed } = require('internal/streams/utils'); const kState = Symbol('kState'); @@ -251,14 +252,6 @@ ObjectDefineProperties(ReadableState.prototype, { function ReadableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream. - // These options can be provided separately as readableXXX and writableXXX. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - // Bit map field to store ReadableState more effciently with 1 bit per field // instead of a V8 slot per field. this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync; @@ -310,16 +303,19 @@ function ReadableState(options, stream, isDuplex) { } } +// TODO (fix): Make this fully private with symbol +ReadableState.prototype[kOnConstructed] = function onConstructed (stream) { + if ((this.state & kNeedReadable) !== 0) { + maybeReadMore(stream, this); + } +} + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); - // Checking for a Stream.Duplex instance is faster here instead of inside - // the ReadableState constructor, at least with V8 6.5. - const isDuplex = this instanceof Stream.Duplex; - - this._readableState = new ReadableState(options, this, isDuplex); + this._readableState = new ReadableState(options, this, false); if (options) { if (typeof options.read === 'function') @@ -331,17 +327,17 @@ function Readable(options) { if (typeof options.construct === 'function') this._construct = options.construct; - if (options.signal && !isDuplex) + if (options.signal) addAbortSignal(options.signal, this); } Stream.call(this, options); - destroyImpl.construct(this, () => { - if (this._readableState.needReadable) { - maybeReadMore(this, this._readableState); - } - }); + if (this._construct != null) { + destroyImpl.construct(this, () => { + this._readableState[kOnConstructed](this); + }); + } } Readable.prototype.destroy = destroyImpl.destroy; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 1b2a6c0fbf6a05..f8e5dd5b9b4802 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -16,6 +16,8 @@ const kIsReadable = SymbolFor('nodejs.stream.readable'); const kIsWritable = SymbolFor('nodejs.stream.writable'); const kIsDisturbed = SymbolFor('nodejs.stream.disturbed'); +const kOnConstructed = Symbol('kOnConstructed') + const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction'); @@ -303,6 +305,7 @@ function isErrored(stream) { } module.exports = { + kOnConstructed, isDestroyed, kIsDestroyed, isDisturbed, diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index f75d2ac5dc7015..b120d1f375d571 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -44,6 +44,7 @@ const EE = require('events'); const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); +const { kOnConstructed } = require('internal/streams/utils'); const { addAbortSignal, @@ -290,14 +291,6 @@ ObjectDefineProperties(WritableState.prototype, { }); function WritableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream, - // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - // Bit map field to store WritableState more effciently with 1 bit per field // instead of a V8 slot per field. this[kState] = kSync | kConstructed | kEmitClose | kAutoDestroy; @@ -372,23 +365,22 @@ ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { }, }); -function Writable(options) { - // Writable ctor is applied to Duplexes, too. - // `realHasInstance` is necessary because using plain `instanceof` - // would return false, as no `_writableState` property is attached. - - // Trying to use the custom `instanceof` for Writable here will also break the - // Node.js LazyTransform implementation, which has a non-trivial getter for - // `_writableState` that would lead to infinite recursion. +// TODO (fix): Make this fully private with symbol +WritableState.prototype[kOnConstructed] = function onConstructed(stream) { + if ((this[kState] & kWriting) === 0) { + clearBuffer(stream, this); + } - // Checking for a Stream.Duplex instance is faster here instead of inside - // the WritableState constructor, at least with V8 6.5. - const isDuplex = (this instanceof Stream.Duplex); + if ((this[kState] & kEnding) !== 0) { + finishMaybe(stream, this); + } +} - if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) +function Writable(options) { + if (!(this instanceof Writable)) return new Writable(options); - this._writableState = new WritableState(options, this, isDuplex); + this._writableState = new WritableState(options, this, false); if (options) { if (typeof options.write === 'function') @@ -412,17 +404,11 @@ function Writable(options) { Stream.call(this, options); - destroyImpl.construct(this, () => { - const state = this._writableState; - - if ((state[kState] & kWriting) === 0) { - clearBuffer(this, state); - } - - if ((state[kState] & kEnding) !== 0) { - finishMaybe(this, state); - } - }); + if (this._construct != null) { + destroyImpl.construct(this, () => { + this._writableState[kOnConstructed](this); + }); + } } ObjectDefineProperty(Writable, SymbolHasInstance, {