Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate GroupByHash output in multiple RecordBatches #11758

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

JasonLi-cn
Copy link
Contributor

@JasonLi-cn JasonLi-cn commented Aug 1, 2024

Which issue does this PR close?

Closes #9562

If the community thinks this PR is reasonable, I will continue the work:

  • 1. Eliminate redundant code
  • 2. Modify the code further as recommended by review
  • 3. Benchmark

In addition, we need to discuss whether we need to emit by batch_size when spill is true.
During the cargo test, I found that if emitting by batch_size when spill is true, some test cases such as aggregate_source_not_yielding_with_spill could not pass. Because the number of RecordBatches has increased, resulting in 'BatchBuilder' call 'push_batch' consumes more memory in update_merged_stream.

pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
self.reservation.try_grow(batch.get_array_memory_size())?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
batch_idx,
row_idx: 0,
};
Ok(())
}

Personally, I prefer to emit by batch_size when spill is true. Otherwise, the panic (apache/arrow-rs#6112 (comment)) is easily triggered by spill.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Aug 1, 2024
@JasonLi-cn JasonLi-cn changed the title Generate GroupByHash output in multiple RecordBatches (Just an attempt) Generate GroupByHash output in multiple RecordBatches Aug 2, 2024
@alamb
Copy link
Contributor

alamb commented Aug 5, 2024

Thank you @JasonLi-cn

I wonder if we have tested the performance of this branch? I worry that the incremental output generation will result in a copying the values multiple times (as each emit_to will effectively copy the remaining elements "down"

If this turns out to be a large performance overhead, then I think we could look into updating the accumulators to remember where they have emitted to or something (or maybe add a EmitNext or something that could handle remebering the offset 🤔

@JasonLi-cn
Copy link
Contributor Author

Thank you @JasonLi-cn

I wonder if we have tested the performance of this branch? I worry that the incremental output generation will result in a copying the values multiple times (as each emit_to will effectively copy the remaining elements "down"

If this turns out to be a large performance overhead, then I think we could look into updating the accumulators to remember where they have emitted to or something (or maybe add a EmitNext or something that could handle remebering the offset 🤔

Thank you @alamb . I'll run the benchmark of aggregate.

@JasonLi-cn JasonLi-cn force-pushed the feature/generate_GroupByHash_output_in_multiple_rb branch from f0daecf to c6d640b Compare August 6, 2024 02:02
@JasonLi-cn
Copy link
Contributor Author

JasonLi-cn commented Aug 6, 2024

Benchmark(main VS this branch)

cargo bench --bench aggregate_query_sql
Gnuplot not found, using plotters backend
aggregate_query_no_group_by 15 12
                        time:   [2.1563 ms 2.1686 ms 2.1824 ms]
                        change: [-0.1065% +1.0107% +2.0667%] (p = 0.07 > 0.05)
                        No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
  7 (7.00%) high mild
  2 (2.00%) high severe

aggregate_query_no_group_by_min_max_f64
                        time:   [2.0533 ms 2.0655 ms 2.0782 ms]
                        change: [-0.2686% +0.5618% +1.4604%] (p = 0.21 > 0.05)
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

aggregate_query_no_group_by_count_distinct_wide
                        time:   [2.8570 ms 2.8720 ms 2.8884 ms]
                        change: [-1.1717% -0.2697% +0.6437%] (p = 0.57 > 0.05)
                        No change in performance detected.
Found 11 outliers among 100 measurements (11.00%)
  6 (6.00%) high mild
  5 (5.00%) high severe

aggregate_query_no_group_by_count_distinct_narrow
                        time:   [2.6355 ms 2.6427 ms 2.6502 ms]
                        change: [-0.3158% +0.1024% +0.5289%] (p = 0.64 > 0.05)
                        No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild

aggregate_query_group_by
                        time:   [3.1874 ms 3.2218 ms 3.2633 ms]
                        change: [+0.8142% +2.0424% +3.3431%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  7 (7.00%) high severe

aggregate_query_group_by_with_filter
                        time:   [3.2268 ms 3.2375 ms 3.2499 ms]
                        change: [-0.8765% -0.3444% +0.1488%] (p = 0.19 > 0.05)
                        No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by_u64 15 12
                        time:   [2.9613 ms 2.9724 ms 2.9841 ms]
                        change: [-1.6244% -1.0331% -0.4294%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

aggregate_query_group_by_with_filter_u64 15 12
                        time:   [3.1977 ms 3.2092 ms 3.2215 ms]
                        change: [-0.5511% -0.0732% +0.4691%] (p = 0.77 > 0.05)
                        No change in performance detected.
Found 10 outliers among 100 measurements (10.00%)
  1 (1.00%) low mild
  5 (5.00%) high mild
  4 (4.00%) high severe

aggregate_query_group_by_u64_multiple_keys
                        time:   [6.4762 ms 6.5397 ms 6.6054 ms]
                        change: [+0.9528% +2.4183% +3.9063%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

aggregate_query_approx_percentile_cont_on_u64
                        time:   [6.2755 ms 6.3326 ms 6.3915 ms]
                        change: [-4.7095% -2.6423% -0.8346%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

aggregate_query_approx_percentile_cont_on_f32
                        time:   [5.4463 ms 5.4856 ms 5.5275 ms]
                        change: [-1.1171% -0.1131% +0.9433%] (p = 0.83 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

aggregate_query_distinct_median
                        time:   [4.1459 ms 4.1710 ms 4.1962 ms]
                        change: [-1.0176% -0.1285% +0.7244%] (p = 0.78 > 0.05)
                        No change in performance detected.

The performance of aggregate_query_group_by and aggregate_query_group_by_u64_multiple_keys deteriorates slightly.What they all have in common is the use of GroupValuesRows. So I guess the reason for the performance deterioration is caused by GroupValuesRows(I think calling let mut output = self.row_converter.convert_rows(groups_rows)?; multiple times will cause some overhead because it will result in more memory allocation times).
In addition, because the other benchmarks haven't changed much, I think the performance overhead caused by accumulators is small.

# aggregate_query_group_by
SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY utf8

# aggregate_query_group_by_u64_multiple_keys
SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY u64_wide, utf8

This test may be incomplete, do you @alamb have any better test suggestions? 🤔

@alamb
Copy link
Contributor

alamb commented Aug 8, 2024

This test may be incomplete, do you @alamb have any better test suggestions? 🤔

Hi @JasonLi-cn -- yes I think we should run the ClickBench and TPCH benchmarks using the script here https://github.com/apache/datafusion/tree/main/benchmarks

I am doing so now and will report results here

@alamb
Copy link
Contributor

alamb commented Aug 8, 2024

I hit a bug #11833 that has been fixed on main when trying to run the benchmarks on this branch:

Query 19 avg time: 161.22 ms
Error: External(ArrowError(InvalidArgumentError("column types must match schema types, expected Decimal128(25, 2) but found Decimal128(38, 10) at column index 2"), None))

Thus I a going to merge up from main and try again

@alamb
Copy link
Contributor

alamb commented Aug 8, 2024

🤔

Query 17 iteration 4 took 6144.7 ms and returned 10 rows
Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT\
 10;
./bench.sh: line 410: 1005229 Killed                  $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries\
/clickbench/queries.sql" -o ${RESULTS_FILE}
alamb@aal-dev:~/datafusion-benchmarking$

It appears on this branch the kernel killed the process due to out of memory (which does not happen on main)

alamb@aal-dev:~/datafusion-benchmarking$sudo dmesg | tail
[262830.944639] [1003196]     0 1003196     4230     1440    69632        0             0 sshd
[262830.944642] [1003273]  1001 1003273     4305     1341    69632        0             0 sshd
[262830.944644] [1003274]  1001 1003274     2308     1152    53248        0             0 bash
[262830.944646] [1003284]  1001 1003284     2080      864    57344        0             0 tmux: client
[262830.944648] [1003287]  1001 1003287     2765     1230    61440        0             0 bash
[262830.944650] [1005223]  1001 1005223     2007      768    57344        0             0 bash
[262830.944652] [1005229]  1001 1005229 25851352 16101822 170872832        0             0 dfbench
[262830.944654] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1001.slice/session-14.scope,task=dfbench,pi\
d=1005229,uid=1001
[262830.944733] Out of memory: Killed process 1005229 (dfbench) total-vm:103405408kB, anon-rss:64403832kB, file-rss:3456kB, shmem-rss:0kB, UID:1001 pgtables:166868kB oom_score_adj:0
[262833.485378] oom_reaper: reaped process 1005229 (dfbench), now anon-rss:0kB, file-rss:1140kB, shmem-rss:0kB
alamb@aal-dev:~/datafusion-benchmarking$

@JasonLi-cn
Copy link
Contributor Author

405

🤔

Query 17 iteration 4 took 6144.7 ms and returned 10 rows
Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT\
 10;
./bench.sh: line 410: 1005229 Killed                  $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries\
/clickbench/queries.sql" -o ${RESULTS_FILE}
alamb@aal-dev:~/datafusion-benchmarking$

It appears on this branch the kernel killed the process due to out of memory (which does not happen on main)

alamb@aal-dev:~/datafusion-benchmarking$sudo dmesg | tail
[262830.944639] [1003196]     0 1003196     4230     1440    69632        0             0 sshd
[262830.944642] [1003273]  1001 1003273     4305     1341    69632        0             0 sshd
[262830.944644] [1003274]  1001 1003274     2308     1152    53248        0             0 bash
[262830.944646] [1003284]  1001 1003284     2080      864    57344        0             0 tmux: client
[262830.944648] [1003287]  1001 1003287     2765     1230    61440        0             0 bash
[262830.944650] [1005223]  1001 1005223     2007      768    57344        0             0 bash
[262830.944652] [1005229]  1001 1005229 25851352 16101822 170872832        0             0 dfbench
[262830.944654] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1001.slice/session-14.scope,task=dfbench,pi\
d=1005229,uid=1001
[262830.944733] Out of memory: Killed process 1005229 (dfbench) total-vm:103405408kB, anon-rss:64403832kB, file-rss:3456kB, shmem-rss:0kB, UID:1001 pgtables:166868kB oom_score_adj:0
[262833.485378] oom_reaper: reaped process 1005229 (dfbench), now anon-rss:0kB, file-rss:1140kB, shmem-rss:0kB
alamb@aal-dev:~/datafusion-benchmarking$

Thank you @alamb 🙏. Let me analyze it further 🤔

@alamb
Copy link
Contributor

alamb commented Aug 10, 2024

Thank you @alamb 🙏. Let me analyze it further 🤔

In order to actually generate the output in multiple batches and gain performance, I think we would need to change:

  1. The GroupValues storage (so that it never creates a large contiguous range)
  2. The GroupsAccumulators likewise to manage the internal state as multiple chunks and not as single chunks

This would likely require some sort of API change to the accumulators / etc

I wonder if we could find some way to do the implementation incrementally

@Rachelint
Copy link
Contributor

Rachelint commented Aug 10, 2024

Thank you @alamb 🙏. Let me analyze it further 🤔

In order to actually generate the output in multiple batches and gain performance, I think we would need to change:

1. The `GroupValues` storage (so that it never creates a large contiguous range)

2. The `GroupsAccumulators` likewise to manage the internal state as multiple chunks and not as single chunks

This would likely require some sort of API change to the accumulators / etc

I wonder if we could find some way to do the implementation incrementally

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).

But maybe just splitting the emit result still have benefits? Seems that it can avoid calling the slice function many times which really costs cpu?

@JasonLi-cn JasonLi-cn marked this pull request as draft August 11, 2024 10:30
@alamb
Copy link
Contributor

alamb commented Aug 11, 2024

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).

I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc.

If we try to port all code at once to being managed in blocks it is going to be a very large change

I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and

pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
/// Returns the number of bytes used by this [`GroupValues`]
fn size(&self) -> usize;
/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;
/// The number of values stored in this [`GroupValues`]
fn len(&self) -> usize;
/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
}

If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔

@Rachelint
Copy link
Contributor

Rachelint commented Aug 11, 2024

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).

I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc.

If we try to port all code at once to being managed in blocks it is going to be a very large change

I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and

pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
/// Returns the number of bytes used by this [`GroupValues`]
fn size(&self) -> usize;
/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;
/// The number of values stored in this [`GroupValues`]
fn len(&self) -> usize;
/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
}

If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔

Yes, I want to do the similar things for

incremental approach

Yes... I try to do something like it but still not thorough enough, and I found it is actually hard to support the exact EmitTo::First used by streaming aggr after I found the reason why some tests failed... And we actually should disable the blocked mode in streaming aggr.

I am planning to switch to the special blocked emission impl now...
It seems we can do two things to introduce the new emit modes?

  • Define a support_blocked_emission for GroupValues and GroupAccumulator.
  • Add two emit enums
pub enum EmitTo {
    /// Emit all groups
    All,
    /// Emit only the first `n` groups and shift all existing group
    /// indexes down by `n`.
    ///
    /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
    /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
    First(usize),
    
    AllBlocks,
    
    FirstBlocks(usize),
}

What do you think this way to supprot the special blocked emission?

@JasonLi-cn
Copy link
Contributor Author

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).

I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc.
If we try to port all code at once to being managed in blocks it is going to be a very large change
I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and

pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
/// Returns the number of bytes used by this [`GroupValues`]
fn size(&self) -> usize;
/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;
/// The number of values stored in this [`GroupValues`]
fn len(&self) -> usize;
/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
}

If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔

Yes, I want to do the similar things for

incremental approach

Yes... I try to do something like it but still not thorough enough, and I found it is actually hard to support the exact EmitTo::First used by streaming aggr after I found the reason why some tests failed... And we actually should disable the blocked mode in streaming aggr.

I am planning to switch to the special blocked emission impl now... It seems we can do two things to introduce the new emit modes?

  • Define a support_blocked_emission for GroupValues and GroupAccumulator.
  • Add two emit enums
pub enum EmitTo {
    /// Emit all groups
    All,
    /// Emit only the first `n` groups and shift all existing group
    /// indexes down by `n`.
    ///
    /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
    /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
    First(usize),
    
    AllBlocks,
    
    FirstBlocks(usize),
}

What do you think this way to supprot the special blocked emission?

Do we need to put batch_size in AllBlocks and FirstBlocks 🤔 ?
And that's exactly what I was planning on doing. But I found that the changes to the code were very large.

@Rachelint
Copy link
Contributor

Rachelint commented Aug 12, 2024

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)?

Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size
    We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

@JasonLi-cn
Copy link
Contributor Author

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)?

Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size
    We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

OK. How do we determine the value of block size?

@Rachelint
Copy link
Contributor

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)?
Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size
    We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

OK. How do we determine the value of block size?

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)?
Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size
    We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

OK. How do we determine the value of block size?

I think maybe we make it equal to batch_size in most cases, and so that we can avoid any split operations during producing output? And for the cornercase, for example, the batch_size is too small, we can let it fallback to single block mode?

@alamb
Copy link
Contributor

alamb commented Aug 14, 2024

I think maybe we make it equal to batch_size in most cases, and so that we can avoid any split operations during producing output? And for the cornercase, for example, the batch_size is too small, we can let it fallback to single block mode?

Yes I think this is a good strategy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Generate GroupByHash output in multiple RecordBatches rather than one large one
3 participants