Skip to content

Commit

Permalink
stream: refactor streams
Browse files Browse the repository at this point in the history
- Remove unnecessary scope.
- Refactor to use more validators.
- Avoid using deprecated APIs.
  • Loading branch information
VoltrexKeyva committed Feb 6, 2022
1 parent 365e5f5 commit bc719a3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 43 deletions.
10 changes: 4 additions & 6 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ const Writable = require('internal/streams/writable');
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
ObjectSetPrototypeOf(Duplex, Readable);

{
// Allow the keys array to be GC'ed.
for (const method of ObjectKeys(Writable.prototype)) {
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
}
// Allow the keys array to be GC'ed.
for (const method of ObjectKeys(Writable.prototype)) {
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
}

function Duplex(options) {
Expand Down
56 changes: 21 additions & 35 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ const { AbortController } = require('internal/abort_controller');

const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
},
AbortError,
} = require('internal/errors');
const {
validateAbortSignal,
validateFunction,
validateInteger,
validateObject,
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
Expand All @@ -32,12 +33,9 @@ const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
validateFunction(fn, 'fn');
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
Expand Down Expand Up @@ -167,8 +165,8 @@ function map(fn, options) {
}

function asIndexedPairs(options = undefined) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
Expand All @@ -186,8 +184,8 @@ function asIndexedPairs(options = undefined) {
}

async function some(fn, options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
Expand Down Expand Up @@ -216,21 +214,15 @@ async function some(fn, options) {
}

async function every(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
validateFunction(fn, 'fn');
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (...args) => {
return !(await fn(...args));
}, options));
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
validateFunction(fn, 'fn');
async function forEachFn(value, options) {
await fn(value, options);
return kEmpty;
Expand All @@ -240,10 +232,7 @@ async function forEach(fn, options) {
}

function filter(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
validateFunction(fn, 'fn');
async function filterFn(value, options) {
if (await fn(value, options)) {
return value;
Expand All @@ -263,12 +252,9 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
}

async function reduce(reducer, initialValue, options) {
if (typeof reducer !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'reducer', ['Function', 'AsyncFunction'], reducer);
}
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
validateFunction(reducer, 'reducer');
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
Expand Down Expand Up @@ -311,8 +297,8 @@ async function reduce(reducer, initialValue, options) {
}

async function toArray(options) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
Expand Down Expand Up @@ -351,8 +337,8 @@ function toIntegerOrInfinity(number) {
}

function drop(number, options = undefined) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
Expand All @@ -375,8 +361,8 @@ function drop(number, options = undefined) {
}

function take(number, options = undefined) {
if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0) {
if (dest.listenerCount('error') === 0) {
const s = dest._writableState || dest._readableState;
if (s && !s.errorEmitted) {
// User incorrectly emitted 'error' directly on the stream.
Expand Down Expand Up @@ -852,7 +852,7 @@ function pipeOnDrain(src, dest) {
}

if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
EE.listenerCount(src, 'data')) {
src.listenerCount('data')) {
state.flowing = true;
flow(src);
}
Expand Down

0 comments on commit bc719a3

Please sign in to comment.