From e49f28a555c9135ea54ab42a8c49418c66e23da6 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 16 Sep 2020 19:05:06 +0200 Subject: [PATCH] stream: move to internal/streams Move all the streams constructors to internal/streams and avoid a circular dependencies between the modules. See: https://github.com/nodejs/readable-stream/issues/348 PR-URL: https://github.com/nodejs/node/pull/35239 Reviewed-By: Robert Nagy Reviewed-By: Luigi Pinca Reviewed-By: Daijiro Wachi --- lib/_stream_duplex.js | 106 +- lib/_stream_passthrough.js | 45 +- lib/_stream_readable.js | 1322 +--------------------- lib/_stream_transform.js | 244 +---- lib/_stream_writable.js | 849 +-------------- lib/internal/streams/duplex.js | 108 ++ lib/internal/streams/passthrough.js | 47 + lib/internal/streams/pipeline.js | 2 +- lib/internal/streams/readable.js | 1324 +++++++++++++++++++++++ lib/internal/streams/transform.js | 246 +++++ lib/internal/streams/writable.js | 851 +++++++++++++++ lib/stream.js | 14 +- node.gyp | 5 + test/message/stdin_messages.out | 8 +- test/parallel/test-bootstrap-modules.js | 10 +- 15 files changed, 2606 insertions(+), 2575 deletions(-) create mode 100644 lib/internal/streams/duplex.js create mode 100644 lib/internal/streams/passthrough.js create mode 100644 lib/internal/streams/readable.js create mode 100644 lib/internal/streams/transform.js create mode 100644 lib/internal/streams/writable.js diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index b7a55524dc4f9a..9e46d02ddcb3d3 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -1,108 +1,6 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -// a duplex stream is just a stream that is both readable and writable. -// Since JS doesn't have multiple prototype inheritance, this class -// prototypically inherits from Readable, and then parasitically from -// Writable. - 'use strict'; -const { - ObjectDefineProperties, - ObjectGetOwnPropertyDescriptor, - ObjectKeys, - ObjectSetPrototypeOf, -} = primordials; +// TODO(mcollina): deprecate this file +const Duplex = require('internal/streams/duplex'); module.exports = Duplex; - -const Readable = require('_stream_readable'); -const Writable = require('_stream_writable'); - -ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); -ObjectSetPrototypeOf(Duplex, Readable); - -{ - // Allow the keys array to be GC'ed. - for (const method of ObjectKeys(Writable.prototype)) { - if (!Duplex.prototype[method]) - Duplex.prototype[method] = Writable.prototype[method]; - } -} - -function Duplex(options) { - if (!(this instanceof Duplex)) - return new Duplex(options); - - Readable.call(this, options); - Writable.call(this, options); - this.allowHalfOpen = true; - - if (options) { - if (options.readable === false) - this.readable = false; - - if (options.writable === false) - this.writable = false; - - if (options.allowHalfOpen === false) { - this.allowHalfOpen = false; - } - } -} - -ObjectDefineProperties(Duplex.prototype, { - writable: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable'), - writableHighWaterMark: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark'), - writableObjectMode: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode'), - writableBuffer: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer'), - writableLength: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength'), - writableFinished: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished'), - writableCorked: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'), - writableEnded: - ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'), - - destroyed: { - get() { - if (this._readableState === undefined || - this._writableState === undefined) { - return false; - } - return this._readableState.destroyed && this._writableState.destroyed; - }, - set(value) { - // Backward compatibility, the user is explicitly - // managing destroyed. - if (this._readableState && this._writableState) { - this._readableState.destroyed = value; - this._writableState.destroyed = value; - } - } - } -}); diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js index 279df5ac2d68ca..dbf8646e009931 100644 --- a/lib/_stream_passthrough.js +++ b/lib/_stream_passthrough.js @@ -1,47 +1,6 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -// a passthrough stream. -// basically just the most minimal sort of Transform stream. -// Every written chunk gets output as-is. - 'use strict'; -const { - ObjectSetPrototypeOf, -} = primordials; +// TODO(mcollina): deprecate this file +const PassThrough = require('internal/streams/passthrough'); module.exports = PassThrough; - -const Transform = require('_stream_transform'); -ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype); -ObjectSetPrototypeOf(PassThrough, Transform); - -function PassThrough(options) { - if (!(this instanceof PassThrough)) - return new PassThrough(options); - - Transform.call(this, options); -} - -PassThrough.prototype._transform = function(chunk, encoding, cb) { - cb(null, chunk); -}; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 53549668717e09..c31dde30645726 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -1,1324 +1,6 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - 'use strict'; -const { - ArrayIsArray, - NumberIsInteger, - NumberIsNaN, - ObjectDefineProperties, - ObjectKeys, - ObjectSetPrototypeOf, - Promise, - Set, - SymbolAsyncIterator, - Symbol -} = primordials; +// TODO(mcollina): deprecate this file +const Readable = require('internal/streams/readable'); module.exports = Readable; -Readable.ReadableState = ReadableState; - -const EE = require('events'); -const Stream = require('stream'); -const { Buffer } = require('buffer'); - -let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { - debug = fn; -}); -const BufferList = require('internal/streams/buffer_list'); -const destroyImpl = require('internal/streams/destroy'); -const { - getHighWaterMark, - getDefaultHighWaterMark -} = require('internal/streams/state'); -const { - ERR_INVALID_ARG_TYPE, - ERR_STREAM_PUSH_AFTER_EOF, - ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT -} = require('internal/errors').codes; - -const kPaused = Symbol('kPaused'); - -// Lazy loaded to improve the startup performance. -let StringDecoder; -let from; - -ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); -ObjectSetPrototypeOf(Readable, Stream); -function nop() {} - -const { errorOrDestroy } = destroyImpl; - -function prependListener(emitter, event, fn) { - // Sadly this is not cacheable as some libraries bundle their own - // event emitter implementation with them. - if (typeof emitter.prependListener === 'function') - return emitter.prependListener(event, fn); - - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - // the prependListener() method. The goal is to eventually remove this hack. - if (!emitter._events || !emitter._events[event]) - emitter.on(event, fn); - else if (ArrayIsArray(emitter._events[event])) - emitter._events[event].unshift(fn); - else - emitter._events[event] = [fn, emitter._events[event]]; -} - -function ReadableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream. - // These options can be provided separately as readableXXX and writableXXX. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - - // Object stream flag. Used to make read(n) ignore n and to - // make all the buffer merging and length checks go away. - this.objectMode = !!(options && options.objectMode); - - if (isDuplex) - this.objectMode = this.objectMode || - !!(options && options.readableObjectMode); - - // The point at which it stops calling _read() to fill the buffer - // Note: 0 is a valid value, means "don't call _read preemptively ever" - this.highWaterMark = options ? - getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) : - getDefaultHighWaterMark(false); - - // A linked list is used to store data chunks instead of an array because the - // linked list can remove elements from the beginning faster than - // array.shift(). - this.buffer = new BufferList(); - this.length = 0; - this.pipes = []; - this.flowing = null; - this.ended = false; - this.endEmitted = false; - this.reading = false; - - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true; - - // A flag to be able to tell if the event 'readable'/'data' is emitted - // immediately, or on a later tick. We set this to true at first, because - // any actions that shouldn't happen until "later" should generally also - // not happen before the first read call. - this.sync = true; - - // Whenever we return null, then we set a flag to say - // that we're awaiting a 'readable' event emission. - this.needReadable = false; - this.emittedReadable = false; - this.readableListening = false; - this.resumeScheduled = false; - this[kPaused] = null; - - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false; - - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false; - - // Should .destroy() be called after 'end' (and potentially 'finish'). - this.autoDestroy = !options || options.autoDestroy !== false; - - // Has it been destroyed. - this.destroyed = false; - - // Indicates whether the stream has errored. When true no further - // _read calls, 'data' or 'readable' events should occur. This is needed - // since when autoDestroy is disabled we need a way to tell whether the - // stream has failed. - this.errored = null; - - // Indicates whether the stream has finished destroying. - this.closed = false; - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false; - - // Crypto is kind of old and crusty. Historically, its default string - // encoding is 'binary' so we have to make this configurable. - // Everything else in the universe uses 'utf8', though. - this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; - - // Ref the piped dest which we need a drain event on it - // type: null | Writable | Set. - this.awaitDrainWriters = null; - this.multiAwaitDrain = false; - - // If true, a maybeReadMore has been scheduled. - this.readingMore = false; - - this.decoder = null; - this.encoding = null; - if (options && options.encoding) { - if (!StringDecoder) - StringDecoder = require('string_decoder').StringDecoder; - this.decoder = new StringDecoder(options.encoding); - this.encoding = options.encoding; - } -} - - -function Readable(options) { - if (!(this instanceof Readable)) - return new Readable(options); - - // Checking for a Stream.Duplex instance is faster here instead of inside - // the ReadableState constructor, at least with V8 6.5. - const isDuplex = this instanceof Stream.Duplex; - - this._readableState = new ReadableState(options, this, isDuplex); - - if (options) { - if (typeof options.read === 'function') - this._read = options.read; - - if (typeof options.destroy === 'function') - this._destroy = options.destroy; - - if (typeof options.construct === 'function') - this._construct = options.construct; - } - - Stream.call(this, options); - - destroyImpl.construct(this, () => { - maybeReadMore(this, this._readableState); - }); -} - -Readable.prototype.destroy = destroyImpl.destroy; -Readable.prototype._undestroy = destroyImpl.undestroy; -Readable.prototype._destroy = function(err, cb) { - cb(err); -}; - -Readable.prototype[EE.captureRejectionSymbol] = function(err) { - this.destroy(err); -}; - -// Manually shove something into the read() buffer. -// This returns true if the highWaterMark has not been hit yet, -// similar to how Writable.write() returns true if you should -// write() some more. -Readable.prototype.push = function(chunk, encoding) { - return readableAddChunk(this, chunk, encoding, false); -}; - -// Unshift should *always* be something directly out of read(). -Readable.prototype.unshift = function(chunk, encoding) { - return readableAddChunk(this, chunk, encoding, true); -}; - -function readableAddChunk(stream, chunk, encoding, addToFront) { - debug('readableAddChunk', chunk); - const state = stream._readableState; - - let err; - if (!state.objectMode) { - if (typeof chunk === 'string') { - encoding = encoding || state.defaultEncoding; - if (state.encoding !== encoding) { - if (addToFront && state.encoding) { - // When unshifting, if state.encoding is set, we have to save - // the string in the BufferList with the state encoding. - chunk = Buffer.from(chunk, encoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, encoding); - encoding = ''; - } - } - } else if (chunk instanceof Buffer) { - encoding = ''; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - encoding = ''; - } else if (chunk != null) { - err = new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); - } - } - - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { - state.reading = false; - onEofChunk(stream, state); - } else if (state.objectMode || (chunk && chunk.length > 0)) { - if (addToFront) { - if (state.endEmitted) - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - else - addChunk(stream, state, chunk, true); - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state.reading = false; - if (state.decoder && !encoding) { - chunk = state.decoder.write(chunk); - if (state.objectMode || chunk.length !== 0) - addChunk(stream, state, chunk, false); - else - maybeReadMore(stream, state); - } else { - addChunk(stream, state, chunk, false); - } - } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); - } - - // We can push more data if we are below the highWaterMark. - // Also, if we have no data yet, we can stand some more bytes. - // This is to work around cases where hwm=0, such as the repl. - return !state.ended && - (state.length < state.highWaterMark || state.length === 0); -} - -function addChunk(stream, state, chunk, addToFront) { - if (state.flowing && state.length === 0 && !state.sync && - stream.listenerCount('data') > 0) { - // Use the guard to avoid creating `Set()` repeatedly - // when we have multiple pipes. - if (state.multiAwaitDrain) { - state.awaitDrainWriters.clear(); - } else { - state.awaitDrainWriters = null; - } - stream.emit('data', chunk); - } else { - // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) - state.buffer.unshift(chunk); - else - state.buffer.push(chunk); - - if (state.needReadable) - emitReadable(stream); - } - maybeReadMore(stream, state); -} - -Readable.prototype.isPaused = function() { - const state = this._readableState; - return state[kPaused] === true || state.flowing === false; -}; - -// Backwards compatibility. -Readable.prototype.setEncoding = function(enc) { - if (!StringDecoder) - StringDecoder = require('string_decoder').StringDecoder; - const decoder = new StringDecoder(enc); - this._readableState.decoder = decoder; - // If setEncoding(null), decoder.encoding equals utf8. - this._readableState.encoding = this._readableState.decoder.encoding; - - const buffer = this._readableState.buffer; - // Iterate over current buffer to convert already stored Buffers: - let content = ''; - for (const data of buffer) { - content += decoder.write(data); - } - buffer.clear(); - if (content !== '') - buffer.push(content); - this._readableState.length = content.length; - return this; -}; - -// Don't raise the hwm > 1GB. -const MAX_HWM = 0x40000000; -function computeNewHighWaterMark(n) { - if (n >= MAX_HWM) { - // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE. - n = MAX_HWM; - } else { - // Get the next highest power of 2 to prevent increasing hwm excessively in - // tiny amounts. - n--; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - n++; - } - return n; -} - -// This function is designed to be inlinable, so please take care when making -// changes to the function body. -function howMuchToRead(n, state) { - if (n <= 0 || (state.length === 0 && state.ended)) - return 0; - if (state.objectMode) - return 1; - if (NumberIsNaN(n)) { - // Only flow one buffer at a time. - if (state.flowing && state.length) - return state.buffer.first().length; - return state.length; - } - if (n <= state.length) - return n; - return state.ended ? state.length : 0; -} - -// You can override either this method, or the async _read(n) below. -Readable.prototype.read = function(n) { - debug('read', n); - // Same as parseInt(undefined, 10), however V8 7.3 performance regressed - // in this scenario, so we are doing it manually. - if (n === undefined) { - n = NaN; - } else if (!NumberIsInteger(n)) { - n = parseInt(n, 10); - } - const state = this._readableState; - const nOrig = n; - - // If we're asking for more than the current hwm, then raise the hwm. - if (n > state.highWaterMark) - state.highWaterMark = computeNewHighWaterMark(n); - - if (n !== 0) - state.emittedReadable = false; - - // If we're doing read(0) to trigger a readable event, but we - // already have a bunch of data in the buffer, then just trigger - // the 'readable' event and move on. - if (n === 0 && - state.needReadable && - ((state.highWaterMark !== 0 ? - state.length >= state.highWaterMark : - state.length > 0) || - state.ended)) { - debug('read: emitReadable', state.length, state.ended); - if (state.length === 0 && state.ended) - endReadable(this); - else - emitReadable(this); - return null; - } - - n = howMuchToRead(n, state); - - // If we've ended, and we're now clear, then finish it up. - if (n === 0 && state.ended) { - if (state.length === 0) - endReadable(this); - return null; - } - - // All the actual chunk generation logic needs to be - // *below* the call to _read. The reason is that in certain - // synthetic stream cases, such as passthrough streams, _read - // may be a completely synchronous operation which may change - // the state of the read buffer, providing enough data when - // before there was *not* enough. - // - // So, the steps are: - // 1. Figure out what the state of things will be after we do - // a read from the buffer. - // - // 2. If that resulting state will trigger a _read, then call _read. - // Note that this may be asynchronous, or synchronous. Yes, it is - // deeply ugly to write APIs this way, but that still doesn't mean - // that the Readable class should behave improperly, as streams are - // designed to be sync/async agnostic. - // Take note if the _read call is sync or async (ie, if the read call - // has returned yet), so that we know whether or not it's safe to emit - // 'readable' etc. - // - // 3. Actually pull the requested chunks out of the buffer and return. - - // if we need a readable event, then we need to do some reading. - let doRead = state.needReadable; - debug('need readable', doRead); - - // If we currently have less than the highWaterMark, then also read some. - if (state.length === 0 || state.length - n < state.highWaterMark) { - doRead = true; - debug('length less than watermark', doRead); - } - - // However, if we've ended, then there's no point, if we're already - // reading, then it's unnecessary, if we're constructing we have to wait, - // and if we're destroyed or errored, then it's not allowed, - if (state.ended || state.reading || state.destroyed || state.errored || - !state.constructed) { - doRead = false; - debug('reading, ended or constructing', doRead); - } else if (doRead) { - debug('do read'); - state.reading = true; - state.sync = true; - // If the length is currently zero, then we *need* a readable event. - if (state.length === 0) - state.needReadable = true; - // Call internal read method - this._read(state.highWaterMark); - state.sync = false; - // If _read pushed data synchronously, then `reading` will be false, - // and we need to re-evaluate how much data we can return to the user. - if (!state.reading) - n = howMuchToRead(nOrig, state); - } - - let ret; - if (n > 0) - ret = fromList(n, state); - else - ret = null; - - if (ret === null) { - state.needReadable = state.length <= state.highWaterMark; - n = 0; - } else { - state.length -= n; - if (state.multiAwaitDrain) { - state.awaitDrainWriters.clear(); - } else { - state.awaitDrainWriters = null; - } - } - - if (state.length === 0) { - // If we have nothing in the buffer, then we want to know - // as soon as we *do* get something into the buffer. - if (!state.ended) - state.needReadable = true; - - // If we tried to read() past the EOF, then emit end on the next tick. - if (nOrig !== n && state.ended) - endReadable(this); - } - - if (ret !== null) - this.emit('data', ret); - - return ret; -}; - -function onEofChunk(stream, state) { - debug('onEofChunk'); - if (state.ended) return; - if (state.decoder) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } - } - state.ended = true; - - if (state.sync) { - // If we are sync, wait until next tick to emit the data. - // Otherwise we risk emitting data in the flow() - // the readable code triggers during a read() call. - emitReadable(stream); - } else { - // Emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - state.emittedReadable = true; - // We have to emit readable now that we are EOF. Modules - // in the ecosystem (e.g. dicer) rely on this event being sync. - emitReadable_(stream); - } -} - -// Don't emit readable right away in sync mode, because this can trigger -// another read() call => stack overflow. This way, it might trigger -// a nextTick recursion warning, but that's not so bad. -function emitReadable(stream) { - const state = stream._readableState; - debug('emitReadable', state.needReadable, state.emittedReadable); - state.needReadable = false; - if (!state.emittedReadable) { - debug('emitReadable', state.flowing); - state.emittedReadable = true; - process.nextTick(emitReadable_, stream); - } -} - -function emitReadable_(stream) { - const state = stream._readableState; - debug('emitReadable_', state.destroyed, state.length, state.ended); - if (!state.destroyed && !state.errored && (state.length || state.ended)) { - stream.emit('readable'); - state.emittedReadable = false; - } - - // The stream needs another readable event if: - // 1. It is not flowing, as the flow mechanism will take - // care of it. - // 2. It is not ended. - // 3. It is below the highWaterMark, so we can schedule - // another readable later. - state.needReadable = - !state.flowing && - !state.ended && - state.length <= state.highWaterMark; - flow(stream); -} - - -// At this point, the user has presumably seen the 'readable' event, -// and called read() to consume some data. that may have triggered -// in turn another _read(n) call, in which case reading = true if -// it's in progress. -// However, if we're not ended, or reading, and the length < hwm, -// then go ahead and try to read some more preemptively. -function maybeReadMore(stream, state) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; - process.nextTick(maybeReadMore_, stream, state); - } -} - -function maybeReadMore_(stream, state) { - // Attempt to read more data if we should. - // - // The conditions for reading more data are (one of): - // - Not enough data buffered (state.length < state.highWaterMark). The loop - // is responsible for filling the buffer with enough data if such data - // is available. If highWaterMark is 0 and we are not in the flowing mode - // we should _not_ attempt to buffer any extra data. We'll get more data - // when the stream consumer calls read() instead. - // - No data in the buffer, and the stream is in flowing mode. In this mode - // the loop below is responsible for ensuring read() is called. Failing to - // call read here would abort the flow and there's no other mechanism for - // continuing the flow if the stream consumer has just subscribed to the - // 'data' event. - // - // In addition to the above conditions to keep reading data, the following - // conditions prevent the data from being read: - // - The stream has ended (state.ended). - // - There is already a pending 'read' operation (state.reading). This is a - // case where the the stream has called the implementation defined _read() - // method, but they are processing the call asynchronously and have _not_ - // called push() with new data. In this case we skip performing more - // read()s. The execution ends in this method again after the _read() ends - // up calling push() with more data. - while (!state.reading && !state.ended && - (state.length < state.highWaterMark || - (state.flowing && state.length === 0))) { - const len = state.length; - debug('maybeReadMore read 0'); - stream.read(0); - if (len === state.length) - // Didn't get any data, stop spinning. - break; - } - state.readingMore = false; -} - -// Abstract method. to be overridden in specific implementation classes. -// call cb(er, data) where data is <= n in length. -// for virtual (non-string, non-buffer) streams, "length" is somewhat -// arbitrary, and perhaps not very meaningful. -Readable.prototype._read = function(n) { - throw new ERR_METHOD_NOT_IMPLEMENTED('_read()'); -}; - -Readable.prototype.pipe = function(dest, pipeOpts) { - const src = this; - const state = this._readableState; - - if (state.pipes.length === 1) { - if (!state.multiAwaitDrain) { - state.multiAwaitDrain = true; - state.awaitDrainWriters = new Set( - state.awaitDrainWriters ? [state.awaitDrainWriters] : [] - ); - } - } - - state.pipes.push(dest); - debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); - - const doEnd = (!pipeOpts || pipeOpts.end !== false) && - dest !== process.stdout && - dest !== process.stderr; - - const endFn = doEnd ? onend : unpipe; - if (state.endEmitted) - process.nextTick(endFn); - else - src.once('end', endFn); - - dest.on('unpipe', onunpipe); - function onunpipe(readable, unpipeInfo) { - debug('onunpipe'); - if (readable === src) { - if (unpipeInfo && unpipeInfo.hasUnpiped === false) { - unpipeInfo.hasUnpiped = true; - cleanup(); - } - } - } - - function onend() { - debug('onend'); - dest.end(); - } - - let ondrain; - - let cleanedUp = false; - function cleanup() { - debug('cleanup'); - // Cleanup event handlers once the pipe is broken. - dest.removeListener('close', onclose); - dest.removeListener('finish', onfinish); - if (ondrain) { - dest.removeListener('drain', ondrain); - } - dest.removeListener('error', onerror); - dest.removeListener('unpipe', onunpipe); - src.removeListener('end', onend); - src.removeListener('end', unpipe); - src.removeListener('data', ondata); - - cleanedUp = true; - - // If the reader is waiting for a drain event from this - // specific writer, then it would cause it to never start - // flowing again. - // So, if this is awaiting a drain, then we just call it now. - // If we don't know, then assume that we are waiting for one. - if (ondrain && state.awaitDrainWriters && - (!dest._writableState || dest._writableState.needDrain)) - ondrain(); - } - - src.on('data', ondata); - function ondata(chunk) { - debug('ondata'); - const ret = dest.write(chunk); - debug('dest.write', ret); - if (ret === false) { - // If the user unpiped during `dest.write()`, it is possible - // to get stuck in a permanently paused state if that write - // also returned false. - // => Check whether `dest` is still a piping destination. - if (!cleanedUp) { - if (state.pipes.length === 1 && state.pipes[0] === dest) { - debug('false write response, pause', 0); - state.awaitDrainWriters = dest; - state.multiAwaitDrain = false; - } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - debug('false write response, pause', state.awaitDrainWriters.size); - state.awaitDrainWriters.add(dest); - } - src.pause(); - } - if (!ondrain) { - // When the dest drains, it reduces the awaitDrain counter - // on the source. This would be more elegant with a .once() - // handler in flow(), but adding and removing repeatedly is - // too slow. - ondrain = pipeOnDrain(src, dest); - dest.on('drain', ondrain); - } - } - } - - // If the dest has an error, then stop piping into it. - // However, don't suppress the throwing behavior for this. - function onerror(er) { - debug('onerror', er); - unpipe(); - dest.removeListener('error', onerror); - if (EE.listenerCount(dest, 'error') === 0) { - const s = dest._writableState || dest._readableState; - if (s && !s.errorEmitted) { - // User incorrectly emitted 'error' directly on the stream. - errorOrDestroy(dest, er); - } else { - dest.emit('error', er); - } - } - } - - // Make sure our error handler is attached before userland ones. - prependListener(dest, 'error', onerror); - - // Both close and finish should trigger unpipe, but only once. - function onclose() { - dest.removeListener('finish', onfinish); - unpipe(); - } - dest.once('close', onclose); - function onfinish() { - debug('onfinish'); - dest.removeListener('close', onclose); - unpipe(); - } - dest.once('finish', onfinish); - - function unpipe() { - debug('unpipe'); - src.unpipe(dest); - } - - // Tell the dest that it's being piped to. - dest.emit('pipe', src); - - // Start the flow if it hasn't been started already. - if (!state.flowing) { - debug('pipe resume'); - src.resume(); - } - - return dest; -}; - -function pipeOnDrain(src, dest) { - return function pipeOnDrainFunctionResult() { - const state = src._readableState; - - // `ondrain` will call directly, - // `this` maybe not a reference to dest, - // so we use the real dest here. - if (state.awaitDrainWriters === dest) { - debug('pipeOnDrain', 1); - state.awaitDrainWriters = null; - } else if (state.multiAwaitDrain) { - debug('pipeOnDrain', state.awaitDrainWriters.size); - state.awaitDrainWriters.delete(dest); - } - - if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && - EE.listenerCount(src, 'data')) { - state.flowing = true; - flow(src); - } - }; -} - - -Readable.prototype.unpipe = function(dest) { - const state = this._readableState; - const unpipeInfo = { hasUnpiped: false }; - - // If we're not piping anywhere, then do nothing. - if (state.pipes.length === 0) - return this; - - if (!dest) { - // remove all. - const dests = state.pipes; - state.pipes = []; - this.pause(); - - for (const dest of dests) - dest.emit('unpipe', this, { hasUnpiped: false }); - return this; - } - - // Try to find the right one. - const index = state.pipes.indexOf(dest); - if (index === -1) - return this; - - state.pipes.splice(index, 1); - if (state.pipes.length === 0) - this.pause(); - - dest.emit('unpipe', this, unpipeInfo); - - return this; -}; - -// Set up data events if they are asked for -// Ensure readable listeners eventually get something. -Readable.prototype.on = function(ev, fn) { - const res = Stream.prototype.on.call(this, ev, fn); - const state = this._readableState; - - if (ev === 'data') { - // Update readableListening so that resume() may be a no-op - // a few lines down. This is needed to support once('readable'). - state.readableListening = this.listenerCount('readable') > 0; - - // Try start flowing on next tick if stream isn't explicitly paused. - if (state.flowing !== false) - this.resume(); - } else if (ev === 'readable') { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - debug('on readable', state.length, state.reading); - if (state.length) { - emitReadable(this); - } else if (!state.reading) { - process.nextTick(nReadingNextTick, this); - } - } - } - - return res; -}; -Readable.prototype.addListener = Readable.prototype.on; - -Readable.prototype.removeListener = function(ev, fn) { - const res = Stream.prototype.removeListener.call(this, ev, fn); - - if (ev === 'readable') { - // We need to check if there is someone still listening to - // readable and reset the state. However this needs to happen - // after readable has been emitted but before I/O (nextTick) to - // support once('readable', fn) cycles. This means that calling - // resume within the same tick will have no - // effect. - process.nextTick(updateReadableListening, this); - } - - return res; -}; -Readable.prototype.off = Readable.prototype.removeListener; - -Readable.prototype.removeAllListeners = function(ev) { - const res = Stream.prototype.removeAllListeners.apply(this, arguments); - - if (ev === 'readable' || ev === undefined) { - // We need to check if there is someone still listening to - // readable and reset the state. However this needs to happen - // after readable has been emitted but before I/O (nextTick) to - // support once('readable', fn) cycles. This means that calling - // resume within the same tick will have no - // effect. - process.nextTick(updateReadableListening, this); - } - - return res; -}; - -function updateReadableListening(self) { - const state = self._readableState; - state.readableListening = self.listenerCount('readable') > 0; - - if (state.resumeScheduled && state[kPaused] === false) { - // Flowing needs to be set to true now, otherwise - // the upcoming resume will not flow. - state.flowing = true; - - // Crude way to check if we should resume. - } else if (self.listenerCount('data') > 0) { - self.resume(); - } else if (!state.readableListening) { - state.flowing = null; - } -} - -function nReadingNextTick(self) { - debug('readable nexttick read 0'); - self.read(0); -} - -// pause() and resume() are remnants of the legacy readable stream API -// If the user uses them, then switch into old mode. -Readable.prototype.resume = function() { - const state = this._readableState; - if (!state.flowing) { - debug('resume'); - // We flow only if there is no one listening - // for readable, but we still have to call - // resume(). - state.flowing = !state.readableListening; - resume(this, state); - } - state[kPaused] = false; - return this; -}; - -function resume(stream, state) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; - process.nextTick(resume_, stream, state); - } -} - -function resume_(stream, state) { - debug('resume', state.reading); - if (!state.reading) { - stream.read(0); - } - - state.resumeScheduled = false; - stream.emit('resume'); - flow(stream); - if (state.flowing && !state.reading) - stream.read(0); -} - -Readable.prototype.pause = function() { - debug('call pause flowing=%j', this._readableState.flowing); - if (this._readableState.flowing !== false) { - debug('pause'); - this._readableState.flowing = false; - this.emit('pause'); - } - this._readableState[kPaused] = true; - return this; -}; - -function flow(stream) { - const state = stream._readableState; - debug('flow', state.flowing); - while (state.flowing && stream.read() !== null); -} - -// Wrap an old-style stream as the async data source. -// This is *not* part of the readable stream interface. -// It is an ugly unfortunate mess of history. -Readable.prototype.wrap = function(stream) { - let paused = false; - - // TODO (ronag): Should this.destroy(err) emit - // 'error' on the wrapped stream? Would require - // a static factory method, e.g. Readable.wrap(stream). - - stream.on('data', (chunk) => { - if (!this.push(chunk) && stream.pause) { - paused = true; - stream.pause(); - } - }); - - stream.on('end', () => { - this.push(null); - }); - - stream.on('error', (err) => { - errorOrDestroy(this, err); - }); - - stream.on('close', () => { - this.destroy(); - }); - - stream.on('destroy', () => { - this.destroy(); - }); - - this._read = () => { - if (paused && stream.resume) { - paused = false; - stream.resume(); - } - }; - - // Proxy all the other methods. Important when wrapping filters and duplexes. - for (const i of ObjectKeys(stream)) { - if (this[i] === undefined && typeof stream[i] === 'function') { - this[i] = stream[i].bind(stream); - } - } - - return this; -}; - -Readable.prototype[SymbolAsyncIterator] = function() { - let stream = this; - - if (typeof stream.read !== 'function') { - // v1 stream - const src = stream; - stream = new Readable({ - objectMode: true, - destroy(err, callback) { - destroyImpl.destroyer(src, err); - callback(); - } - }).wrap(src); - } - - const iter = createAsyncIterator(stream); - iter.stream = stream; - return iter; -}; - -async function* createAsyncIterator(stream) { - let callback = nop; - - function next(resolve) { - if (this === stream) { - callback(); - callback = nop; - } else { - callback = resolve; - } - } - - stream - .on('readable', next) - .on('error', next) - .on('end', next) - .on('close', next); - - try { - const state = stream._readableState; - while (true) { - const chunk = stream.read(); - if (chunk !== null) { - yield chunk; - } else if (state.errored) { - throw state.errored; - } else if (state.ended) { - break; - } else if (state.closed) { - // TODO(ronag): ERR_PREMATURE_CLOSE? - break; - } else { - await new Promise(next); - } - } - } catch (err) { - destroyImpl.destroyer(stream, err); - throw err; - } finally { - destroyImpl.destroyer(stream, null); - } -} - -// Making it explicit these properties are not enumerable -// because otherwise some prototype manipulation in -// userland will fail. -ObjectDefineProperties(Readable.prototype, { - readable: { - get() { - const r = this._readableState; - // r.readable === false means that this is part of a Duplex stream - // where the readable side was disabled upon construction. - // Compat. The user might manually disable readable side through - // deprecated setter. - return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && - !r.endEmitted; - }, - set(val) { - // Backwards compat. - if (this._readableState) { - this._readableState.readable = !!val; - } - } - }, - - readableHighWaterMark: { - enumerable: false, - get: function() { - return this._readableState.highWaterMark; - } - }, - - readableBuffer: { - enumerable: false, - get: function() { - return this._readableState && this._readableState.buffer; - } - }, - - readableFlowing: { - enumerable: false, - get: function() { - return this._readableState.flowing; - }, - set: function(state) { - if (this._readableState) { - this._readableState.flowing = state; - } - } - }, - - readableLength: { - enumerable: false, - get() { - return this._readableState.length; - } - }, - - readableObjectMode: { - enumerable: false, - get() { - return this._readableState ? this._readableState.objectMode : false; - } - }, - - readableEncoding: { - enumerable: false, - get() { - return this._readableState ? this._readableState.encoding : null; - } - }, - - destroyed: { - enumerable: false, - get() { - if (this._readableState === undefined) { - return false; - } - return this._readableState.destroyed; - }, - set(value) { - // We ignore the value if the stream - // has not been initialized yet. - if (!this._readableState) { - return; - } - - // Backward compatibility, the user is explicitly - // managing destroyed. - this._readableState.destroyed = value; - } - }, - - readableEnded: { - enumerable: false, - get() { - return this._readableState ? this._readableState.endEmitted : false; - } - }, - -}); - -ObjectDefineProperties(ReadableState.prototype, { - // Legacy getter for `pipesCount`. - pipesCount: { - get() { - return this.pipes.length; - } - }, - - // Legacy property for `paused`. - paused: { - get() { - return this[kPaused] !== false; - }, - set(value) { - this[kPaused] = !!value; - } - } -}); - -// Exposed for testing purposes only. -Readable._fromList = fromList; - -// Pluck off n bytes from an array of buffers. -// Length is the combined lengths of all the buffers in the list. -// This function is designed to be inlinable, so please take care when making -// changes to the function body. -function fromList(n, state) { - // nothing buffered. - if (state.length === 0) - return null; - - let ret; - if (state.objectMode) - ret = state.buffer.shift(); - else if (!n || n >= state.length) { - // Read it all, truncate the list. - if (state.decoder) - ret = state.buffer.join(''); - else if (state.buffer.length === 1) - ret = state.buffer.first(); - else - ret = state.buffer.concat(state.length); - state.buffer.clear(); - } else { - // read part of list. - ret = state.buffer.consume(n, state.decoder); - } - - return ret; -} - -function endReadable(stream) { - const state = stream._readableState; - - debug('endReadable', state.endEmitted); - if (!state.endEmitted) { - state.ended = true; - process.nextTick(endReadableNT, state, stream); - } -} - -function endReadableNT(state, stream) { - debug('endReadableNT', state.endEmitted, state.length); - - // Check that we didn't get one last unshift. - if (!state.errorEmitted && !state.closeEmitted && - !state.endEmitted && state.length === 0) { - state.endEmitted = true; - stream.emit('end'); - - if (stream.writable && stream.allowHalfOpen === false) { - process.nextTick(endWritableNT, state, stream); - } else if (state.autoDestroy) { - // In case of duplex streams we need a way to detect - // if the writable side is ready for autoDestroy as well. - const wState = stream._writableState; - const autoDestroy = !wState || ( - wState.autoDestroy && - // We don't expect the writable to ever 'finish' - // if writable is explicitly set to false. - (wState.finished || wState.writable === false) - ); - - if (autoDestroy) { - stream.destroy(); - } - } - } -} - -function endWritableNT(state, stream) { - const writable = stream.writable && !stream.writableEnded && - !stream.destroyed; - if (writable) { - stream.end(); - } -} - -Readable.from = function(iterable, opts) { - if (from === undefined) { - from = require('internal/streams/from'); - } - return from(Readable, iterable, opts); -}; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 2fdd0d721b63be..50150638d9db8c 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -1,246 +1,6 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -// a transform stream is a readable/writable stream where you do -// something with the data. Sometimes it's called a "filter", -// but that's not a great name for it, since that implies a thing where -// some bits pass through, and others are simply ignored. (That would -// be a valid example of a transform, of course.) -// -// While the output is causally related to the input, it's not a -// necessarily symmetric or synchronous transformation. For example, -// a zlib stream might take multiple plain-text writes(), and then -// emit a single compressed chunk some time in the future. -// -// Here's how this works: -// -// The Transform stream has all the aspects of the readable and writable -// stream classes. When you write(chunk), that calls _write(chunk,cb) -// internally, and returns false if there's a lot of pending writes -// buffered up. When you call read(), that calls _read(n) until -// there's enough pending readable data buffered up. -// -// In a transform stream, the written data is placed in a buffer. When -// _read(n) is called, it transforms the queued up data, calling the -// buffered _write cb's as it consumes chunks. If consuming a single -// written chunk would result in multiple output chunks, then the first -// outputted bit calls the readcb, and subsequent chunks just go into -// the read buffer, and will cause it to emit 'readable' if necessary. -// -// This way, back-pressure is actually determined by the reading side, -// since _read has to be called to start processing a new chunk. However, -// a pathological inflate type of transform can cause excessive buffering -// here. For example, imagine a stream where every byte of input is -// interpreted as an integer from 0-255, and then results in that many -// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in -// 1kb of data being output. In this case, you could write a very small -// amount of input, and end up with a very large amount of output. In -// such a pathological inflating mechanism, there'd be no way to tell -// the system to stop doing the transform. A single 4MB write could -// cause the system to run out of memory. -// -// However, even in such a pathological case, only a single written chunk -// would be consumed, and then the rest would wait (un-transformed) until -// the results of the previous transformed chunk were consumed. - 'use strict'; -const { - ObjectSetPrototypeOf, - Symbol -} = primordials; +// TODO(mcollina): deprecate this file +const Transform = require('internal/streams/transform'); module.exports = Transform; -const { - ERR_METHOD_NOT_IMPLEMENTED -} = require('internal/errors').codes; -const Duplex = require('_stream_duplex'); -ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); -ObjectSetPrototypeOf(Transform, Duplex); - -const kCallback = Symbol('kCallback'); - -function Transform(options) { - if (!(this instanceof Transform)) - return new Transform(options); - - Duplex.call(this, options); - - // We have implemented the _read method, and done the other things - // that Readable wants before the first _read call, so unset the - // sync guard flag. - this._readableState.sync = false; - - this[kCallback] = null; - - if (options) { - if (typeof options.transform === 'function') - this._transform = options.transform; - - if (typeof options.flush === 'function') - this._flush = options.flush; - } - - // When the writable side finishes, then flush out anything remaining. - // Backwards compat. Some Transform streams incorrectly implement _final - // instead of or in addition to _flush. By using 'prefinish' instead of - // implementing _final we continue supporting this unfortunate use case. - this.on('prefinish', prefinish); -} - -function final(cb) { - let called = false; - if (typeof this._flush === 'function' && !this.destroyed) { - const result = this._flush((er, data) => { - called = true; - if (er) { - if (cb) { - cb(er); - } else { - this.destroy(er); - } - return; - } - - if (data != null) { - this.push(data); - } - this.push(null); - if (cb) { - cb(); - } - }); - if (result !== undefined && result !== null) { - try { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - (data) => { - if (called) - return; - if (data != null) - this.push(data); - this.push(null); - if (cb) - process.nextTick(cb); - }, - (err) => { - if (cb) { - process.nextTick(cb, err); - } else { - process.nextTick(() => this.destroy(err)); - } - }); - } - } catch (err) { - process.nextTick(() => this.destroy(err)); - } - } - } else { - this.push(null); - if (cb) { - cb(); - } - } -} - -function prefinish() { - if (this._final !== final) { - final.call(this); - } -} - -Transform.prototype._final = final; - -Transform.prototype._transform = function(chunk, encoding, callback) { - throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); -}; - -Transform.prototype._write = function(chunk, encoding, callback) { - const rState = this._readableState; - const wState = this._writableState; - const length = rState.length; - - let called = false; - const result = this._transform(chunk, encoding, (err, val) => { - called = true; - if (err) { - callback(err); - return; - } - - if (val != null) { - this.push(val); - } - - if ( - wState.ended || // Backwards compat. - length === rState.length || // Backwards compat. - rState.length < rState.highWaterMark || - rState.length === 0 - ) { - callback(); - } else { - this[kCallback] = callback; - } - }); - if (result !== undefined && result != null) { - try { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - (val) => { - if (called) - return; - - if (val != null) { - this.push(val); - } - - if ( - wState.ended || - length === rState.length || - rState.length < rState.highWaterMark || - rState.length === 0) { - process.nextTick(callback); - } else { - this[kCallback] = callback; - } - }, - (err) => { - process.nextTick(callback, err); - }); - } - } catch (err) { - process.nextTick(callback, err); - } - } -}; - -Transform.prototype._read = function() { - if (this[kCallback]) { - const callback = this[kCallback]; - this[kCallback] = null; - callback(); - } -}; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2343704b0cdf85..e328a9434c85a9 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -1,851 +1,6 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -// A bit simpler than readable streams. -// Implement an async ._write(chunk, encoding, cb), and it'll handle all -// the drain event emission and buffering. - 'use strict'; -const { - FunctionPrototype, - ObjectDefineProperty, - ObjectDefineProperties, - ObjectSetPrototypeOf, - Symbol, - SymbolHasInstance, -} = primordials; +// TODO(mcollina): deprecate this file +const Writable = require('internal/streams/writable'); module.exports = Writable; -Writable.WritableState = WritableState; - -const EE = require('events'); -const Stream = require('stream'); -const { Buffer } = require('buffer'); -const destroyImpl = require('internal/streams/destroy'); -const { - getHighWaterMark, - getDefaultHighWaterMark -} = require('internal/streams/state'); -const assert = require('internal/assert'); -const { - ERR_INVALID_ARG_TYPE, - ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, - ERR_STREAM_CANNOT_PIPE, - ERR_STREAM_DESTROYED, - ERR_STREAM_ALREADY_FINISHED, - ERR_STREAM_NULL_VALUES, - ERR_STREAM_WRITE_AFTER_END, - ERR_UNKNOWN_ENCODING -} = require('internal/errors').codes; - -const { errorOrDestroy } = destroyImpl; - -ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); -ObjectSetPrototypeOf(Writable, Stream); - -function nop() {} - -const kOnFinished = Symbol('kOnFinished'); - -function WritableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream, - // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - - // Object stream flag to indicate whether or not this stream - // contains buffers or objects. - this.objectMode = !!(options && options.objectMode); - - if (isDuplex) - this.objectMode = this.objectMode || - !!(options && options.writableObjectMode); - - // The point at which write() starts returning false - // Note: 0 is a valid value, means that we always return false if - // the entire buffer is not flushed immediately on write(). - this.highWaterMark = options ? - getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : - getDefaultHighWaterMark(false); - - // if _final has been called. - this.finalCalled = false; - - // drain event flag. - this.needDrain = false; - // At the start of calling end() - this.ending = false; - // When end() has been called, and returned. - this.ended = false; - // When 'finish' is emitted. - this.finished = false; - - // Has it been destroyed - this.destroyed = false; - - // Should we decode strings into buffers before passing to _write? - // this is here so that some node-core streams can optimize string - // handling at a lower level. - const noDecode = !!(options && options.decodeStrings === false); - this.decodeStrings = !noDecode; - - // Crypto is kind of old and crusty. Historically, its default string - // encoding is 'binary' so we have to make this configurable. - // Everything else in the universe uses 'utf8', though. - this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; - - // Not an actual buffer we keep track of, but a measurement - // of how much we're waiting to get pushed to some underlying - // socket or file. - this.length = 0; - - // A flag to see when we're in the middle of a write. - this.writing = false; - - // When true all writes will be buffered until .uncork() call. - this.corked = 0; - - // A flag to be able to tell if the onwrite cb is called immediately, - // or on a later tick. We set this to true at first, because any - // actions that shouldn't happen until "later" should generally also - // not happen before the first write call. - this.sync = true; - - // A flag to know if we're processing previously buffered items, which - // may call the _write() callback in the same tick, so that we don't - // end up in an overlapped onwrite situation. - this.bufferProcessing = false; - - // The callback that's passed to _write(chunk, cb). - this.onwrite = onwrite.bind(undefined, stream); - - // The callback that the user supplies to write(chunk, encoding, cb). - this.writecb = null; - - // The amount that is being written when _write is called. - this.writelen = 0; - - // Storage for data passed to the afterWrite() callback in case of - // synchronous _write() completion. - this.afterWriteTickInfo = null; - - resetBuffer(this); - - // Number of pending user-supplied write callbacks - // this must be 0 before 'finish' can be emitted. - this.pendingcb = 0; - - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true; - - // Emit prefinish if the only thing we're waiting for is _write cbs - // This is relevant for synchronous Transform streams. - this.prefinished = false; - - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false; - - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false; - - // Should .destroy() be called after 'finish' (and potentially 'end'). - this.autoDestroy = !options || options.autoDestroy !== false; - - // Indicates whether the stream has errored. When true all write() calls - // should return false. This is needed since when autoDestroy - // is disabled we need a way to tell whether the stream has failed. - this.errored = null; - - // Indicates whether the stream has finished destroying. - this.closed = false; - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false; - - this[kOnFinished] = []; -} - -function resetBuffer(state) { - state.buffered = []; - state.bufferedIndex = 0; - state.allBuffers = true; - state.allNoop = true; -} - -WritableState.prototype.getBuffer = function getBuffer() { - return this.buffered.slice(this.bufferedIndex); -}; - -ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { - get() { - return this.buffered.length - this.bufferedIndex; - } -}); - -// Test _writableState for inheritance to account for Duplex streams, -// whose prototype chain only points to Readable. -let realHasInstance; -if (typeof Symbol === 'function' && SymbolHasInstance) { - realHasInstance = FunctionPrototype[SymbolHasInstance]; - ObjectDefineProperty(Writable, SymbolHasInstance, { - value: function(object) { - if (realHasInstance.call(this, object)) - return true; - if (this !== Writable) - return false; - - return object && object._writableState instanceof WritableState; - } - }); -} else { - realHasInstance = function(object) { - return object instanceof this; - }; -} - -function Writable(options) { - // Writable ctor is applied to Duplexes, too. - // `realHasInstance` is necessary because using plain `instanceof` - // would return false, as no `_writableState` property is attached. - - // Trying to use the custom `instanceof` for Writable here will also break the - // Node.js LazyTransform implementation, which has a non-trivial getter for - // `_writableState` that would lead to infinite recursion. - - // Checking for a Stream.Duplex instance is faster here instead of inside - // the WritableState constructor, at least with V8 6.5. - const isDuplex = (this instanceof Stream.Duplex); - - if (!isDuplex && !realHasInstance.call(Writable, this)) - return new Writable(options); - - this._writableState = new WritableState(options, this, isDuplex); - - if (options) { - if (typeof options.write === 'function') - this._write = options.write; - - if (typeof options.writev === 'function') - this._writev = options.writev; - - if (typeof options.destroy === 'function') - this._destroy = options.destroy; - - if (typeof options.final === 'function') - this._final = options.final; - - if (typeof options.construct === 'function') - this._construct = options.construct; - } - - Stream.call(this, options); - - destroyImpl.construct(this, () => { - const state = this._writableState; - - if (!state.writing) { - clearBuffer(this, state); - } - - finishMaybe(this, state); - }); -} - -// Otherwise people can pipe Writable streams, which is just wrong. -Writable.prototype.pipe = function() { - errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); -}; - -Writable.prototype.write = function(chunk, encoding, cb) { - const state = this._writableState; - - if (typeof encoding === 'function') { - cb = encoding; - encoding = state.defaultEncoding; - } else { - if (!encoding) - encoding = state.defaultEncoding; - else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) - throw new ERR_UNKNOWN_ENCODING(encoding); - if (typeof cb !== 'function') - cb = nop; - } - - if (chunk === null) { - throw new ERR_STREAM_NULL_VALUES(); - } else if (!state.objectMode) { - if (typeof chunk === 'string') { - if (state.decodeStrings !== false) { - chunk = Buffer.from(chunk, encoding); - encoding = 'buffer'; - } - } else if (chunk instanceof Buffer) { - encoding = 'buffer'; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - encoding = 'buffer'; - } else { - throw new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); - } - } - - let err; - if (state.ending) { - err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED('write'); - } - - if (err) { - process.nextTick(cb, err); - errorOrDestroy(this, err, true); - return false; - } - state.pendingcb++; - return writeOrBuffer(this, state, chunk, encoding, cb); -}; - -Writable.prototype.cork = function() { - this._writableState.corked++; -}; - -Writable.prototype.uncork = function() { - const state = this._writableState; - - if (state.corked) { - state.corked--; - - if (!state.writing) - clearBuffer(this, state); - } -}; - -Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { - // node::ParseEncoding() requires lower case. - if (typeof encoding === 'string') - encoding = encoding.toLowerCase(); - if (!Buffer.isEncoding(encoding)) - throw new ERR_UNKNOWN_ENCODING(encoding); - this._writableState.defaultEncoding = encoding; - return this; -}; - -// If we're already writing something, then just put this -// in the queue, and wait our turn. Otherwise, call _write -// If we return false, then we need a drain event, so set that flag. -function writeOrBuffer(stream, state, chunk, encoding, callback) { - const len = state.objectMode ? 1 : chunk.length; - - state.length += len; - - if (state.writing || state.corked || state.errored || !state.constructed) { - state.buffered.push({ chunk, encoding, callback }); - if (state.allBuffers && encoding !== 'buffer') { - state.allBuffers = false; - } - if (state.allNoop && callback !== nop) { - state.allNoop = false; - } - } else { - state.writelen = len; - state.writecb = callback; - state.writing = true; - state.sync = true; - stream._write(chunk, encoding, state.onwrite); - state.sync = false; - } - - const ret = state.length < state.highWaterMark; - - // We must ensure that previous needDrain will not be reset to false. - if (!ret) - state.needDrain = true; - - // Return false if errored or destroyed in order to break - // any synchronous while(stream.write(data)) loops. - return ret && !state.errored && !state.destroyed; -} - -function doWrite(stream, state, writev, len, chunk, encoding, cb) { - state.writelen = len; - state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) - state.onwrite(new ERR_STREAM_DESTROYED('write')); - else if (writev) - stream._writev(chunk, state.onwrite); - else - stream._write(chunk, encoding, state.onwrite); - state.sync = false; -} - -function onwriteError(stream, state, er, cb) { - --state.pendingcb; - - cb(er); - // Ensure callbacks are invoked even when autoDestroy is - // not enabled. Passing `er` here doesn't make sense since - // it's related to one specific write, not to the buffered - // writes. - errorBuffer(state); - // This can emit error, but error must always follow cb. - errorOrDestroy(stream, er); -} - -function onwrite(stream, er) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - - if (typeof cb !== 'function') { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - - if (er) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - er.stack; - - if (!state.errored) { - state.errored = er; - } - - // In case of duplex streams we need to notify the readable side of the - // error. - if (stream._readableState && !stream._readableState.errored) { - stream._readableState.errored = er; - } - - if (sync) { - process.nextTick(onwriteError, stream, state, er, cb); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - - if (sync) { - // It is a common case that the callback passed to .write() is always - // the same. In that case, we do not schedule a new nextTick(), but - // rather just increase a counter, to improve performance and avoid - // memory allocations. - if (state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { count: 1, cb, stream, state }; - process.nextTick(afterWriteTick, state.afterWriteTickInfo); - } - } else { - afterWrite(stream, state, 1, cb); - } - } -} - -function afterWriteTick({ stream, state, count, cb }) { - state.afterWriteTickInfo = null; - return afterWrite(stream, state, count, cb); -} - -function afterWrite(stream, state, count, cb) { - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && - state.needDrain; - if (needDrain) { - state.needDrain = false; - stream.emit('drain'); - } - - while (count-- > 0) { - state.pendingcb--; - cb(); - } - - if (state.destroyed) { - errorBuffer(state); - } - - finishMaybe(stream, state); -} - -// If there's something in the buffer waiting, then invoke callbacks. -function errorBuffer(state) { - if (state.writing) { - return; - } - - for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { - const { chunk, callback } = state.buffered[n]; - const len = state.objectMode ? 1 : chunk.length; - state.length -= len; - callback(new ERR_STREAM_DESTROYED('write')); - } - - for (const callback of state[kOnFinished].splice(0)) { - callback(new ERR_STREAM_DESTROYED('end')); - } - - resetBuffer(state); -} - -// If there's something in the buffer waiting, then process it. -function clearBuffer(stream, state) { - if (state.corked || - state.bufferProcessing || - state.destroyed || - !state.constructed) { - return; - } - - const { buffered, bufferedIndex, objectMode } = state; - const bufferedLength = buffered.length - bufferedIndex; - - if (!bufferedLength) { - return; - } - - let i = bufferedIndex; - - state.bufferProcessing = true; - if (bufferedLength > 1 && stream._writev) { - state.pendingcb -= bufferedLength - 1; - - const callback = state.allNoop ? nop : (err) => { - for (let n = i; n < buffered.length; ++n) { - buffered[n].callback(err); - } - }; - // Make a copy of `buffered` if it's going to be used by `callback` above, - // since `doWrite` will mutate the array. - const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); - chunks.allBuffers = state.allBuffers; - - doWrite(stream, state, true, state.length, chunks, '', callback); - - resetBuffer(state); - } else { - do { - const { chunk, encoding, callback } = buffered[i]; - buffered[i++] = null; - const len = objectMode ? 1 : chunk.length; - doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && !state.writing); - - if (i === buffered.length) { - resetBuffer(state); - } else if (i > 256) { - buffered.splice(0, i); - state.bufferedIndex = 0; - } else { - state.bufferedIndex = i; - } - } - state.bufferProcessing = false; -} - -Writable.prototype._write = function(chunk, encoding, cb) { - if (this._writev) { - this._writev([{ chunk, encoding }], cb); - } else { - throw new ERR_METHOD_NOT_IMPLEMENTED('_write()'); - } -}; - -Writable.prototype._writev = null; - -Writable.prototype.end = function(chunk, encoding, cb) { - const state = this._writableState; - - if (typeof chunk === 'function') { - cb = chunk; - chunk = null; - encoding = null; - } else if (typeof encoding === 'function') { - cb = encoding; - encoding = null; - } - - if (chunk !== null && chunk !== undefined) - this.write(chunk, encoding); - - // .end() fully uncorks. - if (state.corked) { - state.corked = 1; - this.uncork(); - } - - // This is forgiving in terms of unnecessary calls to end() and can hide - // logic errors. However, usually such errors are harmless and causing a - // hard error can be disproportionately destructive. It is not always - // trivial for the user to determine whether end() needs to be called or not. - let err; - if (!state.errored && !state.ending) { - state.ending = true; - finishMaybe(this, state, true); - state.ended = true; - } else if (state.finished) { - err = new ERR_STREAM_ALREADY_FINISHED('end'); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED('end'); - } - - if (typeof cb === 'function') { - if (err || state.finished) { - process.nextTick(cb, err); - } else { - state[kOnFinished].push(cb); - } - } - - return this; -}; - -function needFinish(state) { - return (state.ending && - state.constructed && - state.length === 0 && - !state.errored && - state.buffered.length === 0 && - !state.finished && - !state.writing); -} - -function callFinal(stream, state) { - state.sync = true; - state.pendingcb++; - const result = stream._final((err) => { - state.pendingcb--; - if (err) { - for (const callback of state[kOnFinished].splice(0)) { - callback(err); - } - errorOrDestroy(stream, err, state.sync); - } else if (needFinish(state)) { - state.prefinished = true; - stream.emit('prefinish'); - // Backwards compat. Don't check state.sync here. - // Some streams assume 'finish' will be emitted - // asynchronously relative to _final callback. - state.pendingcb++; - process.nextTick(finish, stream, state); - } - }); - if (result !== undefined && result !== null) { - try { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - function() { - if (state.prefinished) - return; - state.prefinish = true; - process.nextTick(() => stream.emit('prefinish')); - state.pendingcb++; - process.nextTick(finish, stream, state); - }, - function(err) { - for (const callback of state[kOnFinished].splice(0)) { - process.nextTick(callback, err); - } - process.nextTick(errorOrDestroy, stream, err, state.sync); - }); - } - } catch (err) { - process.nextTick(errorOrDestroy, stream, err, state.sync); - } - } - state.sync = false; -} - -function prefinish(stream, state) { - if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === 'function' && !state.destroyed) { - state.finalCalled = true; - callFinal(stream, state); - } else { - state.prefinished = true; - stream.emit('prefinish'); - } - } -} - -function finishMaybe(stream, state, sync) { - if (needFinish(state)) { - prefinish(stream, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - process.nextTick(finish, stream, state); - } else { - finish(stream, state); - } - } - } -} - -function finish(stream, state) { - state.pendingcb--; - // TODO (ronag): Unify with needFinish. - if (state.errorEmitted || state.closeEmitted) - return; - - state.finished = true; - - for (const callback of state[kOnFinished].splice(0)) { - callback(); - } - - stream.emit('finish'); - - if (state.autoDestroy) { - // In case of duplex streams we need a way to detect - // if the readable side is ready for autoDestroy as well. - const rState = stream._readableState; - const autoDestroy = !rState || ( - rState.autoDestroy && - // We don't expect the readable to ever 'end' - // if readable is explicitly set to false. - (rState.endEmitted || rState.readable === false) - ); - if (autoDestroy) { - stream.destroy(); - } - } -} - -ObjectDefineProperties(Writable.prototype, { - - destroyed: { - get() { - return this._writableState ? this._writableState.destroyed : false; - }, - set(value) { - // Backward compatibility, the user is explicitly managing destroyed. - if (this._writableState) { - this._writableState.destroyed = value; - } - } - }, - - writable: { - get() { - const w = this._writableState; - // w.writable === false means that this is part of a Duplex stream - // where the writable side was disabled upon construction. - // Compat. The user might manually disable writable side through - // deprecated setter. - return !!w && w.writable !== false && !w.destroyed && !w.errored && - !w.ending && !w.ended; - }, - set(val) { - // Backwards compatible. - if (this._writableState) { - this._writableState.writable = !!val; - } - } - }, - - writableFinished: { - get() { - return this._writableState ? this._writableState.finished : false; - } - }, - - writableObjectMode: { - get() { - return this._writableState ? this._writableState.objectMode : false; - } - }, - - writableBuffer: { - get() { - return this._writableState && this._writableState.getBuffer(); - } - }, - - writableEnded: { - get() { - return this._writableState ? this._writableState.ending : false; - } - }, - - writableHighWaterMark: { - get() { - return this._writableState && this._writableState.highWaterMark; - } - }, - - writableCorked: { - get() { - return this._writableState ? this._writableState.corked : 0; - } - }, - - writableLength: { - get() { - return this._writableState && this._writableState.length; - } - } -}); - -const destroy = destroyImpl.destroy; -Writable.prototype.destroy = function(err, cb) { - const state = this._writableState; - - // Invoke pending callbacks. - if ( - state.bufferedIndex < state.buffered.length || - state[kOnFinished].length - ) { - assert(!state.destroyed); - process.nextTick(errorBuffer, state); - } - - destroy.call(this, err, cb); - return this; -}; - -Writable.prototype._undestroy = destroyImpl.undestroy; -Writable.prototype._destroy = function(err, cb) { - cb(err); -}; - -Writable.prototype[EE.captureRejectionSymbol] = function(err) { - this.destroy(err); -}; diff --git a/lib/internal/streams/duplex.js b/lib/internal/streams/duplex.js new file mode 100644 index 00000000000000..3bab26d9cef934 --- /dev/null +++ b/lib/internal/streams/duplex.js @@ -0,0 +1,108 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// a duplex stream is just a stream that is both readable and writable. +// Since JS doesn't have multiple prototype inheritance, this class +// prototypically inherits from Readable, and then parasitically from +// Writable. + +'use strict'; + +const { + ObjectDefineProperties, + ObjectGetOwnPropertyDescriptor, + ObjectKeys, + ObjectSetPrototypeOf, +} = primordials; + +module.exports = Duplex; + +const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); + +ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); +ObjectSetPrototypeOf(Duplex, Readable); + +{ + // Allow the keys array to be GC'ed. + for (const method of ObjectKeys(Writable.prototype)) { + if (!Duplex.prototype[method]) + Duplex.prototype[method] = Writable.prototype[method]; + } +} + +function Duplex(options) { + if (!(this instanceof Duplex)) + return new Duplex(options); + + Readable.call(this, options); + Writable.call(this, options); + this.allowHalfOpen = true; + + if (options) { + if (options.readable === false) + this.readable = false; + + if (options.writable === false) + this.writable = false; + + if (options.allowHalfOpen === false) { + this.allowHalfOpen = false; + } + } +} + +ObjectDefineProperties(Duplex.prototype, { + writable: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable'), + writableHighWaterMark: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark'), + writableObjectMode: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode'), + writableBuffer: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer'), + writableLength: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength'), + writableFinished: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished'), + writableCorked: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'), + writableEnded: + ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'), + + destroyed: { + get() { + if (this._readableState === undefined || + this._writableState === undefined) { + return false; + } + return this._readableState.destroyed && this._writableState.destroyed; + }, + set(value) { + // Backward compatibility, the user is explicitly + // managing destroyed. + if (this._readableState && this._writableState) { + this._readableState.destroyed = value; + this._writableState.destroyed = value; + } + } + } +}); diff --git a/lib/internal/streams/passthrough.js b/lib/internal/streams/passthrough.js new file mode 100644 index 00000000000000..d37f9caf0116b5 --- /dev/null +++ b/lib/internal/streams/passthrough.js @@ -0,0 +1,47 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// a passthrough stream. +// basically just the most minimal sort of Transform stream. +// Every written chunk gets output as-is. + +'use strict'; + +const { + ObjectSetPrototypeOf, +} = primordials; + +module.exports = PassThrough; + +const Transform = require('internal/streams/transform'); +ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype); +ObjectSetPrototypeOf(PassThrough, Transform); + +function PassThrough(options) { + if (!(this instanceof PassThrough)) + return new PassThrough(options); + + Transform.call(this, options); +} + +PassThrough.prototype._transform = function(chunk, encoding, cb) { + cb(null, chunk); +}; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index aaee0d24992af8..c662bce8db1ab3 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -114,7 +114,7 @@ function makeAsyncIterable(val) { async function* fromReadable(val) { if (!Readable) { - Readable = require('_stream_readable'); + Readable = require('internal/streams/readable'); } yield* Readable.prototype[SymbolAsyncIterator].call(val); diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js new file mode 100644 index 00000000000000..373e42907bf0ca --- /dev/null +++ b/lib/internal/streams/readable.js @@ -0,0 +1,1324 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; + +const { + ArrayIsArray, + NumberIsInteger, + NumberIsNaN, + ObjectDefineProperties, + ObjectKeys, + ObjectSetPrototypeOf, + Promise, + Set, + SymbolAsyncIterator, + Symbol +} = primordials; + +module.exports = Readable; +Readable.ReadableState = ReadableState; + +const EE = require('events'); +const Stream = require('internal/streams/legacy'); +const { Buffer } = require('buffer'); + +let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { + debug = fn; +}); +const BufferList = require('internal/streams/buffer_list'); +const destroyImpl = require('internal/streams/destroy'); +const { + getHighWaterMark, + getDefaultHighWaterMark +} = require('internal/streams/state'); +const { + ERR_INVALID_ARG_TYPE, + ERR_STREAM_PUSH_AFTER_EOF, + ERR_METHOD_NOT_IMPLEMENTED, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT +} = require('internal/errors').codes; + +const kPaused = Symbol('kPaused'); + +// Lazy loaded to improve the startup performance. +let StringDecoder; +let from; + +ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); +ObjectSetPrototypeOf(Readable, Stream); +function nop() {} + +const { errorOrDestroy } = destroyImpl; + +function prependListener(emitter, event, fn) { + // Sadly this is not cacheable as some libraries bundle their own + // event emitter implementation with them. + if (typeof emitter.prependListener === 'function') + return emitter.prependListener(event, fn); + + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + // the prependListener() method. The goal is to eventually remove this hack. + if (!emitter._events || !emitter._events[event]) + emitter.on(event, fn); + else if (ArrayIsArray(emitter._events[event])) + emitter._events[event].unshift(fn); + else + emitter._events[event] = [fn, emitter._events[event]]; +} + +function ReadableState(options, stream, isDuplex) { + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + if (typeof isDuplex !== 'boolean') + isDuplex = stream instanceof Stream.Duplex; + + // Object stream flag. Used to make read(n) ignore n and to + // make all the buffer merging and length checks go away. + this.objectMode = !!(options && options.objectMode); + + if (isDuplex) + this.objectMode = this.objectMode || + !!(options && options.readableObjectMode); + + // The point at which it stops calling _read() to fill the buffer + // Note: 0 is a valid value, means "don't call _read preemptively ever" + this.highWaterMark = options ? + getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) : + getDefaultHighWaterMark(false); + + // A linked list is used to store data chunks instead of an array because the + // linked list can remove elements from the beginning faster than + // array.shift(). + this.buffer = new BufferList(); + this.length = 0; + this.pipes = []; + this.flowing = null; + this.ended = false; + this.endEmitted = false; + this.reading = false; + + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + this.constructed = true; + + // A flag to be able to tell if the event 'readable'/'data' is emitted + // immediately, or on a later tick. We set this to true at first, because + // any actions that shouldn't happen until "later" should generally also + // not happen before the first read call. + this.sync = true; + + // Whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. + this.needReadable = false; + this.emittedReadable = false; + this.readableListening = false; + this.resumeScheduled = false; + this[kPaused] = null; + + // True if the error was already emitted and should not be thrown again. + this.errorEmitted = false; + + // Should close be emitted on destroy. Defaults to true. + this.emitClose = !options || options.emitClose !== false; + + // Should .destroy() be called after 'end' (and potentially 'finish'). + this.autoDestroy = !options || options.autoDestroy !== false; + + // Has it been destroyed. + this.destroyed = false; + + // Indicates whether the stream has errored. When true no further + // _read calls, 'data' or 'readable' events should occur. This is needed + // since when autoDestroy is disabled we need a way to tell whether the + // stream has failed. + this.errored = null; + + // Indicates whether the stream has finished destroying. + this.closed = false; + + // True if close has been emitted or would have been emitted + // depending on emitClose. + this.closeEmitted = false; + + // Crypto is kind of old and crusty. Historically, its default string + // encoding is 'binary' so we have to make this configurable. + // Everything else in the universe uses 'utf8', though. + this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; + + // Ref the piped dest which we need a drain event on it + // type: null | Writable | Set. + this.awaitDrainWriters = null; + this.multiAwaitDrain = false; + + // If true, a maybeReadMore has been scheduled. + this.readingMore = false; + + this.decoder = null; + this.encoding = null; + if (options && options.encoding) { + if (!StringDecoder) + StringDecoder = require('string_decoder').StringDecoder; + this.decoder = new StringDecoder(options.encoding); + this.encoding = options.encoding; + } +} + + +function Readable(options) { + if (!(this instanceof Readable)) + return new Readable(options); + + // Checking for a Stream.Duplex instance is faster here instead of inside + // the ReadableState constructor, at least with V8 6.5. + const isDuplex = this instanceof Stream.Duplex; + + this._readableState = new ReadableState(options, this, isDuplex); + + if (options) { + if (typeof options.read === 'function') + this._read = options.read; + + if (typeof options.destroy === 'function') + this._destroy = options.destroy; + + if (typeof options.construct === 'function') + this._construct = options.construct; + } + + Stream.call(this, options); + + destroyImpl.construct(this, () => { + maybeReadMore(this, this._readableState); + }); +} + +Readable.prototype.destroy = destroyImpl.destroy; +Readable.prototype._undestroy = destroyImpl.undestroy; +Readable.prototype._destroy = function(err, cb) { + cb(err); +}; + +Readable.prototype[EE.captureRejectionSymbol] = function(err) { + this.destroy(err); +}; + +// Manually shove something into the read() buffer. +// This returns true if the highWaterMark has not been hit yet, +// similar to how Writable.write() returns true if you should +// write() some more. +Readable.prototype.push = function(chunk, encoding) { + return readableAddChunk(this, chunk, encoding, false); +}; + +// Unshift should *always* be something directly out of read(). +Readable.prototype.unshift = function(chunk, encoding) { + return readableAddChunk(this, chunk, encoding, true); +}; + +function readableAddChunk(stream, chunk, encoding, addToFront) { + debug('readableAddChunk', chunk); + const state = stream._readableState; + + let err; + if (!state.objectMode) { + if (typeof chunk === 'string') { + encoding = encoding || state.defaultEncoding; + if (state.encoding !== encoding) { + if (addToFront && state.encoding) { + // When unshifting, if state.encoding is set, we have to save + // the string in the BufferList with the state encoding. + chunk = Buffer.from(chunk, encoding).toString(state.encoding); + } else { + chunk = Buffer.from(chunk, encoding); + encoding = ''; + } + } + } else if (chunk instanceof Buffer) { + encoding = ''; + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + encoding = ''; + } else if (chunk != null) { + err = new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + } + } + + if (err) { + errorOrDestroy(stream, err); + } else if (chunk === null) { + state.reading = false; + onEofChunk(stream, state); + } else if (state.objectMode || (chunk && chunk.length > 0)) { + if (addToFront) { + if (state.endEmitted) + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + else + addChunk(stream, state, chunk, true); + } else if (state.ended) { + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); + } else if (state.destroyed || state.errored) { + return false; + } else { + state.reading = false; + if (state.decoder && !encoding) { + chunk = state.decoder.write(chunk); + if (state.objectMode || chunk.length !== 0) + addChunk(stream, state, chunk, false); + else + maybeReadMore(stream, state); + } else { + addChunk(stream, state, chunk, false); + } + } + } else if (!addToFront) { + state.reading = false; + maybeReadMore(stream, state); + } + + // We can push more data if we are below the highWaterMark. + // Also, if we have no data yet, we can stand some more bytes. + // This is to work around cases where hwm=0, such as the repl. + return !state.ended && + (state.length < state.highWaterMark || state.length === 0); +} + +function addChunk(stream, state, chunk, addToFront) { + if (state.flowing && state.length === 0 && !state.sync && + stream.listenerCount('data') > 0) { + // Use the guard to avoid creating `Set()` repeatedly + // when we have multiple pipes. + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; + } + stream.emit('data', chunk); + } else { + // Update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + if (addToFront) + state.buffer.unshift(chunk); + else + state.buffer.push(chunk); + + if (state.needReadable) + emitReadable(stream); + } + maybeReadMore(stream, state); +} + +Readable.prototype.isPaused = function() { + const state = this._readableState; + return state[kPaused] === true || state.flowing === false; +}; + +// Backwards compatibility. +Readable.prototype.setEncoding = function(enc) { + if (!StringDecoder) + StringDecoder = require('string_decoder').StringDecoder; + const decoder = new StringDecoder(enc); + this._readableState.decoder = decoder; + // If setEncoding(null), decoder.encoding equals utf8. + this._readableState.encoding = this._readableState.decoder.encoding; + + const buffer = this._readableState.buffer; + // Iterate over current buffer to convert already stored Buffers: + let content = ''; + for (const data of buffer) { + content += decoder.write(data); + } + buffer.clear(); + if (content !== '') + buffer.push(content); + this._readableState.length = content.length; + return this; +}; + +// Don't raise the hwm > 1GB. +const MAX_HWM = 0x40000000; +function computeNewHighWaterMark(n) { + if (n >= MAX_HWM) { + // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE. + n = MAX_HWM; + } else { + // Get the next highest power of 2 to prevent increasing hwm excessively in + // tiny amounts. + n--; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + n++; + } + return n; +} + +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function howMuchToRead(n, state) { + if (n <= 0 || (state.length === 0 && state.ended)) + return 0; + if (state.objectMode) + return 1; + if (NumberIsNaN(n)) { + // Only flow one buffer at a time. + if (state.flowing && state.length) + return state.buffer.first().length; + return state.length; + } + if (n <= state.length) + return n; + return state.ended ? state.length : 0; +} + +// You can override either this method, or the async _read(n) below. +Readable.prototype.read = function(n) { + debug('read', n); + // Same as parseInt(undefined, 10), however V8 7.3 performance regressed + // in this scenario, so we are doing it manually. + if (n === undefined) { + n = NaN; + } else if (!NumberIsInteger(n)) { + n = parseInt(n, 10); + } + const state = this._readableState; + const nOrig = n; + + // If we're asking for more than the current hwm, then raise the hwm. + if (n > state.highWaterMark) + state.highWaterMark = computeNewHighWaterMark(n); + + if (n !== 0) + state.emittedReadable = false; + + // If we're doing read(0) to trigger a readable event, but we + // already have a bunch of data in the buffer, then just trigger + // the 'readable' event and move on. + if (n === 0 && + state.needReadable && + ((state.highWaterMark !== 0 ? + state.length >= state.highWaterMark : + state.length > 0) || + state.ended)) { + debug('read: emitReadable', state.length, state.ended); + if (state.length === 0 && state.ended) + endReadable(this); + else + emitReadable(this); + return null; + } + + n = howMuchToRead(n, state); + + // If we've ended, and we're now clear, then finish it up. + if (n === 0 && state.ended) { + if (state.length === 0) + endReadable(this); + return null; + } + + // All the actual chunk generation logic needs to be + // *below* the call to _read. The reason is that in certain + // synthetic stream cases, such as passthrough streams, _read + // may be a completely synchronous operation which may change + // the state of the read buffer, providing enough data when + // before there was *not* enough. + // + // So, the steps are: + // 1. Figure out what the state of things will be after we do + // a read from the buffer. + // + // 2. If that resulting state will trigger a _read, then call _read. + // Note that this may be asynchronous, or synchronous. Yes, it is + // deeply ugly to write APIs this way, but that still doesn't mean + // that the Readable class should behave improperly, as streams are + // designed to be sync/async agnostic. + // Take note if the _read call is sync or async (ie, if the read call + // has returned yet), so that we know whether or not it's safe to emit + // 'readable' etc. + // + // 3. Actually pull the requested chunks out of the buffer and return. + + // if we need a readable event, then we need to do some reading. + let doRead = state.needReadable; + debug('need readable', doRead); + + // If we currently have less than the highWaterMark, then also read some. + if (state.length === 0 || state.length - n < state.highWaterMark) { + doRead = true; + debug('length less than watermark', doRead); + } + + // However, if we've ended, then there's no point, if we're already + // reading, then it's unnecessary, if we're constructing we have to wait, + // and if we're destroyed or errored, then it's not allowed, + if (state.ended || state.reading || state.destroyed || state.errored || + !state.constructed) { + doRead = false; + debug('reading, ended or constructing', doRead); + } else if (doRead) { + debug('do read'); + state.reading = true; + state.sync = true; + // If the length is currently zero, then we *need* a readable event. + if (state.length === 0) + state.needReadable = true; + // Call internal read method + this._read(state.highWaterMark); + state.sync = false; + // If _read pushed data synchronously, then `reading` will be false, + // and we need to re-evaluate how much data we can return to the user. + if (!state.reading) + n = howMuchToRead(nOrig, state); + } + + let ret; + if (n > 0) + ret = fromList(n, state); + else + ret = null; + + if (ret === null) { + state.needReadable = state.length <= state.highWaterMark; + n = 0; + } else { + state.length -= n; + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; + } + } + + if (state.length === 0) { + // If we have nothing in the buffer, then we want to know + // as soon as we *do* get something into the buffer. + if (!state.ended) + state.needReadable = true; + + // If we tried to read() past the EOF, then emit end on the next tick. + if (nOrig !== n && state.ended) + endReadable(this); + } + + if (ret !== null) + this.emit('data', ret); + + return ret; +}; + +function onEofChunk(stream, state) { + debug('onEofChunk'); + if (state.ended) return; + if (state.decoder) { + const chunk = state.decoder.end(); + if (chunk && chunk.length) { + state.buffer.push(chunk); + state.length += state.objectMode ? 1 : chunk.length; + } + } + state.ended = true; + + if (state.sync) { + // If we are sync, wait until next tick to emit the data. + // Otherwise we risk emitting data in the flow() + // the readable code triggers during a read() call. + emitReadable(stream); + } else { + // Emit 'readable' now to make sure it gets picked up. + state.needReadable = false; + state.emittedReadable = true; + // We have to emit readable now that we are EOF. Modules + // in the ecosystem (e.g. dicer) rely on this event being sync. + emitReadable_(stream); + } +} + +// Don't emit readable right away in sync mode, because this can trigger +// another read() call => stack overflow. This way, it might trigger +// a nextTick recursion warning, but that's not so bad. +function emitReadable(stream) { + const state = stream._readableState; + debug('emitReadable', state.needReadable, state.emittedReadable); + state.needReadable = false; + if (!state.emittedReadable) { + debug('emitReadable', state.flowing); + state.emittedReadable = true; + process.nextTick(emitReadable_, stream); + } +} + +function emitReadable_(stream) { + const state = stream._readableState; + debug('emitReadable_', state.destroyed, state.length, state.ended); + if (!state.destroyed && !state.errored && (state.length || state.ended)) { + stream.emit('readable'); + state.emittedReadable = false; + } + + // The stream needs another readable event if: + // 1. It is not flowing, as the flow mechanism will take + // care of it. + // 2. It is not ended. + // 3. It is below the highWaterMark, so we can schedule + // another readable later. + state.needReadable = + !state.flowing && + !state.ended && + state.length <= state.highWaterMark; + flow(stream); +} + + +// At this point, the user has presumably seen the 'readable' event, +// and called read() to consume some data. that may have triggered +// in turn another _read(n) call, in which case reading = true if +// it's in progress. +// However, if we're not ended, or reading, and the length < hwm, +// then go ahead and try to read some more preemptively. +function maybeReadMore(stream, state) { + if (!state.readingMore && state.constructed) { + state.readingMore = true; + process.nextTick(maybeReadMore_, stream, state); + } +} + +function maybeReadMore_(stream, state) { + // Attempt to read more data if we should. + // + // The conditions for reading more data are (one of): + // - Not enough data buffered (state.length < state.highWaterMark). The loop + // is responsible for filling the buffer with enough data if such data + // is available. If highWaterMark is 0 and we are not in the flowing mode + // we should _not_ attempt to buffer any extra data. We'll get more data + // when the stream consumer calls read() instead. + // - No data in the buffer, and the stream is in flowing mode. In this mode + // the loop below is responsible for ensuring read() is called. Failing to + // call read here would abort the flow and there's no other mechanism for + // continuing the flow if the stream consumer has just subscribed to the + // 'data' event. + // + // In addition to the above conditions to keep reading data, the following + // conditions prevent the data from being read: + // - The stream has ended (state.ended). + // - There is already a pending 'read' operation (state.reading). This is a + // case where the the stream has called the implementation defined _read() + // method, but they are processing the call asynchronously and have _not_ + // called push() with new data. In this case we skip performing more + // read()s. The execution ends in this method again after the _read() ends + // up calling push() with more data. + while (!state.reading && !state.ended && + (state.length < state.highWaterMark || + (state.flowing && state.length === 0))) { + const len = state.length; + debug('maybeReadMore read 0'); + stream.read(0); + if (len === state.length) + // Didn't get any data, stop spinning. + break; + } + state.readingMore = false; +} + +// Abstract method. to be overridden in specific implementation classes. +// call cb(er, data) where data is <= n in length. +// for virtual (non-string, non-buffer) streams, "length" is somewhat +// arbitrary, and perhaps not very meaningful. +Readable.prototype._read = function(n) { + throw new ERR_METHOD_NOT_IMPLEMENTED('_read()'); +}; + +Readable.prototype.pipe = function(dest, pipeOpts) { + const src = this; + const state = this._readableState; + + if (state.pipes.length === 1) { + if (!state.multiAwaitDrain) { + state.multiAwaitDrain = true; + state.awaitDrainWriters = new Set( + state.awaitDrainWriters ? [state.awaitDrainWriters] : [] + ); + } + } + + state.pipes.push(dest); + debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); + + const doEnd = (!pipeOpts || pipeOpts.end !== false) && + dest !== process.stdout && + dest !== process.stderr; + + const endFn = doEnd ? onend : unpipe; + if (state.endEmitted) + process.nextTick(endFn); + else + src.once('end', endFn); + + dest.on('unpipe', onunpipe); + function onunpipe(readable, unpipeInfo) { + debug('onunpipe'); + if (readable === src) { + if (unpipeInfo && unpipeInfo.hasUnpiped === false) { + unpipeInfo.hasUnpiped = true; + cleanup(); + } + } + } + + function onend() { + debug('onend'); + dest.end(); + } + + let ondrain; + + let cleanedUp = false; + function cleanup() { + debug('cleanup'); + // Cleanup event handlers once the pipe is broken. + dest.removeListener('close', onclose); + dest.removeListener('finish', onfinish); + if (ondrain) { + dest.removeListener('drain', ondrain); + } + dest.removeListener('error', onerror); + dest.removeListener('unpipe', onunpipe); + src.removeListener('end', onend); + src.removeListener('end', unpipe); + src.removeListener('data', ondata); + + cleanedUp = true; + + // If the reader is waiting for a drain event from this + // specific writer, then it would cause it to never start + // flowing again. + // So, if this is awaiting a drain, then we just call it now. + // If we don't know, then assume that we are waiting for one. + if (ondrain && state.awaitDrainWriters && + (!dest._writableState || dest._writableState.needDrain)) + ondrain(); + } + + src.on('data', ondata); + function ondata(chunk) { + debug('ondata'); + const ret = dest.write(chunk); + debug('dest.write', ret); + if (ret === false) { + // If the user unpiped during `dest.write()`, it is possible + // to get stuck in a permanently paused state if that write + // also returned false. + // => Check whether `dest` is still a piping destination. + if (!cleanedUp) { + if (state.pipes.length === 1 && state.pipes[0] === dest) { + debug('false write response, pause', 0); + state.awaitDrainWriters = dest; + state.multiAwaitDrain = false; + } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { + debug('false write response, pause', state.awaitDrainWriters.size); + state.awaitDrainWriters.add(dest); + } + src.pause(); + } + if (!ondrain) { + // When the dest drains, it reduces the awaitDrain counter + // on the source. This would be more elegant with a .once() + // handler in flow(), but adding and removing repeatedly is + // too slow. + ondrain = pipeOnDrain(src, dest); + dest.on('drain', ondrain); + } + } + } + + // If the dest has an error, then stop piping into it. + // However, don't suppress the throwing behavior for this. + function onerror(er) { + debug('onerror', er); + unpipe(); + dest.removeListener('error', onerror); + if (EE.listenerCount(dest, 'error') === 0) { + const s = dest._writableState || dest._readableState; + if (s && !s.errorEmitted) { + // User incorrectly emitted 'error' directly on the stream. + errorOrDestroy(dest, er); + } else { + dest.emit('error', er); + } + } + } + + // Make sure our error handler is attached before userland ones. + prependListener(dest, 'error', onerror); + + // Both close and finish should trigger unpipe, but only once. + function onclose() { + dest.removeListener('finish', onfinish); + unpipe(); + } + dest.once('close', onclose); + function onfinish() { + debug('onfinish'); + dest.removeListener('close', onclose); + unpipe(); + } + dest.once('finish', onfinish); + + function unpipe() { + debug('unpipe'); + src.unpipe(dest); + } + + // Tell the dest that it's being piped to. + dest.emit('pipe', src); + + // Start the flow if it hasn't been started already. + if (!state.flowing) { + debug('pipe resume'); + src.resume(); + } + + return dest; +}; + +function pipeOnDrain(src, dest) { + return function pipeOnDrainFunctionResult() { + const state = src._readableState; + + // `ondrain` will call directly, + // `this` maybe not a reference to dest, + // so we use the real dest here. + if (state.awaitDrainWriters === dest) { + debug('pipeOnDrain', 1); + state.awaitDrainWriters = null; + } else if (state.multiAwaitDrain) { + debug('pipeOnDrain', state.awaitDrainWriters.size); + state.awaitDrainWriters.delete(dest); + } + + if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && + EE.listenerCount(src, 'data')) { + state.flowing = true; + flow(src); + } + }; +} + + +Readable.prototype.unpipe = function(dest) { + const state = this._readableState; + const unpipeInfo = { hasUnpiped: false }; + + // If we're not piping anywhere, then do nothing. + if (state.pipes.length === 0) + return this; + + if (!dest) { + // remove all. + const dests = state.pipes; + state.pipes = []; + this.pause(); + + for (const dest of dests) + dest.emit('unpipe', this, { hasUnpiped: false }); + return this; + } + + // Try to find the right one. + const index = state.pipes.indexOf(dest); + if (index === -1) + return this; + + state.pipes.splice(index, 1); + if (state.pipes.length === 0) + this.pause(); + + dest.emit('unpipe', this, unpipeInfo); + + return this; +}; + +// Set up data events if they are asked for +// Ensure readable listeners eventually get something. +Readable.prototype.on = function(ev, fn) { + const res = Stream.prototype.on.call(this, ev, fn); + const state = this._readableState; + + if (ev === 'data') { + // Update readableListening so that resume() may be a no-op + // a few lines down. This is needed to support once('readable'). + state.readableListening = this.listenerCount('readable') > 0; + + // Try start flowing on next tick if stream isn't explicitly paused. + if (state.flowing !== false) + this.resume(); + } else if (ev === 'readable') { + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; + state.flowing = false; + state.emittedReadable = false; + debug('on readable', state.length, state.reading); + if (state.length) { + emitReadable(this); + } else if (!state.reading) { + process.nextTick(nReadingNextTick, this); + } + } + } + + return res; +}; +Readable.prototype.addListener = Readable.prototype.on; + +Readable.prototype.removeListener = function(ev, fn) { + const res = Stream.prototype.removeListener.call(this, ev, fn); + + if (ev === 'readable') { + // We need to check if there is someone still listening to + // readable and reset the state. However this needs to happen + // after readable has been emitted but before I/O (nextTick) to + // support once('readable', fn) cycles. This means that calling + // resume within the same tick will have no + // effect. + process.nextTick(updateReadableListening, this); + } + + return res; +}; +Readable.prototype.off = Readable.prototype.removeListener; + +Readable.prototype.removeAllListeners = function(ev) { + const res = Stream.prototype.removeAllListeners.apply(this, arguments); + + if (ev === 'readable' || ev === undefined) { + // We need to check if there is someone still listening to + // readable and reset the state. However this needs to happen + // after readable has been emitted but before I/O (nextTick) to + // support once('readable', fn) cycles. This means that calling + // resume within the same tick will have no + // effect. + process.nextTick(updateReadableListening, this); + } + + return res; +}; + +function updateReadableListening(self) { + const state = self._readableState; + state.readableListening = self.listenerCount('readable') > 0; + + if (state.resumeScheduled && state[kPaused] === false) { + // Flowing needs to be set to true now, otherwise + // the upcoming resume will not flow. + state.flowing = true; + + // Crude way to check if we should resume. + } else if (self.listenerCount('data') > 0) { + self.resume(); + } else if (!state.readableListening) { + state.flowing = null; + } +} + +function nReadingNextTick(self) { + debug('readable nexttick read 0'); + self.read(0); +} + +// pause() and resume() are remnants of the legacy readable stream API +// If the user uses them, then switch into old mode. +Readable.prototype.resume = function() { + const state = this._readableState; + if (!state.flowing) { + debug('resume'); + // We flow only if there is no one listening + // for readable, but we still have to call + // resume(). + state.flowing = !state.readableListening; + resume(this, state); + } + state[kPaused] = false; + return this; +}; + +function resume(stream, state) { + if (!state.resumeScheduled) { + state.resumeScheduled = true; + process.nextTick(resume_, stream, state); + } +} + +function resume_(stream, state) { + debug('resume', state.reading); + if (!state.reading) { + stream.read(0); + } + + state.resumeScheduled = false; + stream.emit('resume'); + flow(stream); + if (state.flowing && !state.reading) + stream.read(0); +} + +Readable.prototype.pause = function() { + debug('call pause flowing=%j', this._readableState.flowing); + if (this._readableState.flowing !== false) { + debug('pause'); + this._readableState.flowing = false; + this.emit('pause'); + } + this._readableState[kPaused] = true; + return this; +}; + +function flow(stream) { + const state = stream._readableState; + debug('flow', state.flowing); + while (state.flowing && stream.read() !== null); +} + +// Wrap an old-style stream as the async data source. +// This is *not* part of the readable stream interface. +// It is an ugly unfortunate mess of history. +Readable.prototype.wrap = function(stream) { + let paused = false; + + // TODO (ronag): Should this.destroy(err) emit + // 'error' on the wrapped stream? Would require + // a static factory method, e.g. Readable.wrap(stream). + + stream.on('data', (chunk) => { + if (!this.push(chunk) && stream.pause) { + paused = true; + stream.pause(); + } + }); + + stream.on('end', () => { + this.push(null); + }); + + stream.on('error', (err) => { + errorOrDestroy(this, err); + }); + + stream.on('close', () => { + this.destroy(); + }); + + stream.on('destroy', () => { + this.destroy(); + }); + + this._read = () => { + if (paused && stream.resume) { + paused = false; + stream.resume(); + } + }; + + // Proxy all the other methods. Important when wrapping filters and duplexes. + for (const i of ObjectKeys(stream)) { + if (this[i] === undefined && typeof stream[i] === 'function') { + this[i] = stream[i].bind(stream); + } + } + + return this; +}; + +Readable.prototype[SymbolAsyncIterator] = function() { + let stream = this; + + if (typeof stream.read !== 'function') { + // v1 stream + const src = stream; + stream = new Readable({ + objectMode: true, + destroy(err, callback) { + destroyImpl.destroyer(src, err); + callback(); + } + }).wrap(src); + } + + const iter = createAsyncIterator(stream); + iter.stream = stream; + return iter; +}; + +async function* createAsyncIterator(stream) { + let callback = nop; + + function next(resolve) { + if (this === stream) { + callback(); + callback = nop; + } else { + callback = resolve; + } + } + + stream + .on('readable', next) + .on('error', next) + .on('end', next) + .on('close', next); + + try { + const state = stream._readableState; + while (true) { + const chunk = stream.read(); + if (chunk !== null) { + yield chunk; + } else if (state.errored) { + throw state.errored; + } else if (state.ended) { + break; + } else if (state.closed) { + // TODO(ronag): ERR_PREMATURE_CLOSE? + break; + } else { + await new Promise(next); + } + } + } catch (err) { + destroyImpl.destroyer(stream, err); + throw err; + } finally { + destroyImpl.destroyer(stream, null); + } +} + +// Making it explicit these properties are not enumerable +// because otherwise some prototype manipulation in +// userland will fail. +ObjectDefineProperties(Readable.prototype, { + readable: { + get() { + const r = this._readableState; + // r.readable === false means that this is part of a Duplex stream + // where the readable side was disabled upon construction. + // Compat. The user might manually disable readable side through + // deprecated setter. + return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && + !r.endEmitted; + }, + set(val) { + // Backwards compat. + if (this._readableState) { + this._readableState.readable = !!val; + } + } + }, + + readableHighWaterMark: { + enumerable: false, + get: function() { + return this._readableState.highWaterMark; + } + }, + + readableBuffer: { + enumerable: false, + get: function() { + return this._readableState && this._readableState.buffer; + } + }, + + readableFlowing: { + enumerable: false, + get: function() { + return this._readableState.flowing; + }, + set: function(state) { + if (this._readableState) { + this._readableState.flowing = state; + } + } + }, + + readableLength: { + enumerable: false, + get() { + return this._readableState.length; + } + }, + + readableObjectMode: { + enumerable: false, + get() { + return this._readableState ? this._readableState.objectMode : false; + } + }, + + readableEncoding: { + enumerable: false, + get() { + return this._readableState ? this._readableState.encoding : null; + } + }, + + destroyed: { + enumerable: false, + get() { + if (this._readableState === undefined) { + return false; + } + return this._readableState.destroyed; + }, + set(value) { + // We ignore the value if the stream + // has not been initialized yet. + if (!this._readableState) { + return; + } + + // Backward compatibility, the user is explicitly + // managing destroyed. + this._readableState.destroyed = value; + } + }, + + readableEnded: { + enumerable: false, + get() { + return this._readableState ? this._readableState.endEmitted : false; + } + }, + +}); + +ObjectDefineProperties(ReadableState.prototype, { + // Legacy getter for `pipesCount`. + pipesCount: { + get() { + return this.pipes.length; + } + }, + + // Legacy property for `paused`. + paused: { + get() { + return this[kPaused] !== false; + }, + set(value) { + this[kPaused] = !!value; + } + } +}); + +// Exposed for testing purposes only. +Readable._fromList = fromList; + +// Pluck off n bytes from an array of buffers. +// Length is the combined lengths of all the buffers in the list. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function fromList(n, state) { + // nothing buffered. + if (state.length === 0) + return null; + + let ret; + if (state.objectMode) + ret = state.buffer.shift(); + else if (!n || n >= state.length) { + // Read it all, truncate the list. + if (state.decoder) + ret = state.buffer.join(''); + else if (state.buffer.length === 1) + ret = state.buffer.first(); + else + ret = state.buffer.concat(state.length); + state.buffer.clear(); + } else { + // read part of list. + ret = state.buffer.consume(n, state.decoder); + } + + return ret; +} + +function endReadable(stream) { + const state = stream._readableState; + + debug('endReadable', state.endEmitted); + if (!state.endEmitted) { + state.ended = true; + process.nextTick(endReadableNT, state, stream); + } +} + +function endReadableNT(state, stream) { + debug('endReadableNT', state.endEmitted, state.length); + + // Check that we didn't get one last unshift. + if (!state.errorEmitted && !state.closeEmitted && + !state.endEmitted && state.length === 0) { + state.endEmitted = true; + stream.emit('end'); + + if (stream.writable && stream.allowHalfOpen === false) { + process.nextTick(endWritableNT, state, stream); + } else if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well. + const wState = stream._writableState; + const autoDestroy = !wState || ( + wState.autoDestroy && + // We don't expect the writable to ever 'finish' + // if writable is explicitly set to false. + (wState.finished || wState.writable === false) + ); + + if (autoDestroy) { + stream.destroy(); + } + } + } +} + +function endWritableNT(state, stream) { + const writable = stream.writable && !stream.writableEnded && + !stream.destroyed; + if (writable) { + stream.end(); + } +} + +Readable.from = function(iterable, opts) { + if (from === undefined) { + from = require('internal/streams/from'); + } + return from(Readable, iterable, opts); +}; diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js new file mode 100644 index 00000000000000..26e0b07c2956c8 --- /dev/null +++ b/lib/internal/streams/transform.js @@ -0,0 +1,246 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// a transform stream is a readable/writable stream where you do +// something with the data. Sometimes it's called a "filter", +// but that's not a great name for it, since that implies a thing where +// some bits pass through, and others are simply ignored. (That would +// be a valid example of a transform, of course.) +// +// While the output is causally related to the input, it's not a +// necessarily symmetric or synchronous transformation. For example, +// a zlib stream might take multiple plain-text writes(), and then +// emit a single compressed chunk some time in the future. +// +// Here's how this works: +// +// The Transform stream has all the aspects of the readable and writable +// stream classes. When you write(chunk), that calls _write(chunk,cb) +// internally, and returns false if there's a lot of pending writes +// buffered up. When you call read(), that calls _read(n) until +// there's enough pending readable data buffered up. +// +// In a transform stream, the written data is placed in a buffer. When +// _read(n) is called, it transforms the queued up data, calling the +// buffered _write cb's as it consumes chunks. If consuming a single +// written chunk would result in multiple output chunks, then the first +// outputted bit calls the readcb, and subsequent chunks just go into +// the read buffer, and will cause it to emit 'readable' if necessary. +// +// This way, back-pressure is actually determined by the reading side, +// since _read has to be called to start processing a new chunk. However, +// a pathological inflate type of transform can cause excessive buffering +// here. For example, imagine a stream where every byte of input is +// interpreted as an integer from 0-255, and then results in that many +// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in +// 1kb of data being output. In this case, you could write a very small +// amount of input, and end up with a very large amount of output. In +// such a pathological inflating mechanism, there'd be no way to tell +// the system to stop doing the transform. A single 4MB write could +// cause the system to run out of memory. +// +// However, even in such a pathological case, only a single written chunk +// would be consumed, and then the rest would wait (un-transformed) until +// the results of the previous transformed chunk were consumed. + +'use strict'; + +const { + ObjectSetPrototypeOf, + Symbol +} = primordials; + +module.exports = Transform; +const { + ERR_METHOD_NOT_IMPLEMENTED +} = require('internal/errors').codes; +const Duplex = require('internal/streams/duplex'); +ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); +ObjectSetPrototypeOf(Transform, Duplex); + +const kCallback = Symbol('kCallback'); + +function Transform(options) { + if (!(this instanceof Transform)) + return new Transform(options); + + Duplex.call(this, options); + + // We have implemented the _read method, and done the other things + // that Readable wants before the first _read call, so unset the + // sync guard flag. + this._readableState.sync = false; + + this[kCallback] = null; + + if (options) { + if (typeof options.transform === 'function') + this._transform = options.transform; + + if (typeof options.flush === 'function') + this._flush = options.flush; + } + + // When the writable side finishes, then flush out anything remaining. + // Backwards compat. Some Transform streams incorrectly implement _final + // instead of or in addition to _flush. By using 'prefinish' instead of + // implementing _final we continue supporting this unfortunate use case. + this.on('prefinish', prefinish); +} + +function final(cb) { + let called = false; + if (typeof this._flush === 'function' && !this.destroyed) { + const result = this._flush((er, data) => { + called = true; + if (er) { + if (cb) { + cb(er); + } else { + this.destroy(er); + } + return; + } + + if (data != null) { + this.push(data); + } + this.push(null); + if (cb) { + cb(); + } + }); + if (result !== undefined && result !== null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + (data) => { + if (called) + return; + if (data != null) + this.push(data); + this.push(null); + if (cb) + process.nextTick(cb); + }, + (err) => { + if (cb) { + process.nextTick(cb, err); + } else { + process.nextTick(() => this.destroy(err)); + } + }); + } + } catch (err) { + process.nextTick(() => this.destroy(err)); + } + } + } else { + this.push(null); + if (cb) { + cb(); + } + } +} + +function prefinish() { + if (this._final !== final) { + final.call(this); + } +} + +Transform.prototype._final = final; + +Transform.prototype._transform = function(chunk, encoding, callback) { + throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); +}; + +Transform.prototype._write = function(chunk, encoding, callback) { + const rState = this._readableState; + const wState = this._writableState; + const length = rState.length; + + let called = false; + const result = this._transform(chunk, encoding, (err, val) => { + called = true; + if (err) { + callback(err); + return; + } + + if (val != null) { + this.push(val); + } + + if ( + wState.ended || // Backwards compat. + length === rState.length || // Backwards compat. + rState.length < rState.highWaterMark || + rState.length === 0 + ) { + callback(); + } else { + this[kCallback] = callback; + } + }); + if (result !== undefined && result != null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + (val) => { + if (called) + return; + + if (val != null) { + this.push(val); + } + + if ( + wState.ended || + length === rState.length || + rState.length < rState.highWaterMark || + rState.length === 0) { + process.nextTick(callback); + } else { + this[kCallback] = callback; + } + }, + (err) => { + process.nextTick(callback, err); + }); + } + } catch (err) { + process.nextTick(callback, err); + } + } +}; + +Transform.prototype._read = function() { + if (this[kCallback]) { + const callback = this[kCallback]; + this[kCallback] = null; + callback(); + } +}; diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js new file mode 100644 index 00000000000000..e3d13dc1bfc675 --- /dev/null +++ b/lib/internal/streams/writable.js @@ -0,0 +1,851 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// A bit simpler than readable streams. +// Implement an async ._write(chunk, encoding, cb), and it'll handle all +// the drain event emission and buffering. + +'use strict'; + +const { + FunctionPrototype, + ObjectDefineProperty, + ObjectDefineProperties, + ObjectSetPrototypeOf, + Symbol, + SymbolHasInstance, +} = primordials; + +module.exports = Writable; +Writable.WritableState = WritableState; + +const EE = require('events'); +const Stream = require('internal/streams/legacy'); +const { Buffer } = require('buffer'); +const destroyImpl = require('internal/streams/destroy'); +const { + getHighWaterMark, + getDefaultHighWaterMark +} = require('internal/streams/state'); +const assert = require('internal/assert'); +const { + ERR_INVALID_ARG_TYPE, + ERR_METHOD_NOT_IMPLEMENTED, + ERR_MULTIPLE_CALLBACK, + ERR_STREAM_CANNOT_PIPE, + ERR_STREAM_DESTROYED, + ERR_STREAM_ALREADY_FINISHED, + ERR_STREAM_NULL_VALUES, + ERR_STREAM_WRITE_AFTER_END, + ERR_UNKNOWN_ENCODING +} = require('internal/errors').codes; + +const { errorOrDestroy } = destroyImpl; + +ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); +ObjectSetPrototypeOf(Writable, Stream); + +function nop() {} + +const kOnFinished = Symbol('kOnFinished'); + +function WritableState(options, stream, isDuplex) { + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream, + // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. + if (typeof isDuplex !== 'boolean') + isDuplex = stream instanceof Stream.Duplex; + + // Object stream flag to indicate whether or not this stream + // contains buffers or objects. + this.objectMode = !!(options && options.objectMode); + + if (isDuplex) + this.objectMode = this.objectMode || + !!(options && options.writableObjectMode); + + // The point at which write() starts returning false + // Note: 0 is a valid value, means that we always return false if + // the entire buffer is not flushed immediately on write(). + this.highWaterMark = options ? + getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : + getDefaultHighWaterMark(false); + + // if _final has been called. + this.finalCalled = false; + + // drain event flag. + this.needDrain = false; + // At the start of calling end() + this.ending = false; + // When end() has been called, and returned. + this.ended = false; + // When 'finish' is emitted. + this.finished = false; + + // Has it been destroyed + this.destroyed = false; + + // Should we decode strings into buffers before passing to _write? + // this is here so that some node-core streams can optimize string + // handling at a lower level. + const noDecode = !!(options && options.decodeStrings === false); + this.decodeStrings = !noDecode; + + // Crypto is kind of old and crusty. Historically, its default string + // encoding is 'binary' so we have to make this configurable. + // Everything else in the universe uses 'utf8', though. + this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; + + // Not an actual buffer we keep track of, but a measurement + // of how much we're waiting to get pushed to some underlying + // socket or file. + this.length = 0; + + // A flag to see when we're in the middle of a write. + this.writing = false; + + // When true all writes will be buffered until .uncork() call. + this.corked = 0; + + // A flag to be able to tell if the onwrite cb is called immediately, + // or on a later tick. We set this to true at first, because any + // actions that shouldn't happen until "later" should generally also + // not happen before the first write call. + this.sync = true; + + // A flag to know if we're processing previously buffered items, which + // may call the _write() callback in the same tick, so that we don't + // end up in an overlapped onwrite situation. + this.bufferProcessing = false; + + // The callback that's passed to _write(chunk, cb). + this.onwrite = onwrite.bind(undefined, stream); + + // The callback that the user supplies to write(chunk, encoding, cb). + this.writecb = null; + + // The amount that is being written when _write is called. + this.writelen = 0; + + // Storage for data passed to the afterWrite() callback in case of + // synchronous _write() completion. + this.afterWriteTickInfo = null; + + resetBuffer(this); + + // Number of pending user-supplied write callbacks + // this must be 0 before 'finish' can be emitted. + this.pendingcb = 0; + + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + this.constructed = true; + + // Emit prefinish if the only thing we're waiting for is _write cbs + // This is relevant for synchronous Transform streams. + this.prefinished = false; + + // True if the error was already emitted and should not be thrown again. + this.errorEmitted = false; + + // Should close be emitted on destroy. Defaults to true. + this.emitClose = !options || options.emitClose !== false; + + // Should .destroy() be called after 'finish' (and potentially 'end'). + this.autoDestroy = !options || options.autoDestroy !== false; + + // Indicates whether the stream has errored. When true all write() calls + // should return false. This is needed since when autoDestroy + // is disabled we need a way to tell whether the stream has failed. + this.errored = null; + + // Indicates whether the stream has finished destroying. + this.closed = false; + + // True if close has been emitted or would have been emitted + // depending on emitClose. + this.closeEmitted = false; + + this[kOnFinished] = []; +} + +function resetBuffer(state) { + state.buffered = []; + state.bufferedIndex = 0; + state.allBuffers = true; + state.allNoop = true; +} + +WritableState.prototype.getBuffer = function getBuffer() { + return this.buffered.slice(this.bufferedIndex); +}; + +ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { + get() { + return this.buffered.length - this.bufferedIndex; + } +}); + +// Test _writableState for inheritance to account for Duplex streams, +// whose prototype chain only points to Readable. +let realHasInstance; +if (typeof Symbol === 'function' && SymbolHasInstance) { + realHasInstance = FunctionPrototype[SymbolHasInstance]; + ObjectDefineProperty(Writable, SymbolHasInstance, { + value: function(object) { + if (realHasInstance.call(this, object)) + return true; + if (this !== Writable) + return false; + + return object && object._writableState instanceof WritableState; + } + }); +} else { + realHasInstance = function(object) { + return object instanceof this; + }; +} + +function Writable(options) { + // Writable ctor is applied to Duplexes, too. + // `realHasInstance` is necessary because using plain `instanceof` + // would return false, as no `_writableState` property is attached. + + // Trying to use the custom `instanceof` for Writable here will also break the + // Node.js LazyTransform implementation, which has a non-trivial getter for + // `_writableState` that would lead to infinite recursion. + + // Checking for a Stream.Duplex instance is faster here instead of inside + // the WritableState constructor, at least with V8 6.5. + const isDuplex = (this instanceof Stream.Duplex); + + if (!isDuplex && !realHasInstance.call(Writable, this)) + return new Writable(options); + + this._writableState = new WritableState(options, this, isDuplex); + + if (options) { + if (typeof options.write === 'function') + this._write = options.write; + + if (typeof options.writev === 'function') + this._writev = options.writev; + + if (typeof options.destroy === 'function') + this._destroy = options.destroy; + + if (typeof options.final === 'function') + this._final = options.final; + + if (typeof options.construct === 'function') + this._construct = options.construct; + } + + Stream.call(this, options); + + destroyImpl.construct(this, () => { + const state = this._writableState; + + if (!state.writing) { + clearBuffer(this, state); + } + + finishMaybe(this, state); + }); +} + +// Otherwise people can pipe Writable streams, which is just wrong. +Writable.prototype.pipe = function() { + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); +}; + +Writable.prototype.write = function(chunk, encoding, cb) { + const state = this._writableState; + + if (typeof encoding === 'function') { + cb = encoding; + encoding = state.defaultEncoding; + } else { + if (!encoding) + encoding = state.defaultEncoding; + else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) + throw new ERR_UNKNOWN_ENCODING(encoding); + if (typeof cb !== 'function') + cb = nop; + } + + if (chunk === null) { + throw new ERR_STREAM_NULL_VALUES(); + } else if (!state.objectMode) { + if (typeof chunk === 'string') { + if (state.decodeStrings !== false) { + chunk = Buffer.from(chunk, encoding); + encoding = 'buffer'; + } + } else if (chunk instanceof Buffer) { + encoding = 'buffer'; + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + encoding = 'buffer'; + } else { + throw new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + } + } + + let err; + if (state.ending) { + err = new ERR_STREAM_WRITE_AFTER_END(); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED('write'); + } + + if (err) { + process.nextTick(cb, err); + errorOrDestroy(this, err, true); + return false; + } + state.pendingcb++; + return writeOrBuffer(this, state, chunk, encoding, cb); +}; + +Writable.prototype.cork = function() { + this._writableState.corked++; +}; + +Writable.prototype.uncork = function() { + const state = this._writableState; + + if (state.corked) { + state.corked--; + + if (!state.writing) + clearBuffer(this, state); + } +}; + +Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { + // node::ParseEncoding() requires lower case. + if (typeof encoding === 'string') + encoding = encoding.toLowerCase(); + if (!Buffer.isEncoding(encoding)) + throw new ERR_UNKNOWN_ENCODING(encoding); + this._writableState.defaultEncoding = encoding; + return this; +}; + +// If we're already writing something, then just put this +// in the queue, and wait our turn. Otherwise, call _write +// If we return false, then we need a drain event, so set that flag. +function writeOrBuffer(stream, state, chunk, encoding, callback) { + const len = state.objectMode ? 1 : chunk.length; + + state.length += len; + + if (state.writing || state.corked || state.errored || !state.constructed) { + state.buffered.push({ chunk, encoding, callback }); + if (state.allBuffers && encoding !== 'buffer') { + state.allBuffers = false; + } + if (state.allNoop && callback !== nop) { + state.allNoop = false; + } + } else { + state.writelen = len; + state.writecb = callback; + state.writing = true; + state.sync = true; + stream._write(chunk, encoding, state.onwrite); + state.sync = false; + } + + const ret = state.length < state.highWaterMark; + + // We must ensure that previous needDrain will not be reset to false. + if (!ret) + state.needDrain = true; + + // Return false if errored or destroyed in order to break + // any synchronous while(stream.write(data)) loops. + return ret && !state.errored && !state.destroyed; +} + +function doWrite(stream, state, writev, len, chunk, encoding, cb) { + state.writelen = len; + state.writecb = cb; + state.writing = true; + state.sync = true; + if (state.destroyed) + state.onwrite(new ERR_STREAM_DESTROYED('write')); + else if (writev) + stream._writev(chunk, state.onwrite); + else + stream._write(chunk, encoding, state.onwrite); + state.sync = false; +} + +function onwriteError(stream, state, er, cb) { + --state.pendingcb; + + cb(er); + // Ensure callbacks are invoked even when autoDestroy is + // not enabled. Passing `er` here doesn't make sense since + // it's related to one specific write, not to the buffered + // writes. + errorBuffer(state); + // This can emit error, but error must always follow cb. + errorOrDestroy(stream, er); +} + +function onwrite(stream, er) { + const state = stream._writableState; + const sync = state.sync; + const cb = state.writecb; + + if (typeof cb !== 'function') { + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + return; + } + + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; + + if (er) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + // In case of duplex streams we need to notify the readable side of the + // error. + if (stream._readableState && !stream._readableState.errored) { + stream._readableState.errored = er; + } + + if (sync) { + process.nextTick(onwriteError, stream, state, er, cb); + } else { + onwriteError(stream, state, er, cb); + } + } else { + if (state.buffered.length > state.bufferedIndex) { + clearBuffer(stream, state); + } + + if (sync) { + // It is a common case that the callback passed to .write() is always + // the same. In that case, we do not schedule a new nextTick(), but + // rather just increase a counter, to improve performance and avoid + // memory allocations. + if (state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { count: 1, cb, stream, state }; + process.nextTick(afterWriteTick, state.afterWriteTickInfo); + } + } else { + afterWrite(stream, state, 1, cb); + } + } +} + +function afterWriteTick({ stream, state, count, cb }) { + state.afterWriteTickInfo = null; + return afterWrite(stream, state, count, cb); +} + +function afterWrite(stream, state, count, cb) { + const needDrain = !state.ending && !stream.destroyed && state.length === 0 && + state.needDrain; + if (needDrain) { + state.needDrain = false; + stream.emit('drain'); + } + + while (count-- > 0) { + state.pendingcb--; + cb(); + } + + if (state.destroyed) { + errorBuffer(state); + } + + finishMaybe(stream, state); +} + +// If there's something in the buffer waiting, then invoke callbacks. +function errorBuffer(state) { + if (state.writing) { + return; + } + + for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { + const { chunk, callback } = state.buffered[n]; + const len = state.objectMode ? 1 : chunk.length; + state.length -= len; + callback(new ERR_STREAM_DESTROYED('write')); + } + + for (const callback of state[kOnFinished].splice(0)) { + callback(new ERR_STREAM_DESTROYED('end')); + } + + resetBuffer(state); +} + +// If there's something in the buffer waiting, then process it. +function clearBuffer(stream, state) { + if (state.corked || + state.bufferProcessing || + state.destroyed || + !state.constructed) { + return; + } + + const { buffered, bufferedIndex, objectMode } = state; + const bufferedLength = buffered.length - bufferedIndex; + + if (!bufferedLength) { + return; + } + + let i = bufferedIndex; + + state.bufferProcessing = true; + if (bufferedLength > 1 && stream._writev) { + state.pendingcb -= bufferedLength - 1; + + const callback = state.allNoop ? nop : (err) => { + for (let n = i; n < buffered.length; ++n) { + buffered[n].callback(err); + } + }; + // Make a copy of `buffered` if it's going to be used by `callback` above, + // since `doWrite` will mutate the array. + const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); + chunks.allBuffers = state.allBuffers; + + doWrite(stream, state, true, state.length, chunks, '', callback); + + resetBuffer(state); + } else { + do { + const { chunk, encoding, callback } = buffered[i]; + buffered[i++] = null; + const len = objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); + } while (i < buffered.length && !state.writing); + + if (i === buffered.length) { + resetBuffer(state); + } else if (i > 256) { + buffered.splice(0, i); + state.bufferedIndex = 0; + } else { + state.bufferedIndex = i; + } + } + state.bufferProcessing = false; +} + +Writable.prototype._write = function(chunk, encoding, cb) { + if (this._writev) { + this._writev([{ chunk, encoding }], cb); + } else { + throw new ERR_METHOD_NOT_IMPLEMENTED('_write()'); + } +}; + +Writable.prototype._writev = null; + +Writable.prototype.end = function(chunk, encoding, cb) { + const state = this._writableState; + + if (typeof chunk === 'function') { + cb = chunk; + chunk = null; + encoding = null; + } else if (typeof encoding === 'function') { + cb = encoding; + encoding = null; + } + + if (chunk !== null && chunk !== undefined) + this.write(chunk, encoding); + + // .end() fully uncorks. + if (state.corked) { + state.corked = 1; + this.uncork(); + } + + // This is forgiving in terms of unnecessary calls to end() and can hide + // logic errors. However, usually such errors are harmless and causing a + // hard error can be disproportionately destructive. It is not always + // trivial for the user to determine whether end() needs to be called or not. + let err; + if (!state.errored && !state.ending) { + state.ending = true; + finishMaybe(this, state, true); + state.ended = true; + } else if (state.finished) { + err = new ERR_STREAM_ALREADY_FINISHED('end'); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED('end'); + } + + if (typeof cb === 'function') { + if (err || state.finished) { + process.nextTick(cb, err); + } else { + state[kOnFinished].push(cb); + } + } + + return this; +}; + +function needFinish(state) { + return (state.ending && + state.constructed && + state.length === 0 && + !state.errored && + state.buffered.length === 0 && + !state.finished && + !state.writing); +} + +function callFinal(stream, state) { + state.sync = true; + state.pendingcb++; + const result = stream._final((err) => { + state.pendingcb--; + if (err) { + for (const callback of state[kOnFinished].splice(0)) { + callback(err); + } + errorOrDestroy(stream, err, state.sync); + } else if (needFinish(state)) { + state.prefinished = true; + stream.emit('prefinish'); + // Backwards compat. Don't check state.sync here. + // Some streams assume 'finish' will be emitted + // asynchronously relative to _final callback. + state.pendingcb++; + process.nextTick(finish, stream, state); + } + }); + if (result !== undefined && result !== null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + function() { + if (state.prefinished) + return; + state.prefinish = true; + process.nextTick(() => stream.emit('prefinish')); + state.pendingcb++; + process.nextTick(finish, stream, state); + }, + function(err) { + for (const callback of state[kOnFinished].splice(0)) { + process.nextTick(callback, err); + } + process.nextTick(errorOrDestroy, stream, err, state.sync); + }); + } + } catch (err) { + process.nextTick(errorOrDestroy, stream, err, state.sync); + } + } + state.sync = false; +} + +function prefinish(stream, state) { + if (!state.prefinished && !state.finalCalled) { + if (typeof stream._final === 'function' && !state.destroyed) { + state.finalCalled = true; + callFinal(stream, state); + } else { + state.prefinished = true; + stream.emit('prefinish'); + } + } +} + +function finishMaybe(stream, state, sync) { + if (needFinish(state)) { + prefinish(stream, state); + if (state.pendingcb === 0 && needFinish(state)) { + state.pendingcb++; + if (sync) { + process.nextTick(finish, stream, state); + } else { + finish(stream, state); + } + } + } +} + +function finish(stream, state) { + state.pendingcb--; + // TODO (ronag): Unify with needFinish. + if (state.errorEmitted || state.closeEmitted) + return; + + state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + + stream.emit('finish'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the readable side is ready for autoDestroy as well. + const rState = stream._readableState; + const autoDestroy = !rState || ( + rState.autoDestroy && + // We don't expect the readable to ever 'end' + // if readable is explicitly set to false. + (rState.endEmitted || rState.readable === false) + ); + if (autoDestroy) { + stream.destroy(); + } + } +} + +ObjectDefineProperties(Writable.prototype, { + + destroyed: { + get() { + return this._writableState ? this._writableState.destroyed : false; + }, + set(value) { + // Backward compatibility, the user is explicitly managing destroyed. + if (this._writableState) { + this._writableState.destroyed = value; + } + } + }, + + writable: { + get() { + const w = this._writableState; + // w.writable === false means that this is part of a Duplex stream + // where the writable side was disabled upon construction. + // Compat. The user might manually disable writable side through + // deprecated setter. + return !!w && w.writable !== false && !w.destroyed && !w.errored && + !w.ending && !w.ended; + }, + set(val) { + // Backwards compatible. + if (this._writableState) { + this._writableState.writable = !!val; + } + } + }, + + writableFinished: { + get() { + return this._writableState ? this._writableState.finished : false; + } + }, + + writableObjectMode: { + get() { + return this._writableState ? this._writableState.objectMode : false; + } + }, + + writableBuffer: { + get() { + return this._writableState && this._writableState.getBuffer(); + } + }, + + writableEnded: { + get() { + return this._writableState ? this._writableState.ending : false; + } + }, + + writableHighWaterMark: { + get() { + return this._writableState && this._writableState.highWaterMark; + } + }, + + writableCorked: { + get() { + return this._writableState ? this._writableState.corked : 0; + } + }, + + writableLength: { + get() { + return this._writableState && this._writableState.length; + } + } +}); + +const destroy = destroyImpl.destroy; +Writable.prototype.destroy = function(err, cb) { + const state = this._writableState; + + // Invoke pending callbacks. + if ( + state.bufferedIndex < state.buffered.length || + state[kOnFinished].length + ) { + assert(!state.destroyed); + process.nextTick(errorBuffer, state); + } + + destroy.call(this, err, cb); + return this; +}; + +Writable.prototype._undestroy = destroyImpl.undestroy; +Writable.prototype._destroy = function(err, cb) { + cb(err); +}; + +Writable.prototype[EE.captureRejectionSymbol] = function(err) { + this.destroy(err); +}; diff --git a/lib/stream.js b/lib/stream.js index 9ea9a0ceeebbab..9fff2e9d5c29c3 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -36,16 +36,12 @@ const internalBuffer = require('internal/buffer'); // Lazy loaded let promises = null; -// Note: export Stream before Readable/Writable/Duplex/... -// to avoid a cross-reference(require) issues const Stream = module.exports = require('internal/streams/legacy'); - -Stream.Readable = require('_stream_readable'); -Stream.Writable = require('_stream_writable'); -Stream.Duplex = require('_stream_duplex'); -Stream.Transform = require('_stream_transform'); -Stream.PassThrough = require('_stream_passthrough'); - +Stream.Readable = require('internal/streams/readable'); +Stream.Writable = require('internal/streams/writable'); +Stream.Duplex = require('internal/streams/duplex'); +Stream.Transform = require('internal/streams/transform'); +Stream.PassThrough = require('internal/streams/passthrough'); Stream.pipeline = pipeline; Stream.finished = eos; diff --git a/node.gyp b/node.gyp index e9ed129743dfe1..54ee9b433bedc0 100644 --- a/node.gyp +++ b/node.gyp @@ -237,6 +237,11 @@ 'lib/internal/streams/duplexpair.js', 'lib/internal/streams/from.js', 'lib/internal/streams/legacy.js', + 'lib/internal/streams/readable.js', + 'lib/internal/streams/writable.js', + 'lib/internal/streams/duplex.js', + 'lib/internal/streams/passthrough.js', + 'lib/internal/streams/transform.js', 'lib/internal/streams/destroy.js', 'lib/internal/streams/state.js', 'lib/internal/streams/pipeline.js', diff --git a/test/message/stdin_messages.out b/test/message/stdin_messages.out index 3570047012c219..0994aaef3e2ca2 100644 --- a/test/message/stdin_messages.out +++ b/test/message/stdin_messages.out @@ -13,7 +13,7 @@ SyntaxError: Strict mode code may not include a with statement at internal/main/eval_stdin.js:*:* at Socket. (internal/process/execution.js:*:*) at Socket.emit (events.js:*:*) - at endReadableNT (_stream_readable.js:*:*) + at endReadableNT (internal/streams/readable.js:*:*) 42 42 [stdin]:1 @@ -30,7 +30,7 @@ Error: hello at internal/main/eval_stdin.js:*:* at Socket. (internal/process/execution.js:*:*) at Socket.emit (events.js:*:*) - at endReadableNT (_stream_readable.js:*:*) + at endReadableNT (internal/streams/readable.js:*:*) [stdin]:1 throw new Error("hello") ^ @@ -45,7 +45,7 @@ Error: hello at internal/main/eval_stdin.js:*:* at Socket. (internal/process/execution.js:*:*) at Socket.emit (events.js:*:*) - at endReadableNT (_stream_readable.js:*:*) + at endReadableNT (internal/streams/readable.js:*:*) 100 [stdin]:1 let x = 100; y = x; @@ -61,7 +61,7 @@ ReferenceError: y is not defined at internal/main/eval_stdin.js:*:* at Socket. (internal/process/execution.js:*:*) at Socket.emit (events.js:*:*) - at endReadableNT (_stream_readable.js:*:*) + at endReadableNT (internal/streams/readable.js:*:*) [stdin]:1 let ______________________________________________; throw 10 diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 3eed9e3a3aca94..6dbe871225ae3c 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -97,11 +97,11 @@ if (!common.isMainThread) { 'Internal Binding messaging', 'Internal Binding symbols', 'Internal Binding worker', - 'NativeModule _stream_duplex', - 'NativeModule _stream_passthrough', - 'NativeModule _stream_readable', - 'NativeModule _stream_transform', - 'NativeModule _stream_writable', + 'NativeModule internal/streams/duplex', + 'NativeModule internal/streams/passthrough', + 'NativeModule internal/streams/readable', + 'NativeModule internal/streams/transform', + 'NativeModule internal/streams/writable', 'NativeModule internal/error_serdes', 'NativeModule internal/process/worker_thread_only', 'NativeModule internal/streams/buffer_list',