diff --git a/benchmark/streams/readable-bigread.js b/benchmark/streams/readable-bigread.js new file mode 100644 index 00000000000000..50f10c44119141 --- /dev/null +++ b/benchmark/streams/readable-bigread.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common'); +const v8 = require('v8'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [100e1] +}); + +function main(conf) { + const n = +conf.n; + const b = new Buffer(32); + const s = new Readable(); + function noop() {} + s._read = noop; + + // Force optimization before starting the benchmark + s.push(b); + v8.setFlagsFromString('--allow_natives_syntax'); + eval('%OptimizeFunctionOnNextCall(s.read)'); + s.push(b); + while (s.read(128)); + + bench.start(); + for (var k = 0; k < n; ++k) { + for (var i = 0; i < 1e4; ++i) + s.push(b); + while (s.read(128)); + } + bench.end(n); +} diff --git a/benchmark/streams/readable-bigunevenread.js b/benchmark/streams/readable-bigunevenread.js new file mode 100644 index 00000000000000..ce6e41c5d30eac --- /dev/null +++ b/benchmark/streams/readable-bigunevenread.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common'); +const v8 = require('v8'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [100e1] +}); + +function main(conf) { + const n = +conf.n; + const b = new Buffer(32); + const s = new Readable(); + function noop() {} + s._read = noop; + + // Force optimization before starting the benchmark + s.push(b); + v8.setFlagsFromString('--allow_natives_syntax'); + eval('%OptimizeFunctionOnNextCall(s.read)'); + s.push(b); + while (s.read(106)); + + bench.start(); + for (var k = 0; k < n; ++k) { + for (var i = 0; i < 1e4; ++i) + s.push(b); + while (s.read(106)); + } + bench.end(n); +} diff --git a/benchmark/streams/readable-boundaryread.js b/benchmark/streams/readable-boundaryread.js new file mode 100644 index 00000000000000..93a6616361ba18 --- /dev/null +++ b/benchmark/streams/readable-boundaryread.js @@ -0,0 +1,33 @@ +'use strict'; + +const common = require('../common'); +const v8 = require('v8'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [200e1] +}); + +function main(conf) { + const n = +conf.n; + const b = new Buffer(32); + const s = new Readable(); + function noop() {} + s._read = noop; + + // Force optimization before starting the benchmark + s.push(b); + v8.setFlagsFromString('--allow_natives_syntax'); + eval('%OptimizeFunctionOnNextCall(s.push)'); + eval('%OptimizeFunctionOnNextCall(s.read)'); + s.push(b); + while (s.read(32)); + + bench.start(); + for (var k = 0; k < n; ++k) { + for (var i = 0; i < 1e4; ++i) + s.push(b); + while (s.read(32)); + } + bench.end(n); +} diff --git a/benchmark/streams/readable-readall.js b/benchmark/streams/readable-readall.js new file mode 100644 index 00000000000000..07626fd7978b9a --- /dev/null +++ b/benchmark/streams/readable-readall.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common'); +const v8 = require('v8'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [50e2] +}); + +function main(conf) { + const n = +conf.n; + const b = new Buffer(32); + const s = new Readable(); + function noop() {} + s._read = noop; + + // Force optimization before starting the benchmark + s.push(b); + v8.setFlagsFromString('--allow_natives_syntax'); + eval('%OptimizeFunctionOnNextCall(s.read)'); + s.push(b); + while (s.read()); + + bench.start(); + for (var k = 0; k < n; ++k) { + for (var i = 0; i < 1e4; ++i) + s.push(b); + while (s.read()); + } + bench.end(n); +} diff --git a/benchmark/streams/readable-unevenread.js b/benchmark/streams/readable-unevenread.js new file mode 100644 index 00000000000000..4a69bd97a94bcf --- /dev/null +++ b/benchmark/streams/readable-unevenread.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common'); +const v8 = require('v8'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [100e1] +}); + +function main(conf) { + const n = +conf.n; + const b = new Buffer(32); + const s = new Readable(); + function noop() {} + s._read = noop; + + // Force optimization before starting the benchmark + s.push(b); + v8.setFlagsFromString('--allow_natives_syntax'); + eval('%OptimizeFunctionOnNextCall(s.read)'); + s.push(b); + while (s.read(12)); + + bench.start(); + for (var k = 0; k < n; ++k) { + for (var i = 0; i < 1e4; ++i) + s.push(b); + while (s.read(12)); + } + bench.end(n); +} diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index fc3e32b64e0734..0b55f2ea8e4ac8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -8,27 +8,29 @@ const Stream = require('stream'); const Buffer = require('buffer').Buffer; const util = require('util'); const debug = util.debuglog('stream'); +const BufferList = require('internal/streams/BufferList'); var StringDecoder; util.inherits(Readable, Stream); -const hasPrependListener = typeof EE.prototype.prependListener === 'function'; - -function prependListener(emitter, event, fn) { - if (hasPrependListener) +var prependListener; +if (typeof EE.prototype.prependListener === 'function') { + prependListener = function prependListener(emitter, event, fn) { return emitter.prependListener(event, fn); - - // This is a brutally ugly hack to make sure that our error handler - // is attached before any userland ones. NEVER DO THIS. This is here - // only because this code needs to continue to work with older versions - // of Node.js that do not include the prependListener() method. The goal - // is to eventually remove this hack. - if (!emitter._events || !emitter._events[event]) - emitter.on(event, fn); - else if (Array.isArray(emitter._events[event])) - emitter._events[event].unshift(fn); - else - emitter._events[event] = [fn, emitter._events[event]]; + }; +} else { + prependListener = function prependListener(emitter, event, fn) { + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + // the prependListener() method. The goal is to eventually remove this hack. + if (!emitter._events || !emitter._events[event]) + emitter.on(event, fn); + else if (Array.isArray(emitter._events[event])) + emitter._events[event].unshift(fn); + else + emitter._events[event] = [fn, emitter._events[event]]; + }; } function ReadableState(options, stream) { @@ -50,7 +52,10 @@ function ReadableState(options, stream) { // cast to ints. this.highWaterMark = ~~this.highWaterMark; - this.buffer = []; + // A linked list is used to store data chunks instead of an array because the + // linked list can remove elements from the beginning faster than + // array.shift() + this.buffer = new BufferList(); this.length = 0; this.pipes = null; this.pipesCount = 0; @@ -223,7 +228,8 @@ function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { n = MAX_HWM; } else { - // Get the next highest power of 2 + // Get the next highest power of 2 to prevent increasing hwm excessively in + // tiny amounts n--; n |= n >>> 1; n |= n >>> 2; @@ -235,51 +241,41 @@ function computeNewHighWaterMark(n) { return n; } +// This function is designed to be inlinable, so please take care when making +// changes to the function body. function howMuchToRead(n, state) { - if (state.length === 0 && state.ended) + if (n <= 0 || (state.length === 0 && state.ended)) return 0; - if (state.objectMode) - return n === 0 ? 0 : 1; - - if (n === null || isNaN(n)) { - // only flow one buffer at a time - if (state.flowing && state.buffer.length) - return state.buffer[0].length; + return 1; + if (n !== n) { + // Only flow one buffer at a time + if (state.flowing && state.length) + return state.buffer.head.data.length; else return state.length; } - - if (n <= 0) - return 0; - - // If we're asking for more than the target buffer level, - // then raise the water mark. Bump up to the next highest - // power of 2, to prevent increasing it excessively in tiny - // amounts. + // If we're asking for more than the current hwm, then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); - - // don't have that much. return null, unless we've ended. - if (n > state.length) { - if (!state.ended) { - state.needReadable = true; - return 0; - } else { - return state.length; - } + if (n <= state.length) + return n; + // Don't have enough + if (!state.ended) { + state.needReadable = true; + return 0; } - - return n; + return state.length; } // you can override either this method, or the async _read(n) below. Readable.prototype.read = function(n) { debug('read', n); + n = parseInt(n, 10); var state = this._readableState; var nOrig = n; - if (typeof n !== 'number' || n > 0) + if (n !== 0) state.emittedReadable = false; // if we're doing read(0) to trigger a readable event, but we @@ -342,9 +338,7 @@ Readable.prototype.read = function(n) { if (state.ended || state.reading) { doRead = false; debug('reading or ended', doRead); - } - - if (doRead) { + } else if (doRead) { debug('do read'); state.reading = true; state.sync = true; @@ -354,13 +348,12 @@ Readable.prototype.read = function(n) { // call internal read method this._read(state.highWaterMark); state.sync = false; + // If _read pushed data synchronously, then `reading` will be false, + // and we need to re-evaluate how much data we can return to the user. + if (!state.reading) + n = howMuchToRead(nOrig, state); } - // If _read pushed data synchronously, then `reading` will be false, - // and we need to re-evaluate how much data we can return to the user. - if (doRead && !state.reading) - n = howMuchToRead(nOrig, state); - var ret; if (n > 0) ret = fromList(n, state); @@ -370,18 +363,20 @@ Readable.prototype.read = function(n) { if (ret === null) { state.needReadable = true; n = 0; + } else { + state.length -= n; } - state.length -= n; - - // If we have nothing in the buffer, then we want to know - // as soon as we *do* get something into the buffer. - if (state.length === 0 && !state.ended) - state.needReadable = true; + if (state.length === 0) { + // If we have nothing in the buffer, then we want to know + // as soon as we *do* get something into the buffer. + if (!state.ended) + state.needReadable = true; - // If we tried to read() past the EOF, then emit end on the next tick. - if (nOrig !== n && state.ended && state.length === 0) - endReadable(this); + // If we tried to read() past the EOF, then emit end on the next tick. + if (nOrig !== n && state.ended) + endReadable(this); + } if (ret !== null) this.emit('data', ret); @@ -690,20 +685,17 @@ Readable.prototype.unpipe = function(dest) { // set up data events if they are asked for // Ensure readable listeners eventually get something Readable.prototype.on = function(ev, fn) { - var res = Stream.prototype.on.call(this, ev, fn); - - // If listening to data, and it has not explicitly been paused, - // then call resume to start the flow of data on the next tick. - if (ev === 'data' && false !== this._readableState.flowing) { - this.resume(); - } - - if (ev === 'readable' && !this._readableState.endEmitted) { - var state = this._readableState; - if (!state.readableListening) { - state.readableListening = true; + const res = Stream.prototype.on.call(this, ev, fn); + + if (ev === 'data') { + // Start flowing on next tick if stream isn't explicitly paused + if (this._readableState.flowing !== false) + this.resume(); + } else if (ev === 'readable') { + const state = this._readableState; + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; state.emittedReadable = false; - state.needReadable = true; if (!state.reading) { process.nextTick(nReadingNextTick, this); } else if (state.length) { @@ -765,13 +757,9 @@ Readable.prototype.pause = function() { }; function flow(stream) { - var state = stream._readableState; + const state = stream._readableState; debug('flow', state.flowing); - if (state.flowing) { - do { - var chunk = stream.read(); - } while (null !== chunk && state.flowing); - } + while (state.flowing && stream.read() !== null); } // wrap an old-style stream as the async data source. @@ -846,69 +834,120 @@ Readable._fromList = fromList; // Pluck off n bytes from an array of buffers. // Length is the combined lengths of all the buffers in the list. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. function fromList(n, state) { - var list = state.buffer; - var length = state.length; - var stringMode = !!state.decoder; - var objectMode = !!state.objectMode; + // nothing buffered + if (state.length === 0) + return null; + var ret; + if (state.objectMode) + ret = state.buffer.shift(); + else if (!n || n >= state.length) { + // read it all, truncate the list + if (state.decoder) + ret = state.buffer.join(''); + else if (state.buffer.length === 1) + ret = state.buffer.head.data; + else + ret = state.buffer.concat(state.length); + state.buffer.clear(); + } else { + // read part of list + ret = fromListPartial(n, state.buffer, state.decoder); + } - // nothing in the list, definitely empty. - if (list.length === 0) - return null; + return ret; +} - if (length === 0) - ret = null; - else if (objectMode) +// Extracts only enough buffered data to satisfy the amount requested. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function fromListPartial(n, list, hasStrings) { + var ret; + if (n < list.head.data.length) { + // slice is the same for buffers and strings + ret = list.head.data.slice(0, n); + list.head.data = list.head.data.slice(n); + } else if (n === list.head.data.length) { + // first chunk is a perfect match ret = list.shift(); - else if (!n || n >= length) { - // read it all, truncate the array. - if (stringMode) - ret = list.join(''); - else if (list.length === 1) - ret = list[0]; - else - ret = Buffer.concat(list, length); - list.length = 0; } else { - // read just some of it. - if (n < list[0].length) { - // just take a part of the first list item. - // slice is the same for buffers and strings. - const buf = list[0]; - ret = buf.slice(0, n); - list[0] = buf.slice(n); - } else if (n === list[0].length) { - // first list is a perfect match - ret = list.shift(); - } else { - // complex case. - // we have enough to cover it, but it spans past the first buffer. - if (stringMode) - ret = ''; - else - ret = Buffer.allocUnsafe(n); - - var c = 0; - for (var i = 0, l = list.length; i < l && c < n; i++) { - const buf = list[0]; - var cpy = Math.min(n - c, buf.length); - - if (stringMode) - ret += buf.slice(0, cpy); - else - buf.copy(ret, c, 0, cpy); + // result spans more than one buffer + ret = (hasStrings + ? copyFromBufferString(n, list) + : copyFromBuffer(n, list)); + } + return ret; +} - if (cpy < buf.length) - list[0] = buf.slice(cpy); +// Copies a specified amount of characters from the list of buffered data +// chunks. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function copyFromBufferString(n, list) { + var p = list.head; + var c = 1; + var ret = p.data; + n -= ret.length; + while (p = p.next) { + const str = p.data; + const nb = (n > str.length ? str.length : n); + if (nb === str.length) + ret += str; + else + ret += str.slice(0, n); + n -= nb; + if (n === 0) { + if (nb === str.length) { + ++c; + if (p.next) + list.head = p.next; else - list.shift(); - - c += cpy; + list.head = list.tail = null; + } else { + list.head = p; + p.data = str.slice(nb); } + break; } + ++c; } + list.length -= c; + return ret; +} +// Copies a specified amount of bytes from the list of buffered data chunks. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function copyFromBuffer(n, list) { + const ret = Buffer.allocUnsafe(n); + var p = list.head; + var c = 1; + p.data.copy(ret); + n -= p.data.length; + while (p = p.next) { + const buf = p.data; + const nb = (n > buf.length ? buf.length : n); + buf.copy(ret, ret.length - n, 0, nb); + n -= nb; + if (n === 0) { + if (nb === buf.length) { + ++c; + if (p.next) + list.head = p.next; + else + list.head = list.tail = null; + } else { + list.head = p; + p.data = buf.slice(nb); + } + break; + } + ++c; + } + list.length -= c; return ret; } diff --git a/lib/internal/streams/BufferList.js b/lib/internal/streams/BufferList.js new file mode 100644 index 00000000000000..76da94bc83d977 --- /dev/null +++ b/lib/internal/streams/BufferList.js @@ -0,0 +1,72 @@ +'use strict'; + +const Buffer = require('buffer').Buffer; + +module.exports = BufferList; + +function BufferList() { + this.head = null; + this.tail = null; + this.length = 0; +} + +BufferList.prototype.push = function(v) { + const entry = { data: v, next: null }; + if (this.length > 0) + this.tail.next = entry; + else + this.head = entry; + this.tail = entry; + ++this.length; +}; + +BufferList.prototype.unshift = function(v) { + const entry = { data: v, next: this.head }; + if (this.length === 0) + this.tail = entry; + this.head = entry; + ++this.length; +}; + +BufferList.prototype.shift = function() { + if (this.length === 0) + return; + const ret = this.head.data; + if (this.length === 1) + this.head = this.tail = null; + else + this.head = this.head.next; + --this.length; + return ret; +}; + +BufferList.prototype.clear = function() { + this.head = this.tail = null; + this.length = 0; +}; + +BufferList.prototype.join = function(s) { + if (this.length === 0) + return ''; + var p = this.head; + var ret = '' + p.data; + while (p = p.next) + ret += s + p.data; + return ret; +}; + +BufferList.prototype.concat = function(n) { + if (this.length === 0) + return Buffer.alloc(0); + if (this.length === 1) + return this.head.data; + const ret = Buffer.allocUnsafe(n >>> 0); + var p = this.head; + var i = 0; + while (p) { + p.data.copy(ret, i); + i += p.data.length; + p = p.next; + } + return ret; +}; diff --git a/node.gyp b/node.gyp index 0d32905b6ce120..3e996f3e6b84cb 100644 --- a/node.gyp +++ b/node.gyp @@ -88,6 +88,7 @@ 'lib/internal/v8_prof_polyfill.js', 'lib/internal/v8_prof_processor.js', 'lib/internal/streams/lazy_transform.js', + 'lib/internal/streams/BufferList.js', 'deps/v8/tools/splaytree.js', 'deps/v8/tools/codemap.js', 'deps/v8/tools/consarray.js', diff --git a/test/parallel/test-stream-push-order.js b/test/parallel/test-stream-push-order.js index 7c1b2602f6ab51..22a3156f62f502 100644 --- a/test/parallel/test-stream-push-order.js +++ b/test/parallel/test-stream-push-order.js @@ -26,7 +26,6 @@ s.read(0); // ACTUALLY [1, 3, 5, 6, 4, 2] process.on('exit', function() { - assert.deepStrictEqual(s._readableState.buffer, - ['1', '2', '3', '4', '5', '6']); + assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6'); console.log('ok'); }); diff --git a/test/parallel/test-stream2-readable-from-list.js b/test/parallel/test-stream2-readable-from-list.js index e22c7a51940966..a823bf9dc10891 100644 --- a/test/parallel/test-stream2-readable-from-list.js +++ b/test/parallel/test-stream2-readable-from-list.js @@ -1,7 +1,9 @@ +// Flags: --expose_internals 'use strict'; require('../common'); var assert = require('assert'); var fromList = require('_stream_readable')._fromList; +var BufferList = require('internal/streams/BufferList'); // tiny node-tap lookalike. var tests = []; @@ -30,6 +32,13 @@ function run() { }); } +function bufferListFromArray(arr) { + const bl = new BufferList(); + for (var i = 0; i < arr.length; ++i) + bl.push(arr[i]); + return bl; +} + // ensure all tests have run process.on('exit', function() { assert.equal(count, 0); @@ -43,6 +52,7 @@ test('buffers', function(t) { Buffer.from('bark'), Buffer.from('bazy'), Buffer.from('kuel') ]; + list = bufferListFromArray(list); // read more than the first element. var ret = fromList(6, { buffer: list, length: 16 }); @@ -61,7 +71,7 @@ test('buffers', function(t) { t.equal(ret.toString(), 'zykuel'); // all consumed. - t.same(list, []); + t.same(list, new BufferList()); t.end(); }); @@ -71,6 +81,7 @@ test('strings', function(t) { 'bark', 'bazy', 'kuel' ]; + list = bufferListFromArray(list); // read more than the first element. var ret = fromList(6, { buffer: list, length: 16, decoder: true }); @@ -89,7 +100,7 @@ test('strings', function(t) { t.equal(ret, 'zykuel'); // all consumed. - t.same(list, []); + t.same(list, new BufferList()); t.end(); });