diff --git a/lib/internal/streams/duplex.js b/lib/internal/streams/duplex.js index bc92e9021ab57c..79f91fa0866e25 100644 --- a/lib/internal/streams/duplex.js +++ b/lib/internal/streams/duplex.js @@ -42,8 +42,10 @@ ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); ObjectSetPrototypeOf(Duplex, Readable); { + const keys = ObjectKeys(Writable.prototype); // Allow the keys array to be GC'ed. - for (const method of ObjectKeys(Writable.prototype)) { + for (let i = 0; i < keys.length; i++) { + const method = keys[i]; if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method]; } diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index fe77bf91e94885..c48a058303bb9a 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -13,6 +13,7 @@ const { const { validateAbortSignal, validateInteger, + validateObject, } = require('internal/validators'); const { kWeakHandler } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); @@ -36,8 +37,8 @@ function map(fn, options) { throw new ERR_INVALID_ARG_TYPE( 'fn', ['Function', 'AsyncFunction'], fn); } - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -167,8 +168,8 @@ function map(fn, options) { } function asIndexedPairs(options = undefined) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -252,8 +253,8 @@ async function reduce(reducer, initialValue, options) { throw new ERR_INVALID_ARG_TYPE( 'reducer', ['Function', 'AsyncFunction'], reducer); } - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -296,8 +297,8 @@ async function reduce(reducer, initialValue, options) { } async function toArray(options) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -336,8 +337,8 @@ function toIntegerOrInfinity(number) { } function drop(number, options = undefined) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -360,8 +361,8 @@ function drop(number, options = undefined) { } function take(number, options = undefined) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 7dc2e5346afad9..3fcd373cb1c708 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -787,7 +787,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('onerror', er); unpipe(); dest.removeListener('error', onerror); - if (EE.listenerCount(dest, 'error') === 0) { + if (dest.listenerCount('error') === 0) { const s = dest._writableState || dest._readableState; if (s && !s.errorEmitted) { // User incorrectly emitted 'error' directly on the stream. @@ -852,7 +852,7 @@ function pipeOnDrain(src, dest) { } if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && - EE.listenerCount(src, 'data')) { + src.listenerCount('data')) { src.resume(); } };