Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: groupBy may never cancel the upstream if the group is dropped #6596

Closed
akarnokd opened this issue Jul 29, 2019 · 7 comments · Fixed by #6642
Closed

3.x: groupBy may never cancel the upstream if the group is dropped #6596

akarnokd opened this issue Jul 29, 2019 · 7 comments · Fixed by #6642

Comments

@akarnokd
Copy link
Member

akarnokd commented Jul 29, 2019

Summary

The design decision to allow a delayed subscribe() to a group emitted by groupBy (i.e., subscribeOn) creates a window where if the consumer ignores the group, groupBy may never cancel its source.

Problem

In order to support taking a limited number of groups (i.e., source.groupBy(i -> i % 10).take(2), the groupBy operator can't cancel its source just because the downstream cancelled the flow of groups on it. Instead, a reference counting scheme is used so that when all groups have been cancelled, the upstream will then be cancelled (i.e., source.groupBy(i -> i % 2).take(2).flatMap(g -> g.take(2))).

The documentation states that one should not ignore a group:

Note: A GroupedFlowable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedPublishers that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like ignoreElements() to them.

however, some operators may ignore their input - since they don't see a group but an object only, thus the group is not consumed and the source is never cancelled.

Unfortunately, flatMap is one of such operators which when cancelled, it will ignore incoming values, and thus groups. Add some asynchronous cancellation race and the problem manifests sooner or later (source.groupBy.take.flatMap.takeUnit(cancelSignal)).

In the original Rx.NET implementation, groups are reference counted and if the consumer doesn't subscribe to it immediately, the group is discarded right then. However, they don't cache items and thus a delayed subscription results in dataloss RxJava was set to avoid.

Proposition

I propose a change to the groupBy logic to solve this cancellation problem as well as not lose data. The solution requires multiple considerations.

First, we need to detect if there was a subscribe call when a fresh group was emitted. If not, the group is discarded after the single value and a completion signal is emitted to the group. This way, the groupBy is not held back by a potentially unconsumed group and if the group is eventually consumed, the value is not lost.

The drawback is that this scheme may lead to group recreation over and over even if the group is actually subscribed to in a delayed fashion. Note, however, that since a group is practically a hot subject, using subscribeOn has generally no practical benefit and consumers should apply observeOn to shift the emission to the desired thread anyway.

Second, there is an inherent race possible between an async subscriber and deciding if the group has been subscribed to just in time. Therefore, an atomic state transition has to be implemented to declare a group live or dead on arrival. In addition, the completion of the dead group and a possible cancellation by its consumer should not trigger multiple cancellations/group removal, especially the removal of a newer group with the same key.

Third, when a GroupedFlowable is declared dead on arrival, then consumed later, the consumption if the cached item should not trigger a request from the main source like with any alive group consumption. In contrast, declaring a group dead should ask for replenishment from the main source as now we can't know if the group will ever be consumed.

@akarnokd akarnokd added this to the 3.0 milestone Jul 29, 2019
@davidmoten
Copy link
Collaborator

davidmoten commented Jul 29, 2019

Thanks for raising this @akarnokd. Reminds me of #5839 where you used the term resource-aware streams. Making RxJava resource-aware might be an option so that queueing operators applied some function (global? local? interface defined?) to unused buffered elements.

I've hundreds of uses of groupBy across work and other codebases and I always use it with flatMap straight after (the only exception is the occasional use of groupBy.map.flatMap which is trivially transformable to groupBy.flatMap). My use cases are not that interesting perhaps but I do wonder if people do use groupBy with cancelling operators like take before flatMap. I don't like losing flexibility in general but perhaps the root of our problem is that the groupBy operator should always flatten the grouped streams as well so that we can internalize the cancellation problem.

Has anyone got a use case where they call flowable.groupBy.take.flatMap or similar cancelling operator being applied to the groupBy before the flatMap?

@smaldini
Copy link

We use something like a hook for unconsumed data (discarded). The problem is still that sometimes we cannot guarantee the parent is not emitting some extra onNext. We had that problem with MonoCollectList in particular but it can apply to other operators.

One thing I was wondering was if we could make cancel propagate back an onComplete to delimit the onNext sequence consistently.

Regarding your case @davidmoten i've mostly seen cases where we do flux.groupBy.flatMap[take.reduce] or similar. I've also had a case where groups could be pre-determined and we just forward to processors we can eagerly create instead of waiting for an onNext to create the groupedFlux in that case.

@davidmoten
Copy link
Collaborator

Thanks @smaldini.

My question is not great at the moment. Really I want to establish if any user has a use case to insert a non-trivial operator between groupBy and flatMap. If not that means that internalization of the flatMap is a possibility.

@davidmoten
Copy link
Collaborator

We use something like a hook for unconsumed data (discarded). The problem is still that sometimes we cannot guarantee the parent is not emitting some extra onNext. We had that problem with MonoCollectList in particular but it can apply to other operators.

That's interesting and certainly makes resource-aware streams more difficult.

@akarnokd
Copy link
Member Author

I don't think resource awareness would help in this situation. When the group is composed over (flatMap( group.map )), it stops being a GroupedFlowable thus any Disposable option would be lost.

Resource-awareness is its own complicated thing and requires drastic changes to the architecture. I'd like someday to explore it but outside of RxJava.

@davidmoten
Copy link
Collaborator

I don't think resource awareness would help in this situation. When the group is composed over (flatMap( group.map )), it stops being a GroupedFlowable thus any Disposable option would be lost.

Yep, indeed.

What about the internalization of flatMap into groupBy?

@akarnokd
Copy link
Member Author

Like with publish(Function)? I'm not sure; it would prevent open composition as you'd have to put everthing inside. I don't think yo could hand out Flowable<GroupedFlowable<T, K>> reliably either way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants