diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index f430a87e190d..ac6a3b669ab7 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -62,6 +62,7 @@ dashmap = { workspace = true } datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } datafusion-optimizer = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-proto = { workspace = true } diff --git a/datafusion-examples/examples/advanced_udaf.rs b/datafusion-examples/examples/advanced_udaf.rs index 1259f90d6449..f107731e5b91 100644 --- a/datafusion-examples/examples/advanced_udaf.rs +++ b/datafusion-examples/examples/advanced_udaf.rs @@ -17,6 +17,7 @@ use arrow_schema::{Field, Schema}; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::EmitToExt; use datafusion_physical_expr::NullState; use std::{any::Any, sync::Arc}; diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e1c5d5424b0..22f7cc4929e8 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -338,6 +338,19 @@ config_namespace! { /// if the source of statistics is accurate. /// We plan to make this the default in the future. pub use_row_number_estimates_to_optimize_partitioning: bool, default = false + + /// Should DataFusion use the the blocked approach to manage the groups + /// values and their related states in accumulators. By default, the single + /// approach will be used, values are managed within a single large block + /// (can think of it as a Vec). As this block grows, it often triggers + /// numerous copies, resulting in poor performance. + /// If setting this flag to `true`, the blocked approach will be used. + /// And the blocked approach allocates capacity for the block + /// based on a predefined block size firstly. When the block reaches its limit, + /// we allocate a new block (also with the same predefined block size based capacity) + // instead of expanding the current one and copying the data. + /// We plan to make this the default in the future when tests are enough. + pub enable_aggregation_intermediate_states_blocked_approach: bool, default = false } } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 62e9be63983c..ce89ad363cf4 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -33,15 +33,20 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, displayable, ExecutionPlan}; use datafusion::prelude::{DataFrame, SessionConfig, SessionContext}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::ScalarValue; +use datafusion_execution::disk_manager::DiskManagerConfig; +use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion_execution::TaskContext; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::InputOrderMode; +use rand::seq::SliceRandom; use test_utils::{add_empty_batches, StringBatchGenerator}; use hashbrown::HashMap; use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; +use rand::{thread_rng, Rng, SeedableRng}; use tokio::task::JoinSet; /// Tests that streaming aggregate and batch (non streaming) aggregate produce @@ -65,7 +70,7 @@ async fn streaming_aggregate_test() { for i in 0..n { let test_idx = i % test_cases.len(); let group_by_columns = test_cases[test_idx].clone(); - join_set.spawn(run_aggregate_test( + join_set.spawn(run_streaming_aggregate_test( make_staggered_batches::(1000, distinct, i as u64), group_by_columns, )); @@ -77,13 +82,59 @@ async fn streaming_aggregate_test() { } } +/// Tests that streaming aggregate and batch (non streaming) aggregate produce +/// same results +#[tokio::test(flavor = "multi_thread")] +async fn blocked_approach_aggregate_test() { + let test_cases = [ + vec!["a"], + vec!["b", "a"], + vec!["c", "a"], + vec!["c", "b", "a"], + vec!["d", "a"], + vec!["d", "b", "a"], + vec!["d", "c", "a"], + vec!["d", "c", "b", "a"], + ]; + + let n_batch_size = 10; + let mut rng = thread_rng(); + let mut all_batch_sizes = (1..=50_usize).collect::>(); + all_batch_sizes.shuffle(&mut rng); + let batch_sizes = &all_batch_sizes[0..n_batch_size]; + + let n = 300; + let distincts = vec![10, 20]; + for distinct in distincts { + let mut join_set = JoinSet::new(); + for batch_size in batch_sizes { + for i in 0..n { + let test_idx = i % test_cases.len(); + let group_by_columns = test_cases[test_idx].clone(); + join_set.spawn(run_blocked_approach_aggregate_test( + make_staggered_batches::(1000, distinct, i as u64), + group_by_columns, + *batch_size, + )); + } + } + while let Some(join_handle) = join_set.join_next().await { + // propagate errors + join_handle.unwrap(); + } + } +} + /// Perform batch and streaming aggregation with same input /// and verify outputs of `AggregateExec` with pipeline breaking stream `GroupedHashAggregateStream` /// and non-pipeline breaking stream `BoundedAggregateStream` produces same result. -async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str>) { - let schema = input1[0].schema(); - let session_config = SessionConfig::new().with_batch_size(50); - let ctx = SessionContext::new_with_config(session_config); +async fn run_streaming_aggregate_test( + test_data: Vec, + group_by_columns: Vec<&str>, +) { + let schema = test_data[0].schema(); + + // Define test data source exec let mut sort_keys = vec![]; for ordering_col in ["a", "b", "c"] { sort_keys.push(PhysicalSortExpr { @@ -92,17 +143,138 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str }) } - let concat_input_record = concat_batches(&schema, &input1).unwrap(); + let concat_input_record = concat_batches(&schema, &test_data).unwrap(); let usual_source = Arc::new( MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(), ); let running_source = Arc::new( - MemoryExec::try_new(&[input1.clone()], schema.clone(), None) + MemoryExec::try_new(&[test_data.clone()], schema.clone(), None) .unwrap() .with_sort_information(vec![sort_keys]), ); + // Define test task ctx + let session_config = SessionConfig::new().with_batch_size(50); + let ctx = SessionContext::new_with_config(session_config); + + // Run and check + let usual_aggr_ctx = AggrTestContext { + data_source_exec: usual_source, + task_ctx: ctx.task_ctx(), + }; + + let running_aggr_ctx = AggrTestContext { + data_source_exec: running_source, + task_ctx: ctx.task_ctx(), + }; + + run_aggregate_test_internal( + test_data, + usual_aggr_ctx, + running_aggr_ctx, + |collected_usual, collected_running| { + assert!(collected_running.len() > 2); + // Running should produce more chunk than the usual AggregateExec. + // Otherwise it means that we cannot generate result in running mode. + assert!(collected_running.len() > collected_usual.len()); + }, + group_by_columns, + ) + .await; +} + +/// Perform batch and blocked approach aggregations, and then verify their outputs. +async fn run_blocked_approach_aggregate_test( + test_data: Vec, + group_by_columns: Vec<&str>, + batch_size: usize, +) { + let schema = test_data[0].schema(); + + // Define test data source exec + let concat_input_record = concat_batches(&schema, &test_data).unwrap(); + let usual_source = Arc::new( + MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(), + ); + + let running_source = Arc::new( + MemoryExec::try_new(&[test_data.clone()], schema.clone(), None).unwrap(), + ); + + // Define test task ctx + // Usual task ctx + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.batch_size", + &ScalarValue::UInt64(Some(batch_size as u64)), + ); + let usual_ctx = Arc::new(TaskContext::default().with_session_config(session_config)); + + // Running task ctx + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.enable_aggregation_intermediate_states_blocked_approach", + &ScalarValue::Boolean(Some(true)), + ); + session_config = session_config.set( + "datafusion.execution.batch_size", + &ScalarValue::UInt64(Some(batch_size as u64)), + ); + + let runtime = Arc::new( + RuntimeEnv::new( + RuntimeConfig::default().with_disk_manager(DiskManagerConfig::Disabled), + ) + .unwrap(), + ); + let running_ctx = Arc::new( + TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime), + ); + + // Run and check + let usual_aggr_ctx = AggrTestContext { + data_source_exec: usual_source, + task_ctx: usual_ctx, + }; + + let running_aggr_ctx = AggrTestContext { + data_source_exec: running_source, + task_ctx: running_ctx, + }; + + run_aggregate_test_internal( + test_data, + usual_aggr_ctx, + running_aggr_ctx, + |_, _| {}, + group_by_columns, + ) + .await; +} + +/// Options of the fuzz aggregation tests +struct AggrTestContext { + data_source_exec: Arc, + task_ctx: Arc, +} + +/// The internal test function for performing normal aggregation +/// and other optimized aggregations (without any optimizations, +/// e.g. streaming, blocked approach), and verify outputs of them. +async fn run_aggregate_test_internal( + test_data: Vec, + left_aggr_ctx: AggrTestContext, + right_aggr_ctx: AggrTestContext, + extra_checks: C, + group_by_columns: Vec<&str>, +) where + C: Fn(&[RecordBatch], &[RecordBatch]), +{ + let schema = test_data[0].schema(); + let aggregate_expr = vec![ AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()]) @@ -117,42 +289,44 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str .collect::>(); let group_by = PhysicalGroupBy::new_single(expr); - let aggregate_exec_running = Arc::new( + let aggregate_exec_usual = Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by.clone(), aggregate_expr.clone(), vec![None], - running_source, + left_aggr_ctx.data_source_exec.clone(), schema.clone(), ) .unwrap(), ) as Arc; - let aggregate_exec_usual = Arc::new( + let aggregate_exec_running = Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by.clone(), aggregate_expr.clone(), vec![None], - usual_source, + right_aggr_ctx.data_source_exec.clone(), schema.clone(), ) .unwrap(), ) as Arc; - let task_ctx = ctx.task_ctx(); - let collected_usual = collect(aggregate_exec_usual.clone(), task_ctx.clone()) - .await - .unwrap(); + let collected_usual = + collect(aggregate_exec_usual.clone(), left_aggr_ctx.task_ctx.clone()) + .await + .unwrap(); + + let collected_running = collect( + aggregate_exec_running.clone(), + right_aggr_ctx.task_ctx.clone(), + ) + .await + .unwrap(); + + extra_checks(&collected_usual, &collected_running); - let collected_running = collect(aggregate_exec_running.clone(), task_ctx.clone()) - .await - .unwrap(); - assert!(collected_running.len() > 2); - // Running should produce more chunk than the usual AggregateExec. - // Otherwise it means that we cannot generate result in running mode. - assert!(collected_running.len() > collected_usual.len()); // compare let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string(); let running_formatted = pretty_format_batches(&collected_running) @@ -187,7 +361,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str displayable(aggregate_exec_running.as_ref()).indent(false), usual_formatted, running_formatted, - pretty_format_batches(&input1).unwrap(), + pretty_format_batches(&test_data).unwrap(), ); } } @@ -311,6 +485,7 @@ async fn group_by_string_test( let actual = extract_result_counts(results); assert_eq!(expected, actual); } + async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) { struct Visitor { expected_sort: bool, diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 156e21d9ae20..9d0d0aa47588 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -18,7 +18,7 @@ //! Vectorized [`GroupsAccumulator`] use arrow::array::{ArrayRef, BooleanArray}; -use datafusion_common::{not_impl_err, Result}; +use datafusion_common::{not_impl_err, DataFusionError, Result}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] @@ -31,29 +31,14 @@ pub enum EmitTo { /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted /// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`. First(usize), -} - -impl EmitTo { - /// Removes the number of rows from `v` required to emit the right - /// number of rows, returning a `Vec` with elements taken, and the - /// remaining values in `v`. - /// - /// This avoids copying if Self::All - pub fn take_needed(&self, v: &mut Vec) -> Vec { - match self { - Self::All => { - // Take the entire vector, leave new (empty) vector - std::mem::take(v) - } - Self::First(n) => { - // get end n+1,.. values into t - let mut t = v.split_off(*n); - // leave n+1,.. in v - std::mem::swap(v, &mut t); - t - } - } - } + /// Emit next block in the blocked managed groups + /// + /// The flag's meaning: + /// - `true` represents new groups still will be added, + /// and we need to shift the values down. + /// - `false` represents no new groups will be added again, + /// and we don't need to shift the values down. + NextBlock(bool), } /// `GroupAccumulator` implements a single aggregate (e.g. AVG) and @@ -143,6 +128,32 @@ pub trait GroupsAccumulator: Send { /// [`Accumulator::state`]: crate::accumulator::Accumulator::state fn state(&mut self, emit_to: EmitTo) -> Result>; + /// Returns `true` if this accumulator supports blocked mode. + fn supports_blocked_mode(&self) -> bool { + false + } + + /// Alter the block size in the accumulator + /// + /// If the target block size is `None`, it will use a single big + /// block(can think it a `Vec`) to manage the state. + /// + /// If the target block size` is `Some(blk_size)`, it will try to + /// set the block size to `blk_size`, and the try will only success + /// when the accumulator has supported blocked mode. + /// + /// NOTICE: After altering block size, all data in previous will be cleared. + /// + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + if block_size.is_some() { + return Err(DataFusionError::NotImplemented( + "this accumulator doesn't support blocked mode yet".to_string(), + )); + } + + Ok(()) + } + /// Merges intermediate state (the output from [`Self::state`]) /// into this accumulator's current state. /// diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index b5eb36c3fac7..0b0670b1a2e6 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -23,6 +23,14 @@ pub mod bool_op; pub mod nulls; pub mod prim_op; +use std::{ + cmp::min, + collections::VecDeque, + fmt::{self, Debug}, + iter, + ops::{Index, IndexMut}, +}; + use arrow::{ array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}, compute, @@ -35,6 +43,12 @@ use datafusion_common::{ use datafusion_expr_common::accumulator::Accumulator; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; +pub const MAX_PREALLOC_BLOCK_SIZE: usize = 8192; +const FLAT_GROUP_INDEX_ID_MASK: u64 = 0; +const FLAT_GROUP_INDEX_OFFSET_MASK: u64 = u64::MAX; +const BLOCKED_GROUP_INDEX_ID_MASK: u64 = 0xffffffff00000000; +const BLOCKED_GROUP_INDEX_OFFSET_MASK: u64 = 0x00000000ffffffff; + /// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] /// /// While [`Accumulator`] are simpler to implement and can support @@ -411,6 +425,315 @@ impl VecAllocExt for Vec { } } +pub trait EmitToExt { + /// Removes the number of rows from `v` required to emit the right + /// number of rows, returning a `Vec` with elements taken, and the + /// remaining values in `v`. + /// + /// This avoids copying if Self::All + fn take_needed(&self, v: &mut Vec) -> Vec; + + /// Removes the number of rows from `blocks` required to emit, + /// returning a `Vec` with elements taken. + /// + /// The detailed behavior in different emissions: + /// - For Emit::CurrentBlock, the first block will be taken and return. + /// - For Emit::All and Emit::First, it will be only supported in `GroupStatesMode::Flat`, + /// similar as `take_needed`. + fn take_needed_from_blocks(&self, blocks: &mut VecBlocks) -> Vec; +} + +impl EmitToExt for EmitTo { + fn take_needed(&self, v: &mut Vec) -> Vec { + match self { + Self::All => { + // Take the entire vector, leave new (empty) vector + std::mem::take(v) + } + Self::First(n) => { + let split_at = min(v.len(), *n); + // get end n+1,.. values into t + let mut t = v.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(v, &mut t); + t + } + Self::NextBlock(_) => unreachable!( + "can not support blocked emission in take_needed, you should use take_needed_from_blocks" + ), + } + } + + fn take_needed_from_blocks(&self, blocks: &mut VecBlocks) -> Vec { + match self { + Self::All => { + debug_assert!(blocks.num_blocks() == 1); + blocks.pop_first_block().unwrap_or_default() + } + Self::First(n) => { + debug_assert!(blocks.num_blocks() == 1); + let block = blocks.current_mut().unwrap(); + let split_at = min(block.len(), *n); + + // get end n+1,.. values into t + let mut t = block.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(block, &mut t); + t + } + Self::NextBlock(_) => blocks.pop_first_block().unwrap_or_default(), + } + } +} + +/// Blocked style group index used in blocked mode group values and accumulators +/// +/// In blocked mode(is_blocked=true): +/// - High 32 bits represent `block_id` +/// - Low 32 bits represent `block_offset` +/// +/// In flat mode(is_blocked=false) +/// - `block_id` is always 0 +/// - total 64 bits used to represent the `block offset` +/// +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BlockedGroupIndex { + pub block_id: u32, + pub block_offset: u64, +} + +impl BlockedGroupIndex { + #[inline] + pub fn new_from_parts(block_id: u32, block_offset: u64) -> Self { + Self { + block_id, + block_offset, + } + } + + #[inline] + pub fn block_id(&self) -> usize { + self.block_id as usize + } + + #[inline] + pub fn block_offset(&self) -> usize { + self.block_offset as usize + } + + #[inline] + pub fn as_packed_index(&self) -> usize { + (((self.block_id as u64) << 32) | self.block_offset) as usize + } +} + +pub struct BlockedGroupIndexBuilder { + block_id_mask: u64, + block_offset_mask: u64, +} + +impl BlockedGroupIndexBuilder { + #[inline] + pub fn new(is_blocked: bool) -> Self { + if is_blocked { + Self { + block_id_mask: BLOCKED_GROUP_INDEX_ID_MASK, + block_offset_mask: BLOCKED_GROUP_INDEX_OFFSET_MASK, + } + } else { + Self { + block_id_mask: FLAT_GROUP_INDEX_ID_MASK, + block_offset_mask: FLAT_GROUP_INDEX_OFFSET_MASK, + } + } + } + + #[inline] + pub fn build(&self, packed_index: usize) -> BlockedGroupIndex { + let block_id = (((packed_index as u64) & self.block_id_mask) >> 32) as u32; + let block_offset = (packed_index as u64) & self.block_offset_mask; + + BlockedGroupIndex { + block_id, + block_offset, + } + } +} + +/// The basic data structure for blocked aggregation intermediate results +/// +/// The reason why not use `VecDeque` directly: +/// +/// `current` and `current_mut` will be called frequently, +/// and if we use `VecDeque` directly, they will be mapped +/// to `back` and `back_mut` in it. +/// +/// `back` and `back_mut` are implemented using indexed operation +/// which need some computation about address that will be a bit +/// more expansive than we keep the latest element in `current`, +/// and just return reference of it directly. +/// +/// This small optimization can bring slight performance improvement +/// in the single block case(e.g. when blocked optimization is disabled). +/// +pub struct Blocks { + /// The current block, it should be pushed into `previous` + /// when next block is pushed + current: Option, + + /// Previous blocks pushed before `current` + previous: VecDeque, +} + +impl Blocks { + pub fn new() -> Self { + Self { + current: None, + previous: VecDeque::new(), + } + } + + #[inline] + pub fn current(&self) -> Option<&T> { + self.current.as_ref() + } + + #[inline] + pub fn current_mut(&mut self) -> Option<&mut T> { + self.current.as_mut() + } + + pub fn push_block(&mut self, block: T) { + // If empty, use the block as initialized current + if self.current.is_none() { + self.current = Some(block); + return; + } + + // Take and push the old current to `previous`, + // use input `block` as the new `current` + let old_cur = std::mem::replace(&mut self.current, Some(block)).unwrap(); + self.previous.push_back(old_cur); + } + + pub fn pop_first_block(&mut self) -> Option { + // If `previous` not empty, pop the first of them + if !self.previous.is_empty() { + return self.previous.pop_front(); + } + + // Otherwise, we pop the current + std::mem::take(&mut self.current) + } + + pub fn num_blocks(&self) -> usize { + if self.current.is_none() { + return 0; + } + + self.previous.len() + 1 + } + + // TODO: maybe impl a specific Iterator rather than use the trait object, + // it can slightly improve performance by eliminating the dynamic dispatch. + pub fn iter(&self) -> Box + '_> { + // If current is None, it means no data, return empty iter + if self.current.is_none() { + return Box::new(iter::empty()); + } + + let cur_iter = iter::once(self.current.as_ref().unwrap()); + + if !self.previous.is_empty() { + let previous_iter = self.previous.iter(); + Box::new(previous_iter.chain(cur_iter)) + } else { + Box::new(cur_iter) + } + } + + // TODO: maybe impl a specific Iterator rather than use the trait object, + // it can slightly improve performance by eliminating the dynamic dispatch. + pub fn iter_mut(&mut self) -> Box + '_> { + // If current is None, it means no data, return empty iter + if self.current.is_none() { + return Box::new(iter::empty()); + } + + let cur_iter = iter::once(self.current.as_mut().unwrap()); + + if !self.previous.is_empty() { + let previous_iter = self.previous.iter_mut(); + Box::new(previous_iter.chain(cur_iter)) + } else { + Box::new(cur_iter) + } + } + + pub fn clear(&mut self) { + *self = Self::new(); + } +} + +impl fmt::Debug for Blocks { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Blocks") + .field("current", &self.current) + .field("previous", &self.previous) + .finish() + } +} + +impl Index for Blocks { + type Output = T; + + fn index(&self, index: usize) -> &T { + if index < self.previous.len() { + &self.previous[index] + } else { + self.current.as_ref().unwrap() + } + } +} + +impl IndexMut for Blocks { + fn index_mut(&mut self, index: usize) -> &mut T { + if index < self.previous.len() { + &mut self.previous[index] + } else { + self.current.as_mut().unwrap() + } + } +} + +impl Default for Blocks { + fn default() -> Self { + Self::new() + } +} + +pub type VecBlocks = Blocks>; + +impl VecBlocks { + pub fn capacity(&self) -> usize { + let cur_cap = self.current.as_ref().map(|blk| blk.capacity()).unwrap_or(0); + let prev_cap = self.previous.iter().map(|p| p.capacity()).sum::(); + + cur_cap + prev_cap + } + + pub fn len(&self) -> usize { + let cur_len = self.current.as_ref().map(|blk| blk.len()).unwrap_or(0); + let prev_len = self.previous.iter().map(|p| p.len()).sum::(); + + cur_len + prev_len + } + + pub fn is_empty(&self) -> bool { + self.current.is_none() + } +} + fn get_filter_at_indices( opt_filter: Option<&BooleanArray>, indices: &PrimitiveArray, @@ -450,3 +773,260 @@ pub(crate) fn slice_and_maybe_filter( Ok(sliced_arrays) } } + +/// Expend blocked values to a big enough size for holding `total_num_groups` groups. +/// +/// For example, +/// +/// before expanding: +/// values: [x, x, x], [x, x, x] (blocks=2, block_size=3) +/// total_num_groups: 8 +/// +/// After expanding: +/// values: [x, x, x], [x, x, x], [default, default, default] +/// +pub fn ensure_enough_room_for_values( + values: &mut VecBlocks, + total_num_groups: usize, + block_size: Option, + default_value: T, +) { + let calc_block_size = block_size.unwrap_or(usize::MAX); + // In blocked mode, we ensure the blks are enough first, + // and then ensure slots in blks are enough. + let (mut cur_blk_idx, exist_slots) = if values.num_blocks() > 0 { + let cur_blk_idx = values.num_blocks() - 1; + let exist_slots = + (values.num_blocks() - 1) * calc_block_size + values.current().unwrap().len(); + + (cur_blk_idx, exist_slots) + } else { + (0, 0) + }; + + // No new groups, don't need to expand, just return. + if exist_slots >= total_num_groups { + return; + } + + // Ensure blks are enough. + let exist_blks = values.num_blocks(); + let new_blks = total_num_groups.saturating_add(calc_block_size - 1) / calc_block_size + - exist_blks; + + if new_blks > 0 { + let prealloc_size = block_size.unwrap_or(0).min(MAX_PREALLOC_BLOCK_SIZE); + for _ in 0..new_blks { + values.push_block(Vec::with_capacity(calc_block_size.min(prealloc_size))); + } + } + + // Ensure slots are enough. + let mut new_slots = total_num_groups - exist_slots; + + // Expand current blk. + let cur_blk_rest_slots = calc_block_size - values[cur_blk_idx].len(); + if cur_blk_rest_slots >= new_slots { + // We just need to expand current blocks. + values[cur_blk_idx].extend(iter::repeat(default_value.clone()).take(new_slots)); + return; + } + + // Expand current blk to full, and expand next blks + values[cur_blk_idx] + .extend(iter::repeat(default_value.clone()).take(cur_blk_rest_slots)); + new_slots -= cur_blk_rest_slots; + cur_blk_idx += 1; + + // Expand whole blks + let expand_blks = new_slots / calc_block_size; + for _ in 0..expand_blks { + values[cur_blk_idx] + .extend(iter::repeat(default_value.clone()).take(calc_block_size)); + cur_blk_idx += 1; + } + + // Expand the last blk if needed + let last_expand_slots = new_slots % calc_block_size; + if last_expand_slots > 0 { + values + .current_mut() + .unwrap() + .extend(iter::repeat(default_value.clone()).take(last_expand_slots)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ensure_room_for_values() { + let mut blocks = VecBlocks::new(); + let block_size = 4; + + // 0 total_num_groups, should be no blocks + ensure_enough_room_for_values(&mut blocks, 0, Some(block_size), 0); + assert_eq!(blocks.num_blocks(), 0); + assert_eq!(blocks.len(), 0); + + // 0 -> 3 total_num_groups, blocks should look like: + // [d, d, d, empty] + ensure_enough_room_for_values(&mut blocks, 3, Some(block_size), 0); + assert_eq!(blocks.num_blocks(), 1); + assert_eq!(blocks.len(), 3); + + // 3 -> 8 total_num_groups, blocks should look like: + // [d, d, d, d], [d, d, d, d] + ensure_enough_room_for_values(&mut blocks, 8, Some(block_size), 0); + assert_eq!(blocks.num_blocks(), 2); + assert_eq!(blocks.len(), 8); + + // 8 -> 13 total_num_groups, blocks should look like: + // [d, d, d, d], [d, d, d, d], [d, d, d, d], [d, empty, empty, empty] + ensure_enough_room_for_values(&mut blocks, 13, Some(block_size), 0); + assert_eq!(blocks.num_blocks(), 4); + assert_eq!(blocks.len(), 13); + + // Block size none, it means we will always use one single block + // [] -> [d, d, d,...,d] + blocks.clear(); + ensure_enough_room_for_values(&mut blocks, 42, None, 0); + assert_eq!(blocks.num_blocks(), 1); + assert_eq!(blocks.len(), 42); + } + + #[test] + fn test_blocks_ops() { + let mut blocks = VecBlocks::::new(); + + // Test empty blocks + assert!(blocks.current().is_none()); + assert!(blocks.current_mut().is_none()); + assert!(blocks.pop_first_block().is_none()); + assert_eq!(blocks.num_blocks(), 0); + { + let mut iter = blocks.iter(); + assert!(iter.next().is_none()); + } + { + let mut iter_mut = blocks.iter_mut(); + assert!(iter_mut.next().is_none()); + } + + // Test push block + for cnt in 0..100 { + blocks.push_block(Vec::with_capacity(4)); + + assert!(blocks.current().is_some()); + assert!(blocks.current_mut().is_some()); + assert_eq!(blocks.num_blocks(), cnt + 1); + + let block_num = blocks.iter().count(); + assert_eq!(block_num, cnt + 1); + let block_num = blocks.iter_mut().count(); + assert_eq!(block_num, cnt + 1); + } + + // Test pop block + for cnt in 0..100 { + blocks.pop_first_block(); + + let rest_blk_num = 100 - cnt - 1; + assert_eq!(blocks.num_blocks(), rest_blk_num); + if rest_blk_num > 0 { + assert!(blocks.current().is_some()); + assert!(blocks.current_mut().is_some()); + } else { + assert!(blocks.current().is_none()); + assert!(blocks.current_mut().is_none()); + } + + let block_num = blocks.iter().count(); + assert_eq!(block_num, rest_blk_num); + let block_num = blocks.iter_mut().count(); + assert_eq!(block_num, rest_blk_num); + } + } + + #[test] + fn test_take_need() { + let values = vec![1, 2, 3, 4, 5, 6, 7, 8]; + + // Test emit all + let emit = EmitTo::All; + let mut source = values.clone(); + let expected = values.clone(); + let actual = emit.take_needed(&mut source); + assert_eq!(actual, expected); + assert!(source.is_empty()); + + // Test emit first n + // n < source len + let emit = EmitTo::First(4); + let mut origin = values.clone(); + let expected = origin[0..4].to_vec(); + let rest_expected = origin[4..].to_vec(); + let actual = emit.take_needed(&mut origin); + assert_eq!(actual, expected); + assert_eq!(origin, rest_expected); + + // n > source len + let emit = EmitTo::First(9); + let mut origin = values.clone(); + let expected = values.clone(); + let actual = emit.take_needed(&mut origin); + assert_eq!(actual, expected); + assert!(origin.is_empty()); + } + + #[test] + fn test_take_need_from_blocks() { + let block1 = vec![1, 2, 3, 4]; + let block2 = vec![5, 6, 7, 8]; + + let mut values = VecBlocks::new(); + values.push_block(block1.clone()); + values.push_block(block2.clone()); + + // Test emit block + let emit = EmitTo::NextBlock(false); + let actual = emit.take_needed_from_blocks(&mut values); + assert_eq!(actual, block1); + + let actual = emit.take_needed_from_blocks(&mut values); + assert_eq!(actual, block2); + + let actual = emit.take_needed_from_blocks(&mut values); + assert!(actual.is_empty()); + } + + #[test] + fn test_blocked_group_index_build() { + let group_index1 = 1; + let group_index2 = (42_u64 << 32) | 2; + let group_index3 = ((u32::MAX as u64) << 32) | 3; + + let index_builder = BlockedGroupIndexBuilder::new(false); + let flat1 = index_builder.build(group_index1 as usize); + let flat2 = index_builder.build(group_index2 as usize); + let flat3 = index_builder.build(group_index3 as usize); + let expected1 = BlockedGroupIndex::new_from_parts(0, group_index1); + let expected2 = BlockedGroupIndex::new_from_parts(0, group_index2); + let expected3 = BlockedGroupIndex::new_from_parts(0, group_index3); + assert_eq!(flat1, expected1); + assert_eq!(flat2, expected2); + assert_eq!(flat3, expected3); + + let index_builder = BlockedGroupIndexBuilder::new(true); + let blocked1 = index_builder.build(group_index1 as usize); + let blocked2 = index_builder.build(group_index2 as usize); + let blocked3 = index_builder.build(group_index3 as usize); + let expected1 = BlockedGroupIndex::new_from_parts(0, 1); + let expected2 = BlockedGroupIndex::new_from_parts(42, 2); + let expected3 = BlockedGroupIndex::new_from_parts(u32::MAX, 3); + assert_eq!(blocked1, expected1); + assert_eq!(blocked2, expected2); + assert_eq!(blocked3, expected3); + } +} diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index a0475fe8e446..770a10454880 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -24,6 +24,11 @@ use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; use datafusion_expr_common::groups_accumulator::EmitTo; + +use crate::aggregate::groups_accumulator::{ + BlockedGroupIndex, BlockedGroupIndexBuilder, Blocks, MAX_PREALLOC_BLOCK_SIZE, +}; + /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. /// @@ -228,7 +233,126 @@ impl NullState { } first_n_null } + EmitTo::NextBlock(_) => { + unreachable!("can't support blocked emission in flat NullState") + } + }; + NullBuffer::new(nulls) + } +} + +/// Similar as the [NullState] but designed for blocked version accumulator +#[derive(Debug)] +pub struct BlockedNullState { + /// Have we seen any non-filtered input values for `group_index`? + /// + /// If `seen_values[i]` is true, have seen at least one non null + /// value for group `i` + /// + /// If `seen_values[i]` is false, have not seen any values that + /// pass the filter yet for group `i` + seen_values_blocks: Blocks, + + block_size: Option, +} + +impl Default for BlockedNullState { + fn default() -> Self { + Self::new(None) + } +} + +impl BlockedNullState { + pub fn new(block_size: Option) -> Self { + Self { + seen_values_blocks: Blocks::new(), + block_size, + } + } + + /// return the size of all buffers allocated by this null state, not including self + pub fn size(&self) -> usize { + // capacity is in bits, so convert to bytes + self.seen_values_blocks + .iter() + .map(|blk| blk.capacity() / 8) + .sum::() + } + + /// Similar as [NullState::accumulate] + pub fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + value_fn: F, + ) where + T: ArrowPrimitiveType + Send, + F: FnMut(&BlockedGroupIndex, T::Native) + Send, + { + debug_assert!(total_num_groups > 0); + debug_assert_eq!(values.values().len(), group_indices.len()); + + // ensure the seen_values is big enough (start everything at + // "not seen" valid) + ensure_enough_room_for_nulls( + &mut self.seen_values_blocks, + total_num_groups, + self.block_size, + false, + ); + let seen_values_blocks = &mut self.seen_values_blocks; + let group_index_builder = + BlockedGroupIndexBuilder::new(self.block_size.is_some()); + + do_blocked_accumulate( + group_indices, + values, + opt_filter, + &group_index_builder, + value_fn, + |group_index| { + seen_values_blocks[group_index.block_id()] + .set_bit(group_index.block_offset(), true); + }, + ) + } + + /// Similar as [NullState::build] but support the blocked version accumulator + pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer { + if self.seen_values_blocks.num_blocks() == 0 { + return NullBuffer::new(BooleanBufferBuilder::new(0).finish()); + } + + let nulls = match emit_to { + EmitTo::All => { + debug_assert!(self.block_size.is_none()); + self.seen_values_blocks.current_mut().unwrap().finish() + } + EmitTo::First(n) => { + debug_assert!(self.block_size.is_none()); + + // split off the first N values in seen_values + // + // TODO make this more efficient rather than two + // copies and bitwise manipulation + let blk = self.seen_values_blocks.current_mut().unwrap(); + let nulls = blk.finish(); + let first_n_null: BooleanBuffer = nulls.iter().take(n).collect(); + // reset the existing seen buffer + for seen in nulls.iter().skip(n) { + blk.append(seen); + } + first_n_null + } + EmitTo::NextBlock(_) => { + debug_assert!(self.block_size.is_some()); + let mut cur_blk = self.seen_values_blocks.pop_first_block().unwrap(); + cur_blk.finish() + } }; + NullBuffer::new(nulls) } } @@ -386,7 +510,7 @@ pub fn accumulate_indices( opt_filter: Option<&BooleanArray>, mut index_fn: F, ) where - F: FnMut(usize) + Send, + F: FnMut(usize), { match (nulls, opt_filter) { (None, None) => { @@ -462,6 +586,118 @@ pub fn accumulate_indices( } } +fn do_blocked_accumulate( + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + group_index_builder: &BlockedGroupIndexBuilder, + mut value_fn: F1, + mut set_valid_fn: F2, +) where + T: ArrowPrimitiveType + Send, + F1: FnMut(&BlockedGroupIndex, T::Native) + Send, + F2: FnMut(&BlockedGroupIndex) + Send, +{ + let data: &[T::Native] = values.values(); + match (values.null_count() > 0, opt_filter) { + // no nulls, no filter, + (false, None) => { + let iter = group_indices.iter().zip(data.iter()); + for (&group_index, &new_value) in iter { + let blocked_index = group_index_builder.build(group_index); + set_valid_fn(&blocked_index); + value_fn(&blocked_index, new_value); + } + } + // nulls, no filter + (true, None) => { + let nulls = values.nulls().unwrap(); + // This is based on (ahem, COPY/PASTE) arrow::compute::aggregate::sum + // iterate over in chunks of 64 bits for more efficient null checking + let group_indices_chunks = group_indices.chunks_exact(64); + let data_chunks = data.chunks_exact(64); + let bit_chunks = nulls.inner().bit_chunks(); + + let group_indices_remainder = group_indices_chunks.remainder(); + let data_remainder = data_chunks.remainder(); + + group_indices_chunks + .zip(data_chunks) + .zip(bit_chunks.iter()) + .for_each(|((group_index_chunk, data_chunk), mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + group_index_chunk.iter().zip(data_chunk.iter()).for_each( + |(&group_index, &new_value)| { + // valid bit was set, real value + let is_valid = (mask & index_mask) != 0; + if is_valid { + let blocked_index = + group_index_builder.build(group_index); + set_valid_fn(&blocked_index); + value_fn(&blocked_index, new_value); + } + index_mask <<= 1; + }, + ) + }); + + // handle any remaining bits (after the initial 64) + let remainder_bits = bit_chunks.remainder_bits(); + group_indices_remainder + .iter() + .zip(data_remainder.iter()) + .enumerate() + .for_each(|(i, (&group_index, &new_value))| { + let is_valid = remainder_bits & (1 << i) != 0; + if is_valid { + let blocked_index = group_index_builder.build(group_index); + set_valid_fn(&blocked_index); + value_fn(&blocked_index, new_value); + } + }); + } + // no nulls, but a filter + (false, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + // The performance with a filter could be improved by + // iterating over the filter in chunks, rather than a single + // iterator. TODO file a ticket + group_indices + .iter() + .zip(data.iter()) + .zip(filter.iter()) + .for_each(|((&group_index, &new_value), filter_value)| { + if let Some(true) = filter_value { + let blocked_index = group_index_builder.build(group_index); + set_valid_fn(&blocked_index); + value_fn(&blocked_index, new_value); + } + }) + } + // both null values and filters + (true, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + // The performance with a filter could be improved by + // iterating over the filter in chunks, rather than using + // iterators. TODO file a ticket + filter + .iter() + .zip(group_indices.iter()) + .zip(values.iter()) + .for_each(|((filter_value, &group_index), new_value)| { + if let Some(true) = filter_value { + if let Some(new_value) = new_value { + let blocked_index = group_index_builder.build(group_index); + set_valid_fn(&blocked_index); + value_fn(&blocked_index, new_value); + } + } + }) + } + } +} + /// Ensures that `builder` contains a `BooleanBufferBuilder with at /// least `total_num_groups`. /// @@ -478,6 +714,77 @@ fn initialize_builder( builder } +/// Similar as the [initialize_builder] but designed for the blocked version accumulator +fn ensure_enough_room_for_nulls( + builder_blocks: &mut Blocks, + total_num_groups: usize, + block_size: Option, + default_value: bool, +) { + let calc_block_size = block_size.unwrap_or(usize::MAX); + // In blocked mode, we ensure the blks are enough first, + // and then ensure slots in blks are enough. + let (mut cur_blk_idx, exist_slots) = if builder_blocks.num_blocks() > 0 { + let cur_blk_idx = builder_blocks.num_blocks() - 1; + let exist_slots = (builder_blocks.num_blocks() - 1) * calc_block_size + + builder_blocks.current().unwrap().len(); + + (cur_blk_idx, exist_slots) + } else { + (0, 0) + }; + + // No new groups, don't need to expand, just return. + if exist_slots >= total_num_groups { + return; + } + + // Ensure blks are enough + let exist_blks = builder_blocks.num_blocks(); + let new_blks = (total_num_groups.saturating_add(calc_block_size - 1) + / calc_block_size) + - exist_blks; + if new_blks > 0 { + let prealloc_size = block_size.unwrap_or(0).min(MAX_PREALLOC_BLOCK_SIZE); + for _ in 0..new_blks { + builder_blocks.push_block(BooleanBufferBuilder::new( + calc_block_size.min(prealloc_size), + )); + } + } + + // Ensure slots are enough. + let mut new_slots = total_num_groups - exist_slots; + + // Expand current blk. + let cur_blk_rest_slots = calc_block_size - builder_blocks[cur_blk_idx].len(); + if cur_blk_rest_slots >= new_slots { + builder_blocks[cur_blk_idx].append_n(new_slots, default_value); + return; + } + + // Expand current blk to full, and expand next blks + builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, default_value); + new_slots -= cur_blk_rest_slots; + cur_blk_idx += 1; + + // Expand blks + let expand_blks = new_slots / calc_block_size; + for _ in 0..expand_blks { + builder_blocks[cur_blk_idx].append_n(calc_block_size, default_value); + cur_blk_idx += 1; + } + + // Expand the last blk. + let last_expand_slots = new_slots % calc_block_size; + if last_expand_slots > 0 { + builder_blocks + .current_mut() + .unwrap() + .append_n(last_expand_slots, default_value); + } +} + #[cfg(test)] mod test { use super::*; @@ -486,8 +793,262 @@ mod test { use rand::{rngs::ThreadRng, Rng}; use std::collections::HashSet; + /// Null state's behaviors needed in accumulate test + trait TestNullState { + fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + accumulated_values: &mut Vec<(usize, T::Native)>, + ) where + T: ArrowPrimitiveType + Send; + + fn build_bool_buffer(&self) -> BooleanBuffer; + + fn build_null_buffer(&mut self) -> NullBuffer; + } + + /// The original `NullState` + impl TestNullState for NullState { + fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + accumulated_values: &mut Vec<(usize, T::Native)>, + ) where + T: ArrowPrimitiveType + Send, + { + self.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, value| { + accumulated_values.push((group_index, value)); + }, + ); + } + + fn build_bool_buffer(&self) -> BooleanBuffer { + self.seen_values.finish_cloned() + } + + fn build_null_buffer(&mut self) -> NullBuffer { + self.build(EmitTo::All) + } + } + + /// The new `BlockedNullState` in flat mode + struct BlockedNullStateInFlatMode(BlockedNullState); + + impl BlockedNullStateInFlatMode { + fn new() -> Self { + let null_state = BlockedNullState::new(None); + + Self(null_state) + } + } + + impl TestNullState for BlockedNullStateInFlatMode { + fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + accumulated_values: &mut Vec<(usize, T::Native)>, + ) where + T: ArrowPrimitiveType + Send, + { + self.0.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, value| { + accumulated_values.push((group_index.as_packed_index(), value)); + }, + ); + } + + fn build_bool_buffer(&self) -> BooleanBuffer { + self.0.seen_values_blocks.current().unwrap().finish_cloned() + } + + fn build_null_buffer(&mut self) -> NullBuffer { + self.0.build(EmitTo::All) + } + } + + /// The new `BlockedNullState` in blocked mode + struct BlockedNullStateInBlockedMode { + null_state: BlockedNullState, + block_size: usize, + } + + impl BlockedNullStateInBlockedMode { + fn new() -> Self { + let null_state = BlockedNullState::new(Some(4)); + + Self { + null_state, + block_size: 4, + } + } + } + + impl TestNullState for BlockedNullStateInBlockedMode { + fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + accumulated_values: &mut Vec<(usize, T::Native)>, + ) where + T: ArrowPrimitiveType + Send, + { + // Convert group indices to blocked style + let blocked_indices = group_indices + .iter() + .map(|idx| { + let block_id = *idx / self.block_size; + let block_offset = *idx % self.block_size; + BlockedGroupIndex::new_from_parts( + block_id as u32, + block_offset as u64, + ) + .as_packed_index() + }) + .collect::>(); + + self.null_state.accumulate( + &blocked_indices, + values, + opt_filter, + total_num_groups, + |blocked_index, value| { + let flat_index = blocked_index.block_id() * self.block_size + + blocked_index.block_offset(); + accumulated_values.push((flat_index, value)); + }, + ); + } + + fn build_bool_buffer(&self) -> BooleanBuffer { + let mut ret_builder = BooleanBufferBuilder::new(0); + for blk in self.null_state.seen_values_blocks.iter() { + let buf = blk.finish_cloned(); + for seen in buf.iter() { + ret_builder.append(seen); + } + } + ret_builder.finish() + } + + fn build_null_buffer(&mut self) -> NullBuffer { + let mut ret_builder = BooleanBufferBuilder::new(0); + loop { + let blk = self.null_state.build(EmitTo::NextBlock(false)); + if blk.is_empty() { + break; + } + + for seen in blk.iter() { + ret_builder.append(seen); + } + } + + NullBuffer::new(ret_builder.finish()) + } + } + + /// Accumulate test mode + /// - Original, test the original `NullState` + /// - Flat, test the `BlockedNullState` in flat mode + /// - Blocked, test the `BlockedNullState` in blocked mode + #[derive(Debug, Clone, Copy)] + enum AccumulateTest { + Original, + Flat, + Blocked, + } + + impl AccumulateTest { + fn run( + &self, + group_indices: &[usize], + values: &UInt32Array, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) { + match self { + AccumulateTest::Original => { + Fixture::accumulate_test( + group_indices, + values, + opt_filter, + total_num_groups, + NullState::new(), + ); + } + AccumulateTest::Flat => { + Fixture::accumulate_test( + group_indices, + values, + opt_filter, + total_num_groups, + BlockedNullStateInFlatMode::new(), + ); + } + AccumulateTest::Blocked => { + Fixture::accumulate_test( + group_indices, + values, + opt_filter, + total_num_groups, + BlockedNullStateInBlockedMode::new(), + ); + } + } + } + } + + #[test] + fn accumulate_test_original() { + do_accumulate_test(AccumulateTest::Original); + } + + #[test] + fn accumulate_test_flat() { + do_accumulate_test(AccumulateTest::Flat); + } + + #[test] + fn accumulate_test_blocked() { + do_accumulate_test(AccumulateTest::Blocked); + } + + #[test] + fn accumulate_fuzz_test_original() { + do_accumulate_fuzz_test(AccumulateTest::Original); + } + + #[test] + fn accumulate_fuzz_test_flat() { + do_accumulate_fuzz_test(AccumulateTest::Flat); + } + #[test] - fn accumulate() { + fn accumulate_fuzz_test_blocked() { + do_accumulate_fuzz_test(AccumulateTest::Blocked); + } + + fn do_accumulate_test(accumulate_test: AccumulateTest) { let group_indices = (0..100).collect(); let values = (0..100).map(|i| (i + 1) * 10).collect(); let values_with_nulls = (0..100) @@ -515,15 +1076,15 @@ mod test { values, values_with_nulls, filter, + accumulate_test, } .run() } - #[test] - fn accumulate_fuzz() { + fn do_accumulate_fuzz_test(accumulate_test: AccumulateTest) { let mut rng = rand::thread_rng(); for _ in 0..100 { - Fixture::new_random(&mut rng).run(); + Fixture::new_random(&mut rng, accumulate_test).run(); } } @@ -541,10 +1102,13 @@ mod test { /// filter (defaults to None) filter: BooleanArray, + + /// tested null state for value test + accumulate_test: AccumulateTest, } impl Fixture { - fn new_random(rng: &mut ThreadRng) -> Self { + fn new_random(rng: &mut ThreadRng, accumulate_test: AccumulateTest) -> Self { // Number of input values in a batch let num_values: usize = rng.gen_range(1..200); // number of distinct groups @@ -592,6 +1156,7 @@ mod test { values, values_with_nulls, filter, + accumulate_test, } } @@ -616,10 +1181,15 @@ mod test { let filter = &self.filter; // no null, no filters - Self::accumulate_test(group_indices, &values_array, None, total_num_groups); + self.accumulate_test.run( + group_indices, + &values_array, + None, + total_num_groups, + ); // nulls, no filters - Self::accumulate_test( + self.accumulate_test.run( group_indices, &values_with_nulls_array, None, @@ -627,7 +1197,7 @@ mod test { ); // no nulls, filters - Self::accumulate_test( + self.accumulate_test.run( group_indices, &values_array, Some(filter), @@ -635,7 +1205,7 @@ mod test { ); // nulls, filters - Self::accumulate_test( + self.accumulate_test.run( group_indices, &values_with_nulls_array, Some(filter), @@ -646,17 +1216,19 @@ mod test { /// Calls `NullState::accumulate` and `accumulate_indices` to /// ensure it generates the correct values. /// - fn accumulate_test( + fn accumulate_test( group_indices: &[usize], values: &UInt32Array, opt_filter: Option<&BooleanArray>, total_num_groups: usize, + null_state_for_value_test: S, ) { Self::accumulate_values_test( group_indices, values, opt_filter, total_num_groups, + null_state_for_value_test, ); Self::accumulate_indices_test(group_indices, values.nulls(), opt_filter); @@ -665,6 +1237,8 @@ mod test { let avg: usize = values.iter().filter_map(|v| v.map(|v| v as usize)).sum(); let boolean_values: BooleanArray = values.iter().map(|v| v.map(|v| v as usize > avg)).collect(); + + // TODO: test the `BlockedNullState` after supporting `accumulate_boolean` in it Self::accumulate_boolean_test( group_indices, &boolean_values, @@ -675,23 +1249,21 @@ mod test { /// This is effectively a different implementation of /// accumulate that we compare with the above implementation - fn accumulate_values_test( + fn accumulate_values_test( group_indices: &[usize], values: &UInt32Array, opt_filter: Option<&BooleanArray>, total_num_groups: usize, + mut null_state: S, ) { let mut accumulated_values = vec![]; - let mut null_state = NullState::new(); null_state.accumulate( group_indices, values, opt_filter, total_num_groups, - |group_index, value| { - accumulated_values.push((group_index, value)); - }, + &mut accumulated_values, ); // Figure out the expected values @@ -726,13 +1298,13 @@ mod test { assert_eq!(accumulated_values, expected_values, "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}"); - let seen_values = null_state.seen_values.finish_cloned(); + let seen_values = null_state.build_bool_buffer(); mock.validate_seen_values(&seen_values); // Validate the final buffer (one value per group) let expected_null_buffer = mock.expected_null_buffer(total_num_groups); - let null_buffer = null_state.build(EmitTo::All); + let null_buffer = null_state.build_null_buffer(); assert_eq!(null_buffer, expected_null_buffer); } @@ -894,4 +1466,46 @@ mod test { .collect() } } + + #[test] + fn test_ensure_room_for_nulls() { + let mut blocks: Blocks = Blocks::new(); + let block_size = 4; + + // Block size < usize::MAX + // 0 total_num_groups, should be no blocks + ensure_enough_room_for_nulls(&mut blocks, 0, Some(block_size), false); + assert_eq!(blocks.num_blocks(), 0); + let total_len = blocks.iter().map(|blk| blk.len()).sum::(); + assert_eq!(total_len, 0); + + // 0 -> 3 total_num_groups, blocks should look like: + // [d, d, d, empty] + ensure_enough_room_for_nulls(&mut blocks, 3, Some(block_size), false); + assert_eq!(blocks.num_blocks(), 1); + let total_len = blocks.iter().map(|blk| blk.len()).sum::(); + assert_eq!(total_len, 3); + + // 3 -> 8 total_num_groups, blocks should look like: + // [d, d, d, d], [d, d, d, d] + ensure_enough_room_for_nulls(&mut blocks, 8, Some(block_size), false); + assert_eq!(blocks.num_blocks(), 2); + let total_len = blocks.iter().map(|blk| blk.len()).sum::(); + assert_eq!(total_len, 8); + + // 8 -> 13 total_num_groups, blocks should look like: + // [d, d, d, d], [d, d, d, d], [d, d, d, d], [d, empty, empty, empty] + ensure_enough_room_for_nulls(&mut blocks, 13, Some(block_size), false); + assert_eq!(blocks.num_blocks(), 4); + let total_len = blocks.iter().map(|blk| blk.len()).sum::(); + assert_eq!(total_len, 13); + + // Block size none, it means we will always use one single block + // [] -> [d, d, d,...,d] + blocks.clear(); + ensure_enough_room_for_nulls(&mut blocks, 42, None, false); + assert_eq!(blocks.num_blocks(), 1); + let total_len = blocks.iter().map(|blk| blk.len()).sum::(); + assert_eq!(total_len, 42); + } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index 149312e5a9c0..24e9249e8687 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use crate::aggregate::groups_accumulator::nulls::filtered_null_mask; use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; use arrow::buffer::BooleanBuffer; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; use super::accumulate::NullState; @@ -117,6 +117,11 @@ where } first_n } + EmitTo::NextBlock(_) => { + return Err(DataFusionError::NotImplemented( + "blocked group values management is not supported".to_string(), + )) + } }; let nulls = self.null_state.build(emit_to); diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index 8bbcf756c37c..29afe63e3c40 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -25,7 +25,10 @@ use arrow::datatypes::DataType; use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; -use super::accumulate::NullState; +use crate::aggregate::groups_accumulator::accumulate::BlockedNullState; +use crate::aggregate::groups_accumulator::{ + ensure_enough_room_for_values, Blocks, EmitToExt, VecBlocks, +}; /// An accumulator that implements a single operation over /// [`ArrowPrimitiveType`] where the accumulated state is the same as @@ -43,7 +46,7 @@ where F: Fn(&mut T::Native, T::Native) + Send + Sync, { /// values per group, stored as the native type - values: Vec, + values_blocks: VecBlocks, /// The output type (needed for Decimal precision and scale) data_type: DataType, @@ -52,10 +55,12 @@ where starting_value: T::Native, /// Track nulls in the input / filters - null_state: NullState, + null_state: BlockedNullState, /// Function that computes the primitive result prim_fn: F, + + block_size: Option, } impl PrimitiveGroupsAccumulator @@ -65,11 +70,12 @@ where { pub fn new(data_type: &DataType, prim_fn: F) -> Self { Self { - values: vec![], + values_blocks: Blocks::new(), data_type: data_type.clone(), - null_state: NullState::new(), + null_state: BlockedNullState::new(None), starting_value: T::default_value(), prim_fn, + block_size: None, } } @@ -92,20 +98,29 @@ where opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { + if total_num_groups == 0 { + return Ok(()); + } + assert_eq!(values.len(), 1, "single argument to update_batch"); let values = values[0].as_primitive::(); - // update values - self.values.resize(total_num_groups, self.starting_value); - // NullState dispatches / handles tracking nulls and groups that saw no values + ensure_enough_room_for_values( + &mut self.values_blocks, + total_num_groups, + self.block_size, + self.starting_value, + ); + self.null_state.accumulate( group_indices, values, opt_filter, total_num_groups, - |group_index, new_value| { - let value = &mut self.values[group_index]; + |index, new_value| { + let value = + &mut self.values_blocks[index.block_id()][index.block_offset()]; (self.prim_fn)(value, new_value); }, ); @@ -114,10 +129,11 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let values = emit_to.take_needed(&mut self.values); + let values = emit_to.take_needed_from_blocks(&mut self.values_blocks); let nulls = self.null_state.build(emit_to); let values = PrimitiveArray::::new(values.into(), Some(nulls)) // no copy .with_data_type(self.data_type.clone()); + Ok(Arc::new(values)) } @@ -195,6 +211,19 @@ where } fn size(&self) -> usize { - self.values.capacity() * std::mem::size_of::() + self.null_state.size() + self.values_blocks.capacity() * std::mem::size_of::() + + self.null_state.size() + } + + fn supports_blocked_mode(&self) -> bool { + true + } + + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + self.values_blocks.clear(); + self.null_state = BlockedNullState::new(block_size); + self.block_size = block_size; + + Ok(()) } } diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index ddad76a8734b..b6ba14dfcb32 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -36,10 +36,13 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; -use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::BlockedNullState; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::{ filtered_null_mask, set_nulls, }; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ + ensure_enough_room_for_values, Blocks, EmitToExt, VecBlocks, +}; use datafusion_functions_aggregate_common::utils::DecimalAverager; use log::debug; @@ -396,16 +399,18 @@ where return_data_type: DataType, /// Count per group (use u64 to make UInt64Array) - counts: Vec, + counts: VecBlocks, /// Sums per group, stored as the native type - sums: Vec, + sums: VecBlocks, /// Track nulls in the input / filters - null_state: NullState, + null_state: BlockedNullState, /// Function that computes the final average (value / count) avg_fn: F, + + block_size: Option, } impl AvgGroupsAccumulator @@ -422,10 +427,11 @@ where Self { return_data_type: return_data_type.clone(), sum_data_type: sum_data_type.clone(), - counts: vec![], - sums: vec![], - null_state: NullState::new(), + counts: Blocks::new(), + sums: Blocks::new(), + null_state: BlockedNullState::new(None), avg_fn, + block_size: None, } } } @@ -442,22 +448,40 @@ where opt_filter: Option<&array::BooleanArray>, total_num_groups: usize, ) -> Result<()> { + if total_num_groups == 0 { + return Ok(()); + } + assert_eq!(values.len(), 1, "single argument to update_batch"); let values = values[0].as_primitive::(); // increment counts, update sums - self.counts.resize(total_num_groups, 0); - self.sums.resize(total_num_groups, T::default_value()); + ensure_enough_room_for_values( + &mut self.counts, + total_num_groups, + self.block_size, + 0, + ); + ensure_enough_room_for_values( + &mut self.sums, + total_num_groups, + self.block_size, + T::default_value(), + ); + self.null_state.accumulate( group_indices, values, opt_filter, total_num_groups, - |group_index, new_value| { - let sum = &mut self.sums[group_index]; - *sum = sum.add_wrapping(new_value); + |blocked_index, new_value| { + let sum = &mut self.sums[blocked_index.block_id()] + [blocked_index.block_offset()]; + let count = &mut self.counts[blocked_index.block_id()] + [blocked_index.block_offset()]; - self.counts[group_index] += 1; + *sum = sum.add_wrapping(new_value); + *count += 1; }, ); @@ -465,8 +489,8 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let counts = emit_to.take_needed(&mut self.counts); - let sums = emit_to.take_needed(&mut self.sums); + let counts = emit_to.take_needed_from_blocks(&mut self.counts); + let sums = emit_to.take_needed_from_blocks(&mut self.sums); let nulls = self.null_state.build(emit_to); assert_eq!(nulls.len(), sums.len()); @@ -505,10 +529,10 @@ where let nulls = self.null_state.build(emit_to); let nulls = Some(nulls); - let counts = emit_to.take_needed(&mut self.counts); + let counts = emit_to.take_needed_from_blocks(&mut self.counts); let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy - let sums = emit_to.take_needed(&mut self.sums); + let sums = emit_to.take_needed_from_blocks(&mut self.sums); let sums = PrimitiveArray::::new(sums.into(), nulls) // zero copy .with_data_type(self.sum_data_type.clone()); @@ -525,31 +549,50 @@ where opt_filter: Option<&array::BooleanArray>, total_num_groups: usize, ) -> Result<()> { + if total_num_groups == 0 { + return Ok(()); + } + assert_eq!(values.len(), 2, "two arguments to merge_batch"); + // first batch is counts, second is partial sums let partial_counts = values[0].as_primitive::(); let partial_sums = values[1].as_primitive::(); - // update counts with partial counts - self.counts.resize(total_num_groups, 0); + + // update counts with partial counts + update sums + ensure_enough_room_for_values( + &mut self.counts, + total_num_groups, + self.block_size, + 0, + ); + ensure_enough_room_for_values( + &mut self.sums, + total_num_groups, + self.block_size, + T::default_value(), + ); + self.null_state.accumulate( group_indices, partial_counts, opt_filter, total_num_groups, - |group_index, partial_count| { - self.counts[group_index] += partial_count; + |blocked_index, partial_count| { + let count = &mut self.counts[blocked_index.block_id()] + [blocked_index.block_offset()]; + *count += partial_count; }, ); - // update sums - self.sums.resize(total_num_groups, T::default_value()); self.null_state.accumulate( group_indices, partial_sums, opt_filter, total_num_groups, - |group_index, new_value: ::Native| { - let sum = &mut self.sums[group_index]; + |blocked_index, new_value: ::Native| { + let sum = &mut self.sums[blocked_index.block_id()] + [blocked_index.block_offset()]; *sum = sum.add_wrapping(new_value); }, ); @@ -585,4 +628,17 @@ where self.counts.capacity() * std::mem::size_of::() + self.sums.capacity() * std::mem::size_of::() } + + fn supports_blocked_mode(&self) -> bool { + true + } + + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + self.counts.clear(); + self.sums.clear(); + self.null_state = BlockedNullState::new(block_size); + self.block_size = block_size; + + Ok(()) + } } diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 417e28e72a71..24a8c09671a5 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -17,6 +17,9 @@ use ahash::RandomState; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ + ensure_enough_room_for_values, BlockedGroupIndexBuilder, Blocks, EmitToExt, VecBlocks, +}; use std::collections::HashSet; use std::ops::BitAnd; use std::{fmt::Debug, sync::Arc}; @@ -358,12 +361,17 @@ struct CountGroupsAccumulator { /// output type of count is `DataType::Int64`. Thus by using `i64` /// for the counts, the output [`Int64Array`] can be created /// without copy. - counts: Vec, + counts: VecBlocks, + + block_size: Option, } impl CountGroupsAccumulator { pub fn new() -> Self { - Self { counts: vec![] } + Self { + counts: Blocks::new(), + block_size: None, + } } } @@ -375,18 +383,33 @@ impl GroupsAccumulator for CountGroupsAccumulator { opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { + if total_num_groups == 0 { + return Ok(()); + } + assert_eq!(values.len(), 1, "single argument to update_batch"); let values = &values[0]; // Add one to each group's counter for each non null, non // filtered value - self.counts.resize(total_num_groups, 0); + ensure_enough_room_for_values( + &mut self.counts, + total_num_groups, + self.block_size, + 0, + ); + + let group_index_builder = + BlockedGroupIndexBuilder::new(self.block_size.is_some()); accumulate_indices( group_indices, values.logical_nulls().as_ref(), opt_filter, |group_index| { - self.counts[group_index] += 1; + let blocked_index = group_index_builder.build(group_index); + let count = &mut self.counts[blocked_index.block_id()] + [blocked_index.block_offset()]; + *count += 1; }, ); @@ -400,6 +423,10 @@ impl GroupsAccumulator for CountGroupsAccumulator { opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { + if total_num_groups == 0 { + return Ok(()); + } + assert_eq!(values.len(), 1, "one argument to merge_batch"); // first batch is counts, second is partial sums let partial_counts = values[0].as_primitive::(); @@ -409,7 +436,16 @@ impl GroupsAccumulator for CountGroupsAccumulator { let partial_counts = partial_counts.values(); // Adds the counts with the partial counts - self.counts.resize(total_num_groups, 0); + ensure_enough_room_for_values( + &mut self.counts, + total_num_groups, + self.block_size, + 0, + ); + + let group_index_builder = + BlockedGroupIndexBuilder::new(self.block_size.is_some()); + match opt_filter { Some(filter) => filter .iter() @@ -417,12 +453,18 @@ impl GroupsAccumulator for CountGroupsAccumulator { .zip(partial_counts.iter()) .for_each(|((filter_value, &group_index), partial_count)| { if let Some(true) = filter_value { - self.counts[group_index] += partial_count; + let blocked_index = group_index_builder.build(group_index); + let count = &mut self.counts[blocked_index.block_id()] + [blocked_index.block_offset()]; + *count += partial_count; } }), None => group_indices.iter().zip(partial_counts.iter()).for_each( |(&group_index, partial_count)| { - self.counts[group_index] += partial_count; + let blocked_index = group_index_builder.build(group_index); + let count = &mut self.counts[blocked_index.block_id()] + [blocked_index.block_offset()]; + *count += partial_count; }, ), } @@ -431,8 +473,7 @@ impl GroupsAccumulator for CountGroupsAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let counts = emit_to.take_needed(&mut self.counts); - + let counts = emit_to.take_needed_from_blocks(&mut self.counts); // Count is always non null (null inputs just don't contribute to the overall values) let nulls = None; let array = PrimitiveArray::::new(counts.into(), nulls); @@ -442,8 +483,9 @@ impl GroupsAccumulator for CountGroupsAccumulator { // return arrays for counts fn state(&mut self, emit_to: EmitTo) -> Result> { - let counts = emit_to.take_needed(&mut self.counts); + let counts = emit_to.take_needed_from_blocks(&mut self.counts); let counts: PrimitiveArray = Int64Array::from(counts); // zero copy, no nulls + Ok(vec![Arc::new(counts) as ArrayRef]) } @@ -515,6 +557,17 @@ impl GroupsAccumulator for CountGroupsAccumulator { fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() } + + fn supports_blocked_mode(&self) -> bool { + true + } + + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + self.counts.clear(); + self.block_size = block_size; + + Ok(()) + } } /// count null values for multiple columns diff --git a/datafusion/functions-aggregate/src/variance.rs b/datafusion/functions-aggregate/src/variance.rs index 367a8669ab7d..f6f5e495420f 100644 --- a/datafusion/functions-aggregate/src/variance.rs +++ b/datafusion/functions-aggregate/src/variance.rs @@ -36,7 +36,8 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, GroupsAccumulator, Signature, Volatility, }; use datafusion_functions_aggregate_common::{ - aggregate::groups_accumulator::accumulate::accumulate, stats::StatsType, + aggregate::groups_accumulator::{accumulate::accumulate, EmitToExt}, + stats::StatsType, }; make_udaf_expr_and_func!( diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/bytes.rs index f789af8b8a02..2a5d4da8bbc5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/bytes.rs @@ -17,6 +17,7 @@ use crate::aggregates::group_values::GroupValues; use arrow_array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch}; +use datafusion_common::DataFusionError; use datafusion_expr::EmitTo; use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType}; @@ -115,6 +116,11 @@ impl GroupValues for GroupValuesByes { emit_group_values } + EmitTo::NextBlock(_) => { + return Err(DataFusionError::NotImplemented( + "this group values doesn't support blocked mode yet".to_string(), + )) + } }; Ok(vec![group_values]) diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs index 1a0cb90a16d4..78f558112f83 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs @@ -17,6 +17,7 @@ use crate::aggregates::group_values::GroupValues; use arrow_array::{Array, ArrayRef, RecordBatch}; +use datafusion_common::DataFusionError; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap; @@ -116,6 +117,11 @@ impl GroupValues for GroupValuesBytesView { emit_group_values } + EmitTo::NextBlock(_) => { + return Err(DataFusionError::NotImplemented( + "this group values doesn't support blocked mode yet".to_string(), + )) + } }; Ok(vec![group_values]) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index be7ac934d7bc..a7f86f3ba23f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -19,7 +19,7 @@ use arrow::record_batch::RecordBatch; use arrow_array::{downcast_primitive, ArrayRef}; use arrow_schema::{DataType, SchemaRef}; use bytes_view::GroupValuesBytesView; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; pub(crate) mod primitive; use datafusion_expr::EmitTo; @@ -52,6 +52,32 @@ pub trait GroupValues: Send { /// Clear the contents and shrink the capacity to the size of the batch (free up memory usage) fn clear_shrink(&mut self, batch: &RecordBatch); + + /// Returns `true` if this group values supports blocked mode. + fn supports_blocked_mode(&self) -> bool { + false + } + + /// Alter the block size in the accumulator + /// + /// If the target block size is `None`, it will use a single big + /// block(can think it a `Vec`) to manage the state. + /// + /// If the target block size` is `Some(blk_size)`, it will try to + /// set the block size to `blk_size`, and the try will only success + /// when the accumulator has supported blocked mode. + /// + /// NOTICE: After altering block size, all data in previous will be cleared. + /// + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + if block_size.is_some() { + return Err(DataFusionError::NotImplemented( + "this group values doesn't support blocked mode yet".to_string(), + )); + } + + Ok(()) + } } pub fn new_group_values(schema: SchemaRef) -> Result> { diff --git a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs index d5b7f1b11ac5..5f35faed3640 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs @@ -25,7 +25,7 @@ use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray}; use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; use half::f16; @@ -206,6 +206,11 @@ where std::mem::swap(&mut self.values, &mut split); build_primitive(split, null_group) } + EmitTo::NextBlock(_) => { + return Err(DataFusionError::NotImplemented( + "this group values doesn't support blocked mode yet".to_string(), + )) + } }; Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) } diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index dc948e28bb2d..2ff433ac7a87 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::mem; + use crate::aggregates::group_values::GroupValues; use ahash::RandomState; use arrow::compute::cast; @@ -26,6 +28,9 @@ use datafusion_common::hash_utils::create_hashes; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ + BlockedGroupIndex, BlockedGroupIndexBuilder, Blocks, +}; use hashbrown::raw::RawTable; /// A [`GroupValues`] making use of [`Rows`] @@ -57,7 +62,7 @@ pub struct GroupValuesRows { /// important for multi-column group keys. /// /// [`Row`]: arrow::row::Row - group_values: Option, + group_values: Blocks, /// reused buffer to store hashes hashes_buffer: Vec, @@ -67,6 +72,9 @@ pub struct GroupValuesRows { /// Random state for creating hashes random_state: RandomState, + + /// Mode about current GroupValuesRows + block_size: Option, } impl GroupValuesRows { @@ -85,15 +93,17 @@ impl GroupValuesRows { let starting_data_capacity = 64 * starting_rows_capacity; let rows_buffer = row_converter.empty_rows(starting_rows_capacity, starting_data_capacity); + Ok(Self { schema, row_converter, map, map_size: 0, - group_values: None, + group_values: Blocks::new(), hashes_buffer: Default::default(), rows_buffer, random_state: Default::default(), + block_size: None, }) } } @@ -106,10 +116,15 @@ impl GroupValues for GroupValuesRows { self.row_converter.append(group_rows, cols)?; let n_rows = group_rows.num_rows(); - let mut group_values = match self.group_values.take() { - Some(group_values) => group_values, - None => self.row_converter.empty_rows(0, 0), - }; + let mut group_values = mem::take(&mut self.group_values); + if group_values.num_blocks() == 0 { + let block = match self.block_size { + Some(blk_size) => self.row_converter.empty_rows(blk_size, 0), + None => self.row_converter.empty_rows(0, 0), + }; + + group_values.push_block(block); + } // tracks to which group each of the input rows belongs groups.clear(); @@ -120,17 +135,25 @@ impl GroupValues for GroupValuesRows { batch_hashes.resize(n_rows, 0); create_hashes(cols, &self.random_state, batch_hashes)?; + let group_index_builder = + BlockedGroupIndexBuilder::new(self.block_size.is_some()); for (row, &target_hash) in batch_hashes.iter().enumerate() { let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| { // Somewhat surprisingly, this closure can be called even if the // hash doesn't match, so check the hash first with an integer // comparison first avoid the more expensive comparison with // group value. https://github.com/apache/datafusion/pull/11718 - target_hash == *exist_hash - // verify that the group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - && group_rows.row(row) == group_values.row(*group_idx) + if target_hash != *exist_hash { + return false; + } + + // verify that the group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let blocked_index = group_index_builder.build(*group_idx); + group_rows.row(row) + == group_values[blocked_index.block_id()] + .row(blocked_index.block_offset()) }); let group_idx = match entry { @@ -139,8 +162,28 @@ impl GroupValues for GroupValuesRows { // 1.2 Need to create new entry for the group None => { // Add new entry to aggr_state and save newly created index - let group_idx = group_values.num_rows(); - group_values.push(group_rows.row(row)); + if let Some(blk_size) = self.block_size { + if group_values.current().unwrap().num_rows() == blk_size { + // Use blk_size as offset cap, + // and old block's buffer size as buffer cap + let new_buf_cap = + rows_buffer_size(group_values.current().unwrap()); + let new_blk = + self.row_converter.empty_rows(blk_size, new_buf_cap); + group_values.push_block(new_blk); + } + } + + let blk_id = group_values.num_blocks() - 1; + let cur_blk = group_values.current_mut().unwrap(); + let blk_offset = cur_blk.num_rows(); + cur_blk.push(group_rows.row(row)); + + let blocked_index = BlockedGroupIndex::new_from_parts( + blk_id as u32, + blk_offset as u64, + ); + let group_idx = blocked_index.as_packed_index(); // for hasher function, use precomputed hash value self.map.insert_accounted( @@ -148,19 +191,26 @@ impl GroupValues for GroupValuesRows { |(hash, _group_index)| *hash, &mut self.map_size, ); + group_idx } }; groups.push(group_idx); } - self.group_values = Some(group_values); + self.group_values = group_values; Ok(()) } fn size(&self) -> usize { - let group_values_size = self.group_values.as_ref().map(|v| v.size()).unwrap_or(0); + // TODO: support size stats in `Blocks`, + // it is too expansive to calculate it again and again. + let group_values_size = self + .group_values + .iter() + .map(|rows| rows.size()) + .sum::(); self.row_converter.size() + group_values_size + self.map_size @@ -173,34 +223,45 @@ impl GroupValues for GroupValuesRows { } fn len(&self) -> usize { - self.group_values - .as_ref() - .map(|group_values| group_values.num_rows()) - .unwrap_or(0) + let num_blocks = self.group_values.num_blocks(); + if num_blocks == 0 { + return 0; + } + + let mut group_len = if let Some(blk_size) = self.block_size { + (num_blocks - 1) * blk_size + } else { + 0 + }; + + group_len += self.group_values.current().unwrap().num_rows(); + + group_len } fn emit(&mut self, emit_to: EmitTo) -> Result> { - let mut group_values = self - .group_values - .take() - .expect("Can not emit from empty rows"); + let mut group_values = mem::take(&mut self.group_values); let mut output = match emit_to { EmitTo::All => { - let output = self.row_converter.convert_rows(&group_values)?; - group_values.clear(); - output + debug_assert!(self.block_size.is_none()); + + let blk = group_values.pop_first_block().unwrap(); + self.row_converter.convert_rows(blk.into_iter())? } EmitTo::First(n) => { - let groups_rows = group_values.iter().take(n); + debug_assert!(self.block_size.is_none()); + + let blk = group_values.current_mut().unwrap(); + let groups_rows = blk.iter().take(n); let output = self.row_converter.convert_rows(groups_rows)?; // Clear out first n group keys by copying them to a new Rows. // TODO file some ticket in arrow-rs to make this more efficient? let mut new_group_values = self.row_converter.empty_rows(0, 0); - for row in group_values.iter().skip(n) { + for row in blk.iter().skip(n) { new_group_values.push(row); } - std::mem::swap(&mut new_group_values, &mut group_values); + std::mem::swap(&mut new_group_values, blk); // SAFETY: self.map outlives iterator and is not modified concurrently unsafe { @@ -216,6 +277,40 @@ impl GroupValues for GroupValuesRows { } output } + EmitTo::NextBlock(true) => { + debug_assert!(self.block_size.is_some()); + + let cur_blk = group_values.pop_first_block().unwrap(); + let output = self.row_converter.convert_rows(cur_blk.iter())?; + + unsafe { + let group_index_builder = + BlockedGroupIndexBuilder::new(self.block_size.is_some()); + for bucket in self.map.iter() { + // Decrement group index by n + let group_idx = bucket.as_ref().1; + let old_blk_idx = group_index_builder.build(group_idx); + match old_blk_idx.block_id().checked_sub(1) { + // Group index was >= n, shift value down + Some(new_blk_id) => { + let new_group_idx = BlockedGroupIndex::new_from_parts( + new_blk_id as u32, + old_blk_idx.block_offset, + ); + bucket.as_mut().1 = new_group_idx.as_packed_index(); + } + // Group index was < n, so remove from table + None => self.map.erase(bucket), + } + } + } + + output + } + EmitTo::NextBlock(false) => { + let cur_blk = group_values.pop_first_block().unwrap(); + self.row_converter.convert_rows(cur_blk.iter())? + } }; // TODO: Materialize dictionaries in group keys (#7647) @@ -232,20 +327,37 @@ impl GroupValues for GroupValuesRows { } } - self.group_values = Some(group_values); + self.group_values = group_values; + Ok(output) } fn clear_shrink(&mut self, batch: &RecordBatch) { let count = batch.num_rows(); - self.group_values = self.group_values.take().map(|mut rows| { - rows.clear(); - rows - }); + self.group_values.clear(); self.map.clear(); self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared self.map_size = self.map.capacity() * std::mem::size_of::<(u64, usize)>(); self.hashes_buffer.clear(); self.hashes_buffer.shrink_to(count); } + + fn supports_blocked_mode(&self) -> bool { + true + } + + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + self.map.clear(); + self.group_values.clear(); + self.block_size = block_size; + + Ok(()) + } +} + +#[inline] +fn rows_buffer_size(rows: &Rows) -> usize { + let total_size = rows.size(); + let offset_size = (rows.num_rows() + 1) * mem::size_of::(); + total_size - offset_size - mem::size_of::() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c3bc7b042e65..8ad29bbbe862 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1207,8 +1207,11 @@ mod tests { ScalarValue, }; use datafusion_execution::config::SessionConfig; + use datafusion_execution::disk_manager::DiskManagerConfig; use datafusion_execution::memory_pool::FairSpillPool; - use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_execution::runtime_env::{ + RuntimeConfig, RuntimeEnv, RuntimeEnvBuilder, + }; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; @@ -2487,6 +2490,131 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_blocked_approach() -> Result<()> { + // Define data + let schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::UInt32, false), + Field::new("a2", DataType::UInt32, false), + Field::new("b", DataType::Float64, false), + ])); + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(UInt32Array::from(vec![2, 3, 3, 4])), + Arc::new(UInt32Array::from(vec![4, 4, 3, 2])), + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(UInt32Array::from(vec![2, 3, 3, 4])), + Arc::new(UInt32Array::from(vec![4, 4, 3, 2])), + Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(UInt32Array::from(vec![2, 3, 3, 4])), + Arc::new(UInt32Array::from(vec![2, 3, 3, 4])), + Arc::new(Float64Array::from(vec![5.0, 4.0, 3.0, 2.0])), + ], + ) + .unwrap(), + ]; + + // Define plan + let group_by = PhysicalGroupBy::new_single(vec![ + (col("a1", &schema)?, "a1".to_string()), + (col("a2", &schema)?, "a2".to_string()), + ]); + + let aggr_expr = + vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias(String::from("AVG(b)")) + .build()?, + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + + let partial_aggr_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr.clone(), + vec![None], + Arc::clone(&input) as Arc, + Arc::clone(&schema), + )?); + + let merge = Arc::new(CoalescePartitionsExec::new(partial_aggr_exec)); + + let final_group_by = PhysicalGroupBy::new_single(vec![ + (col("a1", &schema)?, "a1".to_string()), + (col("a2", &schema)?, "a2".to_string()), + ]); + + let merged_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Final, + final_group_by, + aggr_expr, + vec![None], + merge, + schema, + )?); + + // Define task context + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.enable_aggregation_intermediate_states_blocked_approach", + &ScalarValue::Boolean(Some(true)), + ); + session_config = session_config.set( + "datafusion.execution.batch_size", + &ScalarValue::UInt64(Some(1)), + ); + + let runtime = Arc::new( + RuntimeEnv::new( + RuntimeConfig::default().with_disk_manager(DiskManagerConfig::Disabled), + ) + .unwrap(), + ); + let ctx = TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime); + + // Run and check + let output = collect(merged_aggregate.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+----+----+--------+", + "| a1 | a2 | AVG(b) |", + "+----+----+--------+", + "| 2 | 4 | 2.5 |", + "| 3 | 4 | 2.5 |", + "| 3 | 3 | 3.0 |", + "| 4 | 2 | 2.5 |", + "| 2 | 2 | 5.0 |", + "| 4 | 4 | 2.0 |", + "+----+----+--------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } + #[test] fn group_exprs_nullable() -> Result<()> { let input_schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 0332131d4b57..b1de97922111 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -43,7 +43,7 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_execution::TaskContext; +use datafusion_execution::{DiskManager, TaskContext}; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; @@ -60,6 +60,16 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), + /// Producing output block by block. + /// + /// It is the blocked version `ProducingOutput` and will be used + /// when blocked optimization is enabled. + /// + /// An optional `blocks num` exists in it: + /// - `Some(n)` represents `n` blocks should be produced. + /// - `None` represents we should produce blocks until exhausted. + /// + ProducingBlocks(Option), /// Produce intermediate aggregate state for each input row without /// aggregation. /// @@ -263,8 +273,6 @@ impl SkipAggregationProbe { /// The accumulator state is not managed by this operator (e.g in the /// hash table). /// -/// [`group_values`]: Self::group_values -/// /// # Partial Aggregate and multi-phase grouping /// /// As described on [`Accumulator::state`], this operator is used in the context @@ -333,6 +341,31 @@ impl SkipAggregationProbe { /// │ 2 │ 2 │ 3.0 │ │ 2 │ 2 │ 3.0 │ └────────────┘ /// └─────────────────┘ └─────────────────┘ /// ``` +/// +/// # Blocked approach for intermediate values +/// +/// An important optimization for [`group_values`] and [`accumulators`] +/// is to manage values using the blocked approach. +/// +/// In the original method, values are managed within a single large block +/// (can think of it as a Vec). As this block grows, it often triggers numerous +/// copies, resulting in poor performance. +/// +/// In contrast, the blocked approach allocates capacity for the block +/// based on a predefined block size firstly. +/// And when the block reaches its limit, we allocate a new block +/// (also with the same predefined block size based capacity) +/// instead of expanding the current one and copying the data. +/// This method eliminates unnecessary copies and significantly improves performance. +/// For a nice introduction to the blocked approach, maybe you can see [#7065]. +/// +/// The conditions that trigger the blocked mode can be found in +/// [`maybe_enable_blocked_group_states`]. +/// +/// [`group_values`]: Self::group_values +/// [`accumulators`]: Self::accumulators +/// [#7065]: https://github.com/apache/datafusion/issues/7065 +/// pub(crate) struct GroupedHashAggregateStream { // ======================================================================== // PROPERTIES: @@ -417,6 +450,9 @@ pub(crate) struct GroupedHashAggregateStream { /// current stream. skip_aggregation_probe: Option, + /// Have we enabled the blocked optimization for group values and accumulators. + is_blocked_approach_on: bool, + // ======================================================================== // EXECUTION RESOURCES: // Fields related to managing execution resources and monitoring performance. @@ -475,7 +511,7 @@ impl GroupedHashAggregateStream { }; // Instantiate the accumulators - let accumulators: Vec<_> = aggregate_exprs + let mut accumulators: Vec<_> = aggregate_exprs .iter() .map(create_group_accumulator) .collect::>()?; @@ -505,7 +541,7 @@ impl GroupedHashAggregateStream { ordering.as_slice(), )?; - let group_values = new_group_values(group_schema)?; + let mut group_values = new_group_values(group_schema)?; timer.done(); let exec_state = ExecutionState::ReadingInput; @@ -549,6 +585,15 @@ impl GroupedHashAggregateStream { None }; + // Check if we can enable the blocked optimization for `GroupValues` and `GroupsAccumulator`s. + let enable_blocked_group_states = maybe_enable_blocked_group_states( + &context, + group_values.as_mut(), + &mut accumulators, + batch_size, + &group_ordering, + )?; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -569,10 +614,63 @@ impl GroupedHashAggregateStream { spill_state, group_values_soft_limit: agg.limit, skip_aggregation_probe, + is_blocked_approach_on: enable_blocked_group_states, }) } } +/// Check if we can enable the blocked optimization for `GroupValues` and `GroupsAccumulator`s. +/// The blocked optimization will be enabled when: +/// - When `enable_aggregation_intermediate_states_blocked_approach` is true +/// - It is not streaming aggregation(because blocked mode can't support Emit::first(exact n)) +/// - The spilling is disabled(still need to consider more to support it efficiently) +/// - The accumulator is not empty(I am still not sure about logic in this case) +/// - [`GroupValues::supports_blocked_mode`] and all [`GroupsAccumulator::supports_blocked_mode`] are true +/// +/// [`GroupValues::supports_blocked_mode`]: crate::aggregates::group_values::GroupValues::supports_blocked_mode +/// [`GroupsAccumulator::supports_blocked_mode`]: datafusion_expr::GroupsAccumulator::supports_blocked_mode +/// +// TODO: support blocked optimization in streaming, spilling, and maybe empty accumulators case? +fn maybe_enable_blocked_group_states( + context: &TaskContext, + group_values: &mut dyn GroupValues, + accumulators: &mut [Box], + block_size: usize, + group_ordering: &GroupOrdering, +) -> Result { + if !context + .session_config() + .options() + .execution + .enable_aggregation_intermediate_states_blocked_approach + || !matches!(group_ordering, GroupOrdering::None) + || accumulators.is_empty() + || enable_spilling(context.runtime_env().disk_manager.as_ref()) + { + return Ok(false); + } + + let group_supports_blocked = group_values.supports_blocked_mode(); + let accumulators_support_blocked = + accumulators.iter().all(|acc| acc.supports_blocked_mode()); + + match (group_supports_blocked, accumulators_support_blocked) { + (true, true) => { + group_values.alter_block_size(Some(block_size))?; + accumulators + .iter_mut() + .try_for_each(|acc| acc.alter_block_size(Some(block_size)))?; + Ok(true) + } + _ => Ok(false), + } +} + +#[inline] +fn enable_spilling(disk_manager: &DiskManager) -> bool { + disk_manager.tmp_files_enabled() +} + /// Create an accumulator for `agg_expr` -- a [`GroupsAccumulator`] if /// that is supported by the aggregate, or a /// [`GroupsAccumulatorAdapter`] if not. @@ -718,6 +816,48 @@ impl Stream for GroupedHashAggregateStream { ))); } + ExecutionState::ProducingBlocks(blocks) => { + let emit_to = if let Some(blk) = blocks { + if *blk > 0 { + self.exec_state = + ExecutionState::ProducingBlocks(Some(*blk - 1)); + + EmitTo::NextBlock(true) + } else { + self.exec_state = if self.input_done { + ExecutionState::Done + } else if self.should_skip_aggregation() { + ExecutionState::SkippingAggregation + } else { + ExecutionState::ReadingInput + }; + continue; + } + } else { + EmitTo::NextBlock(false) + }; + + let emit_result = self.emit(emit_to, false); + if emit_result.is_err() { + return Poll::Ready(Some(emit_result)); + } + + let emit_batch = emit_result.unwrap(); + if emit_batch.num_rows() == 0 { + self.exec_state = if self.input_done { + ExecutionState::Done + } else if self.should_skip_aggregation() { + ExecutionState::SkippingAggregation + } else { + ExecutionState::ReadingInput + }; + } + + return Poll::Ready(Some(Ok( + emit_batch.record_output(&self.baseline_metrics) + ))); + } + ExecutionState::Done => { // release the memory reservation since sending back output batch itself needs // some memory reservation, so make some room for it. @@ -937,9 +1077,14 @@ impl GroupedHashAggregateStream { && matches!(self.mode, AggregateMode::Partial) && self.update_memory_reservation().is_err() { - let n = self.group_values.len() / self.batch_size * self.batch_size; - let batch = self.emit(EmitTo::First(n), false)?; - self.exec_state = ExecutionState::ProducingOutput(batch); + if !self.is_blocked_approach_on { + let n = self.group_values.len() / self.batch_size * self.batch_size; + let batch = self.emit(EmitTo::First(n), false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); + } else { + let blocks = self.group_values.len() / self.batch_size; + self.exec_state = ExecutionState::ProducingBlocks(Some(blocks)); + } } Ok(()) } @@ -976,6 +1121,18 @@ impl GroupedHashAggregateStream { None, self.reservation.new_empty(), )?; + + // We should disable the blocked optimization for `GroupValues` and `GroupAccumulator`s here, + // because the blocked mode can't support `Emit::First(exact n)` which is needed in + // streaming aggregation. + if self.is_blocked_approach_on { + self.group_values.alter_block_size(None)?; + self.accumulators + .iter_mut() + .try_for_each(|acc| acc.alter_block_size(None))?; + self.is_blocked_approach_on = false; + } + self.input_done = false; self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); Ok(()) @@ -997,8 +1154,12 @@ impl GroupedHashAggregateStream { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); self.exec_state = if self.spill_state.spills.is_empty() { - let batch = self.emit(EmitTo::All, false)?; - ExecutionState::ProducingOutput(batch) + if !self.is_blocked_approach_on { + let batch = self.emit(EmitTo::All, false)?; + ExecutionState::ProducingOutput(batch) + } else { + ExecutionState::ProducingBlocks(None) + } } else { // If spill files exist, stream-merge them. self.update_merged_stream()?; @@ -1030,8 +1191,12 @@ impl GroupedHashAggregateStream { fn switch_to_skip_aggregation(&mut self) -> Result<()> { if let Some(probe) = self.skip_aggregation_probe.as_mut() { if probe.should_skip() { - let batch = self.emit(EmitTo::All, false)?; - self.exec_state = ExecutionState::ProducingOutput(batch); + if !self.is_blocked_approach_on { + let batch = self.emit(EmitTo::All, false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); + } else { + self.exec_state = ExecutionState::ProducingBlocks(None); + } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 7acdf25b6596..4ff3ae902f23 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -172,6 +172,7 @@ datafusion.catalog.newlines_in_values false datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.enable_aggregation_intermediate_states_blocked_approach false datafusion.execution.enable_recursive_ctes true datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_ignore_subdirectory true @@ -262,6 +263,7 @@ datafusion.catalog.newlines_in_values false Specifies whether newlines in (quote datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files +datafusion.execution.enable_aggregation_intermediate_states_blocked_approach false Should DataFusion use the the blocked approach to manage the groups values and their related states in accumulators. By default, the single approach will be used, values are managed within a single large block (can think of it as a Vec). As this block grows, it often triggers numerous copies, resulting in poor performance. If setting this flag to `true`, the blocked approach will be used. And the blocked approach allocates capacity for the block based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) We plan to make this the default in the future when tests are enough. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f34d148f092f..feb15b2af7d5 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,90 +35,91 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | default | description | -| ----------------------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | -| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | -| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | -| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | -| datafusion.catalog.has_header | true | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | -| datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | -| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | -| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | -| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | -| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | -| datafusion.execution.parquet.enable_page_index | true | (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | -| datafusion.execution.parquet.pruning | true | (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | -| datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | -| datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | -| datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.parquet.schema_force_view_types | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | -| datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | -| datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | -| datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | -| datafusion.execution.parquet.compression | zstd(3) | (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. | -| datafusion.execution.parquet.dictionary_enabled | true | (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | (writing) Sets best effort maximum dictionary page size, in bytes | -| datafusion.execution.parquet.statistics_enabled | page | (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.max_statistics_size | 4096 | (writing) Sets max statistics size for any column. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. | -| datafusion.execution.parquet.created_by | datafusion version 42.0.0 | (writing) Sets "created by" property | -| datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length | -| datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page | -| datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.bloom_filter_on_read | true | (writing) Use any available bloom filters when reading parquet files | -| datafusion.execution.parquet.bloom_filter_on_write | false | (writing) Write bloom filters for all columns when creating parquet files | -| datafusion.execution.parquet.bloom_filter_fpp | NULL | (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.bloom_filter_ndv | NULL | (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | -| datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | -| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | -| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | -| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | -| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | -| datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | -| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | -| datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | -| datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | -| datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | -| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | -| datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | -| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | -| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | -| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | -| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | -| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | -| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | -| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. | -| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | -| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | -| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | -| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | -| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | -| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | -| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | -| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | -| datafusion.explain.show_sizes | true | When set to true, the explain statement will print the partition sizes | -| datafusion.explain.show_schema | false | When set to true, the explain statement will print schema information | -| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | -| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | -| datafusion.sql_parser.enable_options_value_normalization | true | When set to true, SQL parser will normalize options value (convert value to lowercase) | -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | -| datafusion.sql_parser.support_varchar_with_length | true | If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. | +| key | default | description | +| ---------------------------------------------------------------------------- | ------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | +| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | +| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | +| datafusion.catalog.has_header | true | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | +| datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | +| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | +| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | +| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | +| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | +| datafusion.execution.parquet.enable_page_index | true | (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | +| datafusion.execution.parquet.pruning | true | (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | +| datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | +| datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | +| datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | +| datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.schema_force_view_types | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | +| datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | +| datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | +| datafusion.execution.parquet.compression | zstd(3) | (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. | +| datafusion.execution.parquet.dictionary_enabled | true | (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | (writing) Sets best effort maximum dictionary page size, in bytes | +| datafusion.execution.parquet.statistics_enabled | page | (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.max_statistics_size | 4096 | (writing) Sets max statistics size for any column. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. | +| datafusion.execution.parquet.created_by | datafusion version 42.0.0 | (writing) Sets "created by" property | +| datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length | +| datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page | +| datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.bloom_filter_on_read | true | (writing) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.bloom_filter_on_write | false | (writing) Write bloom filters for all columns when creating parquet files | +| datafusion.execution.parquet.bloom_filter_fpp | NULL | (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.bloom_filter_ndv | NULL | (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | +| datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | +| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | +| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | +| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | +| datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | +| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | +| datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | +| datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | +| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | +| datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | +| datafusion.execution.enable_aggregation_intermediate_states_blocked_approach | false | Should DataFusion use the the blocked approach to manage the groups values and their related states in accumulators. By default, the single approach will be used, values are managed within a single large block (can think of it as a Vec). As this block grows, it often triggers numerous copies, resulting in poor performance. If setting this flag to `true`, the blocked approach will be used. And the blocked approach allocates capacity for the block based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) We plan to make this the default in the future when tests are enough. | +| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | +| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | +| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | +| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | +| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. | +| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | +| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | +| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | +| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | +| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | +| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | +| datafusion.explain.show_sizes | true | When set to true, the explain statement will print the partition sizes | +| datafusion.explain.show_schema | false | When set to true, the explain statement will print schema information | +| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | +| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | +| datafusion.sql_parser.enable_options_value_normalization | true | When set to true, SQL parser will normalize options value (convert value to lowercase) | +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | +| datafusion.sql_parser.support_varchar_with_length | true | If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. |