From bc719a355c198e568eeb1675762d99dda7f3e62c Mon Sep 17 00:00:00 2001 From: Mohammed Keyvanzadeh Date: Sun, 6 Feb 2022 08:06:03 +0330 Subject: [PATCH] stream: refactor streams - Remove unnecessary scope. - Refactor to use more validators. - Avoid using deprecated APIs. --- lib/internal/streams/duplex.js | 10 +++--- lib/internal/streams/operators.js | 56 ++++++++++++------------------- lib/internal/streams/readable.js | 4 +-- 3 files changed, 27 insertions(+), 43 deletions(-) diff --git a/lib/internal/streams/duplex.js b/lib/internal/streams/duplex.js index bc92e9021ab57c..295d5c7cf156e8 100644 --- a/lib/internal/streams/duplex.js +++ b/lib/internal/streams/duplex.js @@ -41,12 +41,10 @@ const Writable = require('internal/streams/writable'); ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); ObjectSetPrototypeOf(Duplex, Readable); -{ - // Allow the keys array to be GC'ed. - for (const method of ObjectKeys(Writable.prototype)) { - if (!Duplex.prototype[method]) - Duplex.prototype[method] = Writable.prototype[method]; - } +// Allow the keys array to be GC'ed. +for (const method of ObjectKeys(Writable.prototype)) { + if (!Duplex.prototype[method]) + Duplex.prototype[method] = Writable.prototype[method]; } function Duplex(options) { diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index a6efa92afb7025..6ac2d131147e24 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -4,7 +4,6 @@ const { AbortController } = require('internal/abort_controller'); const { codes: { - ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE, }, @@ -12,7 +11,9 @@ const { } = require('internal/errors'); const { validateAbortSignal, + validateFunction, validateInteger, + validateObject, } = require('internal/validators'); const { kWeakHandler } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); @@ -32,12 +33,9 @@ const kEmpty = Symbol('kEmpty'); const kEof = Symbol('kEof'); function map(fn, options) { - if (typeof fn !== 'function') { - throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], fn); - } - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + validateFunction(fn, 'fn'); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -167,8 +165,8 @@ function map(fn, options) { } function asIndexedPairs(options = undefined) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -186,8 +184,8 @@ function asIndexedPairs(options = undefined) { } async function some(fn, options) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -216,10 +214,7 @@ async function some(fn, options) { } async function every(fn, options) { - if (typeof fn !== 'function') { - throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], fn); - } + validateFunction(fn, 'fn'); // https://en.wikipedia.org/wiki/De_Morgan%27s_laws return !(await some.call(this, async (...args) => { return !(await fn(...args)); @@ -227,10 +222,7 @@ async function every(fn, options) { } async function forEach(fn, options) { - if (typeof fn !== 'function') { - throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], fn); - } + validateFunction(fn, 'fn'); async function forEachFn(value, options) { await fn(value, options); return kEmpty; @@ -240,10 +232,7 @@ async function forEach(fn, options) { } function filter(fn, options) { - if (typeof fn !== 'function') { - throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], fn); - } + validateFunction(fn, 'fn'); async function filterFn(value, options) { if (await fn(value, options)) { return value; @@ -263,12 +252,9 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { } async function reduce(reducer, initialValue, options) { - if (typeof reducer !== 'function') { - throw new ERR_INVALID_ARG_TYPE( - 'reducer', ['Function', 'AsyncFunction'], reducer); - } - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + validateFunction(reducer, 'reducer'); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -311,8 +297,8 @@ async function reduce(reducer, initialValue, options) { } async function toArray(options) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -351,8 +337,8 @@ function toIntegerOrInfinity(number) { } function drop(number, options = undefined) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); @@ -375,8 +361,8 @@ function drop(number, options = undefined) { } function take(number, options = undefined) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); + if (options != null) { + validateObject(options, 'options'); } if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index d0125386c8ae8e..eaa3121ef6f278 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -787,7 +787,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('onerror', er); unpipe(); dest.removeListener('error', onerror); - if (EE.listenerCount(dest, 'error') === 0) { + if (dest.listenerCount('error') === 0) { const s = dest._writableState || dest._readableState; if (s && !s.errorEmitted) { // User incorrectly emitted 'error' directly on the stream. @@ -852,7 +852,7 @@ function pipeOnDrain(src, dest) { } if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && - EE.listenerCount(src, 'data')) { + src.listenerCount('data')) { state.flowing = true; flow(src); }