Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: writableNeedDrain #35348

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,15 @@ This property contains the number of bytes (or objects) in the queue
ready to be written. The value provides introspection data regarding
the status of the `highWaterMark`.

##### `writable.writableNeedDrain`
<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` if the stream's buffer has been full and stream will emit `'drain'`.

##### `writable.writableObjectMode`
<!-- YAML
added: v12.3.0
Expand Down
5 changes: 5 additions & 0 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,11 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', {
get: function() { return this.finished; }
});

ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', {
get: function() {
return !this.destroyed && !this.finished && this[kNeedDrain];
}
});

const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ ObjectDefineProperties(Duplex.prototype, {
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
writableEnded:
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
writableNeedDrain:
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain'),

destroyed: {
get() {
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ async function pump(iterable, writable, finish) {
}
let error;
try {
if (writable.writableNeedDrain === true) {
await EE.once(writable, 'drain');
ronag marked this conversation as resolved.
Show resolved Hide resolved
}

for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
Expand Down
7 changes: 6 additions & 1 deletion lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.emit('pipe', src);

// Start the flow if it hasn't been started already.
if (!state.flowing) {

if (dest.writableNeedDrain === true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why not:

Suggested change
if (dest.writableNeedDrain === true) {
if (dest.writableNeedDrain) {

if (state.flowing) {
src.pause();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little unsure how this affects the case where src is already piped to other destinations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 what about adding the stream to the state.awaitDrainWriters (refs pipeOnDrain + ondata(chunk)). Though that may require copying https://github.com/nodejs/node/pull/35348/files#diff-0117344ddd481d021ad96b9c8eea78a5R741-R747 from ondata...

Copy link
Member Author

@ronag ronag Oct 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea how that works or what it is intended for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, same 😄, based on the initial look it is a list of writables to wait for a drain event so this case seems fitting.
Let's ping @addaleax @BridgeAR @mscdex as most "recent" collaborators according to git-blame.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this can be reverted or done some other way? It seems to cause issues with the source getting stuck in a paused state if you previously fed the output buffer with a lot of data, see eg, electron/asar#210 That module basically just appends a bunch of files to a single binary, but after a random amount of files it gets stuck due to the source being in a paused state.
This could, of course, be easily worked around by adding the writableNeedDrain loop as in the issue above, or adding a resume call to the stream, but I feel like that shouldn't really be necessary.

} else if (!state.flowing) {
debug('pipe resume');
src.resume();
}
Expand Down
8 changes: 8 additions & 0 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,14 @@ ObjectDefineProperties(Writable.prototype, {
}
},

writableNeedDrain: {
get() {
const wState = this._writableState;
if (!wState) return false;
return !wState.destroyed && !wState.ending && wState.needDrain;
}
},

writableHighWaterMark: {
get() {
return this._writableState && this._writableState.highWaterMark;
Expand Down
29 changes: 29 additions & 0 deletions test/parallel/test-stream-pipe-needDrain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');

// Pipe should not continue writing if writable needs drain.
{
const w = new Writable({
write(buf, encoding, callback) {

}
});

while (w.write('asd'));

assert.strictEqual(w.writableNeedDrain, true);

const r = new Readable({
read() {
this.push('asd');
}
});

w.write = common.mustNotCall();

r.pipe(w);
}