From b862a0c6d69bac6e7deaf7c53802a5d3bc05099e Mon Sep 17 00:00:00 2001 From: Brian White Date: Mon, 3 Feb 2020 16:18:29 -0500 Subject: [PATCH 1/2] benchmark: check for and fix multiple end() PR-URL: https://github.com/nodejs/node/pull/31624 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Luigi Pinca --- benchmark/common.js | 5 +++++ benchmark/streams/writable-manywrites.js | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/benchmark/common.js b/benchmark/common.js index 6a3be4fc376920..c5791c2bacfd5d 100644 --- a/benchmark/common.js +++ b/benchmark/common.js @@ -33,6 +33,7 @@ function Benchmark(fn, configs, options) { this._time = [0, 0]; // Used to make sure a benchmark only start a timer once this._started = false; + this._ended = false; // this._run will use fork() to create a new process for each configuration // combination. @@ -197,6 +198,9 @@ Benchmark.prototype.end = function(operations) { if (!this._started) { throw new Error('called end without start'); } + if (this._ended) { + throw new Error('called end multiple times'); + } if (typeof operations !== 'number') { throw new Error('called end() without specifying operation count'); } @@ -210,6 +214,7 @@ Benchmark.prototype.end = function(operations) { elapsed[1] = 1; } + this._ended = true; const time = elapsed[0] + elapsed[1] / 1e9; const rate = operations / time; this.report(rate, elapsed); diff --git a/benchmark/streams/writable-manywrites.js b/benchmark/streams/writable-manywrites.js index 049bf8eb281db2..e4ae9ab91e5f4a 100644 --- a/benchmark/streams/writable-manywrites.js +++ b/benchmark/streams/writable-manywrites.js @@ -35,8 +35,10 @@ function main({ n, sync, writev, callback }) { let k = 0; function run() { while (k++ < n && s.write(b, cb)); - if (k >= n) + if (k >= n) { bench.end(n); + s.removeListener('drain', run); + } } s.on('drain', run); run(); From 43783b5b3fa0a0d477ae34aeb0405956fb534dc4 Mon Sep 17 00:00:00 2001 From: Brian White Date: Mon, 3 Feb 2020 16:18:57 -0500 Subject: [PATCH 2/2] stream: improve writable.write() performance PR-URL: https://github.com/nodejs/node/pull/31624 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Luigi Pinca --- lib/_stream_writable.js | 45 +++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c42760c2b25e63..9c38da3f107ba0 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -261,7 +261,6 @@ Writable.prototype.pipe = function() { Writable.prototype.write = function(chunk, encoding, cb) { const state = this._writableState; - var ret = false; const isBuf = !state.objectMode && Stream._isUint8Array(chunk); // Do not use Object.getPrototypeOf as it is slower since V8 7.3. @@ -271,16 +270,16 @@ Writable.prototype.write = function(chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding; - encoding = null; + encoding = state.defaultEncoding; + } else { + if (!encoding) + encoding = state.defaultEncoding; + if (typeof cb !== 'function') + cb = nop; } if (isBuf) encoding = 'buffer'; - else if (!encoding) - encoding = state.defaultEncoding; - - if (typeof cb !== 'function') - cb = nop; let err; if (state.ending) { @@ -289,19 +288,24 @@ Writable.prototype.write = function(chunk, encoding, cb) { err = new ERR_STREAM_DESTROYED('write'); } else if (chunk === null) { err = new ERR_STREAM_NULL_VALUES(); - } else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) { - err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); } else { - state.pendingcb++; - ret = writeOrBuffer(this, state, chunk, encoding, cb); - } - - if (err) { - process.nextTick(cb, err); - errorOrDestroy(this, err, true); + if (!isBuf && !state.objectMode) { + if (typeof chunk !== 'string') { + err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); + } else if (encoding !== 'buffer' && state.decodeStrings !== false) { + chunk = Buffer.from(chunk, encoding); + encoding = 'buffer'; + } + } + if (err === undefined) { + state.pendingcb++; + return writeOrBuffer(this, state, chunk, encoding, cb); + } } - return ret; + process.nextTick(cb, err); + errorOrDestroy(this, err, true); + return false; }; Writable.prototype.cork = function() { @@ -376,13 +380,6 @@ ObjectDefineProperty(Writable.prototype, 'writableCorked', { // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, cb) { - if (!state.objectMode && - state.decodeStrings !== false && - encoding !== 'buffer' && - typeof chunk === 'string') { - chunk = Buffer.from(chunk, encoding); - encoding = 'buffer'; - } const len = state.objectMode ? 1 : chunk.length; state.length += len;