diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 0fa47671d303a..c6bd53356f2e4 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -28,7 +28,7 @@ use arrow::{downcast_dictionary_array, downcast_primitive_array}; use crate::cast::{ as_binary_view_array, as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, as_large_list_array, as_list_array, as_map_array, - as_string_array, as_string_view_array, as_struct_array, + as_string_array, as_string_view_array, as_struct_array, as_union_array, }; use crate::error::Result; use crate::error::{_internal_datafusion_err, _internal_err}; @@ -417,6 +417,40 @@ where Ok(()) } +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_union_array( + array: &UnionArray, + random_state: &RandomState, + hashes_buffer: &mut [u64], +) -> Result<()> { + use std::collections::HashMap; + + let DataType::Union(union_fields, _mode) = array.data_type() else { + unreachable!() + }; + + let mut child_hashes = HashMap::with_capacity(union_fields.len()); + + for (type_id, _field) in union_fields.iter() { + let child = array.child(type_id); + let mut child_hash_buffer = vec![0; child.len()]; + create_hashes([child], random_state, &mut child_hash_buffer)?; + + child_hashes.insert(type_id, child_hash_buffer); + } + + #[expect(clippy::needless_range_loop)] + for i in 0..array.len() { + let type_id = array.type_id(i); + let child_offset = array.value_offset(i); + + let child_hash = child_hashes.get(&type_id).expect("invalid type_id"); + hashes_buffer[i] = combine_hashes(hashes_buffer[i], child_hash[child_offset]); + } + + Ok(()) +} + #[cfg(not(feature = "force_hash_collisions"))] fn hash_fixed_list_array( array: &FixedSizeListArray, @@ -497,6 +531,10 @@ fn hash_single_array( let array = as_fixed_size_list_array(array)?; hash_fixed_list_array(array, random_state, hashes_buffer)?; } + DataType::Union(_, _) => { + let array = as_union_array(array)?; + hash_union_array(array, random_state, hashes_buffer)?; + } _ => { // This is internal because we should have caught this before. return _internal_err!( @@ -1168,4 +1206,119 @@ mod tests { "Error message should mention reentrancy: {err_msg}", ); } + + #[test] + #[cfg(not(feature = "force_hash_collisions"))] + fn create_hashes_for_sparse_union_arrays() { + // logical array: [int(5), str("foo"), int(10), int(5)] + let int_array = Int32Array::from(vec![Some(5), None, Some(10), Some(5)]); + let str_array = StringArray::from(vec![None, Some("foo"), None, None]); + + let type_ids = vec![0_i8, 1, 0, 0].into(); + let children = vec![ + Arc::new(int_array) as ArrayRef, + Arc::new(str_array) as ArrayRef, + ]; + + let union_fields = [ + (0, Arc::new(Field::new("a", DataType::Int32, true))), + (1, Arc::new(Field::new("b", DataType::Utf8, true))), + ] + .into_iter() + .collect(); + + let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap(); + let array_ref = Arc::new(array) as ArrayRef; + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let mut hashes = vec![0; array_ref.len()]; + create_hashes(&[array_ref], &random_state, &mut hashes).unwrap(); + + // Rows 0 and 3 both have type_id=0 (int) with value 5 + assert_eq!(hashes[0], hashes[3]); + // Row 0 (int 5) vs Row 2 (int 10) - different values + assert_ne!(hashes[0], hashes[2]); + // Row 0 (int) vs Row 1 (string) - different types + assert_ne!(hashes[0], hashes[1]); + } + + #[test] + #[cfg(not(feature = "force_hash_collisions"))] + fn create_hashes_for_sparse_union_arrays_with_nulls() { + // logical array: [int(5), str("foo"), int(null), str(null)] + let int_array = Int32Array::from(vec![Some(5), None, None, None]); + let str_array = StringArray::from(vec![None, Some("foo"), None, None]); + + let type_ids = vec![0, 1, 0, 1].into(); + let children = vec![ + Arc::new(int_array) as ArrayRef, + Arc::new(str_array) as ArrayRef, + ]; + + let union_fields = [ + (0, Arc::new(Field::new("a", DataType::Int32, true))), + (1, Arc::new(Field::new("b", DataType::Utf8, true))), + ] + .into_iter() + .collect(); + + let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap(); + let array_ref = Arc::new(array) as ArrayRef; + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let mut hashes = vec![0; array_ref.len()]; + create_hashes(&[array_ref], &random_state, &mut hashes).unwrap(); + + // row 2 (int null) and row 3 (str null) should have the same hash + // because they are both null values + assert_eq!(hashes[2], hashes[3]); + + // row 0 (int 5) vs row 2 (int null) - different (value vs null) + assert_ne!(hashes[0], hashes[2]); + + // row 1 (str "foo") vs row 3 (str null) - different (value vs null) + assert_ne!(hashes[1], hashes[3]); + } + + #[test] + #[cfg(not(feature = "force_hash_collisions"))] + fn create_hashes_for_dense_union_arrays() { + // creates a dense union array with int and string types + // [67, "norm", 100, "macdonald", 67] + let int_array = Int32Array::from(vec![67, 100, 67]); + let str_array = StringArray::from(vec!["norm", "macdonald"]); + + let type_ids = vec![0, 1, 0, 1, 0].into(); + let offsets = vec![0, 0, 1, 1, 2].into(); + let children = vec![ + Arc::new(int_array) as ArrayRef, + Arc::new(str_array) as ArrayRef, + ]; + + let union_fields = [ + (0, Arc::new(Field::new("a", DataType::Int32, false))), + (1, Arc::new(Field::new("b", DataType::Utf8, false))), + ] + .into_iter() + .collect(); + + let array = + UnionArray::try_new(union_fields, type_ids, Some(offsets), children).unwrap(); + let array_ref = Arc::new(array) as ArrayRef; + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let mut hashes = vec![0; array_ref.len()]; + create_hashes(&[array_ref], &random_state, &mut hashes).unwrap(); + + // 67 vs "norm" + assert_ne!(hashes[0], hashes[1]); + // 67 vs 100 + assert_ne!(hashes[0], hashes[2]); + // "norm" vs "macdonald" + assert_ne!(hashes[1], hashes[3]); + // 100 vs "macdonald" + assert_ne!(hashes[2], hashes[3]); + // 67 vs 67 + assert_eq!(hashes[0], hashes[4]); + } }