diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9f8aa1cbdcaa..b5204b343f05 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -324,6 +324,15 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false + + /// Aggregation ratio (number of distinct groups / number of input rows) + /// threshold for skipping partial aggregation. If the value is greater + /// then partial aggregation will skip aggregation for further input + pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8 + + /// Number of input rows partial aggregation partition should process, before + /// aggregation ratio check and trying to switch to skipping aggregation mode + pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 } } diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs index f9af7850cb92..262646d8ba3a 100644 --- a/datafusion/expr/src/accumulator.rs +++ b/datafusion/expr/src/accumulator.rs @@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug { /// /// Intermediate state is used for "multi-phase" grouping in /// DataFusion, where an aggregate is computed in parallel with - /// multiple `Accumulator` instances, as illustrated below: + /// multiple `Accumulator` instances, as described below: /// /// # MultiPhase Grouping /// @@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug { /// `───────' `───────' /// ``` /// - /// The partial state is serialied as `Arrays` and then combined + /// The partial state is serialized as `Arrays` and then combined /// with other partial states from different instances of this /// Accumulator (that ran on different partitions, for example). /// @@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug { /// Note that [`ScalarValue::List`] can be used to pass multiple /// values if the number of intermediate values is not known at /// planning time (e.g. for `MEDIAN`) + /// + /// # Multi-phase repartitioned Grouping + /// + /// Many multi-phase grouping plans contain a Repartition operation + /// as well as shown below: + /// + /// ```text + /// ▲ ▲ + /// │ │ + /// │ │ + /// │ │ + /// │ │ + /// │ │ + /// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final + /// │GroupBy │ │GroupBy │ GroupBy has an entry for its + /// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case + /// │ │ │ │ that means half the entries) + /// └───────────────────────┘ └───────────────────────┘ + /// ▲ ▲ + /// │ │ + /// └─────────────┬────────────┘ + /// │ + /// │ + /// │ + /// ┌─────────────────────────┐ 3. Repartitioning by hash(group + /// │ Repartition │ keys) ensures that each distinct + /// │ HASH(x) │ group key now appears in exactly + /// └─────────────────────────┘ one partition + /// ▲ + /// │ + /// ┌───────────────┴─────────────┐ + /// │ │ + /// │ │ + /// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial + /// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all* + /// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups + /// └─────────────────────────┘ └──────────────────────────┘ + /// ▲ ▲ + /// │ ┌┘ + /// │ │ + /// .─────────. .─────────. + /// ,─' '─. ,─' '─. + /// ; Input : ; Input : 1. Since input data is + /// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin + /// ╲ ╱ ╲ ╱ distributed, each partition + /// '─. ,─' '─. ,─' likely has all distinct + /// `───────' `───────' + /// ``` + /// + /// This structure is used so that the `AggregateMode::Partial` accumulators + /// reduces the cardinality of the input as soon as possible. Typically, + /// each partial accumulator sees all groups in the input as the group keys + /// are evenly distributed across the input. + /// + /// The final output is computed by repartitioning the result of + /// [`Self::state`] from each Partial aggregate and `hash(group keys)` so + /// that each distinct group key appears in exactly one of the + /// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are + /// then unioned together to produce the overall final output. + /// + /// Here is an example that shows the distribution of groups in the + /// different phases + /// + /// ```text + /// ┌─────┐ ┌─────┐ + /// │ 1 │ │ 3 │ + /// ├─────┤ ├─────┤ + /// │ 2 │ │ 4 │ After repartitioning by + /// └─────┘ └─────┘ hash(group keys), each distinct + /// ┌─────┐ ┌─────┐ group key now appears in exactly + /// │ 1 │ │ 3 │ one partition + /// ├─────┤ ├─────┤ + /// │ 2 │ │ 4 │ + /// └─────┘ └─────┘ + /// + /// + /// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + /// + /// ┌─────┐ ┌─────┐ + /// │ 2 │ │ 2 │ + /// ├─────┤ ├─────┤ + /// │ 1 │ │ 2 │ + /// ├─────┤ ├─────┤ + /// │ 3 │ │ 3 │ + /// ├─────┤ ├─────┤ + /// │ 4 │ │ 1 │ + /// └─────┘ └─────┘ Input data is arbitrarily or + /// ... ... RoundRobin distributed, each + /// ┌─────┐ ┌─────┐ partition likely has all + /// │ 1 │ │ 4 │ distinct group keys + /// ├─────┤ ├─────┤ + /// │ 4 │ │ 3 │ + /// ├─────┤ ├─────┤ + /// │ 1 │ │ 1 │ + /// ├─────┤ ├─────┤ + /// │ 4 │ │ 3 │ + /// └─────┘ └─────┘ + /// + /// group values group values + /// in partition 0 in partition 1 + /// ``` fn state(&mut self) -> Result>; /// Updates the accumulator's state from an `Array` containing one diff --git a/datafusion/expr/src/groups_accumulator.rs b/datafusion/expr/src/groups_accumulator.rs index 0d57c403bbe0..886bd8443e4d 100644 --- a/datafusion/expr/src/groups_accumulator.rs +++ b/datafusion/expr/src/groups_accumulator.rs @@ -18,7 +18,7 @@ //! Vectorized [`GroupsAccumulator`] use arrow_array::{ArrayRef, BooleanArray}; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] @@ -128,6 +128,9 @@ pub trait GroupsAccumulator: Send { /// Returns the intermediate aggregate state for this accumulator, /// used for multi-phase grouping, resetting its internal state. /// + /// See [`Accumulator::state`] for more information on multi-phase + /// aggregation. + /// /// For example, `AVG` might return two arrays: `SUM` and `COUNT` /// but the `MIN` aggregate would just return a single array. /// @@ -135,11 +138,13 @@ pub trait GroupsAccumulator: Send { /// single `StructArray` rather than multiple arrays. /// /// See [`Self::evaluate`] for details on the required output - /// order and `emit_to`. + /// order and `emit_to`. + /// + /// [`Accumulator::state`]: crate::Accumulator::state fn state(&mut self, emit_to: EmitTo) -> Result>; /// Merges intermediate state (the output from [`Self::state`]) - /// into this accumulator's values. + /// into this accumulator's current state. /// /// For some aggregates (such as `SUM`), `merge_batch` is the same /// as `update_batch`, but for some aggregates (such as `COUNT`, @@ -158,8 +163,59 @@ pub trait GroupsAccumulator: Send { total_num_groups: usize, ) -> Result<()>; + /// Converts an input batch directly the intermediate aggregate state. + /// + /// This is the equivalent of treating each input row as its own group. It + /// is invoked when the Partial phase of a multi-phase aggregation is not + /// reducing the cardinality enough to warrant spending more effort on + /// pre-aggregation (see `Background` section below), and switches to + /// passing intermediate state directly on to the next aggregation phase. + /// + /// Examples: + /// * `COUNT`: an array of 1s for each row in the input batch. + /// * `SUM/MIN/MAX`: the input values themselves. + /// + /// # Arguments + /// * `values`: the input arguments to the accumulator + /// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored + /// + /// # Background + /// + /// In a multi-phase aggregation (see [`Accumulator::state`]), the initial + /// Partial phase reduces the cardinality of the input data as soon as + /// possible in the plan. + /// + /// This strategy is very effective for queries with a small number of + /// groups, as most of the data is aggregated immediately and only a small + /// amount of data must be repartitioned (see [`Accumulator::state`] for + /// background) + /// + /// However, for queries with a large number of groups, the Partial phase + /// often does not reduce the cardinality enough to warrant the memory and + /// CPU cost of actually performing the aggregation. For such cases, the + /// HashAggregate operator will dynamically switch to passing intermediate + /// state directly to the next aggregation phase with minimal processing + /// using this method. + /// + /// [`Accumulator::state`]: crate::Accumulator::state + fn convert_to_state( + &self, + _values: &[ArrayRef], + _opt_filter: Option<&BooleanArray>, + ) -> Result> { + not_impl_err!("Input batch conversion to state not implemented") + } + + /// Returns `true` if [`Self::convert_to_state`] is implemented to support + /// intermediate aggregate state conversion. + fn supports_convert_to_state(&self) -> bool { + false + } + /// Amount of memory used to store the state of this accumulator, - /// in bytes. This function is called once per batch, so it should - /// be `O(n)` to compute, not `O(num_groups)` + /// in bytes. + /// + /// This function is called once per batch, so it should be `O(n)` to + /// compute, not `O(num_groups)` fn size(&self) -> usize; } diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index f5eeef6b53bb..3a292b2b49bf 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -351,6 +351,8 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// Return the fields used to store the intermediate state of this accumulator. /// + /// See [`Accumulator::state`] for background information. + /// /// args: [`StateFieldsArgs`] contains arguments passed to the /// aggregate function's accumulator. /// @@ -388,7 +390,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// # Notes /// /// Even if this function returns true, DataFusion will still use - /// `Self::accumulator` for certain queries, such as when this aggregate is + /// [`Self::accumulator`] for certain queries, such as when this aggregate is /// used as a window function or when there no GROUP BY columns in the /// query. fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index 43ddd37cfb6f..4f2bd864832e 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -50,4 +50,14 @@ paste = "1.0.14" sqlparser = { workspace = true } [dev-dependencies] +arrow = { workspace = true, features = ["test_utils"] } +criterion = "0.5" rand = { workspace = true } + +[[bench]] +name = "count" +harness = false + +[[bench]] +name = "sum" +harness = false diff --git a/datafusion/functions-aggregate/benches/count.rs b/datafusion/functions-aggregate/benches/count.rs new file mode 100644 index 000000000000..875112ca8d47 --- /dev/null +++ b/datafusion/functions-aggregate/benches/count.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, BooleanArray}; +use arrow::datatypes::Int32Type; +use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_common::DFSchema; +use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; +use datafusion_functions_aggregate::count::Count; +use std::sync::Arc; + +fn prepare_accumulator() -> Box { + let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int32, true)])); + let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap(); + let accumulator_args = AccumulatorArgs { + data_type: &DataType::Int64, + schema: &schema, + dfschema: &df_schema, + ignore_nulls: false, + sort_exprs: &[], + is_reversed: false, + name: "COUNT(f)", + is_distinct: false, + input_types: &[DataType::Int32], + input_exprs: &[datafusion_expr::col("f")], + }; + let count_fn = Count::new(); + + count_fn + .create_groups_accumulator(accumulator_args) + .unwrap() +} + +fn convert_to_state_bench( + c: &mut Criterion, + name: &str, + values: ArrayRef, + opt_filter: Option<&BooleanArray>, +) { + let accumulator = prepare_accumulator(); + c.bench_function(name, |b| { + b.iter(|| { + black_box( + accumulator + .convert_to_state(&[values.clone()], opt_filter) + .unwrap(), + ) + }) + }); +} + +fn count_benchmark(c: &mut Criterion) { + let values = Arc::new(create_primitive_array::(8192, 0.0)) as ArrayRef; + convert_to_state_bench(c, "count convert state no nulls, no filter", values, None); + + let values = Arc::new(create_primitive_array::(8192, 0.3)) as ArrayRef; + convert_to_state_bench(c, "count convert state 30% nulls, no filter", values, None); + + let values = Arc::new(create_primitive_array::(8192, 0.3)) as ArrayRef; + convert_to_state_bench(c, "count convert state 70% nulls, no filter", values, None); + + let values = Arc::new(create_primitive_array::(8192, 0.0)) as ArrayRef; + let filter = create_boolean_array(8192, 0.0, 0.5); + convert_to_state_bench( + c, + "count convert state no nulls, filter", + values, + Some(&filter), + ); + + let values = Arc::new(create_primitive_array::(8192, 0.3)) as ArrayRef; + let filter = create_boolean_array(8192, 0.0, 0.5); + convert_to_state_bench( + c, + "count convert state nulls, filter", + values, + Some(&filter), + ); +} + +criterion_group!(benches, count_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-aggregate/benches/sum.rs b/datafusion/functions-aggregate/benches/sum.rs new file mode 100644 index 000000000000..dfaa93cdeff7 --- /dev/null +++ b/datafusion/functions-aggregate/benches/sum.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, BooleanArray}; +use arrow::datatypes::Int64Type; +use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_common::DFSchema; +use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; +use datafusion_functions_aggregate::sum::Sum; +use std::sync::Arc; + +fn prepare_accumulator(data_type: &DataType) -> Box { + let schema = Arc::new(Schema::new(vec![Field::new("f", data_type.clone(), true)])); + let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap(); + let accumulator_args = AccumulatorArgs { + data_type, + schema: &schema, + dfschema: &df_schema, + ignore_nulls: false, + sort_exprs: &[], + is_reversed: false, + name: "SUM(f)", + is_distinct: false, + input_types: &[data_type.clone()], + input_exprs: &[datafusion_expr::col("f")], + }; + let sum_fn = Sum::new(); + + sum_fn.create_groups_accumulator(accumulator_args).unwrap() +} + +fn convert_to_state_bench( + c: &mut Criterion, + name: &str, + values: ArrayRef, + opt_filter: Option<&BooleanArray>, +) { + let accumulator = prepare_accumulator(values.data_type()); + c.bench_function(name, |b| { + b.iter(|| { + black_box( + accumulator + .convert_to_state(&[values.clone()], opt_filter) + .unwrap(), + ) + }) + }); +} + +fn count_benchmark(c: &mut Criterion) { + let values = Arc::new(create_primitive_array::(8192, 0.0)) as ArrayRef; + convert_to_state_bench(c, "sum i64 convert state no nulls, no filter", values, None); + + let values = Arc::new(create_primitive_array::(8192, 0.3)) as ArrayRef; + convert_to_state_bench( + c, + "sum i64 convert state 30% nulls, no filter", + values, + None, + ); + + let values = Arc::new(create_primitive_array::(8192, 0.3)) as ArrayRef; + convert_to_state_bench( + c, + "sum i64 convert state 70% nulls, no filter", + values, + None, + ); + + let values = Arc::new(create_primitive_array::(8192, 0.0)) as ArrayRef; + let filter = create_boolean_array(8192, 0.0, 0.5); + convert_to_state_bench( + c, + "sum i64 convert state no nulls, filter", + values, + Some(&filter), + ); + + let values = Arc::new(create_primitive_array::(8192, 0.3)) as ArrayRef; + let filter = create_boolean_array(8192, 0.0, 0.5); + convert_to_state_bench( + c, + "sum i64 convert state nulls, filter", + values, + Some(&filter), + ); +} + +criterion_group!(benches, count_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 64eb7253f5c9..aea05442536e 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -23,6 +23,7 @@ use std::{fmt::Debug, sync::Arc}; use arrow::{ array::{ArrayRef, AsArray}, + compute, datatypes::{ DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, @@ -440,6 +441,71 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(vec![Arc::new(counts) as ArrayRef]) } + /// Converts an input batch directly to a state batch + /// + /// The state of `COUNT` is always a single Int64Array: + /// * `1` (for non-null, non filtered values) + /// * `0` (for null values) + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = &values[0]; + + let state_array = match (values.logical_nulls(), opt_filter) { + (None, None) => { + // In case there is no nulls in input and no filter, returning array of 1 + Arc::new(Int64Array::from_value(1, values.len())) + } + (Some(nulls), None) => { + // If there are any nulls in input values -- casting `nulls` (true for values, false for nulls) + // of input array to Int64 + let nulls = BooleanArray::new(nulls.into_inner(), None); + compute::cast(&nulls, &DataType::Int64)? + } + (None, Some(filter)) => { + // If there is only filter + // - applying filter null mask to filter values by bitand filter values and nulls buffers + // (using buffers guarantees absence of nulls in result) + // - casting result of bitand to Int64 array + let (filter_values, filter_nulls) = filter.clone().into_parts(); + + let state_buf = match filter_nulls { + Some(filter_nulls) => &filter_values & filter_nulls.inner(), + None => filter_values, + }; + + let boolean_state = BooleanArray::new(state_buf, None); + compute::cast(&boolean_state, &DataType::Int64)? + } + (Some(nulls), Some(filter)) => { + // For both input nulls and filter + // - applying filter null mask to filter values by bitand filter values and nulls buffers + // (using buffers guarantees absence of nulls in result) + // - applying values null mask to filter buffer by another bitand on filter result and + // nulls from input values + // - casting result to Int64 array + let (filter_values, filter_nulls) = filter.clone().into_parts(); + + let filter_buf = match filter_nulls { + Some(filter_nulls) => &filter_values & filter_nulls.inner(), + None => filter_values, + }; + let state_buf = &filter_buf & nulls.inner(); + + let boolean_state = BooleanArray::new(state_buf, None); + compute::cast(&boolean_state, &DataType::Int64)? + } + }; + + Ok(vec![state_array]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } + fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() } diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index debb36852b22..8d69646bd422 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::buffer::NullBuffer; +use arrow::compute; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; -use datafusion_common::Result; +use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_expr::{EmitTo, GroupsAccumulator}; use super::accumulate::NullState; @@ -134,6 +136,63 @@ where self.update_batch(values, group_indices, opt_filter, total_num_groups) } + /// Converts an input batch directly to a state batch + /// + /// The state is: + /// - self.prim_fn for all non null, non filtered values + /// - null otherwise + /// + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = values[0].as_primitive::().clone(); + + // Initializing state with starting values + let initial_state = + PrimitiveArray::::from_value(self.starting_value, values.len()); + + // Recalculating values in case there is filter + let values = match opt_filter { + None => values, + Some(filter) => { + let (filter_values, filter_nulls) = filter.clone().into_parts(); + // Calculating filter mask as a result of bitand of filter, and converting it to null buffer + let filter_bool = match filter_nulls { + Some(filter_nulls) => filter_nulls.inner() & &filter_values, + None => filter_values, + }; + let filter_nulls = NullBuffer::from(filter_bool); + + // Rebuilding input values with a new nulls mask, which is equal to + // the union of original nulls and filter mask + let (dt, values_buf, original_nulls) = values.clone().into_parts(); + let nulls_buf = + NullBuffer::union(original_nulls.as_ref(), Some(&filter_nulls)); + PrimitiveArray::::new(values_buf, nulls_buf).with_data_type(dt) + } + }; + + let state_values = compute::binary_mut(initial_state, &values, |mut x, y| { + (self.prim_fn)(&mut x, y); + x + }); + let state_values = state_values + .map_err(|_| { + internal_datafusion_err!( + "initial_values underlying buffer must not be shared" + ) + })? + .map_err(DataFusionError::from)?; + + Ok(vec![Arc::new(state_values)]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } + fn size(&self) -> usize { self.values.capacity() * std::mem::size_of::() + self.null_state.size() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 43f9f98283bb..8941418c12e1 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -56,11 +56,20 @@ mod topk; mod topk_stream; /// Hash aggregate modes +/// +/// See [`Accumulator::state`] for background information on multi-phase +/// aggregation and how these modes are used. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AggregateMode { - /// Partial aggregate that can be applied in parallel across input partitions + /// Partial aggregate that can be applied in parallel across input + /// partitions. + /// + /// This is the first phase of a multi-phase aggregation. Partial, - /// Final aggregate that produces a single partition of output + /// Final aggregate that produces a single partition of output by combining + /// the output of multiple partial aggregates. + /// + /// This is the second phase of a multi-phase aggregation. Final, /// Final aggregate that works on pre-partitioned data. /// @@ -72,12 +81,15 @@ pub enum AggregateMode { /// Applies the entire logical aggregation operation in a single operator, /// as opposed to Partial / Final modes which apply the logical aggregation using /// two operators. + /// /// This mode requires that the input is a single partition (like Final) Single, /// Applies the entire logical aggregation operation in a single operator, /// as opposed to Partial / Final modes which apply the logical aggregation using /// two operators. - /// This mode requires that the input is partitioned by group key (like FinalPartitioned) + /// + /// This mode requires that the input is partitioned by group key (like + /// FinalPartitioned) SinglePartitioned, } @@ -2395,4 +2407,189 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_skip_aggregation_after_first_batch() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + let df_schema = DFSchema::try_from(Arc::clone(&schema))?; + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = + vec![create_aggregate_expr_with_dfschema( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &df_schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(2)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } + + #[tokio::test] + async fn test_skip_aggregation_after_threshold() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + let df_schema = DFSchema::try_from(Arc::clone(&schema))?; + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = + vec![create_aggregate_expr_with_dfschema( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &df_schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(5)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 2 |", + "| 4 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 167ca7240750..62ed79dad4aa 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -39,7 +39,7 @@ use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -62,6 +62,12 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), + /// Produce intermediate aggregate state for each input row without + /// aggregation. + /// + /// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] + SkippingAggregation, + /// All input has been consumed and all groups have been emitted Done, } @@ -90,6 +96,72 @@ struct SpillState { merging_group_by: PhysicalGroupBy, } +/// Tracks if the aggregate should skip partial aggregations +/// +/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] +struct SkipAggregationProbe { + /// Number of processed input rows + input_rows: usize, + /// Number of total group values for `input_rows` + num_groups: usize, + + /// Aggregation ratio check should be performed only when the + /// number of input rows exceeds this threshold + probe_rows_threshold: usize, + /// Maximum allowed value of `input_rows` / `num_groups` to + /// continue aggregation + probe_ratio_threshold: f64, + + /// Flag indicating that further data aggregation mey be skipped + should_skip: bool, + /// Flag indicating that further updates of `SkipAggregationProbe` + /// state won't make any effect + is_locked: bool, +} + +impl SkipAggregationProbe { + fn new(probe_rows_threshold: usize, probe_ratio_threshold: f64) -> Self { + Self { + input_rows: 0, + num_groups: 0, + probe_rows_threshold, + probe_ratio_threshold, + should_skip: false, + is_locked: false, + } + } + + /// Updates `SkipAggregationProbe` state: + /// - increments the number of input rows + /// - replaces the number of groups with the new value + /// - on `probe_rows_threshold` exceeded calculates + /// aggregation ratio and sets `should_skip` flag + /// - if `should_skip` is set, locks further state updates + fn update_state(&mut self, input_rows: usize, num_groups: usize) { + if self.is_locked { + return; + } + self.input_rows += input_rows; + self.num_groups = num_groups; + if self.input_rows >= self.probe_rows_threshold { + self.should_skip = self.num_groups as f64 / self.input_rows as f64 + >= self.probe_ratio_threshold; + self.is_locked = true; + } + } + + fn should_skip(&self) -> bool { + self.should_skip + } + + /// Provides an ability to externally set `should_skip` flag + /// to `false` and prohibit further state updates + fn forbid_skipping(&mut self) { + self.should_skip = false; + self.is_locked = true; + } +} + /// HashTable based Grouping Aggregator /// /// # Design Goals @@ -137,7 +209,7 @@ struct SpillState { /// of `x` and one accumulator for `SUM(y)`, specialized for the data /// type of `y`. /// -/// # Description +/// # Discussion /// /// [`group_values`] does not store any aggregate state inline. It only /// assigns "group indices", one for each (distinct) group value. The @@ -155,7 +227,25 @@ struct SpillState { /// /// [`group_values`]: Self::group_values /// -/// # Spilling +/// # Partial Aggregate and multi-phase grouping +/// +/// As described on [`Accumulator::state`], this operator is used in the context +/// "multi-phase" grouping when the mode is [`AggregateMode::Partial`]. +/// +/// An important optimization for multi-phase partial aggregation is to skip +/// partial aggregation when it is not effective enough to warrant the memory or +/// CPU cost, as is often the case for queries many distinct groups (high +/// cardinality group by). Memory is particularly important because each Partial +/// aggregator must store the intermediate state for each group. +/// +/// If the ratio of the number of groups to the number of input rows exceeds a +/// threshold, and [`GroupsAccumulator::supports_convert_to_state`] is +/// supported, this operator will stop applying Partial aggregation and directly +/// pass the input rows to the next aggregation phase. +/// +/// [`Accumulator::state`]: datafusion_expr::Accumulator::state +/// +/// # Spilling (to disk) /// /// The sizes of group values and accumulators can become large. Before that causes out of memory, /// this hash aggregator outputs partial states early for partial aggregation or spills to local @@ -275,6 +365,10 @@ pub(crate) struct GroupedHashAggregateStream { /// the `GroupedHashAggregateStream` operation immediately switches to /// output mode and emits all groups. group_values_soft_limit: Option, + + /// Optional probe for skipping data aggregation, if supported by + /// current stream. + skip_aggregation_probe: Option, } impl GroupedHashAggregateStream { @@ -365,6 +459,36 @@ impl GroupedHashAggregateStream { merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), }; + // Skip aggregation is supported if: + // - aggregation mode is Partial + // - input is not ordered by GROUP BY expressions, + // since Final mode expects unique group values as its input + // - all accumulators support input batch to intermediate + // aggregate state conversion + // - there is only one GROUP BY expressions set + let skip_aggregation_probe = if agg.mode == AggregateMode::Partial + && matches!(group_ordering, GroupOrdering::None) + && accumulators + .iter() + .all(|acc| acc.supports_convert_to_state()) + && agg_group_by.is_single() + { + Some(SkipAggregationProbe::new( + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_rows_threshold, + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_ratio_threshold, + )) + } else { + None + }; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -384,6 +508,7 @@ impl GroupedHashAggregateStream { runtime: context.runtime_env(), spill_state, group_values_soft_limit: agg.limit, + skip_aggregation_probe, }) } } @@ -434,12 +559,16 @@ impl Stream for GroupedHashAggregateStream { // new batch to aggregate Some(Ok(batch)) => { let timer = elapsed_compute.timer(); + let input_rows = batch.num_rows(); + // Make sure we have enough capacity for `batch`, otherwise spill extract_ok!(self.spill_previous_if_necessary(&batch)); // Do the grouping extract_ok!(self.group_aggregate_batch(batch)); + self.update_skip_aggregation_probe(input_rows); + // If we can begin emitting rows, do so, // otherwise keep consuming input assert!(!self.input_done); @@ -463,6 +592,8 @@ impl Stream for GroupedHashAggregateStream { extract_ok!(self.emit_early_if_necessary()); + extract_ok!(self.switch_to_skip_aggregation()); + timer.done(); } Some(Err(e)) => { @@ -476,6 +607,26 @@ impl Stream for GroupedHashAggregateStream { } } + ExecutionState::SkippingAggregation => { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let _timer = elapsed_compute.timer(); + let states = self.transform_to_states(batch)?; + return Poll::Ready(Some(Ok( + states.record_output(&self.baseline_metrics) + ))); + } + Some(Err(e)) => { + // inner had error, return to caller + return Poll::Ready(Some(Err(e))); + } + None => { + // inner is done, switching to `Done` state + self.exec_state = ExecutionState::Done; + } + } + } + ExecutionState::ProducingOutput(batch) => { // slice off a part of the batch, if needed let output_batch; @@ -484,6 +635,12 @@ impl Stream for GroupedHashAggregateStream { ( if self.input_done { ExecutionState::Done + } else if self + .skip_aggregation_probe + .as_ref() + .is_some_and(|probe| probe.should_skip()) + { + ExecutionState::SkippingAggregation } else { ExecutionState::ReadingInput }, @@ -797,4 +954,59 @@ impl GroupedHashAggregateStream { timer.done(); Ok(()) } + + // Updates skip aggregation probe state. + // In case stream has any spills, the probe is forcefully set to + // forbid aggregation skipping, and locked, since spilling resets + // total number of unique groups. + // + // Note: currently spilling is not supported for Partial aggregation + fn update_skip_aggregation_probe(&mut self, input_rows: usize) { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if !self.spill_state.spills.is_empty() { + probe.forbid_skipping(); + } else { + probe.update_state(input_rows, self.group_values.len()); + } + }; + } + + // In case the probe indicates that aggregation may be + // skipped, forces stream to produce currently accumulated output. + fn switch_to_skip_aggregation(&mut self) -> Result<()> { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if probe.should_skip() { + let batch = self.emit(EmitTo::All, false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); + } + } + + Ok(()) + } + + // Transforms input batch to intermediate aggregate state, without grouping it + fn transform_to_states(&self, batch: RecordBatch) -> Result { + let group_values = evaluate_group_by(&self.group_by, &batch)?; + let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; + let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + + let mut output = group_values.first().cloned().ok_or_else(|| { + internal_datafusion_err!("group_values expected to have at least one element") + })?; + + let iter = self + .accumulators + .iter() + .zip(input_values.iter()) + .zip(filter_values.iter()); + + for ((acc, values), opt_filter) in iter { + let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); + output.extend(acc.convert_to_state(values, opt_filter)?); + } + + let states_batch = RecordBatch::try_new(self.schema(), output)?; + + Ok(states_batch) + } } diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt new file mode 100644 index 000000000000..65efc24ec037 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt @@ -0,0 +1,324 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The main goal of these tests is to verify correctness of transforming +# input values to state by accumulators, supporting `convert_to_state`. + + +# Setup test data table +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +# Prepare settings to skip partial aggregation from the beginning +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 0; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 1; + +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# Grouping by unique fields allows to check all accumulators +query ITIIII +SELECT c5, c1, + COUNT(), + COUNT(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END), + COUNT() FILTER (WHERE c1 = 'b'), + COUNT(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END) FILTER (WHERE c1 = 'b') +FROM aggregate_test_100 +GROUP BY 1, 2 ORDER BY 1 LIMIT 5; +---- +-2141999138 c 1 0 0 0 +-2141451704 a 1 1 0 0 +-2138770630 b 1 0 1 0 +-2117946883 d 1 0 0 0 +-2098805236 c 1 0 0 0 + +query ITIIII +SELECT c5, c1, + MIN(c5), + MIN(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END), + MIN(c5) FILTER (WHERE c1 = 'b'), + MIN(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END) FILTER (WHERE c1 = 'b') +FROM aggregate_test_100 +GROUP BY 1, 2 ORDER BY 1 LIMIT 5; +---- +-2141999138 c -2141999138 NULL NULL NULL +-2141451704 a -2141451704 -2141451704 NULL NULL +-2138770630 b -2138770630 NULL -2138770630 NULL +-2117946883 d -2117946883 NULL NULL NULL +-2098805236 c -2098805236 NULL NULL NULL + +query ITIIII +SELECT c5, c1, + MAX(c5), + MAX(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END), + MAX(c5) FILTER (WHERE c1 = 'b'), + MAX(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END) FILTER (WHERE c1 = 'b') +FROM aggregate_test_100 +GROUP BY 1, 2 ORDER BY 1 LIMIT 5; +---- +-2141999138 c -2141999138 NULL NULL NULL +-2141451704 a -2141451704 -2141451704 NULL NULL +-2138770630 b -2138770630 NULL -2138770630 NULL +-2117946883 d -2117946883 NULL NULL NULL +-2098805236 c -2098805236 NULL NULL NULL + +query ITIIII +SELECT c5, c1, + SUM(c5), + SUM(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END), + SUM(c5) FILTER (WHERE c1 = 'b'), + SUM(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END) FILTER (WHERE c1 = 'b') +FROM aggregate_test_100 +GROUP BY 1, 2 ORDER BY 1 LIMIT 5; +---- +-2141999138 c -2141999138 NULL NULL NULL +-2141451704 a -2141451704 -2141451704 NULL NULL +-2138770630 b -2138770630 NULL -2138770630 NULL +-2117946883 d -2117946883 NULL NULL NULL +-2098805236 c -2098805236 NULL NULL NULL + +# Prepare settings to always skip aggregation after couple of batches +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4; + +# Inserting into nullable table with batch_size specified above +# to prevent creation on single in-memory batch +statement ok +CREATE TABLE aggregate_test_100_null ( + c2 TINYINT NOT NULL, + c5 INT NOT NULL, + c3 SMALLINT, + c11 FLOAT +); + +statement ok +INSERT INTO aggregate_test_100_null +SELECT + c2, + c5, + CASE WHEN c1 = 'e' THEN NULL ELSE c3 END as c3, + CASE WHEN c1 = 'a' THEN NULL ELSE c11 END as c11 +FROM aggregate_test_100; + +# Test count varchar / int / float +query IIII +SELECT c2, count(c1), count(c5), count(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 22 22 22 +2 22 22 22 +3 19 19 19 +4 23 23 23 +5 14 14 14 + +# Test min / max for int / float +query IIIRR +SELECT c2, min(c5), max(c5), min(c11), max(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -1991133944 2143473091 0.064453244 0.89651865 +2 -2138770630 2053379412 0.055064857 0.8315913 +3 -2141999138 2030965207 0.034291923 0.9488028 +4 -1885422396 2064155045 0.028003037 0.7459874 +5 -2117946883 2025611582 0.12559289 0.87989986 + +# Test sum for int / float +query IIR +SELECT c2, sum(c5), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -438598674 12.153253793716 +2 -8259865364 9.577824473381 +3 1956035476 9.590891361237 +4 16155718643 9.531112968922 +5 6449337880 7.074412226677 + +# Test count with nullable fields +query III +SELECT c2, count(c3), count(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 19 17 +2 17 19 +3 15 13 +4 16 19 +5 12 11 + +# Test min / max with nullable fields +query IIIRR +SELECT c2, min(c3), max(c3), min(c11), max(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 -99 125 0.064453244 0.89651865 +2 -117 122 0.09683716 0.8315913 +3 -101 123 0.034291923 0.94669616 +4 -117 123 0.028003037 0.7085086 +5 -101 118 0.12559289 0.87989986 + +# Test sum with nullable fields +query IIR +SELECT c2, sum(c3), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 367 12.153253793716 +2 184 9.577824473381 +3 395 9.590891361237 +4 29 9.531112968922 +5 -194 7.074412226677 + +# Enabling PG dialect for filtered aggregates tests +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# Test count with filter +query III +SELECT + c2, + count(c3) FILTER (WHERE c3 > 0), + count(c3) FILTER (WHERE c11 > 10) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 13 0 +2 13 0 +3 13 0 +4 13 0 +5 5 0 + +# Test min / max with filter +query III +SELECT + c2, + min(c3) FILTER (WHERE c3 > 0), + max(c3) FILTER (WHERE c3 < 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 12 -5 +2 1 -29 +3 13 -2 +4 3 -38 +5 36 -5 + +# Test sum with filter +query II +SELECT + c2, + sum(c3) FILTER (WHERE c1 != 'e' AND c3 > 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 612 +2 565 +3 466 +4 417 +5 284 + +# Test count with nullable fields and filter +query III +SELECT c2, + COUNT(c3) FILTER (WHERE c5 > 0), + COUNT(c11) FILTER(WHERE c5 > 0) +FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 11 6 +2 6 6 +3 8 6 +4 11 14 +5 8 7 + +# Test count with nullable fields and nullable filter +query III +SELECT c2, + COUNT(c3) FILTER (WHERE c11 > 0.5), + COUNT(c11) FILTER(WHERE c3 > 0) +FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 10 9 +2 7 8 +3 3 6 +4 3 7 +5 6 3 + +# Test min / max with nullable fields and filter +query IIIRR +SELECT c2, + MIN(c3) FILTER (WHERE c5 > 0), + MAX(c3) FILTER (WHERE c5 > 0), + MIN(c11) FILTER (WHERE c5 < 0), + MAX(c11) FILTER (WHERE c5 < 0) +FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 -99 103 0.2578469 0.89651865 +2 -48 93 0.09683716 0.8315913 +3 -76 123 0.034291923 0.94669616 +4 -117 123 0.06563997 0.57360977 +5 -94 68 0.12559289 0.75173044 + +# Test min / max with nullable fields and nullable filter +query III +SELECT c2, + MIN(c3) FILTER (WHERE c11 > 0.5), + MAX(c3) FILTER (WHERE c11 > 0.5) +FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 -99 125 +2 -106 122 +3 -76 73 +4 -117 47 +5 -82 118 + +# Test sum with nullable field and nullable / non-nullable filters +query IIIRR +SELECT c2, + SUM(c3) FILTER (WHERE c5 > 0), + SUM(c3) FILTER (WHERE c11 < 0.5), + SUM(c11) FILTER (WHERE c5 < 0), + SUM(c11) FILTER (WHERE c3 > 0) +FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 -3 77 7.214695632458 5.085060358047 +2 100 77 6.197732746601 3.150197088718 +3 109 211 2.80575042963 2.80632930994 +4 -171 56 2.10740506649 1.939846396446 +5 -86 -76 1.8741710186 1.600569307804 diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index fef7bfe82174..0cbbbf3c608c 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -207,6 +207,8 @@ datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 +datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 +datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -294,6 +296,8 @@ datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistic datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system +datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input +datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 78d0d7b0239f..badd07822ac2 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -89,6 +89,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | +| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |