Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/extended.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,16 @@ jobs:
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Run tests
- name: Run tests (force_hash_collisions)
run: |
cd datafusion
cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --exclude datafusion-cli --workspace --lib --tests --features=force_hash_collisions,avro
cargo clean
- name: Run tests (force_hash_partial_collisions, #20724)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how long our CI already runs for we probably want to avoid a new target (this will recompile a bunch of DataFusion even though it only runs one test)

Another technique we might use to add coverage / find a reproducer here is fuzzing

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thank you 🙇 , I was about to do it after my lunch break

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged it on this branch, thanks a lot

run: |
cd datafusion
cargo test --profile ci -p datafusion --test core_integration --features=force_hash_partial_collisions -- memory_limit::test_no_duplicate_groups_after_spill --exact
cargo clean

sqllogictest-sqlite:
name: "Run sqllogictests with the sqlite test suite"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ jobs:
run: cargo check --profile ci --no-default-features -p datafusion --features=encoding_expressions
- name: Check datafusion (force_hash_collisions)
run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_collisions
- name: Check datafusion (force_hash_partial_collisions)
run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_partial_collisions
- name: Check datafusion (math_expressions)
run: cargo check --profile ci --no-default-features -p datafusion --features=math_expressions
- name: Check datafusion (parquet)
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ parquet_encryption = [
"dep:hex",
]
force_hash_collisions = []
force_hash_partial_collisions = []
recursive_protection = ["dep:recursive"]
parquet = ["dep:parquet"]
sql = ["sqlparser"]
Expand Down
57 changes: 48 additions & 9 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
use arrow::compute::take;
use arrow::datatypes::*;
#[cfg(not(feature = "force_hash_collisions"))]
#[cfg(not(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
)))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use itertools::Itertools;
use std::collections::HashMap;

#[cfg(not(feature = "force_hash_collisions"))]
#[cfg(not(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
)))]
use crate::cast::{
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_large_list_view_array,
Expand Down Expand Up @@ -935,8 +941,11 @@ fn hash_run_array<R: RunEndIndexType>(

/// Internal helper function that hashes a single array and either initializes or combines
/// the hash values in the buffer.
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_single_array(
#[cfg(not(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
)))]
fn hash_single_array_impl(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
Expand Down Expand Up @@ -1007,17 +1016,47 @@ fn hash_single_array(
Ok(())
}

/// Test version of `hash_single_array` that forces all hashes to collide to zero.
#[cfg(feature = "force_hash_collisions")]
/// Dispatches to the appropriate `hash_single_array` implementation based on
/// the enabled feature flags.
#[cfg(not(any(
feature = "force_hash_collisions",
feature = "force_hash_partial_collisions"
)))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
hash_single_array_impl(array, random_state, hashes_buffer, rehash)
}

/// Test version: forces full hash collisions by setting all hashes to 0.
#[cfg(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
))]
fn hash_single_array(
_array: &dyn Array,
_random_state: &RandomState,
hashes_buffer: &mut [u64],
_rehash: bool,
) -> Result<()> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
hashes_buffer.iter_mut().for_each(|x| *x = 0);
Ok(())
}

/// Test version: truncates real hashes to 5 bits (32 distinct values) to create
/// partial collisions that expose non-monotonic group index bugs (#20724).
#[cfg(feature = "force_hash_partial_collisions")]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
hash_single_array_impl(array, random_state, hashes_buffer, rehash)?;
hashes_buffer.iter_mut().for_each(|h| *h &= 0x1F);
Ok(())
}

Expand Down
7 changes: 7 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ default = [
encoding_expressions = ["datafusion-functions/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"]
# Used for testing ONLY: truncates hashes to 5 bits (32 distinct values) to create partial collisions.
# Unlike force_hash_collisions (all hashes = 0), this creates a mix of colliding and non-colliding keys,
# which triggers non-monotonic group indices in vectorized_intern (#20724).
force_hash_partial_collisions = [
"datafusion-physical-plan/force_hash_partial_collisions",
"datafusion-common/force_hash_partial_collisions",
]
math_expressions = ["datafusion-functions/math_expressions"]
parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"]
parquet_encryption = [
Expand Down
125 changes: 124 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use std::sync::{Arc, LazyLock};
#[cfg(feature = "extended_tests")]
mod memory_limit_validation;
mod repartition_mem_limit;
use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray};
use arrow::array::{
ArrayRef, DictionaryArray, Int32Array, Int64Array, RecordBatch, StringViewArray,
};
use arrow::compute::SortOptions;
use arrow::datatypes::{Int32Type, SchemaRef};
use arrow_schema::{DataType, Field, Schema};
Expand Down Expand Up @@ -56,6 +58,7 @@ use datafusion_physical_plan::collect as collect_batches;
use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use std::collections::HashSet;
use test_utils::AccessLogGenerator;

use async_trait::async_trait;
Expand Down Expand Up @@ -1172,3 +1175,123 @@ impl TableProvider for SortedTableProvider {
Ok(DataSourceExec::from_data_source(mem_conf))
}
}

// ============================================================================
// Regression tests for https://github.com/apache/datafusion/issues/20724
//
// When hash aggregation spills and switches to streaming merge,
// `group_values` must be recreated with the streaming variant.
// Otherwise `vectorized_intern` can produce non-monotonic group indices
// under hash collisions, causing `GroupOrderingFull` to prematurely
// emit groups → duplicate keys in output.
// ============================================================================

/// Helper: set up a session that forces spilling during aggregation.
async fn setup_spill_agg_context(
memory_limit: usize,
batch_size: usize,
) -> Result<SessionContext> {
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit)))
.with_disk_manager_builder(
DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory),
)
.build_arc()
.unwrap();

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024)
.with_sort_in_place_threshold_bytes(0)
.with_spill_compression(SpillCompression::Uncompressed)
.with_batch_size(batch_size)
.with_target_partitions(1);

Ok(SessionContext::new_with_config_rt(config, runtime))
}

/// Regression test for https://github.com/apache/datafusion/issues/20724
///
/// When hash aggregation spills and switches to streaming merge,
/// `group_values` (GroupValuesColumn<false>) is not recreated with the
/// streaming variant (<true>). This means `vectorized_intern` is used
/// post-spill, which can produce non-monotonic group indices under hash
/// collisions, causing `GroupOrderingFull` to prematurely emit groups
/// and create duplicate keys in the output.
///
/// Requirements to trigger:
/// - Two-column GROUP BY → forces `GroupValuesColumn` (not `GroupValuesPrimitive`)
/// - `force_hash_partial_collisions` feature → truncated hashes create the mix
/// of colliding/non-colliding keys needed for non-monotonic indices
/// - `batch_size=50` → not a multiple of rows-per-group in the merged stream,
/// so groups span batch boundaries and premature emission causes duplicates
#[tokio::test]
async fn test_no_duplicate_groups_after_spill() -> Result<()> {
let num_keys: i64 = 5000;
let rows_per_key: i64 = 4;
let total_rows = (num_keys * rows_per_key) as usize;

let schema = Arc::new(Schema::new(vec![
Field::new("key_a", DataType::Int64, false),
Field::new("key_b", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));

let mut keys_a = Vec::with_capacity(total_rows);
let mut keys_b = Vec::with_capacity(total_rows);
let mut vals = Vec::with_capacity(total_rows);
for r in 0..rows_per_key {
for k in 0..num_keys {
keys_a.push(k / 100);
keys_b.push(k % 100);
vals.push(r * num_keys + k);
}
}

let mut batches = Vec::new();
for start in (0..total_rows).step_by(500) {
let end = (start + 500).min(total_rows);
batches.push(RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int64Array::from(keys_a[start..end].to_vec())),
Arc::new(Int64Array::from(keys_b[start..end].to_vec())),
Arc::new(Int64Array::from(vals[start..end].to_vec())),
],
)?);
}

let ctx = setup_spill_agg_context(128 * 1024, 50).await?;
let table = MemTable::try_new(schema, vec![batches])?;
ctx.register_table("t", Arc::new(table))?;

let df = ctx
.sql("SELECT key_a, key_b, COUNT(*) as cnt FROM t GROUP BY key_a, key_b")
.await?;
let results =
collect_batches(df.create_physical_plan().await?, ctx.task_ctx()).await?;

let mut seen = HashSet::new();
for batch in &results {
let ka = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let kb = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
for i in 0..batch.num_rows() {
assert!(
seen.insert((ka.value(i), kb.value(i))),
"DUPLICATE group key ({}, {}). \
Bug #20724: group_values not recreated for streaming merge.",
ka.value(i),
kb.value(i),
);
}
}
assert_eq!(seen.len(), num_keys as usize);
Ok(())
}
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ workspace = true

[features]
force_hash_collisions = []
force_hash_partial_collisions = ["datafusion-common/force_hash_partial_collisions"]
test_utils = ["arrow/test_utils"]
tokio_coop = []
tokio_coop_fallback = []
Expand Down
12 changes: 12 additions & 0 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,18 @@ impl GroupedHashAggregateStream {
// on the grouping columns.
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());

// Recreate group_values to use streaming mode (GroupValuesColumn<true>
// with scalarized_intern) which preserves input row order, as required
// by GroupOrderingFull. This is only needed for multi-column group by,
// since single-column uses GroupValuesPrimitive which is always safe.
Comment thread
gboucher90 marked this conversation as resolved.
Outdated
let group_schema = self
.spill_state
.merging_group_by
.group_schema(&self.spill_state.spill_schema)?;
if group_schema.fields().len() > 1 {
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
Comment on lines +1278 to +1279
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note:

only recreates group_values for multi-column GROUP BY (fields().len() > 1), since single-column uses GroupValuesPrimitive which doesn't have the vectorized_intern bug. This avoids the memory overhead that was causing aggregate_source_not_yielding_with_spill to fail with its tight 2600-byte budget.

}

// Use `OutOfMemoryMode::ReportError` from this point on
// to ensure we don't spill the spilled data to disk again.
self.oom_mode = OutOfMemoryMode::ReportError;
Expand Down
Loading