diff --git a/benchmark/readline/readline-iterable.js b/benchmark/readline/readline-iterable.js new file mode 100644 index 00000000000000..e706c7f2f1506e --- /dev/null +++ b/benchmark/readline/readline-iterable.js @@ -0,0 +1,48 @@ +'use strict'; +const common = require('../common.js'); +const readline = require('readline'); +const { Readable } = require('stream'); + +const bench = common.createBenchmark(main, { + n: [1e1, 1e2, 1e3, 1e4, 1e5, 1e6], +}); + +const loremIpsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed +do eiusmod tempor incididunt ut labore et dolore magna aliqua. +Dui accumsan sit amet nulla facilisi morbi tempus iaculis urna. +Eget dolor morbi non arcu risus quis varius quam quisque. +Lacus viverra vitae congue eu consequat ac felis donec. +Amet porttitor eget dolor morbi non arcu. +Velit ut tortor pretium viverra suspendisse. +Mauris nunc congue nisi vitae suscipit tellus. +Amet nisl suscipit adipiscing bibendum est ultricies integer. +Sit amet dictum sit amet justo donec enim diam. +Condimentum mattis pellentesque id nibh tortor id aliquet lectus proin. +Diam in arcu cursus euismod quis viverra nibh. +Rest of line`; + +function getLoremIpsumStream(repetitions) { + const readable = Readable({ + objectMode: true, + }); + let i = 0; + readable._read = () => readable.push( + i++ >= repetitions ? null : loremIpsum + ); + return readable; +} + +async function main({ n }) { + bench.start(); + let lineCount = 0; + + const iterable = readline.createInterface({ + input: getLoremIpsumStream(n), + }); + + // eslint-disable-next-line no-unused-vars + for await (const _ of iterable) { + lineCount++; + } + bench.end(lineCount); +} diff --git a/lib/events.js b/lib/events.js index 7abf18f42c0064..e05a3bc6b8b168 100644 --- a/lib/events.js +++ b/lib/events.js @@ -23,7 +23,8 @@ const { ArrayPrototypeJoin, - ArrayPrototypeShift, + ArrayPrototypePop, + ArrayPrototypePush, ArrayPrototypeSlice, ArrayPrototypeSplice, ArrayPrototypeUnshift, @@ -33,6 +34,7 @@ const { FunctionPrototypeBind, FunctionPrototypeCall, NumberIsNaN, + NumberMAX_SAFE_INTEGER, ObjectCreate, ObjectDefineProperty, ObjectDefineProperties, @@ -59,6 +61,8 @@ const { } = require('internal/util/inspect'); let spliceOne; +let FixedQueue; +let kFirstEventParam; const { AbortError, @@ -73,6 +77,7 @@ const { } = require('internal/errors'); const { + validateInteger, validateAbortSignal, validateBoolean, validateFunction, @@ -84,6 +89,7 @@ const kErrorMonitor = Symbol('events.errorMonitor'); const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners'); const kMaxEventTargetListenersWarned = Symbol('events.maxEventTargetListenersWarned'); +const kWatermarkData = SymbolFor('nodejs.watermarkData'); let EventEmitterAsyncResource; // The EventEmitterAsyncResource has to be initialized lazily because event.js @@ -999,25 +1005,44 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) { * Returns an `AsyncIterator` that iterates `event` events. * @param {EventEmitter} emitter * @param {string | symbol} event - * @param {{ signal: AbortSignal; }} [options] + * @param {{ + * signal: AbortSignal; + * close?: string[]; + * highWatermark?: number, + * lowWatermark?: number + * }} [options] * @returns {AsyncIterator} */ -function on(emitter, event, options) { - const signal = options?.signal; +function on(emitter, event, options = {}) { + // Parameters validation + const signal = options.signal; validateAbortSignal(signal, 'options.signal'); if (signal?.aborted) throw new AbortError(undefined, { cause: signal?.reason }); - - const unconsumedEvents = []; - const unconsumedPromises = []; + const highWatermark = options.highWatermark ?? NumberMAX_SAFE_INTEGER; + validateInteger(highWatermark, 'options.highWatermark', 1); + const lowWatermark = options.lowWatermark ?? 1; + validateInteger(lowWatermark, 'options.lowWatermark', 1); + + // Preparing controlling queues and variables + FixedQueue ??= require('internal/fixed_queue'); + const unconsumedEvents = new FixedQueue(); + const unconsumedPromises = new FixedQueue(); + let paused = false; let error = null; let finished = false; + let size = 0; const iterator = ObjectSetPrototypeOf({ next() { // First, we consume all unread events - const value = unconsumedEvents.shift(); - if (value) { + if (size) { + const value = unconsumedEvents.shift(); + size--; + if (paused && size < lowWatermark) { + emitter.resume(); + paused = false; + } return PromiseResolve(createIterResult(value, false)); } @@ -1032,9 +1057,7 @@ function on(emitter, event, options) { } // If the iterator is finished, resolve to done - if (finished) { - return PromiseResolve(createIterResult(undefined, true)); - } + if (finished) return closeHandler(); // Wait until an event happens return new Promise(function(resolve, reject) { @@ -1043,24 +1066,7 @@ function on(emitter, event, options) { }, return() { - eventTargetAgnosticRemoveListener(emitter, event, eventHandler); - eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); - - if (signal) { - eventTargetAgnosticRemoveListener( - signal, - 'abort', - abortListener, - { once: true }); - } - - finished = true; - - for (const promise of unconsumedPromises) { - promise.resolve(createIterResult(undefined, true)); - } - - return PromiseResolve(createIterResult(undefined, true)); + return closeHandler(); }, throw(err) { @@ -1068,21 +1074,54 @@ function on(emitter, event, options) { throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator', 'Error', err); } - error = err; - eventTargetAgnosticRemoveListener(emitter, event, eventHandler); - eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); + errorHandler(err); }, - [SymbolAsyncIterator]() { return this; - } + }, + [kWatermarkData]: { + /** + * The current queue size + */ + get size() { + return size; + }, + /** + * The low watermark. The emitter is resumed every time size is lower than it + */ + get low() { + return lowWatermark; + }, + /** + * The high watermark. The emitter is paused every time size is higher than it + */ + get high() { + return highWatermark; + }, + /** + * It checks wether the emitter is paused by the watermark controller or not + */ + get isPaused() { + return paused; + } + }, }, AsyncIteratorPrototype); - eventTargetAgnosticAddListener(emitter, event, eventHandler); + // Adding event handlers + const { addEventListener, removeAll } = listenersController(); + kFirstEventParam ??= require('internal/events/symbols').kFirstEventParam; + addEventListener(emitter, event, options[kFirstEventParam] ? eventHandler : function(...args) { + return eventHandler(args); + }); if (event !== 'error' && typeof emitter.on === 'function') { - emitter.on('error', errorHandler); + addEventListener(emitter, 'error', errorHandler); + } + const closeEvents = options?.close; + if (closeEvents?.length) { + for (let i = 0; i < closeEvents.length; i++) { + addEventListener(emitter, closeEvents[i], closeHandler); + } } - if (signal) { eventTargetAgnosticAddListener( signal, @@ -1097,27 +1136,48 @@ function on(emitter, event, options) { errorHandler(new AbortError(undefined, { cause: signal?.reason })); } - function eventHandler(...args) { - const promise = ArrayPrototypeShift(unconsumedPromises); - if (promise) { - promise.resolve(createIterResult(args, false)); - } else { - unconsumedEvents.push(args); - } + function eventHandler(value) { + if (unconsumedPromises.isEmpty()) { + size++; + if (!paused && size > highWatermark) { + paused = true; + emitter.pause(); + } + unconsumedEvents.push(value); + } else unconsumedPromises.shift().resolve(createIterResult(value, false)); } function errorHandler(err) { - finished = true; + if (unconsumedPromises.isEmpty()) error = err; + else unconsumedPromises.shift().reject(err); - const toError = ArrayPrototypeShift(unconsumedPromises); + closeHandler(); + } - if (toError) { - toError.reject(err); - } else { - // The next time we call next() - error = err; + function closeHandler() { + removeAll(); + finished = true; + const doneResult = createIterResult(undefined, true); + while (!unconsumedPromises.isEmpty()) { + unconsumedPromises.shift().resolve(doneResult); } - iterator.return(); + return PromiseResolve(doneResult); } } + +function listenersController() { + const listeners = []; + + return { + addEventListener(emitter, event, handler, flags) { + eventTargetAgnosticAddListener(emitter, event, handler, flags); + ArrayPrototypePush(listeners, [emitter, event, handler, flags]); + }, + removeAll() { + while (listeners.length > 0) { + ReflectApply(eventTargetAgnosticRemoveListener, undefined, ArrayPrototypePop(listeners)); + } + } + }; +} diff --git a/lib/internal/events/symbols.js b/lib/internal/events/symbols.js new file mode 100644 index 00000000000000..b1b89ddb8f0a4d --- /dev/null +++ b/lib/internal/events/symbols.js @@ -0,0 +1,11 @@ +'use strict'; + +const { + Symbol, +} = primordials; + +const kFirstEventParam = Symbol('nodejs.kFirstEventParam'); + +module.exports = { + kFirstEventParam, +}; diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index 694f4462ceea7a..5de6a8fe03da13 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -62,6 +62,7 @@ const { kSubstringSearch, } = require('internal/readline/utils'); let emitKeypressEvents; +let kFirstEventParam; const { clearScreenDown, cursorTo, @@ -70,9 +71,6 @@ const { const { StringDecoder } = require('string_decoder'); -// Lazy load Readable for startup performance. -let Readable; - const kHistorySize = 30; const kMaxUndoRedoStackSize = 2048; const kMincrlfDelay = 100; @@ -1346,40 +1344,15 @@ class Interface extends InterfaceConstructor { */ [SymbolAsyncIterator]() { if (this[kLineObjectStream] === undefined) { - if (Readable === undefined) { - Readable = require('stream').Readable; - } - const readable = new Readable({ - objectMode: true, - read: () => { - this.resume(); - }, - destroy: (err, cb) => { - this.off('line', lineListener); - this.off('close', closeListener); - this.close(); - cb(err); - }, - }); - const lineListener = (input) => { - if (!readable.push(input)) { - // TODO(rexagod): drain to resume flow - this.pause(); - } - }; - const closeListener = () => { - readable.push(null); - }; - const errorListener = (err) => { - readable.destroy(err); - }; - this.on('error', errorListener); - this.on('line', lineListener); - this.on('close', closeListener); - this[kLineObjectStream] = readable; + kFirstEventParam ??= require('internal/events/symbols').kFirstEventParam; + this[kLineObjectStream] = EventEmitter.on( + this, 'line', { + close: ['close'], + highWatermark: 1024, + [kFirstEventParam]: true, + }); } - - return this[kLineObjectStream][SymbolAsyncIterator](); + return this[kLineObjectStream]; } } diff --git a/test/benchmark/test-bechmark-readline.js b/test/benchmark/test-bechmark-readline.js new file mode 100644 index 00000000000000..eaa9fd08ed487e --- /dev/null +++ b/test/benchmark/test-bechmark-readline.js @@ -0,0 +1,7 @@ +'use strict'; + +require('../common'); + +const runBenchmark = require('../common/benchmark'); + +runBenchmark('readline', { NODEJS_BENCHMARK_ZERO_ALLOWED: 1 }); diff --git a/test/parallel/test-readline-async-iterators-backpressure.js b/test/parallel/test-readline-async-iterators-backpressure.js index 2ca124dde5b890..e8f443f4383346 100644 --- a/test/parallel/test-readline-async-iterators-backpressure.js +++ b/test/parallel/test-readline-async-iterators-backpressure.js @@ -6,11 +6,17 @@ const { Readable } = require('stream'); const readline = require('readline'); const CONTENT = 'content'; -const TOTAL_LINES = 18; +const LINES_PER_PUSH = 2051; +const REPETITIONS = 3; (async () => { const readable = new Readable({ read() {} }); - readable.push(`${CONTENT}\n`.repeat(TOTAL_LINES)); + let salt = 0; + for (let i = 0; i < REPETITIONS; i++) { + readable.push(`${CONTENT}\n`.repeat(LINES_PER_PUSH + i)); + salt += i; + } + const TOTAL_LINES = LINES_PER_PUSH * REPETITIONS + salt; const rli = readline.createInterface({ input: readable, @@ -18,21 +24,24 @@ const TOTAL_LINES = 18; }); const it = rli[Symbol.asyncIterator](); - const highWaterMark = it.stream.readableHighWaterMark; + const watermarkData = it[Symbol.for('nodejs.watermarkData')]; + const highWaterMark = watermarkData.high; // For this test to work, we have to queue up more than the number of // highWaterMark items in rli. Make sure that is the case. - assert(TOTAL_LINES > highWaterMark); + assert(TOTAL_LINES > highWaterMark, `TOTAL_LINES (${TOTAL_LINES}) isn't greater than highWaterMark (${highWaterMark})`); let iterations = 0; let readableEnded = false; + let notPaused = 0; for await (const line of it) { assert.strictEqual(readableEnded, false); - assert.strictEqual(line, CONTENT); - - const expectedPaused = TOTAL_LINES - iterations > highWaterMark; - assert.strictEqual(readable.isPaused(), expectedPaused); + assert.ok(watermarkData.size <= TOTAL_LINES); + assert.strictEqual(readable.isPaused(), watermarkData.size >= 1); + if (!readable.isPaused()) { + notPaused++; + } iterations += 1; @@ -45,4 +54,5 @@ const TOTAL_LINES = 18; } assert.strictEqual(iterations, TOTAL_LINES); + assert.strictEqual(notPaused, REPETITIONS); })().then(common.mustCall()); diff --git a/test/parallel/test-readline-async-iterators.js b/test/parallel/test-readline-async-iterators.js index 2aa557a3363486..57c74f69d587d8 100644 --- a/test/parallel/test-readline-async-iterators.js +++ b/test/parallel/test-readline-async-iterators.js @@ -4,6 +4,7 @@ const common = require('../common'); const fs = require('fs'); const { join } = require('path'); const readline = require('readline'); +const { Readable } = require('stream'); const assert = require('assert'); const tmpdir = require('../common/tmpdir'); @@ -63,7 +64,6 @@ async function testMutual() { // This outer loop should only iterate once. assert.strictEqual(iterated, false); iterated = true; - iteratedLines.push(k); for await (const l of rli) { iteratedLines.push(l); @@ -74,4 +74,158 @@ async function testMutual() { } } -testSimple().then(testMutual).then(common.mustCall()); +async function testPerformance() { + const loremIpsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, +sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. +Dui accumsan sit amet nulla facilisi morbi tempus iaculis urna. +Eget dolor morbi non arcu risus quis varius quam quisque. +Lacus viverra vitae congue eu consequat ac felis donec. +Amet porttitor eget dolor morbi non arcu. +Velit ut tortor pretium viverra suspendisse. +Mauris nunc congue nisi vitae suscipit tellus. +Amet nisl suscipit adipiscing bibendum est ultricies integer. +Sit amet dictum sit amet justo donec enim diam. +Condimentum mattis pellentesque id nibh tortor id aliquet lectus proin. +Diam in arcu cursus euismod quis viverra nibh. +`; + + const REPETITIONS = 10000; + const SAMPLE = 100; + const THRESHOLD = 81; + + function getLoremIpsumStream() { + const readable = Readable({ + objectMode: true, + }); + let i = 0; + readable._read = () => readable.push( + i++ >= REPETITIONS ? null : loremIpsum + ); + return readable; + } + + function oldWay() { + const readable = new Readable({ + objectMode: true, + read: () => { + this.resume(); + }, + destroy: (err, cb) => { + this.off('line', lineListener); + this.off('close', closeListener); + this.close(); + cb(err); + }, + }); + const lineListener = (input) => { + if (!readable.push(input)) { + // TODO(rexagod): drain to resume flow + this.pause(); + } + }; + const closeListener = () => { + readable.push(null); + }; + const errorListener = (err) => { + readable.destroy(err); + }; + this.on('error', errorListener); + this.on('line', lineListener); + this.on('close', closeListener); + return readable[Symbol.asyncIterator](); + } + + function getAvg(mean, x, n) { + return (mean * n + x) / (n + 1); + } + + let totalTimeOldWay = 0; + let totalTimeNewWay = 0; + let totalCharsOldWay = 0; + let totalCharsNewWay = 0; + const linesOldWay = []; + const linesNewWay = []; + + for (let time = 0; time < SAMPLE; time++) { + const rlOldWay = readline.createInterface({ + input: getLoremIpsumStream(), + }); + let currenttotalTimeOldWay = Date.now(); + for await (const line of oldWay.call(rlOldWay)) { + totalCharsOldWay += line.length; + if (time === 0) { + linesOldWay.push(line); + } + } + currenttotalTimeOldWay = Date.now() - currenttotalTimeOldWay; + totalTimeOldWay = getAvg(totalTimeOldWay, currenttotalTimeOldWay, SAMPLE); + + const rlNewWay = readline.createInterface({ + input: getLoremIpsumStream(), + }); + let currentTotalTimeNewWay = Date.now(); + for await (const line of rlNewWay) { + totalCharsNewWay += line.length; + if (time === 0) { + linesNewWay.push(line); + } + } + currentTotalTimeNewWay = Date.now() - currentTotalTimeNewWay; + totalTimeNewWay = getAvg(totalTimeNewWay, currentTotalTimeNewWay, SAMPLE); + } + + assert.strictEqual(totalCharsOldWay, totalCharsNewWay); + assert.strictEqual(linesOldWay.length, linesNewWay.length); + linesOldWay.forEach((line, index) => + assert.strictEqual(line, linesNewWay[index]) + ); + const percentage = totalTimeNewWay * 100 / totalTimeOldWay; + assert.ok(percentage <= THRESHOLD, `Failed: ${totalTimeNewWay} isn't lesser than ${THRESHOLD}% of ${totalTimeOldWay}. Actual percentage: ${percentage.toFixed(2)}%`); +} + +async function testSlowStreamForLeaks() { + const message = 'a\nb\nc\n'; + const DELAY = 1; + const REPETITIONS = 100; + const warningCallback = common.mustNotCall(); + process.on('warning', warningCallback); + + function getStream() { + const readable = Readable({ + objectMode: true, + }); + readable._read = () => {}; + let i = REPETITIONS; + function schedule() { + setTimeout(() => { + i--; + if (i < 0) { + readable.push(null); + } else { + readable.push(message); + schedule(); + } + }, DELAY); + } + schedule(); + return readable; + } + const iterable = readline.createInterface({ + input: getStream(), + }); + + let lines = 0; + // eslint-disable-next-line no-unused-vars + for await (const _ of iterable) { + lines++; + } + + assert.strictEqual(lines, 3 * REPETITIONS); + process.off('warning', warningCallback); +} + +testSimple() + .then(testMutual) + .then(testPerformance) + .then(testSlowStreamForLeaks) + .then(common.mustCall());