Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: mutable highwatermark #33346

36 changes: 36 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,24 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Writable` stream.

##### `writable.updateWritableHighwaterMark(highwaterMark)`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can actually see a case for this part and updating the watermarks. I even needed to use the lowWatermark at some point for a specific use case at my old job and was sad to find out we don't have one anymore :]

I think this sort of change would need to come with a concrete use case of why I'd update the watermarks.

<!-- YAML
added: v9.3.0
0807Jpatel marked this conversation as resolved.
Show resolved Hide resolved
-->

* `highwaterMark` {number} new highwaterMark

Update the value of `highWaterMark` of a given `Writable` stream.

##### `writable.updateWritableObjectMode(objectMode)`
<!-- YAML
added: v12.3.0
-->

* `objectMode` {boolean} new objectMode

Update the value of `objectMode` of a given `Writable` stream.

##### `writable.write(chunk[, encoding][, callback])`
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -1211,6 +1229,24 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Readable` stream.

##### `readable.updateReadableHighWaterMark(highwaterMark)`
<!-- YAML
added: v9.3.0
-->

* `highwaterMark` {number} new highwaterMark

Update the value of `highWaterMark` of a given `Readable` stream.

##### `readable.updateReadableObjectMode(objectMode)`
0807Jpatel marked this conversation as resolved.
Show resolved Hide resolved
<!-- YAML
added: v12.3.0
-->

* `objectMode` {boolean} new objectMode

Update the value of `objectMode` of a given `Readable` stream.

##### `readable.resume()`
<!-- YAML
added: v0.9.4
Expand Down
35 changes: 33 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ function ReadableState(options, stream, isDuplex) {
// If true, a maybeReadMore has been scheduled.
this.readingMore = false;

// Flags to update highwatermark and objectMode
this.updateHighwaterMark = null;
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
this.updateObjectMode = null;
0807Jpatel marked this conversation as resolved.
Show resolved Hide resolved

this.decoder = null;
this.encoding = null;
if (options && options.encoding) {
Expand Down Expand Up @@ -396,6 +400,17 @@ Readable.prototype.read = function(n) {
const state = this._readableState;
const nOrig = n;

// If they called updateHighwaterMark since last read
if (state.updateHighwaterMark !== null) {
state.highWaterMark = state.updateHighwaterMark;
state.updateHighwaterMark = null;
}

if (state.updateObjectMode !== null && state.length === 0) {
state.objectMode = state.updateObjectMode;
state.updateObjectMode = null;
}

// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);
Expand Down Expand Up @@ -461,12 +476,18 @@ Readable.prototype.read = function(n) {
debug('length less than watermark', doRead);
}

const objectModeUpdateRequest = state.updateObjectMode !== null;

// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
if (state.ended ||
state.reading ||
state.destroyed ||
state.errored ||
objectModeUpdateRequest) {
doRead = false;
debug('reading or ended', doRead);
debug('reading or ended or objectMode update', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
Expand Down Expand Up @@ -936,6 +957,16 @@ function nReadingNextTick(self) {
self.read(0);
}

Readable.prototype.updateReadableHighwaterMark = function(newHighwaterMark) {
this._readableState.updateHighwaterMark = newHighwaterMark;
};

Readable.prototype.updateReadableObjectMode = function(objectMode) {
objectMode = !!objectMode;
if (objectMode !== this._readableState.objectMode)
this._readableState.updateObjectMode = objectMode;
};

// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
Expand Down
28 changes: 27 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ function WritableState(options, stream, isDuplex) {
// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;

// Flags to update highwatermark and objectMode
this.updateHighwaterMark = null;
this.updateObjectMode = null;
}

function resetBuffer(state) {
Expand Down Expand Up @@ -262,6 +266,11 @@ Writable.prototype.pipe = function() {
Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;

if (state.updateHighwaterMark !== null) {
state.highWaterMark = state.updateHighwaterMark;
state.updateHighwaterMark = null;
}

if (typeof encoding === 'function') {
cb = encoding;
encoding = state.defaultEncoding;
Expand Down Expand Up @@ -339,6 +348,13 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, callback) {
let needObjectModeUpdate = state.updateObjectMode !== null;
if (needObjectModeUpdate && state.length === 0) {
state.objectMode = state.updateObjectMode;
state.updateObjectMode = null;
needObjectModeUpdate = false;
}

const len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand Down Expand Up @@ -368,7 +384,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
return ret && !needObjectModeUpdate && !state.errored && !state.destroyed;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
Expand Down Expand Up @@ -553,6 +569,16 @@ Writable.prototype._write = function(chunk, encoding, cb) {

Writable.prototype._writev = null;

Writable.prototype.updateWritableHighwaterMark = function(newHighwaterMark) {
0807Jpatel marked this conversation as resolved.
Show resolved Hide resolved
this._writableState.updateHighwaterMark = newHighwaterMark;
};

Writable.prototype.updateWritableObjectMode = function(objectMode) {
objectMode = !!objectMode;
if (objectMode !== this._writableState.objectMode)
this._writableState.updateObjectMode = objectMode;
};

Writable.prototype.end = function(chunk, encoding, cb) {
const state = this._writableState;

Expand Down
45 changes: 45 additions & 0 deletions test/parallel/test-stream-readable-hwm-mutable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

let currHighWaterMark = 20;
let currObjectMode = true;
let pushes = 0;
const rs = new stream.Readable({
highWaterMark: currHighWaterMark,
objectMode: currObjectMode,
read: common.mustCall(function() {
if (pushes++ === 100) {
this.push(null);
return;
}

const highWaterMark = this._readableState.highWaterMark;
const objectMode = this._readableState.objectMode;

// Both highwatermark and objectmode should update
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
// before it makes next _read request.
assert.strictEqual(highWaterMark, currHighWaterMark);
assert.strictEqual(objectMode, currObjectMode);

this.push(Buffer.alloc(1024));

if (pushes === 30) {
this.updateReadableHighwaterMark(2048);
this.updateReadableObjectMode(false);
currHighWaterMark = 2048;
currObjectMode = false;
}

}, 101)
});

const ws = stream.Writable({
write: common.mustCall(function(data, enc, cb) {
setImmediate(cb);
}, 100)
});

rs.pipe(ws);
47 changes: 47 additions & 0 deletions test/parallel/test-stream-writable-hwm-mutable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

let pushes = 0;
const rs = new stream.Readable({
read: common.mustCall(function() {
if (pushes++ === 100) {
this.push(null);
return;
}

this.push(Buffer.alloc(1024));
}, 101)
});

let currHighWaterMark = 0;
let currObjectMode = true;
let read = 0;
const ws = stream.Writable({
highWaterMark: currHighWaterMark,
objectMode: currObjectMode,
write: common.mustCall(function(data, enc, cb) {

const highWaterMark = this._writableState.highWaterMark;
const objectMode = this._writableState.objectMode;

// Should update highwatermark and objectmode
// after emptying current buffer
assert.strictEqual(highWaterMark, currHighWaterMark);
assert.strictEqual(objectMode, currObjectMode);

if (read++ === 30) {
this.updateWritableHighwaterMark(2048);
this.updateWritableObjectMode(false);
currHighWaterMark = 2048;
currObjectMode = false;
}

setImmediate(cb);

}, 100)
});

rs.pipe(ws);