diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 607572862290..3c76da51a9d4 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -134,10 +134,6 @@ impl AggregateExpr for Avg { is_row_accumulator_support_dtype(&self.sum_data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } - fn create_row_accumulator( &self, start_index: usize, @@ -263,6 +259,9 @@ impl Accumulator for AvgAccumulator { )), } } + fn supports_retract_batch(&self) -> bool { + true + } fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 15df28b4e38a..22cb2512fc42 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -133,10 +133,6 @@ impl AggregateExpr for Count { true } - fn supports_bounded_execution(&self) -> bool { - true - } - fn create_row_accumulator( &self, start_index: usize, @@ -214,6 +210,10 @@ impl Accumulator for CountAccumulator { Ok(ScalarValue::Int64(Some(self.count))) } + fn supports_retract_batch(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) } diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index f811dae7b560..e3c061dc1354 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -125,10 +125,6 @@ impl AggregateExpr for Max { is_row_accumulator_support_dtype(&self.data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } - fn create_row_accumulator( &self, start_index: usize, @@ -699,6 +695,10 @@ impl Accumulator for SlidingMaxAccumulator { Ok(self.max.clone()) } + fn supports_retract_batch(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size() } @@ -825,10 +825,6 @@ impl AggregateExpr for Min { is_row_accumulator_support_dtype(&self.data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } - fn create_row_accumulator( &self, start_index: usize, @@ -958,6 +954,10 @@ impl Accumulator for SlidingMinAccumulator { Ok(self.min.clone()) } + fn supports_retract_batch(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size() } diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 09fd9bcfc524..7d2316c532a0 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -96,12 +96,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { false } - /// Specifies whether this aggregate function can run using bounded memory. - /// Any accumulator returning "true" needs to implement `retract_batch`. - fn supports_bounded_execution(&self) -> bool { - false - } - /// RowAccumulator to access/update row-based aggregation state in-place. /// Currently, row accumulator only supports states of fixed-sized type. /// diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 1c70dc67beeb..efa55f060264 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -131,10 +131,6 @@ impl AggregateExpr for Sum { is_row_accumulator_support_dtype(&self.data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } - fn create_row_accumulator( &self, start_index: usize, @@ -361,6 +357,10 @@ impl Accumulator for SumAccumulator { } } + fn supports_retract_batch(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() } diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index c8a4797a5288..5892f7f3f3b0 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -155,8 +155,7 @@ impl WindowExpr for PlainAggregateWindowExpr { } fn uses_bounded_memory(&self) -> bool { - self.aggregate.supports_bounded_execution() - && !self.window_frame.end_bound.is_unbounded() + !self.window_frame.end_bound.is_unbounded() } } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index a03267c03532..828bc7218fa2 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -122,7 +122,7 @@ impl WindowExpr for BuiltInWindowExpr { } else if evaluator.include_rank() { let columns = self.sort_columns(batch)?; let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?; - evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points) + evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points) } else { let (values, _) = self.get_values_orderbys(batch)?; evaluator.evaluate_all(&values, num_rows) diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs index 47f2e4208d71..9040165ac9e0 100644 --- a/datafusion/physical-expr/src/window/cume_dist.rs +++ b/datafusion/physical-expr/src/window/cume_dist.rs @@ -70,7 +70,7 @@ impl BuiltInWindowFunctionExpr for CumeDist { pub(crate) struct CumeDistEvaluator; impl PartitionEvaluator for CumeDistEvaluator { - fn evaluate_with_rank_all( + fn evaluate_all_with_rank( &self, num_rows: usize, ranks_in_partition: &[Range], @@ -109,7 +109,7 @@ mod tests { ) -> Result<()> { let result = expr .create_evaluator()? - .evaluate_with_rank_all(num_rows, &ranks)?; + .evaluate_all_with_rank(num_rows, &ranks)?; let result = as_float64_array(&result)?; let result = result.values(); assert_eq!(expected, *result); diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs index 0dfad0e80f05..e518e89a75d0 100644 --- a/datafusion/physical-expr/src/window/partition_evaluator.rs +++ b/datafusion/physical-expr/src/window/partition_evaluator.rs @@ -69,7 +69,7 @@ use std::ops::Range; /// /// # Stateless `PartitionEvaluator` /// -/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the +/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_all_with_rank`] is called with values for the /// entire partition. /// /// # Stateful `PartitionEvaluator` @@ -221,7 +221,7 @@ pub trait PartitionEvaluator: Debug + Send { )) } - /// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window + /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window /// functions that only need the rank of a row within its window /// frame. /// @@ -248,7 +248,7 @@ pub trait PartitionEvaluator: Debug + Send { /// (3,4), /// ] /// ``` - fn evaluate_with_rank_all( + fn evaluate_all_with_rank( &self, _num_rows: usize, _ranks_in_partition: &[Range], @@ -278,7 +278,7 @@ pub trait PartitionEvaluator: Debug + Send { /// Can this function be evaluated with (only) rank /// - /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`] + /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_all_with_rank`] fn include_rank(&self) -> bool { false } diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index be184ca891de..59a08358cda6 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -159,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator { } } - fn evaluate_with_rank_all( + fn evaluate_all_with_rank( &self, num_rows: usize, ranks_in_partition: &[Range], @@ -236,7 +236,7 @@ mod tests { ) -> Result<()> { let result = expr .create_evaluator()? - .evaluate_with_rank_all(num_rows, &ranks)?; + .evaluate_all_with_rank(num_rows, &ranks)?; let result = as_float64_array(&result)?; let result = result.values(); assert_eq!(expected, *result); @@ -248,7 +248,7 @@ mod tests { ranks: Vec>, expected: Vec, ) -> Result<()> { - let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?; + let result = expr.create_evaluator()?.evaluate_all_with_rank(8, &ranks)?; let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(expected, *result); diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 709f8d23be36..1494129cf897 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -139,8 +139,7 @@ impl WindowExpr for SlidingAggregateWindowExpr { } fn uses_bounded_memory(&self) -> bool { - self.aggregate.supports_bounded_execution() - && !self.window_frame.end_bound.is_unbounded() + !self.window_frame.end_bound.is_unbounded() } }