Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
156ee11
perf: Optimize `multi_group_by` when there are a lot of unique groups
rluvaton Sep 16, 2025
62ebc86
Merge branch 'main' into optimize-for-unique-groups
rluvaton Sep 16, 2025
3133196
updated based on cr
rluvaton Sep 16, 2025
00b5f07
update comment
rluvaton Sep 16, 2025
9476184
added fuzz test to test all values are unique in aggregate group by
rluvaton Sep 16, 2025
bbc62dc
Merge branch 'main' into optimize-for-unique-groups
rluvaton Sep 16, 2025
a052b39
added comment
rluvaton Sep 16, 2025
390bd68
don't always call `append_array_slice` if it is not supported
rluvaton Sep 17, 2025
0a78ede
add comment
rluvaton Sep 17, 2025
c1bcb17
add benchmark
rluvaton Sep 18, 2025
cc2e725
try optimization
rluvaton Sep 18, 2025
35c5a02
Revert "try optimization"
rluvaton Sep 18, 2025
093b069
reserve and use extend
rluvaton Sep 18, 2025
aa56172
Merge branch 'refs/heads/main' into optimize-for-unique-groups
rluvaton Oct 8, 2025
cbd7fc0
Merge branch 'main' into optimize-for-unique-groups
rluvaton Oct 8, 2025
1ce9617
support `append_array_slice` for boolean group values
rluvaton Oct 8, 2025
045020e
avoid push and use extend instead to make the compiler vectorize the …
rluvaton Oct 8, 2025
e8a94e1
fix: actually generate a lot of unique values in benchmark table
rluvaton Oct 8, 2025
42df966
Merge branch 'main' into fix-benchmark-value-generation
rluvaton Oct 8, 2025
e690921
Merge branch 'fix-benchmark-value-generation' into optimize-for-uniqu…
rluvaton Oct 8, 2025
155868d
format
rluvaton Oct 8, 2025
ff1a744
Merge branch 'fix-benchmark-value-generation' into optimize-for-uniqu…
rluvaton Oct 8, 2025
f4d0373
added multi group by benchmark on primitive only columns
rluvaton Oct 8, 2025
8f2974d
Merge branch 'fix-benchmark-value-generation' into optimize-for-uniqu…
rluvaton Oct 8, 2025
c9cbe59
Merge branch 'main' into optimize-for-unique-groups
rluvaton Nov 20, 2025
7d8db1a
optimize bytes
rluvaton Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 307 additions & 3 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ use crate::fuzz_cases::aggregation_fuzzer::{
};

use arrow::array::{
types::Int64Type, Array, ArrayRef, AsArray, Int32Array, Int64Array, RecordBatch,
StringArray,
make_array, types::Int64Type, Array, ArrayRef, AsArray, Int32Array, Int64Array,
RecordBatch, StringArray, StringViewArray, UInt64Array, UInt8Array,
};
use arrow::buffer::NullBuffer;
use arrow::compute::concat_batches;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, UInt64Type};
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::{Field, Schema, SchemaRef};
use datafusion::datasource::memory::MemorySourceConfig;
Expand Down Expand Up @@ -767,3 +768,306 @@ async fn test_single_mode_aggregate_single_mode_aggregate_with_spill() -> Result

Ok(())
}

/// This test where we group by multiple columns and a very extreme case where all the values are unique
/// i.e. there are the same number of groups as input rows
#[tokio::test]
async fn test_group_by_multiple_columns_all_unique() -> Result<()> {
let scan_schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt8, true),
Field::new("idx", DataType::UInt64, false),
Field::new("grouping_key_1", DataType::Utf8, true),
Field::new("grouping_key_2", DataType::Utf8, false),
Field::new("grouping_key_3", DataType::Utf8View, true),
Field::new("grouping_key_4", DataType::Utf8View, false),
Field::new("grouping_key_5", DataType::Int32, true),
Field::new("grouping_key_6", DataType::Int32, false),
]));

let nullable_grouping_expr_indices = scan_schema
.fields()
.iter()
.enumerate()
.skip(1)
.filter(|(_, f)| f.is_nullable())
.map(|(index, _)| index)
.collect::<Vec<_>>();

let record_batch_size = 100;
let number_of_record_batches = 1000;

#[derive(Clone)]
enum NullableDataGenConfig {
/// All the values in the array are nulls
All,

/// No nulls values
None,

/// Random nulls
Random,
}

/// Generate an infinite iterator of (column_index, null_config) pairs
/// where the column_index is which column we're going to change based on the null_config
///
/// Each (column_index, null_config) pair will be repeated [`NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG`] times
/// to make sure that even if concatenating batches together, we still have a safe margin for the all and none null cases
///
/// For example, if the nullable_column_indices are `[1, 3, 5]`
/// and [`NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG`] is 2
/// this will generate the following sequence:
/// ```text
/// (1, AllNulls), (1, AllNulls),
/// (1, NoNulls), (1, NoNulls),
/// (1, RandomNulls), (1, RandomNulls),
/// (3, AllNulls), (3, AllNulls),
/// (3, NoNulls), (3, NoNulls),
/// (3, RandomNulls), (3, RandomNulls),
/// (5, AllNulls), (5, AllNulls),
/// (5, NoNulls), (5, NoNulls),
/// (5, RandomNulls), (5, RandomNulls),
/// ```
fn generate_infinite_null_state_iter(
nullable_column_indices: Vec<usize>,
) -> impl Iterator<Item = (usize, NullableDataGenConfig)> {
/// How many consecutive batches to generate with the same null configuration
/// to make sure that even if concatenating batches together, we still have a safe margin for the all and none null cases
const NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG: usize = 4;

nullable_column_indices
.into_iter()
// Create a pair of (column_index, null_config) for each null config
.flat_map(|index| {
[
(index, NullableDataGenConfig::All),
(index, NullableDataGenConfig::None),
(index, NullableDataGenConfig::Random),
]
})
// Repeat the same column index for NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG times
// to make sure we have multiple consecutive batches with the same null configuration
.flat_map(|index| {
std::iter::repeat_n(index, NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG)
})
// Make it infinite so we can just take as many as we need
.cycle()
}

let schema = Arc::clone(&scan_schema);

let input = (0..number_of_record_batches)
.zip(generate_infinite_null_state_iter(
nullable_grouping_expr_indices,
))
.map(move |(batch_index, null_generate_config)| {
let unique_iterator = (batch_index * record_batch_size)
..(batch_index * record_batch_size) + record_batch_size;

// Only one column at a time to have nulls to make sure all the group keys together are still unique

let mut columns = vec![
// col_1: nullable uint8
// The values does not really matter as we just count on it
Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n(
0,
record_batch_size,
))),
// idx: non-nullable uint64
Arc::new(UInt64Array::from_iter_values(
unique_iterator.clone().map(|x| x as u64),
)) as ArrayRef,
// grouping_key_1: nullable string
Arc::new(StringArray::from_iter_values(
unique_iterator.clone().map(|x| x.to_string()),
)) as ArrayRef,
// grouping_key_2: non-nullable string
Arc::new(StringArray::from_iter_values(
unique_iterator.clone().map(|x| x.to_string()),
)) as ArrayRef,
// grouping_key_3: nullable string view
Arc::new(StringViewArray::from_iter_values(
unique_iterator.clone().map(|x| x.to_string()),
)) as ArrayRef,
// grouping_key_4: non-nullable string view
Arc::new(StringViewArray::from_iter_values(
unique_iterator.clone().map(|x| x.to_string()),
)) as ArrayRef,
// grouping_key_5: nullable int32
Arc::new(Int32Array::from_iter_values(
unique_iterator.clone().map(|x| x as i32),
)) as ArrayRef,
// grouping_key_6: non-nullable int32
Arc::new(Int32Array::from_iter_values(
unique_iterator.clone().map(|x| x as i32),
)) as ArrayRef,
];

// Apply the null configuration to the selected column
let (column_to_set_nulls, null_config) = null_generate_config;

match null_config {
// We should already have no nulls by default
NullableDataGenConfig::None => {
assert_eq!(columns[column_to_set_nulls].null_count(), 0);
}
// Change all values to nulls
NullableDataGenConfig::All => {
columns[column_to_set_nulls] = set_nulls(
&columns[column_to_set_nulls],
NullBuffer::new_null(columns[column_to_set_nulls].len()),
);
}
NullableDataGenConfig::Random => {
let mut rnd = rng();

let null_buffer: NullBuffer = (0..columns[column_to_set_nulls].len())
.map(|_| {
// ~30% nulls
rnd.random::<f32>() > 0.3
})
.collect();

columns[column_to_set_nulls] =
set_nulls(&columns[column_to_set_nulls], null_buffer);
}
}

RecordBatch::try_new(Arc::clone(&schema), columns).map_err(Into::into)
})
.collect::<Result<Vec<RecordBatch>>>()?;

let input_row_count: usize = input.iter().map(|b| b.num_rows()).sum();

let results = {
let session_config = SessionConfig::new().with_batch_size(record_batch_size);
let ctx = SessionContext::new_with_config(session_config);

let df = run_sql_on_input(
&ctx,
Arc::clone(&scan_schema),
vec![input.clone()],
// It is important to get the grouping keys back so we can test that we don't
// mess them up
r#"
SELECT
idx,
grouping_key_1,
grouping_key_2,
grouping_key_3,
grouping_key_4,
grouping_key_5,
grouping_key_6,
COUNT(col_1)
FROM t
GROUP BY
idx,
grouping_key_1,
grouping_key_2,
grouping_key_3,
grouping_key_4,
grouping_key_5,
grouping_key_6
"#,
)
.await?;

df.collect().await?
};

assert_eq!(
results.iter().map(|b| b.num_rows()).sum::<usize>(),
input_row_count,
"Must be exactly the same number of output rows as input rows"
);

// Sort the results for easier assertion
// We are sorting in different plan to make sure it does not affect the aggregation
let sorted_results = {
let session_config = SessionConfig::new().with_batch_size(record_batch_size);
let ctx = SessionContext::new_with_config(session_config);

let df = run_sql_on_input(
&ctx,
results[0].schema(),
vec![results],
"SELECT * FROM t ORDER BY idx",
)
.await?;

df.collect().await?
};

// Assert the input is sorted
assert_eq!(
sorted_results
.iter()
.flat_map(|b| b
.column_by_name("idx")
.unwrap()
.as_primitive::<UInt64Type>()
.iter())
.collect::<Vec<_>>(),
(0..input_row_count)
.map(|x| Some(x as u64))
.collect::<Vec<_>>(),
"Output is not sorted by idx"
);

// The expected output batches are the input batches without the `col_1` and with a count column with value 1 added at the end
let expected_output_batches = input.into_iter().map(|batch| {
// Remove the first column (col_1) which we are counting
let indices = (1..batch.schema().fields().len()).collect::<Vec<_>>();
let batch = batch.project(&indices).unwrap();

// Add the expected count column with all values set to 1
let count_result = vec![1; batch.num_rows()];
let count_array = Int64Array::from_iter_values(count_result);
let (_, mut arrays, _) = batch.into_parts();

arrays.push(Arc::new(count_array) as ArrayRef);

RecordBatch::try_new(sorted_results[0].schema(), arrays).unwrap()
});

for (output_batch, expected_batch) in
sorted_results.iter().zip(expected_output_batches)
{
assert_eq!(output_batch, &expected_batch);
}

Ok(())
}

/// Set the null buffer of an array to the specified null buffer
/// it is important that we keep the underlying values as is and only modify the null buffer
/// to also test the case for bytes for example that null values does not point to an empty bytes
fn set_nulls(array: &ArrayRef, null_buffer: NullBuffer) -> ArrayRef {
let null_count = null_buffer.null_count();

let array_data = array
.to_data()
.into_builder()
.nulls(Some(null_buffer))
.build()
.unwrap();

let array = make_array(array_data);

assert_eq!(array.null_count(), null_count);

array
}

async fn run_sql_on_input(
ctx: &SessionContext,
schema: SchemaRef,
partitions: Vec<Vec<RecordBatch>>,
sql: &str,
) -> Result<DataFrame> {
let provider = MemTable::try_new(schema, partitions)?;

ctx.register_table("t", Arc::new(provider))?;

ctx.sql(sql).await
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,39 @@ impl<const NULLABLE: bool> GroupColumn for BooleanGroupValueBuilder<NULLABLE> {
Ok(())
}

fn support_append_array_slice(&self) -> bool {
true
}

fn append_array_slice(
&mut self,
array: &ArrayRef,
start: usize,
length: usize,
) -> Result<()> {
let array = array.as_boolean();

if NULLABLE {
if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
self.nulls.append_buffer(&nulls.slice(start, length));
} else {
self.nulls.append_n(length, false);
}
} else {
assert_eq!(
array.null_count(),
0,
"unexpected nulls in non nullable input"
);
self.nulls.append_n(length, false);
}

self.buffer
.append_buffer(&array.values().slice(start, length));

Ok(())
}

fn len(&self) -> usize {
self.buffer.len()
}
Expand Down
Loading
Loading