Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: improve writable.write() performance #31624

Merged
merged 2 commits into from
Feb 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benchmark/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ function Benchmark(fn, configs, options) {
this._time = [0, 0];
// Used to make sure a benchmark only start a timer once
this._started = false;
this._ended = false;

// this._run will use fork() to create a new process for each configuration
// combination.
Expand Down Expand Up @@ -197,6 +198,9 @@ Benchmark.prototype.end = function(operations) {
if (!this._started) {
throw new Error('called end without start');
}
if (this._ended) {
throw new Error('called end multiple times');
}
if (typeof operations !== 'number') {
throw new Error('called end() without specifying operation count');
}
Expand All @@ -210,6 +214,7 @@ Benchmark.prototype.end = function(operations) {
elapsed[1] = 1;
}

this._ended = true;
const time = elapsed[0] + elapsed[1] / 1e9;
const rate = operations / time;
this.report(rate, elapsed);
Expand Down
4 changes: 3 additions & 1 deletion benchmark/streams/writable-manywrites.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ function main({ n, sync, writev, callback }) {
let k = 0;
function run() {
while (k++ < n && s.write(b, cb));
if (k >= n)
if (k >= n) {
bench.end(n);
s.removeListener('drain', run);
}
}
s.on('drain', run);
run();
Expand Down
45 changes: 21 additions & 24 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ Writable.prototype.pipe = function() {

Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;
var ret = false;
const isBuf = !state.objectMode && Stream._isUint8Array(chunk);

// Do not use Object.getPrototypeOf as it is slower since V8 7.3.
Expand All @@ -271,16 +270,16 @@ Writable.prototype.write = function(chunk, encoding, cb) {

if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
encoding = state.defaultEncoding;
} else {
if (!encoding)
encoding = state.defaultEncoding;
if (typeof cb !== 'function')
cb = nop;
}

if (isBuf)
encoding = 'buffer';
else if (!encoding)
encoding = state.defaultEncoding;

if (typeof cb !== 'function')
cb = nop;

let err;
if (state.ending) {
Expand All @@ -289,19 +288,24 @@ Writable.prototype.write = function(chunk, encoding, cb) {
err = new ERR_STREAM_DESTROYED('write');
} else if (chunk === null) {
err = new ERR_STREAM_NULL_VALUES();
} else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}

if (err) {
process.nextTick(cb, err);
errorOrDestroy(this, err, true);
if (!isBuf && !state.objectMode) {
if (typeof chunk !== 'string') {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else if (encoding !== 'buffer' && state.decodeStrings !== false) {
chunk = Buffer.from(chunk, encoding);
encoding = 'buffer';
}
}
if (err === undefined) {
state.pendingcb++;
return writeOrBuffer(this, state, chunk, encoding, cb);
}
}

return ret;
process.nextTick(cb, err);
errorOrDestroy(this, err, true);
return false;
};

Writable.prototype.cork = function() {
Expand Down Expand Up @@ -376,13 +380,6 @@ ObjectDefineProperty(Writable.prototype, 'writableCorked', {
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!state.objectMode &&
state.decodeStrings !== false &&
encoding !== 'buffer' &&
typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
encoding = 'buffer';
}
const len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand Down