diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml index 7697826837b4f..3837feb62226a 100644 --- a/.github/workflows/extended.yml +++ b/.github/workflows/extended.yml @@ -154,16 +154,11 @@ jobs: uses: ./.github/actions/setup-builder with: rust-version: stable - - name: Run tests (force_hash_collisions) + - name: Run tests run: | cd datafusion cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --exclude datafusion-cli --workspace --lib --tests --features=force_hash_collisions,avro cargo clean - - name: Run tests (force_hash_partial_collisions, #20724) - run: | - cd datafusion - cargo test --profile ci -p datafusion --test core_integration --features=force_hash_partial_collisions -- memory_limit::test_no_duplicate_groups_after_spill --exact - cargo clean sqllogictest-sqlite: name: "Run sqllogictests with the sqlite test suite" diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9844a00559f4a..af37b470a498b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -210,8 +210,6 @@ jobs: run: cargo check --profile ci --no-default-features -p datafusion --features=encoding_expressions - name: Check datafusion (force_hash_collisions) run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_collisions - - name: Check datafusion (force_hash_partial_collisions) - run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_partial_collisions - name: Check datafusion (math_expressions) run: cargo check --profile ci --no-default-features -p datafusion --features=math_expressions - name: Check datafusion (parquet) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index d9a460de42e62..e4ba71e45c661 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -49,7 +49,6 @@ parquet_encryption = [ "dep:hex", ] force_hash_collisions = [] -force_hash_partial_collisions = [] recursive_protection = ["dep:recursive"] parquet = ["dep:parquet"] sql = ["sqlparser"] diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 13c8dfae37d97..3be6118c55ff2 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -22,18 +22,12 @@ use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; use arrow::array::*; use arrow::compute::take; use arrow::datatypes::*; -#[cfg(not(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -)))] +#[cfg(not(feature = "force_hash_collisions"))] use arrow::{downcast_dictionary_array, downcast_primitive_array}; use itertools::Itertools; use std::collections::HashMap; -#[cfg(not(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -)))] +#[cfg(not(feature = "force_hash_collisions"))] use crate::cast::{ as_binary_view_array, as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, as_large_list_array, as_large_list_view_array, @@ -941,11 +935,8 @@ fn hash_run_array( /// Internal helper function that hashes a single array and either initializes or combines /// the hash values in the buffer. -#[cfg(not(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -)))] -fn hash_single_array_impl( +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_single_array( array: &dyn Array, random_state: &RandomState, hashes_buffer: &mut [u64], @@ -1016,47 +1007,17 @@ fn hash_single_array_impl( Ok(()) } -/// Dispatches to the appropriate `hash_single_array` implementation based on -/// the enabled feature flags. -#[cfg(not(any( - feature = "force_hash_collisions", - feature = "force_hash_partial_collisions" -)))] -fn hash_single_array( - array: &dyn Array, - random_state: &RandomState, - hashes_buffer: &mut [u64], - rehash: bool, -) -> Result<()> { - hash_single_array_impl(array, random_state, hashes_buffer, rehash) -} - -/// Test version: forces full hash collisions by setting all hashes to 0. -#[cfg(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -))] +/// Test version of `hash_single_array` that forces all hashes to collide to zero. +#[cfg(feature = "force_hash_collisions")] fn hash_single_array( _array: &dyn Array, _random_state: &RandomState, hashes_buffer: &mut [u64], _rehash: bool, ) -> Result<()> { - hashes_buffer.iter_mut().for_each(|x| *x = 0); - Ok(()) -} - -/// Test version: truncates real hashes to 5 bits (32 distinct values) to create -/// partial collisions that expose non-monotonic group index bugs (#20724). -#[cfg(feature = "force_hash_partial_collisions")] -fn hash_single_array( - array: &dyn Array, - random_state: &RandomState, - hashes_buffer: &mut [u64], - rehash: bool, -) -> Result<()> { - hash_single_array_impl(array, random_state, hashes_buffer, rehash)?; - hashes_buffer.iter_mut().for_each(|h| *h &= 0x1F); + for hash in hashes_buffer.iter_mut() { + *hash = 0 + } Ok(()) } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index fce0d1c7cfe9c..9beb94497a5fd 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -71,13 +71,6 @@ default = [ encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] -# Used for testing ONLY: truncates hashes to 5 bits (32 distinct values) to create partial collisions. -# Unlike force_hash_collisions (all hashes = 0), this creates a mix of colliding and non-colliding keys, -# which triggers non-monotonic group indices in vectorized_intern (#20724). -force_hash_partial_collisions = [ - "datafusion-physical-plan/force_hash_partial_collisions", - "datafusion-common/force_hash_partial_collisions", -] math_expressions = ["datafusion-functions/math_expressions"] parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"] parquet_encryption = [ diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 98230dad8710a..ff8c512cbd22e 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -24,9 +24,7 @@ use std::sync::{Arc, LazyLock}; #[cfg(feature = "extended_tests")] mod memory_limit_validation; mod repartition_mem_limit; -use arrow::array::{ - ArrayRef, DictionaryArray, Int32Array, Int64Array, RecordBatch, StringViewArray, -}; +use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray}; use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; use arrow_schema::{DataType, Field, Schema}; @@ -58,7 +56,6 @@ use datafusion_physical_plan::collect as collect_batches; use datafusion_physical_plan::common::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; -use std::collections::HashSet; use test_utils::AccessLogGenerator; use async_trait::async_trait; @@ -1175,123 +1172,3 @@ impl TableProvider for SortedTableProvider { Ok(DataSourceExec::from_data_source(mem_conf)) } } - -// ============================================================================ -// Regression tests for https://github.com/apache/datafusion/issues/20724 -// -// When hash aggregation spills and switches to streaming merge, -// `group_values` must be recreated with the streaming variant. -// Otherwise `vectorized_intern` can produce non-monotonic group indices -// under hash collisions, causing `GroupOrderingFull` to prematurely -// emit groups → duplicate keys in output. -// ============================================================================ - -/// Helper: set up a session that forces spilling during aggregation. -async fn setup_spill_agg_context( - memory_limit: usize, - batch_size: usize, -) -> Result { - let runtime = RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))) - .with_disk_manager_builder( - DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory), - ) - .build_arc() - .unwrap(); - - let config = SessionConfig::new() - .with_sort_spill_reservation_bytes(64 * 1024) - .with_sort_in_place_threshold_bytes(0) - .with_spill_compression(SpillCompression::Uncompressed) - .with_batch_size(batch_size) - .with_target_partitions(1); - - Ok(SessionContext::new_with_config_rt(config, runtime)) -} - -/// Regression test for https://github.com/apache/datafusion/issues/20724 -/// -/// When hash aggregation spills and switches to streaming merge, -/// `group_values` (GroupValuesColumn) is not recreated with the -/// streaming variant (). This means `vectorized_intern` is used -/// post-spill, which can produce non-monotonic group indices under hash -/// collisions, causing `GroupOrderingFull` to prematurely emit groups -/// and create duplicate keys in the output. -/// -/// Requirements to trigger: -/// - Two-column GROUP BY → forces `GroupValuesColumn` (not `GroupValuesPrimitive`) -/// - `force_hash_partial_collisions` feature → truncated hashes create the mix -/// of colliding/non-colliding keys needed for non-monotonic indices -/// - `batch_size=50` → not a multiple of rows-per-group in the merged stream, -/// so groups span batch boundaries and premature emission causes duplicates -#[tokio::test] -async fn test_no_duplicate_groups_after_spill() -> Result<()> { - let num_keys: i64 = 5000; - let rows_per_key: i64 = 4; - let total_rows = (num_keys * rows_per_key) as usize; - - let schema = Arc::new(Schema::new(vec![ - Field::new("key_a", DataType::Int64, false), - Field::new("key_b", DataType::Int64, false), - Field::new("value", DataType::Int64, false), - ])); - - let mut keys_a = Vec::with_capacity(total_rows); - let mut keys_b = Vec::with_capacity(total_rows); - let mut vals = Vec::with_capacity(total_rows); - for r in 0..rows_per_key { - for k in 0..num_keys { - keys_a.push(k / 100); - keys_b.push(k % 100); - vals.push(r * num_keys + k); - } - } - - let mut batches = Vec::new(); - for start in (0..total_rows).step_by(500) { - let end = (start + 500).min(total_rows); - batches.push(RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int64Array::from(keys_a[start..end].to_vec())), - Arc::new(Int64Array::from(keys_b[start..end].to_vec())), - Arc::new(Int64Array::from(vals[start..end].to_vec())), - ], - )?); - } - - let ctx = setup_spill_agg_context(128 * 1024, 50).await?; - let table = MemTable::try_new(schema, vec![batches])?; - ctx.register_table("t", Arc::new(table))?; - - let df = ctx - .sql("SELECT key_a, key_b, COUNT(*) as cnt FROM t GROUP BY key_a, key_b") - .await?; - let results = - collect_batches(df.create_physical_plan().await?, ctx.task_ctx()).await?; - - let mut seen = HashSet::new(); - for batch in &results { - let ka = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let kb = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - for i in 0..batch.num_rows() { - assert!( - seen.insert((ka.value(i), kb.value(i))), - "DUPLICATE group key ({}, {}). \ - Bug #20724: group_values not recreated for streaming merge.", - ka.value(i), - kb.value(i), - ); - } - } - assert_eq!(seen.len(), num_keys as usize); - Ok(()) -} diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index acb6c2ac4fcfd..6a28486cca5dc 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -39,7 +39,6 @@ workspace = true [features] force_hash_collisions = [] -force_hash_partial_collisions = ["datafusion-common/force_hash_partial_collisions"] test_utils = ["arrow/test_utils"] tokio_coop = [] tokio_coop_fallback = []