Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Improve aggregate performance with adaptive sizing in accumulators / avoiding reallocations in accumulators #7065

Open
2 tasks
alamb opened this issue Jul 24, 2023 · 6 comments
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 24, 2023

UPDATE: Now that @Rachelint has begun work on #11931 I turned this ticket into an "epic" (aka I'll link related tasks here)

Is your feature request related to a problem or challenge?

Making aggregations fast in datafusion helps its adoption and makes it even cooler

As part of the new #6904 work, @yjshen had an idea #6800 (comment) that could avoid a copy in the accumulator implementations:

Describe the solution you'd like

Adaptive sizing(perhaps?): How would the hash table header and states in each accumulator initialize and grow their sizes afterward?

Here is the structure of the current group operator

                                         ┌──────────────┐   ┌──────────────┐   ┌──────────────┐
                                         │┌────────────┐│   │┌────────────┐│   │┌────────────┐│
    ┌─────┐                              ││accumulator ││   ││accumulator ││   ││accumulator ││
    │  5  │                              ││     0      ││   ││     0      ││   ││     0      ││
    ├─────┤                              ││ ┌────────┐ ││   ││ ┌────────┐ ││   ││ ┌────────┐ ││
    │  9  │                              ││ │ state  │ ││   ││ │ state  │ ││   ││ │ state  │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │     │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │  1  │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │     │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    └─────┘                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ └────────┘ ││   ││ └────────┘ ││   ││ └────────┘ ││
                                         │└────────────┘│   │└────────────┘│   │└────────────┘│
    Hash Table                           └──────────────┘   └──────────────┘   └──────────────┘
                                                                                               
                                          New NewAccumulator                                   
                                                                                               
                                                                                               
                                                                                               
stores "group indexes"                     There is one GroupsAccumulator per aggregate           
which are indexes into                     (NOT PER GROUP). Internally, each                   
Vec<GroupState>                            GroupsAccumulator manages the state for                
                                           multiple groups                                     

The implementation of this, such as Average is to use a Vec<T>. While this approach is simple to implement, it also means that as the Vec grows, the accumulated vales may be copied (up to 2 times on average, given a doubling strategy)

An alternate, suggested by @yjshen is to segment the state into fixed-sized vectors, allocate a new vector at a time, fill it until full, then create a new vector for upcoming new states.

                                         ┌──────────────┐   ┌──────────────┐   ┌──────────────┐
                                         │┌────────────┐│   │┌────────────┐│   │┌────────────┐│
    ┌─────┐                              ││accumulator ││   ││accumulator ││   ││accumulator ││
    │  5  │                              ││     AGG    ││   ││     SUM    ││   ││     0      ││
    ├─────┤                              ││ ┌────────┐ ││   ││ ┌────────┐ ││   ││ ┌────────┐ ││
    │  9  │                              ││ │ state- │ ││   ││ │ state- │ ││   ││ │ state  │ ││
    ├─────┤                              ││ │segment-│ ││   ││ │segment-│ ││   ││ │        │ ││
    │     │                              ││ │   1    │ ││   ││ │   1    │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │  1  │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │     │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    └─────┘                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ └────────┘ ││   ││ └────────┘ ││   ││ └────────┘ ││
                                         ││            ││   ││            ││   │└────────────┘│
    Hash Table                           ││ ┌────────┐ ││   ││ ┌────────┐ ││   └──────────────┘
                                         ││ │ state- │ ││   ││ │ state- │ ││                   
                                         ││ │segment-│ ││   ││ │segment-│ ││                   
                                         ││ │   2    │ ││   ││ │   2    │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ └────────┘ ││   ││ └────────┘ ││                   
                                         │└────────────┘│   │└────────────┘│                   
                                         └──────────────┘   └──────────────┘                   
                                                                                               

Thru this segmented approach, we could avoid memory copy for each resize, which the number of resizing would be great for high cardinality aggs, and grows the size more predictably.

But admittedly, this approach would also bring complexity for both header pointer management and update span multiple vectors.

Implementation Steps:

@my-vegetable-has-exploded
Copy link
Contributor

my-vegetable-has-exploded commented Nov 27, 2023

It seems like deque in cpp ?
deque in cpp
But what does adaptive mean in with adaptive sizing?
thanks, @alamb

@alamb
Copy link
Contributor Author

alamb commented Nov 28, 2023

But what does adaptive mean in with adaptive sizing?

I think @yjshen meant something like storing the values in Vec<Vec<T>> rather than Vec<T>

And when more space was needed, create a new Vec::with_capacity rather than push to the existing Vec (which might reallocate and copy)

@avantgardnerio
Copy link
Contributor

We (Coralogix) may want to tackle this but combine with ColumnStatistics::distinct_count to only allocate once.

@alamb
Copy link
Contributor Author

alamb commented Aug 1, 2024

Using statistics to improve the allocation performance certainly seems like a good idea -- though I am not sure the default distinct statistics are very reliable

@Rachelint
Copy link
Contributor

take

@Rachelint
Copy link
Contributor

Rachelint commented Aug 6, 2024

Working on an poc for performance improvement which is related to this.

@alamb alamb changed the title Improve aggregate performance with adaptive sizing in accumulators / avoiding reallocations in accumulators [EPIC] Improve aggregate performance with adaptive sizing in accumulators / avoiding reallocations in accumulators Aug 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants