Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .github/workflows/extended.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,11 @@ jobs:
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Run tests (force_hash_collisions)
- name: Run tests
run: |
cd datafusion
cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --exclude datafusion-cli --workspace --lib --tests --features=force_hash_collisions,avro
cargo clean
- name: Run tests (force_hash_partial_collisions, #20724)
run: |
cd datafusion
cargo test --profile ci -p datafusion --test core_integration --features=force_hash_partial_collisions -- memory_limit::test_no_duplicate_groups_after_spill --exact
cargo clean

sqllogictest-sqlite:
name: "Run sqllogictests with the sqlite test suite"
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ jobs:
run: cargo check --profile ci --no-default-features -p datafusion --features=encoding_expressions
- name: Check datafusion (force_hash_collisions)
run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_collisions
- name: Check datafusion (force_hash_partial_collisions)
run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_partial_collisions
- name: Check datafusion (math_expressions)
run: cargo check --profile ci --no-default-features -p datafusion --features=math_expressions
- name: Check datafusion (parquet)
Expand Down
1 change: 0 additions & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ parquet_encryption = [
"dep:hex",
]
force_hash_collisions = []
force_hash_partial_collisions = []
recursive_protection = ["dep:recursive"]
parquet = ["dep:parquet"]
sql = ["sqlparser"]
Expand Down
57 changes: 9 additions & 48 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,12 @@ use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
use arrow::compute::take;
use arrow::datatypes::*;
#[cfg(not(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
)))]
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use itertools::Itertools;
use std::collections::HashMap;

#[cfg(not(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
)))]
#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_large_list_view_array,
Expand Down Expand Up @@ -941,11 +935,8 @@ fn hash_run_array<R: RunEndIndexType>(

/// Internal helper function that hashes a single array and either initializes or combines
/// the hash values in the buffer.
#[cfg(not(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
)))]
fn hash_single_array_impl(
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
Expand Down Expand Up @@ -1016,47 +1007,17 @@ fn hash_single_array_impl(
Ok(())
}

/// Dispatches to the appropriate `hash_single_array` implementation based on
/// the enabled feature flags.
#[cfg(not(any(
feature = "force_hash_collisions",
feature = "force_hash_partial_collisions"
)))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
hash_single_array_impl(array, random_state, hashes_buffer, rehash)
}

/// Test version: forces full hash collisions by setting all hashes to 0.
#[cfg(all(
feature = "force_hash_collisions",
not(feature = "force_hash_partial_collisions")
))]
/// Test version of `hash_single_array` that forces all hashes to collide to zero.
#[cfg(feature = "force_hash_collisions")]
fn hash_single_array(
_array: &dyn Array,
_random_state: &RandomState,
hashes_buffer: &mut [u64],
_rehash: bool,
) -> Result<()> {
hashes_buffer.iter_mut().for_each(|x| *x = 0);
Ok(())
}

/// Test version: truncates real hashes to 5 bits (32 distinct values) to create
/// partial collisions that expose non-monotonic group index bugs (#20724).
#[cfg(feature = "force_hash_partial_collisions")]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
hash_single_array_impl(array, random_state, hashes_buffer, rehash)?;
hashes_buffer.iter_mut().for_each(|h| *h &= 0x1F);
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(())
}

Expand Down
7 changes: 0 additions & 7 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ default = [
encoding_expressions = ["datafusion-functions/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"]
# Used for testing ONLY: truncates hashes to 5 bits (32 distinct values) to create partial collisions.
# Unlike force_hash_collisions (all hashes = 0), this creates a mix of colliding and non-colliding keys,
# which triggers non-monotonic group indices in vectorized_intern (#20724).
force_hash_partial_collisions = [
"datafusion-physical-plan/force_hash_partial_collisions",
"datafusion-common/force_hash_partial_collisions",
]
math_expressions = ["datafusion-functions/math_expressions"]
parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"]
parquet_encryption = [
Expand Down
125 changes: 1 addition & 124 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use std::sync::{Arc, LazyLock};
#[cfg(feature = "extended_tests")]
mod memory_limit_validation;
mod repartition_mem_limit;
use arrow::array::{
ArrayRef, DictionaryArray, Int32Array, Int64Array, RecordBatch, StringViewArray,
};
use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{Int32Type, SchemaRef};
use arrow_schema::{DataType, Field, Schema};
Expand Down Expand Up @@ -58,7 +56,6 @@ use datafusion_physical_plan::collect as collect_batches;
use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use std::collections::HashSet;
use test_utils::AccessLogGenerator;

use async_trait::async_trait;
Expand Down Expand Up @@ -1175,123 +1172,3 @@ impl TableProvider for SortedTableProvider {
Ok(DataSourceExec::from_data_source(mem_conf))
}
}

// ============================================================================
// Regression tests for https://github.com/apache/datafusion/issues/20724
//
// When hash aggregation spills and switches to streaming merge,
// `group_values` must be recreated with the streaming variant.
// Otherwise `vectorized_intern` can produce non-monotonic group indices
// under hash collisions, causing `GroupOrderingFull` to prematurely
// emit groups → duplicate keys in output.
// ============================================================================

/// Helper: set up a session that forces spilling during aggregation.
async fn setup_spill_agg_context(
memory_limit: usize,
batch_size: usize,
) -> Result<SessionContext> {
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit)))
.with_disk_manager_builder(
DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory),
)
.build_arc()
.unwrap();

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024)
.with_sort_in_place_threshold_bytes(0)
.with_spill_compression(SpillCompression::Uncompressed)
.with_batch_size(batch_size)
.with_target_partitions(1);

Ok(SessionContext::new_with_config_rt(config, runtime))
}

/// Regression test for https://github.com/apache/datafusion/issues/20724
///
/// When hash aggregation spills and switches to streaming merge,
/// `group_values` (GroupValuesColumn<false>) is not recreated with the
/// streaming variant (<true>). This means `vectorized_intern` is used
/// post-spill, which can produce non-monotonic group indices under hash
/// collisions, causing `GroupOrderingFull` to prematurely emit groups
/// and create duplicate keys in the output.
///
/// Requirements to trigger:
/// - Two-column GROUP BY → forces `GroupValuesColumn` (not `GroupValuesPrimitive`)
/// - `force_hash_partial_collisions` feature → truncated hashes create the mix
/// of colliding/non-colliding keys needed for non-monotonic indices
/// - `batch_size=50` → not a multiple of rows-per-group in the merged stream,
/// so groups span batch boundaries and premature emission causes duplicates
#[tokio::test]
async fn test_no_duplicate_groups_after_spill() -> Result<()> {
let num_keys: i64 = 5000;
let rows_per_key: i64 = 4;
let total_rows = (num_keys * rows_per_key) as usize;

let schema = Arc::new(Schema::new(vec![
Field::new("key_a", DataType::Int64, false),
Field::new("key_b", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));

let mut keys_a = Vec::with_capacity(total_rows);
let mut keys_b = Vec::with_capacity(total_rows);
let mut vals = Vec::with_capacity(total_rows);
for r in 0..rows_per_key {
for k in 0..num_keys {
keys_a.push(k / 100);
keys_b.push(k % 100);
vals.push(r * num_keys + k);
}
}

let mut batches = Vec::new();
for start in (0..total_rows).step_by(500) {
let end = (start + 500).min(total_rows);
batches.push(RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int64Array::from(keys_a[start..end].to_vec())),
Arc::new(Int64Array::from(keys_b[start..end].to_vec())),
Arc::new(Int64Array::from(vals[start..end].to_vec())),
],
)?);
}

let ctx = setup_spill_agg_context(128 * 1024, 50).await?;
let table = MemTable::try_new(schema, vec![batches])?;
ctx.register_table("t", Arc::new(table))?;

let df = ctx
.sql("SELECT key_a, key_b, COUNT(*) as cnt FROM t GROUP BY key_a, key_b")
.await?;
let results =
collect_batches(df.create_physical_plan().await?, ctx.task_ctx()).await?;

let mut seen = HashSet::new();
for batch in &results {
let ka = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let kb = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
for i in 0..batch.num_rows() {
assert!(
seen.insert((ka.value(i), kb.value(i))),
"DUPLICATE group key ({}, {}). \
Bug #20724: group_values not recreated for streaming merge.",
ka.value(i),
kb.value(i),
);
}
}
assert_eq!(seen.len(), num_keys as usize);
Ok(())
}
1 change: 0 additions & 1 deletion datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ workspace = true

[features]
force_hash_collisions = []
force_hash_partial_collisions = ["datafusion-common/force_hash_partial_collisions"]
test_utils = ["arrow/test_utils"]
tokio_coop = []
tokio_coop_fallback = []
Expand Down