Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate GroupByHash output in multiple RecordBatches rather than one large one #9562

Open
alamb opened this issue Mar 11, 2024 · 8 comments · May be fixed by #11758
Open

Generate GroupByHash output in multiple RecordBatches rather than one large one #9562

alamb opened this issue Mar 11, 2024 · 8 comments · May be fixed by #11758
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 11, 2024

Is your feature request related to a problem or challenge?

  1. The AggregateExec generates one single (giant) RecordBatch on output (source)
  2. Which is then emitted in parts (via RecordBatch::slice(), which does not actually allocate any additional memory) (source)

This has at least two potential downsides:

  1. No memory is freed until the GroupByHash has output every output row
  2. As we see in Further refine the Top K sort operator #9417, if there are upstream operators like TopK that hold references to any of these sliced RecordBatchs, those slices are treated as though they were an additional allocation that needs to be tracked (source)

Something like this in pictures:

                                                 Output             
                           ▲               RecordBatches are        
                           │                 slices into a          
                  ┌────────────────┐          single large          
                  │  RecordBatch   │─ ─ ─ ┐   output batch          
                  └────────────────┴ ─ ─ ┐                          
                           ▲              │                         
                           │             │        ┌────────────────┐
                  ┌────────────────┐      └ ─ ─ ─▶│                │
  output stream   │  RecordBatch   │     │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                  └────────────────┘      ─ ─ ─ ─▶│                │
                                                  ├ ─ ─ ─ ─ ─ ─ ─ ─│
                         ...                      │                │
                                                  │                │
                           ▲                      │      ...       │
                           │                      │                │
                  ┌────────────────┐              │                │
                  │  RecordBatch   ├ ─ ─ ┐        │                │
                  └────────────────┘              │                │
                           ▲             │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                           │              ─ ─ ─ ─▶│                │
                           │                      └────────────────┘
                           │                                        
               ┏━━━━━━━━━━━━━━━━━━━━━━━┓                            
               ┃                       ┃       Single RecordBatch   
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃    GroupByHashExec    ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            

Describe the solution you'd like

If we had infinite time / engineering hours I think a better approach would actually be to change GroupByHash so it didn't create a single giant contiguous RecordBatch

Instead it would be better if GroupByHash produced a Vec<RecordBatch> and then incrementally fed those batches out

Doing this would allow the GroupByHash to release memory incrementally as it output. This is analogous to how @korowa made join output incremental in #8658

Perhaps something like

                            ▲                                        
                            │                                        
                   ┌────────────────┐                    Output      
  output stream    │  RecordBatch   │              RecordBatches are 
                   └────────────────┘              created in smaller
                            ▲                      chunks and emitted
                            │                          one by one    
                            │                                        
                            │                                        
                ┏━━━━━━━━━━━━━━━━━━━━━━━┓          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┃                       ┃          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┃    GroupByHashExec    ┃          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┃                       ┃                 ...        
                ┃                       ┃          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            
                                                    Vec<RecordBatch> 
                                                                     
                                                                     

Describe alternatives you've considered

No response

Additional context

@yjshen notes:

To improve AggExec's mono output pattern, #7065 might be similar to the idea of incremental output.

@guojidan
Copy link
Contributor

take

@guojidan
Copy link
Contributor

This issue is interesting, let me try implement it

@alamb
Copy link
Contributor Author

alamb commented Mar 13, 2024

FYI this issue may be tricky -- as it will be performance critical -- I will be happy to assist

@guojidan
Copy link
Contributor

this issue is a bit tricky for me 😢 , I can only think of the following approaches:
change the GroupedHashAggregateStream::emit() function return a vector Result<Vec<RecordBatch>>, each RecordBatch num_rows equal GroupedHashAggregateStream::batch_size, and change ExecutionState::ProducingOutput(RecordBatch) to ExecutionState::ProducingOutput(Vec<RecordBatch>), then GroupedHashAggregateStream::poll_next() function returns one element of a vector at one loop, like let output = batchs.pop() source

@alamb
Copy link
Contributor Author

alamb commented Mar 22, 2024

this issue is a bit tricky for me 😢 , I can only think of the following approaches: change the GroupedHashAggregateStream::emit() function return a vector Result<Vec<RecordBatch>>, each RecordBatch num_rows equal GroupedHashAggregateStream::batch_size, and change ExecutionState::ProducingOutput(RecordBatch) to ExecutionState::ProducingOutput(Vec<RecordBatch>), then GroupedHashAggregateStream::poll_next() function returns one element of a vector at one loop, like let output = batchs.pop() source

I think this approach sounds good -- nice proposal

One thing that could help keep the PRs small and manageable would be to switch the APIs as described above but you could avoid having to change all the GroupAccumulators in one PR by returning return a Vec<> of size 1 for most of them.

Then we can make subsequent PRs to switch over other groups accumulators as needed

@Rachelint
Copy link
Contributor

Rachelint commented Jul 30, 2024

Actually, I found the slice function is not trivial because the eager computation for the null count.
One way I could think to solve it is making the computation lazy, see arrow-rs pr(q32 1.10~1.15x faster without the computation for null count):
apache/arrow-rs#6155

The alternative may be the way mentioned in this issue.

@Rachelint
Copy link
Contributor

Rachelint commented Jul 30, 2024

Hi @guojidan, seems there is no update for some months, can I pick this up?
I want to try it and compare the effect with lazy null count computation.

@guojidan
Copy link
Contributor

Hi @guojidan, seems there is no update for some months, can I pick this up? I want to try it and compare the effect with lazy null count computation.

Thanks. You can take this. I am working on another project now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants