Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory is coupled to group by cardinality, even when the aggregate output is truncated by a limit clause #7191

Closed
avantgardnerio opened this issue Aug 4, 2023 · 6 comments · Fixed by #7192
Labels
enhancement New feature or request

Comments

@avantgardnerio
Copy link
Contributor

Is your feature request related to a problem or challenge?

Currently, there is only one Aggregation: GroupedHashAggregateStream. It does a lovely job, but it allocates memory for every unique group by value.

For large datasets, this can cause OOM errors, even if the very next operation is a sort by max(x) limit y.

Describe the solution you'd like

I would like to add a GroupedAggregateStream based on a PriorityQueue of grouped values that can be used instead of GroupedHashAggregateStream under the specific conditions above, so that Top K queries work even on datasets with cardinality larger than available memory.

Describe alternatives you've considered

A more generalized implementation where we:

  1. sort by group_val
  2. aggregate by group_val emiting rows in a stream as the aggregate for each group is computed
  3. feed that into a (new) generalized TopKExec node that is only responsible for doing the top K operation

Unfortunately, despite being more general, I'm told that this approach will still OOM in our case.

Additional context

Please see the following similar (but not same) tickets for related top K issues:

  1. Top-K query optimization in sort uses substantial memory  #7149
  2. Improve Memory usage + performance with large numbers of groups / High Cardinality Aggregates #6937
  3. Improve aggregate performance with specialized groups accumulator for single string group by #7064
  4. Optimize "per partition" top-k : ROW_NUMBER < 5 / TopK #6899
@tustvold
Copy link
Contributor

tustvold commented Aug 4, 2023

Perhaps we could add a redact group API to the new row accumulators, this would allow using them for this as well as for window functions

@avantgardnerio
Copy link
Contributor Author

Perhaps we could add a redact group API to the new row accumulators, this would allow using them for this as well as for window functions

I struggled with this for a bit. Originally I rejected using GroupValuesRows because it had a hash table in it, whereas this needed to be ordered. Eventually I realized I needed a BiMap and ended up with both a hash table (group to value) and a priority queue (value to group). I think this means we could merge the two implementations by passing an optional limit to GroupValuesRows.

I don't think we'd want to always evict groups, because we might not even need to add them in the first place if the value being aggregated is less/greater than the min/max of the priority queue - so it would be a no-op.

@avantgardnerio
Copy link
Contributor Author

Also, I think usually this optimization would be applied for single terms max(x) not max(x), min(y) so we would probably want this to apply to PrimitiveGroupsAccumulator if anything.

@tustvold
Copy link
Contributor

tustvold commented Aug 4, 2023

I needed a BiMap

Yeah, I think you need both a priority queue to work out which groups to keep, along with a HashMap to work out which rows belong to which groups. I can't think of an obvious way to avoid this.

I don't think we'd want to always evict groups, because we might not even need to add them in the first place if the value being aggregated is less/greater than the min/max of the priority queue - so it would be a no-op.

I was envisaging something like adding support to the GroupsAccumulator::inter to optionally return a list of groups to redact, possibly as a BooleanBuffer. This would effectively be groups from previous calls to GroupsAccumulator::inter that are no longer needed. This would then be fed to a new GroupValues::evict method to clear them out from the various aggregators, possibly using something relatively cheap like Vec::retain.

Or something to that effect, just spitballing here. I really want to get Window functions using GroupsAccumulator so that we can get rid of the old scalar accumulators (#7112), and the GroupValues::evict API would enable this.

@avantgardnerio
Copy link
Contributor Author

get rid of the old scalar accumulators

Interesting... I thought we were going the other way, due to this comment.

@tustvold
Copy link
Contributor

tustvold commented Aug 4, 2023

By scalar, I am referring to the ones based around ScalarValue, i.e. https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
2 participants