diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 4e04da26f70b6..8f7b9cf8cb1a0 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -24,11 +24,12 @@ use crate::fuzz_cases::aggregation_fuzzer::{ }; use arrow::array::{ - types::Int64Type, Array, ArrayRef, AsArray, Int32Array, Int64Array, RecordBatch, - StringArray, + make_array, types::Int64Type, Array, ArrayRef, AsArray, Int32Array, Int64Array, + RecordBatch, StringArray, StringViewArray, UInt64Array, UInt8Array, }; +use arrow::buffer::NullBuffer; use arrow::compute::concat_batches; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, UInt64Type}; use arrow::util::pretty::pretty_format_batches; use arrow_schema::{Field, Schema, SchemaRef}; use datafusion::datasource::memory::MemorySourceConfig; @@ -767,3 +768,306 @@ async fn test_single_mode_aggregate_single_mode_aggregate_with_spill() -> Result Ok(()) } + +/// This test where we group by multiple columns and a very extreme case where all the values are unique +/// i.e. there are the same number of groups as input rows +#[tokio::test] +async fn test_group_by_multiple_columns_all_unique() -> Result<()> { + let scan_schema = Arc::new(Schema::new(vec![ + Field::new("col_1", DataType::UInt8, true), + Field::new("idx", DataType::UInt64, false), + Field::new("grouping_key_1", DataType::Utf8, true), + Field::new("grouping_key_2", DataType::Utf8, false), + Field::new("grouping_key_3", DataType::Utf8View, true), + Field::new("grouping_key_4", DataType::Utf8View, false), + Field::new("grouping_key_5", DataType::Int32, true), + Field::new("grouping_key_6", DataType::Int32, false), + ])); + + let nullable_grouping_expr_indices = scan_schema + .fields() + .iter() + .enumerate() + .skip(1) + .filter(|(_, f)| f.is_nullable()) + .map(|(index, _)| index) + .collect::>(); + + let record_batch_size = 100; + let number_of_record_batches = 1000; + + #[derive(Clone)] + enum NullableDataGenConfig { + /// All the values in the array are nulls + All, + + /// No nulls values + None, + + /// Random nulls + Random, + } + + /// Generate an infinite iterator of (column_index, null_config) pairs + /// where the column_index is which column we're going to change based on the null_config + /// + /// Each (column_index, null_config) pair will be repeated [`NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG`] times + /// to make sure that even if concatenating batches together, we still have a safe margin for the all and none null cases + /// + /// For example, if the nullable_column_indices are `[1, 3, 5]` + /// and [`NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG`] is 2 + /// this will generate the following sequence: + /// ```text + /// (1, AllNulls), (1, AllNulls), + /// (1, NoNulls), (1, NoNulls), + /// (1, RandomNulls), (1, RandomNulls), + /// (3, AllNulls), (3, AllNulls), + /// (3, NoNulls), (3, NoNulls), + /// (3, RandomNulls), (3, RandomNulls), + /// (5, AllNulls), (5, AllNulls), + /// (5, NoNulls), (5, NoNulls), + /// (5, RandomNulls), (5, RandomNulls), + /// ``` + fn generate_infinite_null_state_iter( + nullable_column_indices: Vec, + ) -> impl Iterator { + /// How many consecutive batches to generate with the same null configuration + /// to make sure that even if concatenating batches together, we still have a safe margin for the all and none null cases + const NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG: usize = 4; + + nullable_column_indices + .into_iter() + // Create a pair of (column_index, null_config) for each null config + .flat_map(|index| { + [ + (index, NullableDataGenConfig::All), + (index, NullableDataGenConfig::None), + (index, NullableDataGenConfig::Random), + ] + }) + // Repeat the same column index for NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG times + // to make sure we have multiple consecutive batches with the same null configuration + .flat_map(|index| { + std::iter::repeat_n(index, NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG) + }) + // Make it infinite so we can just take as many as we need + .cycle() + } + + let schema = Arc::clone(&scan_schema); + + let input = (0..number_of_record_batches) + .zip(generate_infinite_null_state_iter( + nullable_grouping_expr_indices, + )) + .map(move |(batch_index, null_generate_config)| { + let unique_iterator = (batch_index * record_batch_size) + ..(batch_index * record_batch_size) + record_batch_size; + + // Only one column at a time to have nulls to make sure all the group keys together are still unique + + let mut columns = vec![ + // col_1: nullable uint8 + // The values does not really matter as we just count on it + Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( + 0, + record_batch_size, + ))), + // idx: non-nullable uint64 + Arc::new(UInt64Array::from_iter_values( + unique_iterator.clone().map(|x| x as u64), + )) as ArrayRef, + // grouping_key_1: nullable string + Arc::new(StringArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_2: non-nullable string + Arc::new(StringArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_3: nullable string view + Arc::new(StringViewArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_4: non-nullable string view + Arc::new(StringViewArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_5: nullable int32 + Arc::new(Int32Array::from_iter_values( + unique_iterator.clone().map(|x| x as i32), + )) as ArrayRef, + // grouping_key_6: non-nullable int32 + Arc::new(Int32Array::from_iter_values( + unique_iterator.clone().map(|x| x as i32), + )) as ArrayRef, + ]; + + // Apply the null configuration to the selected column + let (column_to_set_nulls, null_config) = null_generate_config; + + match null_config { + // We should already have no nulls by default + NullableDataGenConfig::None => { + assert_eq!(columns[column_to_set_nulls].null_count(), 0); + } + // Change all values to nulls + NullableDataGenConfig::All => { + columns[column_to_set_nulls] = set_nulls( + &columns[column_to_set_nulls], + NullBuffer::new_null(columns[column_to_set_nulls].len()), + ); + } + NullableDataGenConfig::Random => { + let mut rnd = rng(); + + let null_buffer: NullBuffer = (0..columns[column_to_set_nulls].len()) + .map(|_| { + // ~30% nulls + rnd.random::() > 0.3 + }) + .collect(); + + columns[column_to_set_nulls] = + set_nulls(&columns[column_to_set_nulls], null_buffer); + } + } + + RecordBatch::try_new(Arc::clone(&schema), columns).map_err(Into::into) + }) + .collect::>>()?; + + let input_row_count: usize = input.iter().map(|b| b.num_rows()).sum(); + + let results = { + let session_config = SessionConfig::new().with_batch_size(record_batch_size); + let ctx = SessionContext::new_with_config(session_config); + + let df = run_sql_on_input( + &ctx, + Arc::clone(&scan_schema), + vec![input.clone()], + // It is important to get the grouping keys back so we can test that we don't + // mess them up + r#" + SELECT + idx, + grouping_key_1, + grouping_key_2, + grouping_key_3, + grouping_key_4, + grouping_key_5, + grouping_key_6, + COUNT(col_1) + FROM t + GROUP BY + idx, + grouping_key_1, + grouping_key_2, + grouping_key_3, + grouping_key_4, + grouping_key_5, + grouping_key_6 + "#, + ) + .await?; + + df.collect().await? + }; + + assert_eq!( + results.iter().map(|b| b.num_rows()).sum::(), + input_row_count, + "Must be exactly the same number of output rows as input rows" + ); + + // Sort the results for easier assertion + // We are sorting in different plan to make sure it does not affect the aggregation + let sorted_results = { + let session_config = SessionConfig::new().with_batch_size(record_batch_size); + let ctx = SessionContext::new_with_config(session_config); + + let df = run_sql_on_input( + &ctx, + results[0].schema(), + vec![results], + "SELECT * FROM t ORDER BY idx", + ) + .await?; + + df.collect().await? + }; + + // Assert the input is sorted + assert_eq!( + sorted_results + .iter() + .flat_map(|b| b + .column_by_name("idx") + .unwrap() + .as_primitive::() + .iter()) + .collect::>(), + (0..input_row_count) + .map(|x| Some(x as u64)) + .collect::>(), + "Output is not sorted by idx" + ); + + // The expected output batches are the input batches without the `col_1` and with a count column with value 1 added at the end + let expected_output_batches = input.into_iter().map(|batch| { + // Remove the first column (col_1) which we are counting + let indices = (1..batch.schema().fields().len()).collect::>(); + let batch = batch.project(&indices).unwrap(); + + // Add the expected count column with all values set to 1 + let count_result = vec![1; batch.num_rows()]; + let count_array = Int64Array::from_iter_values(count_result); + let (_, mut arrays, _) = batch.into_parts(); + + arrays.push(Arc::new(count_array) as ArrayRef); + + RecordBatch::try_new(sorted_results[0].schema(), arrays).unwrap() + }); + + for (output_batch, expected_batch) in + sorted_results.iter().zip(expected_output_batches) + { + assert_eq!(output_batch, &expected_batch); + } + + Ok(()) +} + +/// Set the null buffer of an array to the specified null buffer +/// it is important that we keep the underlying values as is and only modify the null buffer +/// to also test the case for bytes for example that null values does not point to an empty bytes +fn set_nulls(array: &ArrayRef, null_buffer: NullBuffer) -> ArrayRef { + let null_count = null_buffer.null_count(); + + let array_data = array + .to_data() + .into_builder() + .nulls(Some(null_buffer)) + .build() + .unwrap(); + + let array = make_array(array_data); + + assert_eq!(array.null_count(), null_count); + + array +} + +async fn run_sql_on_input( + ctx: &SessionContext, + schema: SchemaRef, + partitions: Vec>, + sql: &str, +) -> Result { + let provider = MemTable::try_new(schema, partitions)?; + + ctx.register_table("t", Arc::new(provider))?; + + ctx.sql(sql).await +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs index 03e26446f5751..76b8f193573c7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs @@ -158,6 +158,39 @@ impl GroupColumn for BooleanGroupValueBuilder { Ok(()) } + fn support_append_array_slice(&self) -> bool { + true + } + + fn append_array_slice( + &mut self, + array: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> { + let array = array.as_boolean(); + + if NULLABLE { + if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) { + self.nulls.append_buffer(&nulls.slice(start, length)); + } else { + self.nulls.append_n(length, false); + } + } else { + assert_eq!( + array.null_count(), + 0, + "unexpected nulls in non nullable input" + ); + self.nulls.append_n(length, false); + } + + self.buffer + .append_buffer(&array.values().slice(start, length)); + + Ok(()) + } + fn len(&self) -> usize { self.buffer.len() } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index d52721c2ee6c3..9a3a6528f2af6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -24,9 +24,9 @@ use arrow::array::{ GenericBinaryArray, GenericByteArray, GenericStringArray, OffsetSizeTrait, }; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; -use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType}; +use arrow::datatypes::{ArrowNativeType, ByteArrayType, DataType, GenericBinaryType}; use datafusion_common::utils::proxy::VecAllocExt; -use datafusion_common::{exec_datafusion_err, Result}; +use datafusion_common::{exec_datafusion_err, exec_err, Result}; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; use itertools::izip; use std::mem::size_of; @@ -173,6 +173,75 @@ where Ok(()) } + fn append_array_slice_inner( + &mut self, + array: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> + where + B: ByteArrayType, + { + let array = array.as_bytes::(); + + let offsets = array.offsets(); + let bytes = array.value_data(); + + // Is the slice all nulls + let all_nulls: bool; + + // 1. Append the nulls + if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) { + let nulls_slice = nulls.slice(start, length); + all_nulls = nulls_slice.null_count() == length; + self.nulls.append_buffer(&nulls_slice); + } else { + all_nulls = false; + self.nulls.append_n(length, false); + } + + let values_start = offsets[start].as_usize(); + let values_end = offsets[start + length].as_usize(); + + // 2. Append the offsets + + if all_nulls { + let last_offset = self.buffer.len(); + + // If all nulls, we can just repeat the last offset + self.offsets + .extend(std::iter::repeat_n(O::usize_as(last_offset), length)); + } else { + let new_base = self.buffer.len(); + let old_base = offsets[start].as_usize(); + + self.offsets.extend( + offsets[start + 1..start + length + 1] + .iter() + .map(|&offset| O::usize_as(new_base + offset.as_usize() - old_base)), + ); + } + + // 3. Append the bytes + + // Only if not all nulls append the actual bytes + if !all_nulls { + // Note: if the array have nulls we might copy some bytes that are not used. + + // Add all the bytes for the values directly to the byte buffer + self.buffer.append_slice(&bytes[values_start..values_end]); + + if self.buffer.len() > self.max_buffer_size { + return exec_err!( + "offset overflow, buffer size > {}", + self.max_buffer_size + ); + } + } + + Ok(()) + } + fn do_equal_to_inner( &self, lhs_row: usize, @@ -329,6 +398,41 @@ where Ok(()) } + fn support_append_array_slice(&self) -> bool { + true + } + + fn append_array_slice( + &mut self, + column: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> { + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + column.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.append_array_slice_inner::>( + column, start, length, + )? + } + OutputType::Utf8 => { + debug_assert!(matches!( + column.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.append_array_slice_inner::>( + column, start, length, + )? + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + }; + + Ok(()) + } + fn len(&self) -> usize { self.offsets.len() - 1 } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 9adf028eca7f6..6cde0e53135d5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -88,6 +88,34 @@ pub trait GroupColumn: Send + Sync { /// The vectorized version `append_val` fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()>; + /// Whether this builder supports [`Self::append_array_slice`] optimization + /// In case it returns true, [`Self::append_array_slice`] must be implemented + fn support_append_array_slice(&self) -> bool { + false + } + + /// Append slice of values from `array`, starting at `start` for `length` rows + /// + /// This is a special case of `vectorized_append` when the rows are continuous + /// + /// You should implement this to optimize large copies of contiguous values. + /// + /// This does not get the sliced array even though it would be more user-friendly + /// to allow optimization that avoid the additional computation that can happen in a slice + /// + /// Note: in order for this to be used, [`Self::support_append_array_slice`] must return true + fn append_array_slice( + &mut self, + _array: &ArrayRef, + _start: usize, + _length: usize, + ) -> Result<()> { + assert!(!self.support_append_array_slice(), "support_append_array_slice() return true while append_array_slice() is not implemented"); + not_impl_err!( + "append_array_slice is not implemented for this GroupColumn, please implement it as well as support_append_array_slice" + ) + } + /// Returns the number of rows stored in this builder fn len(&self) -> usize; @@ -229,6 +257,11 @@ struct VectorizedOperationBuffers { /// The `vectorized append` row indices buffer append_row_indices: Vec, + /// If all the values in `append_row_indices` are consecutive + /// i.e. `append_row_indices[i] + 1 == append_row_indices[i + 1]` + /// this is used to optimize the `vectorized_append` operation + are_row_indices_consecutive: bool, + /// The `vectorized_equal_to` row indices buffer equal_to_row_indices: Vec, @@ -246,12 +279,27 @@ struct VectorizedOperationBuffers { impl VectorizedOperationBuffers { fn clear(&mut self) { - self.append_row_indices.clear(); + self.clear_append_row_indices(); self.equal_to_row_indices.clear(); self.equal_to_group_indices.clear(); self.equal_to_results.clear(); self.remaining_row_indices.clear(); } + + fn add_append_row_index(&mut self, row: usize) { + self.are_row_indices_consecutive = self.are_row_indices_consecutive + && self + .append_row_indices + .last() + .is_none_or(|last| last + 1 == row); + + self.append_row_indices.push(row); + } + + fn clear_append_row_indices(&mut self) { + self.append_row_indices.clear(); + self.are_row_indices_consecutive = true; + } } impl GroupValuesColumn { @@ -491,7 +539,7 @@ impl GroupValuesColumn { batch_hashes: &[u64], groups: &mut [usize], ) { - self.vectorized_operation_buffers.append_row_indices.clear(); + self.vectorized_operation_buffers.clear_append_row_indices(); self.vectorized_operation_buffers .equal_to_row_indices .clear(); @@ -521,9 +569,7 @@ impl GroupValuesColumn { ); // Add row index to `vectorized_append_row_indices` - self.vectorized_operation_buffers - .append_row_indices - .push(row); + self.vectorized_operation_buffers.add_append_row_index(row); // Set group index to row in `groups` groups[row] = current_group_idx; @@ -571,11 +617,33 @@ impl GroupValuesColumn { } let iter = self.group_values.iter_mut().zip(cols.iter()); - for (group_column, col) in iter { - group_column.vectorized_append( - col, - &self.vectorized_operation_buffers.append_row_indices, - )?; + if self + .vectorized_operation_buffers + .are_row_indices_consecutive + && !self + .vectorized_operation_buffers + .append_row_indices + .is_empty() + { + let start = self.vectorized_operation_buffers.append_row_indices[0]; + let length = self.vectorized_operation_buffers.append_row_indices.len(); + for (group_column, col) in iter { + if group_column.support_append_array_slice() { + group_column.append_array_slice(col, start, length)?; + } else { + group_column.vectorized_append( + col, + &self.vectorized_operation_buffers.append_row_indices, + )?; + } + } + } else { + for (group_column, col) in iter { + group_column.vectorized_append( + col, + &self.vectorized_operation_buffers.append_row_indices, + )?; + } } Ok(()) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index df2cf4bdecce5..2d0e5fb0b65e1 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -239,6 +239,39 @@ impl GroupColumn Ok(()) } + fn support_append_array_slice(&self) -> bool { + true + } + + fn append_array_slice( + &mut self, + array: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> { + let array = array.as_primitive::(); + + if NULLABLE { + if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) { + self.nulls.append_buffer(&nulls.slice(start, length)); + } else { + self.nulls.append_n(length, false); + } + } else { + assert_eq!( + array.null_count(), + 0, + "unexpected nulls in non nullable input" + ); + self.nulls.append_n(length, false); + } + + self.group_values + .extend_from_slice(&array.values()[start..start + length]); + + Ok(()) + } + fn len(&self) -> usize { self.group_values.len() } diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index 6a84d685b6c79..0249702ea261f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -61,6 +61,13 @@ impl MaybeNullBufferBuilder { } } + /// Append [`NullBuffer`] to this [`NullBufferBuilder`] + /// + /// This is useful when you want to concatenate two null buffers. + pub fn append_buffer(&mut self, other: &NullBuffer) { + self.nulls.append_buffer(other); + } + /// return the number of heap allocated bytes used by this structure to store boolean values pub fn allocated_size(&self) -> usize { // NullBufferBuilder builder::allocated_size returns capacity in bits