Skip to content

Commit ef8ac7b

Browse files
guymguymaddaleax
authored andcommitted
stream: support readable/writableHWM for Duplex
This commits adds support for readableHighWaterMark and writableHighWaterMark in Duplex stream, so that they can be set without accessing the internal state. Fixes: #14555 PR-URL: #14636 Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 2ee3320 commit ef8ac7b

File tree

4 files changed

+107
-4
lines changed

4 files changed

+107
-4
lines changed

Diff for: doc/api/stream.md

+4
Original file line numberDiff line numberDiff line change
@@ -1752,6 +1752,10 @@ constructor and implement *both* the `readable._read()` and
17521752
* `writableObjectMode` {boolean} Defaults to `false`. Sets `objectMode`
17531753
for writable side of the stream. Has no effect if `objectMode`
17541754
is `true`.
1755+
* `readableHighWaterMark` {number} Sets `highWaterMark` for the readable side
1756+
of the stream. Has no effect if `highWaterMark` is provided.
1757+
* `writableHighWaterMark` {number} Sets `highWaterMark` for the writable side
1758+
of the stream. Has no effect if `highWaterMark` is provided.
17551759

17561760
For example:
17571761

Diff for: lib/_stream_readable.js

+16-2
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,32 @@ function prependListener(emitter, event, fn) {
6161
function ReadableState(options, stream) {
6262
options = options || {};
6363

64+
// Duplex streams are both readable and writable, but share
65+
// the same options object.
66+
// However, some cases require setting options to different
67+
// values for the readable and the writable sides of the duplex stream.
68+
// These options can be provided separately as readableXXX and writableXXX.
69+
var isDuplex = stream instanceof Stream.Duplex;
70+
6471
// object stream flag. Used to make read(n) ignore n and to
6572
// make all the buffer merging and length checks go away
6673
this.objectMode = !!options.objectMode;
6774

68-
if (stream instanceof Stream.Duplex)
75+
if (isDuplex)
6976
this.objectMode = this.objectMode || !!options.readableObjectMode;
7077

7178
// the point at which it stops calling _read() to fill the buffer
7279
// Note: 0 is a valid value, means "don't call _read preemptively ever"
7380
var hwm = options.highWaterMark;
81+
var readableHwm = options.readableHighWaterMark;
7482
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
75-
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
83+
84+
if (hwm || hwm === 0)
85+
this.highWaterMark = hwm;
86+
else if (isDuplex && (readableHwm || readableHwm === 0))
87+
this.highWaterMark = readableHwm;
88+
else
89+
this.highWaterMark = defaultHwm;
7690

7791
// cast to ints.
7892
this.highWaterMark = Math.floor(this.highWaterMark);

Diff for: lib/_stream_writable.js

+16-2
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,33 @@ function nop() {}
4141
function WritableState(options, stream) {
4242
options = options || {};
4343

44+
// Duplex streams are both readable and writable, but share
45+
// the same options object.
46+
// However, some cases require setting options to different
47+
// values for the readable and the writable sides of the duplex stream.
48+
// These options can be provided separately as readableXXX and writableXXX.
49+
var isDuplex = stream instanceof Stream.Duplex;
50+
4451
// object stream flag to indicate whether or not this stream
4552
// contains buffers or objects.
4653
this.objectMode = !!options.objectMode;
4754

48-
if (stream instanceof Stream.Duplex)
55+
if (isDuplex)
4956
this.objectMode = this.objectMode || !!options.writableObjectMode;
5057

5158
// the point at which write() starts returning false
5259
// Note: 0 is a valid value, means that we always return false if
5360
// the entire buffer is not flushed immediately on write()
5461
var hwm = options.highWaterMark;
62+
var writableHwm = options.writableHighWaterMark;
5563
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
56-
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
64+
65+
if (hwm || hwm === 0)
66+
this.highWaterMark = hwm;
67+
else if (isDuplex && (writableHwm || writableHwm === 0))
68+
this.highWaterMark = writableHwm;
69+
else
70+
this.highWaterMark = defaultHwm;
5771

5872
// cast to ints.
5973
this.highWaterMark = Math.floor(this.highWaterMark);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
'use strict';
2+
require('../common');
3+
const assert = require('assert');
4+
5+
const { Transform, Readable, Writable } = require('stream');
6+
7+
const DEFAULT = 16 * 1024;
8+
9+
function testTransform(expectedReadableHwm, expectedWritableHwm, options) {
10+
const t = new Transform(options);
11+
assert.strictEqual(t._readableState.highWaterMark, expectedReadableHwm);
12+
assert.strictEqual(t._writableState.highWaterMark, expectedWritableHwm);
13+
}
14+
15+
// test overriding defaultHwm
16+
testTransform(666, DEFAULT, { readableHighWaterMark: 666 });
17+
testTransform(DEFAULT, 777, { writableHighWaterMark: 777 });
18+
testTransform(666, 777, {
19+
readableHighWaterMark: 666,
20+
writableHighWaterMark: 777,
21+
});
22+
23+
// test 0 overriding defaultHwm
24+
testTransform(0, DEFAULT, { readableHighWaterMark: 0 });
25+
testTransform(DEFAULT, 0, { writableHighWaterMark: 0 });
26+
27+
// test highWaterMark overriding
28+
testTransform(555, 555, {
29+
highWaterMark: 555,
30+
readableHighWaterMark: 666,
31+
});
32+
testTransform(555, 555, {
33+
highWaterMark: 555,
34+
writableHighWaterMark: 777,
35+
});
36+
testTransform(555, 555, {
37+
highWaterMark: 555,
38+
readableHighWaterMark: 666,
39+
writableHighWaterMark: 777,
40+
});
41+
42+
// test highWaterMark = 0 overriding
43+
testTransform(0, 0, {
44+
highWaterMark: 0,
45+
readableHighWaterMark: 666,
46+
});
47+
testTransform(0, 0, {
48+
highWaterMark: 0,
49+
writableHighWaterMark: 777,
50+
});
51+
testTransform(0, 0, {
52+
highWaterMark: 0,
53+
readableHighWaterMark: 666,
54+
writableHighWaterMark: 777,
55+
});
56+
57+
// test undefined, null, NaN
58+
[undefined, null, NaN].forEach((v) => {
59+
testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });
60+
testTransform(DEFAULT, DEFAULT, { writableHighWaterMark: v });
61+
testTransform(666, DEFAULT, { highWaterMark: v, readableHighWaterMark: 666 });
62+
testTransform(DEFAULT, 777, { highWaterMark: v, writableHighWaterMark: 777 });
63+
});
64+
65+
// test non Duplex streams ignore the options
66+
{
67+
const r = new Readable({ readableHighWaterMark: 666 });
68+
assert.strictEqual(r._readableState.highWaterMark, DEFAULT);
69+
const w = new Writable({ writableHighWaterMark: 777 });
70+
assert.strictEqual(w._writableState.highWaterMark, DEFAULT);
71+
}

0 commit comments

Comments
 (0)