-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change in stream behaviour in Node 4.4.5 #7278
Comments
Hi! I think that this is being fixed by #7160, at least your provided test script seems to pass with it. Can your verify that, and if so, would you mind submitting your example as an additional test case in a PR? And fyi, Node.js versions from before 4.2.0 should exhibit the 4.4.5 behaviour, too. /cc @thealphanerd |
Unfortunately, a slight variation of the test still fails with #7160 applied: var stream = require('stream'),
util = require('util'),
Writable = stream.Writable,
PassThrough = stream.PassThrough;
function TestStream()
{
Writable.call(this);
}
util.inherits(TestStream, Writable);
var count = 0;
TestStream.prototype._write = function (chunk, encoding, callback)
{
count += 1;
console.log("_write", chunk.length);
if (count === 1)
{
pthru.push(new Buffer(64 * 1024));
process.nextTick(callback);
}
else
{
callback();
}
};
var pthru = new PassThrough(),
test = new TestStream();
pthru.pipe(test);
process.nextTick(function ()
{
pthru.push(new Buffer(32 * 1024));
pthru.push(new Buffer(128 * 1024));
}); This results in the following being printed:
This is because we never get a resume (apart from the initial one due to setting up the pipe) so I have a rudimentary fix but it will probably need some work: diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index afe3b3b..5ef40e9 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -521,7 +521,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
- var ondrain = pipeOnDrain(src);
+ var ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
var cleanedUp = false;
@@ -611,15 +611,20 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
return dest;
};
-function pipeOnDrain(src) {
+function pipeOnDrain(src, dest) {
return function() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
- if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
- state.flowing = true;
- flow(src);
+ if (EE.listenerCount(src, 'data')) {
+ if (state.awaitDrain === 0) {
+ state.flowing = true;
+ flow(src);
+ } else {
+ // TODO: we need to remember awaitDrain per dest
+ dest._writableState.needDrain = true;
+ }
}
};
}
@@ -740,6 +745,7 @@ function resume_(stream, state) {
}
state.resumeScheduled = false;
+ state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading) The fix results in the following being printed:
|
This also works: diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index afe3b3b..46a5691 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -550,10 +550,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
src.on('data', ondata);
+ var increasedAwaitDrain = false;
function ondata(chunk) {
debug('ondata');
+ increasedAwaitDrain = false;
var ret = dest.write(chunk);
- if (false === ret) {
+ if (false === ret && !increasedAwaitDrain) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
@@ -563,6 +565,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
+ increasedAwaitDrain = true;
}
src.pause();
} |
Regarding your first suggestion: You’ll have to be pretty careful about accessing the destination’s Your second suggestion seems to make sense and I guess there’s nothing speaking against turning it into a PR. |
@addaleax I assigned myself to track the issue... not 100% the best course of action |
@nodejs/streams |
#7292 should be a fix for this, btw |
Reopening until the fix lands on v4.x |
Guard against the call to write() inside pipe's ondata pushing more data back onto the Readable, thus causing ondata to be called again. This is fine but results in awaitDrain being increased more than once. The problem with that is when the destination does drain, only a single 'drain' event is emitted, so awaitDrain in this case will never reach zero and we end up with a permanently paused stream. Fixes: #7278 PR-URL: #7292 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
Guard against the call to write() inside pipe's ondata pushing more data back onto the Readable, thus causing ondata to be called again. This is fine but results in awaitDrain being increased more than once. The problem with that is when the destination does drain, only a single 'drain' event is emitted, so awaitDrain in this case will never reach zero and we end up with a permanently paused stream. Fixes: #7278 PR-URL: #7292 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
Guard against the call to write() inside pipe's ondata pushing more data back onto the Readable, thus causing ondata to be called again. This is fine but results in awaitDrain being increased more than once. The problem with that is when the destination does drain, only a single 'drain' event is emitted, so awaitDrain in this case will never reach zero and we end up with a permanently paused stream. Fixes: #7278 PR-URL: #7292 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
Guard against the call to write() inside pipe's ondata pushing more data back onto the Readable, thus causing ondata to be called again. This is fine but results in awaitDrain being increased more than once. The problem with that is when the destination does drain, only a single 'drain' event is emitted, so awaitDrain in this case will never reach zero and we end up with a permanently paused stream. Fixes: #7278 PR-URL: #7292 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
Closing this as the fix has been included in v4.4.7 |
replace custom backpressure mechanism that involved queueing write requests with pausing/resuming source stream that seems to solve incomplete/unfinished part stream issues that started appearing in node 4.4.5 and later probably related to nodejs/node#7278 although not fixed in 4.4.7 as this issue is supposed to be
replace standard backpressure mechanism that involved queueing write requests with pausing/resuming source stream that seems to solve incomplete/unfinished part stream issues that started appearing in node 4.4.5 and later probably related to nodejs/node#7278 although not fixed in 4.4.7 as this issue is supposed to be
Version: v4.4.5
Platform: Linux 4.2.0-36-generic #41-Ubuntu SMP Mon Apr 18 15:47:56 UTC 2016 i686 i686 i686 GNU/Linux
Subsystem: stream
I have the following test code:
On 4.4.4, this prints:
On 4.4.5, this prints:
i.e. the final buffer gets lost. I believe this is because
awaitDrain
ends up as 2 but duringclearBuffer
only onedrain
event gets emitted.I think the change in behaviour occurred due to #6023 - which seems sensible for the issue it was fixing.
My question is: which behaviour is correct? Is it supported to push onto the source stream during a write to the destination given a pipe has been set up between the two?
The text was updated successfully, but these errors were encountered: