Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readable.pipe() behaves inconsistently resuming (or not) the source #41785

Closed
tufosa opened this issue Jan 31, 2022 · 11 comments · Fixed by #41848
Closed

Readable.pipe() behaves inconsistently resuming (or not) the source #41785

tufosa opened this issue Jan 31, 2022 · 11 comments · Fixed by #41848
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.

Comments

@tufosa
Copy link

tufosa commented Jan 31, 2022

Version

14.17.0

Platform

Linux tufopad 5.4.0-91-generic #102-Ubuntu SMP Fri Nov 5 16:31:28 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

Subsystem

stream

What steps will reproduce the bug?

Prior to v14.17.0 (versions <= v14.16.1), calling to Readable.pipe() would always resume the source if it had been previously paused. This behaviour was not documented, but it was consistent (it always happened). Since version 14.17.0 this behaviour changed and now it only resumes the source in some cases. If you run the following code

const { PassThrough } = require('stream');

// FIRST EXPERIMENT
console.info('********** FIRST EXPERIMENT **********');
const source1 = new PassThrough();
const target1 = new PassThrough();

// `Readable.pipe()` resumes the source if it was previously paused
source1.pause();
console.info(`source1 before pipe. Paused: ${source1.isPaused()}`);
source1.pipe(target1);
console.info(`source1 after pipe. Paused: ${source1.isPaused()}`);

// SECOND EXPERIMENT
console.info('\n********** SECOND EXPERIMENT **********');
const source2 = new PassThrough();
const target2 = new PassThrough();

// stall target2
const chunk = Buffer.allocUnsafe(1000);
let chunks = 1;
while (target2.write(chunk)) chunks++;
console.info(`${chunks} chunks of ${chunk.length} bytes to stall target2`);

// `Readable.pipe()` DOES NOT resume the source if it was previously paused, but
// the target needs drain (only in version >= v14.17.0)
source2.pause();
console.info(`source2 before pipe. Paused: ${source2.isPaused()}`);
source2.pipe(target2);
console.info(`source2 after pipe. Paused: ${source2.isPaused()}`);
target2.on('drain', () => {   
  console.info('target2 drained');
  console.info(`source2 after drain. Paused: ${source2.isPaused()}`);
});
target2.on('data', () => {});

with version v14.16.1 you will get the following output

********** FIRST EXPERIMENT **********
source1 before pipe. Paused: true
source1 after pipe. Paused: false

********** SECOND EXPERIMENT **********
34 chunks of 1000 bytes to stall target2
source2 before pipe. Paused: true
source2 after pipe. Paused: false
target2 drained
source2 after drain. Paused: false

whereas with version v14.17.0 you get

********** FIRST EXPERIMENT **********
source1 before pipe. Paused: true
source1 after pipe. Paused: false

********** SECOND EXPERIMENT **********
34 chunks of 1000 bytes to stall target2
source2 before pipe. Paused: true
source2 after pipe. Paused: true
target2 drained
source2 after drain. Paused: true

In versions higher or equal to v14.17.0, the source is NOT resumed if the target needs a drain, and it does not resume even when the target drains. Furthermore, if the source wasn't paused, Readable.pipe() will pause it if the piped target needs to drain. This did not happen in versions <= v14.16.1.

The fact that Readable.pipe() decides if it should pause or resume the source depending on the status of the target looks a bit inconsistent to me, and if not fixed, I believe that at least it would need to be documented.

How often does it reproduce? Is there a required condition?

It always happens

What is the expected behavior?

It's a change of an undocumented behaviour, so it's hard for me to say what's the expected behaviour. The previous behaviour (resuming always the source) seemed a bit more consistent than the current

What do you see instead?

Read the experiment described in the "What steps will reproduce the bug?" section

Additional information

No response

@benjamingr
Copy link
Member

Probably 4793f165dc from #36563 to fix #36544

@benjamingr
Copy link
Member

cc @ronag

@benjamingr benjamingr added the stream Issues and PRs related to the stream subsystem. label Jan 31, 2022
@benjamingr
Copy link
Member

The fact that Readable.pipe() decides if it should pause or resume the source depending on the status of the target looks a bit inconsistent to me, and if not fixed, I believe that at least it would need to be documented.

I think it makes sense not to resume a source if you currently can't (conceptually) take more data. Showing interest in data you can't consume is probably not good?

I think the issue here is that even after the data is drained the source isn't resumed?

@benjamingr
Copy link
Member

I might be sleepy but I think it's a bug:

const { Readable, Writable } = require('stream');

const readable = new Readable({
    read() {
        this.push('hello');
        this.push('world');
        this.push(null);
    },
    objectMode: true
});

let cb;
const writable = new Writable({
    write(chunk, encoding, callback) {
        // don't call callback, save for later
        cb = callback;
    },
    highWaterMark: 1,
    objectMode: true
});
writable.write('a');
console.log(writable.writableNeedDrain); // true
console.log(readable.isPaused()); // false
readable.pipe(writable);
console.log(readable.isPaused()); // true
cb();
console.log(readable.isPaused()); // true, but I'd expect it to be false

@tufosa
Copy link
Author

tufosa commented Jan 31, 2022

Thanks for your prompt response and your input, @benjamingr

I think it makes sense not to resume a source if you currently can't (conceptually) take more data. Showing interest in data you can't consume is probably not good?

Yep, that might make sense. Although the previous behaviour (resuming it) would make sense as well. When you pipe a source is like adding a data listener somehow, and it makes sense to resume a source when a data listener.

And what about to pause a source when you pipe it to a target that needs drain? I would say that in this case, Readable.pipe() is overdoing things a bit. Anyway, both things should be documented somewhere, because they are far from being intuitive.

I think the issue here is that even after the data is drained the source isn't resumed?

Definitely. I would expect pipe to handle that.

@ronag
Copy link
Member

ronag commented Jan 31, 2022

I'm not sure I see a problem here.

@benjamingr regarding your example, 'drain' is emitted in the same tick as the callback invocation, hence the readable is read immediately and again fills the writable, hence it is paused again. Whether the 'drain' event should occur in same tick as the callback invocation I guess we could think about.

Yep, that might make sense. Although the previous behaviour (resuming it) would make sense as well. When you pipe a source is like adding a data listener somehow, and it makes sense to resume a source when a data listener.

I don't think it makes sense and also leads to potential memory leaks. The whole 'data' resumes the stream behaviour is there due to compatibility reasons and something I would consider a mistake from long ago that we can't fix.

Anyway, both things should be documented somewhere, because they are far from being intuitive.

PR welcome!

@tufosa
Copy link
Author

tufosa commented Jan 31, 2022

And what about the fact that pipe might decide to pause your source because the target needs to drain and won't resume it when the target drains? Does it make sense to you, @ronag? If it does, then I think it should be documented in block capital letters. I am happy to send a PR myself.

@ronag
Copy link
Member

ronag commented Jan 31, 2022

won't resume it when the target drains?

Do you have an example of this?

@benjamingr
Copy link
Member

@benjamingr regarding your example, 'drain' is emitted in the same tick as the callback invocation, hence the readable is read immediately and again fills the writable, hence it is paused again.

Oh yeah that makes sense. I was sleepy after all :]

@tufosa
Copy link
Author

tufosa commented Feb 1, 2022

Do you have an example of this?

Yep, @ronag, try the following code:

const { PassThrough } = require('stream');

// THIRD EXPERIMENT
console.info('\n********** THIRD EXPERIMENT **********');
const source3 = new PassThrough();
const target3 = new PassThrough();

// stall target3
const chunk = Buffer.allocUnsafe(1000);
let chunks = 1;
while (target3.write(chunk)) chunks++;
console.info(`${chunks} chunks of ${chunk.length} bytes to stall target3`);

// `Readable.pipe()` PAUSES the source if the target needs drain (only in
// version >= v14.17.0) and it does not resume it after drain
console.info(`source3 before pipe. Paused: ${source3.isPaused()}`);
source3.pipe(target3);
console.info(`source3 after pipe. Paused: ${source3.isPaused()}`);
target3.on('drain', () => { 
  console.info('target3 drained');
  console.info(`source3 after drain. Paused: ${source3.isPaused()}`);
});                          
target3.on('data', () => {});

if you run this with a version of Nodejs >= v14.17.0, you will get something like

********** THIRD EXPERIMENT **********
34 chunks of 1000 bytes to stall target3
source3 before pipe. Paused: false
source3 after pipe. Paused: true
target3 drained
source3 after drain. Paused: true

It looks like Readable.pipe() pauses the source and does not resume it when the target drains

@ronag
Copy link
Member

ronag commented Feb 2, 2022

The flowing part of things work. It's just that isPaused returns false even though it kind of shouldn't here.

Looks like a bug with the flowing/paused state of Readable.

@ronag ronag added the confirmed-bug Issues with confirmed bugs. label Feb 2, 2022
ronag added a commit to nxtedition/node that referenced this issue Feb 4, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: nodejs#41785
ronag added a commit to nxtedition/node that referenced this issue Feb 4, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: nodejs#41785
nodejs-github-bot pushed a commit that referenced this issue Feb 6, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: #41785

PR-URL: #41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
ruyadorno pushed a commit that referenced this issue Feb 8, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: #41785

PR-URL: #41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
danielleadams pushed a commit that referenced this issue Mar 2, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: #41785

PR-URL: #41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
danielleadams pushed a commit that referenced this issue Mar 3, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: #41785

PR-URL: #41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
danielleadams pushed a commit to danielleadams/node that referenced this issue Mar 4, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: nodejs#41785

PR-URL: nodejs#41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
danielleadams pushed a commit to danielleadams/node that referenced this issue Mar 4, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: nodejs#41785

PR-URL: nodejs#41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
danielleadams pushed a commit that referenced this issue Mar 8, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: #41785

PR-URL: #41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
danielleadams pushed a commit that referenced this issue Mar 14, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: #41785

PR-URL: #41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
juanarbol pushed a commit that referenced this issue Apr 28, 2022
Previously we would just resume "flowing" the stream without
reseting the "paused" state. Fixes this by properly using
pause/resume methods for .pipe.

Fixes: #41785

PR-URL: #41848
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants