Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readline: buffer line events in async iterator #41708

Closed
wants to merge 3 commits into from

Conversation

benjamingr
Copy link
Member

Beforehand the async iterator for readline was created lazily when the
stream was accessed which meant no events were buffered. In addition
stream handling was modernized and improved in the process to support
backpressure correctly. A test was added to assert this.

There is a cost here (the stream is always created) I believe it is well worth it to enable modern usage of readline without dropping lines.

There is also an unrelated bugfix here (we return the wrong thing from events.on(...).throw(...).

cc @nodejs/readline @nodejs/streams @jfriend00

Fixes: #28565
Fixes: #33463

Beforehand the async iterator for readline was created lazily when the
stream was accessed which meant no events were buffered. In addition
stream handling was modernized and improved in the process to support
backpressure correctly. A test was added to assert this.

Fixes: nodejs#28565
Fixes: nodejs#33463
@nodejs-github-bot nodejs-github-bot added events Issues and PRs related to the events subsystem / EventEmitter. needs-ci PRs that need a full CI run. readline Issues and PRs related to the built-in readline module. labels Jan 26, 2022
const onIterator = on(this, 'line');
this.once('close', () => onIterator.return());
const readable = Readable.from(onIterator).map((line) => line[0]);
this[kLineObjectStream] = readable;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the current implementation does not close the readline.Interface when the async iterable is closed. This can be easily fixed if we want by doing readable.once('close', () => this.close()) though I am not sure which behavior is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should close it. The pattern for async iterators is that the stream is fully consumed after the loop. Maybe add an option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should close it too but was hesitant to change the existing behavior. If you agree it should be closed I'll close it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@benjamingr
Copy link
Member Author

(Note to self: This also needs a doc update to remove the sentence about not attaching the listener early enough)

@benjamingr
Copy link
Member Author

The failing for backpressure test looks kind of wrong I think?

It wraps the events with a readable settings its highWaterMark to 16 (default for objectMode) and stops asking for data once we buffered 16 items by calling pause.

This means that other consumers of the readline.Interface won't get line events they're listening to which is kind of confusing behavior I think? I think for the very least we should check no one else is listening to line before pausing.

this is like one consumer in a multicast system pausing the broadcast for everyone because it can't buffer enough.

Alternatively I'm not sure backpressure makes a ton of sense here? I'll consider

@mcollina
Copy link
Member

The failing for backpressure test looks kind of wrong I think?

I'm probably lost, could you link?

This means that other consumers of the readline.Interface won't get line events they're listening to which is kind of confusing behavior I think? I think for the very least we should check no one else is listening to line before pausing.

If you are using an async iterator you are the primary consumer of that data. This should probably documented somewhere but a lot of unwanted things happen when multiple consumers are attached.

@benjamingr
Copy link
Member Author

I'm probably lost, could you link?

There is a test that checks that we .pause the readline.Interface after the readable associated with the async iterator reaches its watermark (which is a non-configurable 16 lines since that's the default for objectMode).

You can see the test in https://github.com/nodejs/node/blob/master/test/parallel/test-readline-async-iterators-backpressure.js

It's kind of confusing since if errors happen those will be buffered too in the async iterable which is why this behavior changed in from: #29428 .

This isn't too hard to "fix" since from already sets reading = false when the readable has buffered enough - it just defaults to a highWaterMark of 1 and not 16.

@benjamingr
Copy link
Member Author

I think I'll split this PR into two - one just fixing the bug and another to discuss backpressure to make the discussion easier - hope that's ok?

@mcollina
Copy link
Member

go for it

@benjamingr benjamingr added the request-ci Add this label to start a Jenkins CI on a PR. label Jan 28, 2022
@github-actions github-actions bot removed the request-ci Add this label to start a Jenkins CI on a PR. label Jan 28, 2022
@nodejs-github-bot
Copy link
Collaborator

@benjamingr benjamingr added the request-ci Add this label to start a Jenkins CI on a PR. label Jan 29, 2022
@github-actions github-actions bot removed the request-ci Add this label to start a Jenkins CI on a PR. label Jan 29, 2022
@nodejs-github-bot
Copy link
Collaborator

@benjamingr
Copy link
Member Author

benjamingr commented Feb 2, 2022

I want to wait for #41276 to land first


await setImmediate();
const result = [];
for await (const item of rl) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it's redundant await, isn't it?

@aduh95
Copy link
Contributor

aduh95 commented May 11, 2024

This needs a rebase/

@benjamingr benjamingr closed this Jun 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
blocked PRs that are blocked by other issues or PRs. events Issues and PRs related to the events subsystem / EventEmitter. needs-ci PRs that need a full CI run. readline Issues and PRs related to the built-in readline module.
Projects
None yet
6 participants