Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid dropping of messages after breaking from Select blocks #42

Merged

Conversation

shsms
Copy link
Contributor

@shsms shsms commented Nov 7, 2022

The current implementation of Select.ready(), instead of just checking that some of the receivers have messages, also fetches them.

And in cases where there are multiple receivers with messages waiting to be read, Select.ready() consumes the latest message from each of them, and if subsequent user code decides to drop the Select object, and recreates it for some reason, the unprocessed messages that Select.ready() had fetched will get lost.

This is incorrect usage of Select, but the implementation should still try to ensure messages don't get dropped in such cases. This PR addresses this issue as follows:

  1. the Receiver interface now requires implementations to provide two private methods _ready and _get, which are used to implement __anext__ on the interface. So Receiver objects would retain the original interface with __anext__, and receive, but now provide methods to only check if a message is ready or wait until one is, and a separate method to get a message after it becomes ready.
  2. Update Select.ready() to use the Receiver._ready() method to know when a message is ready and fetch the message with _get() only when the user code is trying to fetch the message.
  3. This would mean the Select implementation no longer be able to operate on async iterators like it used to, but that's probably a good thing, taking away some of its power.

@github-actions github-actions bot added part:channels Affects channels implementation part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests labels Nov 7, 2022
@shsms shsms force-pushed the select-improvements branch from d8fdef3 to 206bbd7 Compare November 7, 2022 13:07
@shsms shsms marked this pull request as ready for review November 7, 2022 13:14
@shsms shsms requested a review from a team as a code owner November 7, 2022 13:14
@leandro-lucarella-frequenz

This comment was marked as outdated.

@shsms

This comment was marked as outdated.

@leandro-lucarella-frequenz

This comment was marked as outdated.

@sahas-subramanian-frequenz

This comment was marked as outdated.

@leandro-lucarella-frequenz

This comment was marked as outdated.

@leandro-lucarella-frequenz

This comment was marked as outdated.

@leandro-lucarella-frequenz

This comment was marked as outdated.

@sahas-subramanian-frequenz

This comment was marked as outdated.

leandro-lucarella-frequenz pushed a commit to llucax/frequenz-channels-python that referenced this pull request Nov 14, 2022
Alternative approach to frequenz-floss#42.

Signed-off-by: Leandro Lucarella <[email protected]>
@leandro-lucarella-frequenz
Copy link
Contributor

OK, I think I can conclude that as_completed() does exactly what we want, except it doesn't return the finished task but its .result(), so we don't have a way to know which of all the tasks finished, so we can't re-schedule it (unless we start wrapping the tasks to return also some way to identify which one finished as part of the .result()).

Another option would be to implement our own as_completed(), the code is not that complicated, which returns the task instead of the .result(): https://github.com/python/cpython/blob/db115682bd639a2642c617f0b7d5b30cd7d7f472/Lib/asyncio/tasks.py#L561-L615

This approach is not making the code simpler, but an advantage over this approach, IMHO, is that we keep the complexity in select() only, and receivers can still be simple async iterators, which for me it makes a lot of sense, and avoid people wanting to provide new channels to worry about implementing readiness primitives.

I gave it a shot by doing the mapping of the done coroutine returned by as_completed() and our pending items, and the code is greatly simplified but for some reason it is not working completely as expected.

#46

@leandro-lucarella-frequenz
Copy link
Contributor

yes, but you are operating on the same tasks a1 and a2 after you break out of the loop. But in the select implementation, a1 and a2 will be owned by select, and when we break out of the loop, the task objects get lost and hence the values they hold.

Not if we break the loop but keep the Select object alive. But yeah, I see your points, that would lead to a select() function implementation that it is not correct. I see now we have another layer of complexity, which is we are consuming from the async iterators in a function running inside a task, so if the task already consumed the value and we cancel the task for some reason, then the value will be lost too. The alternative I presented above suffers from this issue too.

So I think you finally convinced me that we can't have both a nice simple and safe async iterator interface and not introduce the ready test to receivers.

I really hate moving the complexity to channels implementators, but I guess there is no way around that is not moving complexity to users of select(), which is even worse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I like it. It's also not that bad in terms of complexity as it only splits the work that needed to be done by receive(), so I'm less worried about complexity for channels implementors too.

Before this PR we also had the issue about potential messages being lost when breaking in Merge/MergeNamed, right?

This also needs a RELEASE_NOTES entry!

In a followup PR I think we should definitely go for the async iterable interface for Select.ready(), it will simplify the implementation too (as we won't need the extra class for lazy evaluation for example).

src/frequenz/channels/anycast.py Outdated Show resolved Hide resolved
src/frequenz/channels/anycast.py Outdated Show resolved Hide resolved
src/frequenz/channels/base_classes.py Outdated Show resolved Hide resolved
src/frequenz/channels/base_classes.py Outdated Show resolved Hide resolved
src/frequenz/channels/base_classes.py Outdated Show resolved Hide resolved
src/frequenz/channels/broadcast.py Show resolved Hide resolved
src/frequenz/channels/broadcast.py Show resolved Hide resolved
src/frequenz/channels/select.py Show resolved Hide resolved
src/frequenz/channels/utils/file_watcher.py Show resolved Hide resolved
src/frequenz/channels/utils/file_watcher.py Show resolved Hide resolved
... instead of using `None`

Signed-off-by: Sahas Subramanian <[email protected]>
.. and not in calls to `Select.ready()`, like it was earlier.

Signed-off-by: Sahas Subramanian <[email protected]>
@shsms shsms force-pushed the select-improvements branch from 206bbd7 to 6f4576b Compare November 18, 2022 16:36
@leandro-lucarella-frequenz leandro-lucarella-frequenz added this to the v0.11.0 milestone Nov 21, 2022
@leandro-lucarella-frequenz
Copy link
Contributor

Needs a couple of documentation fixes and it is ready to go! Feel free to merge after those fixes.

@shsms shsms force-pushed the select-improvements branch from 6f4576b to 64e715d Compare November 21, 2022 11:32
... instead of `StopAsyncIteration`

Signed-off-by: Sahas Subramanian <[email protected]>
@shsms shsms force-pushed the select-improvements branch from 64e715d to 6138ee0 Compare November 21, 2022 11:44
@sahas-subramanian-frequenz sahas-subramanian-frequenz merged commit 5b47b03 into frequenz-floss:v0.x.x Nov 21, 2022
@shsms shsms deleted the select-improvements branch November 21, 2022 11:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:channels Affects channels implementation part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests
Projects
Development

Successfully merging this pull request may close these issues.

3 participants