Skip to content

Commit

Permalink
fixup: NodeStream
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 9, 2021
1 parent fb49c78 commit 7a19a7a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 34 deletions.
4 changes: 2 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => {
}
};

function isStream(obj) {
function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
Expand Down
12 changes: 6 additions & 6 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ const {
const {
isClosed,
isReadable,
isReadableStream,
isReadableNodeStream,
isReadableFinished,
isWritable,
isWritableStream,
isWritableNodeStream,
isWritableFinished,
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');
Expand All @@ -49,9 +49,9 @@ function eos(stream, options, callback) {
callback = once(callback);

const readable = options.readable ||
(options.readable !== false && isReadableStream(stream));
(options.readable !== false && isReadableNodeStream(stream));
const writable = options.writable ||
(options.writable !== false && isWritableStream(stream));
(options.writable !== false && isWritableNodeStream(stream));

const wState = stream._writableState;
const rState = stream._readableState;
Expand All @@ -65,8 +65,8 @@ function eos(stream, options, callback) {
// this generic check.
let willEmitClose = (
_willEmitClose(stream) &&
isReadableStream(stream) === readable &&
isWritableStream(stream) === writable
isReadableNodeStream(stream) === readable &&
isWritableNodeStream(stream) === writable
);

let writableFinished = isWritableFinished(stream, false);
Expand Down
14 changes: 7 additions & 7 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadableStream,
isStream,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');

let PassThrough;
Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadableStream(val)) {
} else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
Expand All @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadableStream(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -271,8 +271,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadableStream(ret)) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand Down
34 changes: 17 additions & 17 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const {

const kDestroyed = Symbol('kDestroyed');

function isReadableStream(obj) {
function isReadableNodeStream(obj) {
return !!(
obj &&
typeof obj.pipe === 'function' &&
Expand All @@ -18,7 +18,7 @@ function isReadableStream(obj) {
);
}

function isWritableStream(obj) {
function isWritableNodeStream(obj) {
return !!(
obj &&
typeof obj.write === 'function' &&
Expand All @@ -27,8 +27,8 @@ function isWritableStream(obj) {
);
}

function isStream(obj) {
return isReadableStream(obj) || isWritableStream(obj);
function isNodeStream(obj) {
return isReadableNodeStream(obj) || isWritableNodeStream(obj);
}

function isIterable(obj, isAsync) {
Expand All @@ -40,7 +40,7 @@ function isIterable(obj, isAsync) {
}

function isDestroyed(stream) {
if (!isStream(stream)) return null;
if (!isNodeStream(stream)) return null;
const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;
Expand All @@ -49,7 +49,7 @@ function isDestroyed(stream) {

// Have been end():d.
function isWritableEnded(stream) {
if (!isWritableStream(stream)) return null;
if (!isWritableNodeStream(stream)) return null;
if (stream.writableEnded === true) return true;
const wState = stream._writableState;
if (wState?.errored) return false;
Expand All @@ -59,7 +59,7 @@ function isWritableEnded(stream) {

// Have emitted 'finish'.
function isWritableFinished(stream, strict) {
if (!isWritableStream(stream)) return null;
if (!isWritableNodeStream(stream)) return null;
if (stream.writableFinished === true) return true;
const wState = stream._writableState;
if (wState?.errored) return false;
Expand All @@ -72,7 +72,7 @@ function isWritableFinished(stream, strict) {

// Have been push(null):d.
function isReadableEnded(stream) {
if (!isReadableStream(stream)) return null;
if (!isReadableNodeStream(stream)) return null;
if (stream.readableEnded === true) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
Expand All @@ -82,7 +82,7 @@ function isReadableEnded(stream) {

// Have emitted 'end'.
function isReadableFinished(stream, strict) {
if (!isReadableStream(stream)) return null;
if (!isReadableNodeStream(stream)) return null;
const rState = stream._readableState;
if (rState?.errored) return false;
if (typeof rState?.endEmitted !== 'boolean') return null;
Expand All @@ -93,21 +93,21 @@ function isReadableFinished(stream, strict) {
}

function isReadable(stream) {
const r = isReadableStream(stream);
const r = isReadableNodeStream(stream);
if (r === null || typeof stream.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableStream(stream);
const r = isWritableNodeStream(stream);
if (r === null || typeof stream.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

function isFinished(stream, opts) {
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
return null;
}

Expand All @@ -127,7 +127,7 @@ function isFinished(stream, opts) {
}

function isClosed(stream) {
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
return null;
}

Expand Down Expand Up @@ -173,7 +173,7 @@ function isServerRequest(stream) {
}

function willEmitClose(stream) {
if (!isStream(stream)) return null;
if (!isNodeStream(stream)) return null;

const wState = stream._writableState;
const rState = stream._readableState;
Expand All @@ -194,12 +194,12 @@ module.exports = {
isFinished,
isIterable,
isReadable,
isReadableStream,
isReadableNodeStream,
isReadableEnded,
isReadableFinished,
isStream,
isNodeStream,
isWritable,
isWritableStream,
isWritableNodeStream,
isWritableEnded,
isWritableFinished,
isServerRequest,
Expand Down
4 changes: 2 additions & 2 deletions lib/stream/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const {

const {
isIterable,
isStream,
isNodeStream,
} = require('internal/streams/utils');

const pl = require('internal/streams/pipeline');
Expand All @@ -26,7 +26,7 @@ function pipeline(...streams) {
let signal;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isStream(lastArg) && !isIterable(lastArg)) {
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
validateAbortSignal(signal, 'options.signal');
Expand Down

0 comments on commit 7a19a7a

Please sign in to comment.