Skip to content

Commit

Permalink
Removed Arc wrapping for AggregateFunctionExpr (#12353)
Browse files Browse the repository at this point in the history
  • Loading branch information
athultr1997 authored Sep 8, 2024
1 parent 3e1850d commit f5c47fa
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 83 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
///
/// # Parameters
///
/// * `aggr_exprs` - A vector of `Arc<AggregateFunctionExpr>` representing the
/// * `aggr_exprs` - A vector of `AggregateFunctionExpr` representing the
/// aggregate expressions to be optimized.
/// * `prefix_requirement` - An array slice representing the ordering
/// requirements preceding the aggregate expressions.
Expand All @@ -131,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
/// successfully. Any errors occurring during the conversion process are
/// passed through.
fn try_convert_aggregate_if_better(
aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
aggr_exprs: Vec<AggregateFunctionExpr>,
prefix_requirement: &[PhysicalSortRequirement],
eq_properties: &EquivalenceProperties,
) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
) -> Result<Vec<AggregateFunctionExpr>> {
aggr_exprs
.into_iter()
.map(|aggr_expr| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1541,7 +1541,7 @@ pub fn create_window_expr(
}

type AggregateExprWithOptionalArgs = (
Arc<AggregateFunctionExpr>,
AggregateFunctionExpr,
// The filter clause, if any
Option<Arc<dyn PhysicalExpr>>,
// Ordering requirements, if any
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl TestAggregate {
}

/// Return appropriate expr depending if COUNT is for col or table (*)
pub fn count_expr(&self, schema: &Schema) -> Arc<AggregateFunctionExpr> {
pub fn count_expr(&self, schema: &Schema) -> AggregateFunctionExpr {
AggregateExprBuilder::new(count_udaf(), vec![self.column()])
.schema(Arc::new(schema.clone()))
.alias(self.column_name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
fn partial_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
aggr_expr: Vec<AggregateFunctionExpr>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
Expand All @@ -104,7 +104,7 @@ fn partial_aggregate_exec(
fn final_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
aggr_expr: Vec<AggregateFunctionExpr>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
Expand All @@ -130,7 +130,7 @@ fn count_expr(
expr: Arc<dyn PhysicalExpr>,
name: &str,
schema: &Schema,
) -> Arc<AggregateFunctionExpr> {
) -> AggregateFunctionExpr {
AggregateExprBuilder::new(count_udaf(), vec![expr])
.schema(Arc::new(schema.clone()))
.alias(name)
Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-expr/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl AggregateExprBuilder {
}
}

pub fn build(self) -> Result<Arc<AggregateFunctionExpr>> {
pub fn build(self) -> Result<AggregateFunctionExpr> {
let Self {
fun,
args,
Expand Down Expand Up @@ -132,7 +132,7 @@ impl AggregateExprBuilder {
Some(alias) => alias,
};

Ok(Arc::new(AggregateFunctionExpr {
Ok(AggregateFunctionExpr {
fun: Arc::unwrap_or_clone(fun),
args,
data_type,
Expand All @@ -145,7 +145,7 @@ impl AggregateExprBuilder {
input_types: input_exprs_types,
is_reversed,
is_nullable,
}))
})
}

pub fn alias(mut self, alias: impl Into<String>) -> Self {
Expand Down Expand Up @@ -328,9 +328,9 @@ impl AggregateFunctionExpr {
/// not implement the method, returns an error. Order insensitive and hard
/// requirement aggregators return `Ok(None)`.
pub fn with_beneficial_ordering(
self: Arc<Self>,
self,
beneficial_ordering: bool,
) -> Result<Option<Arc<AggregateFunctionExpr>>> {
) -> Result<Option<AggregateFunctionExpr>> {
let Some(updated_fn) = self
.fun
.clone()
Expand Down Expand Up @@ -457,10 +457,10 @@ impl AggregateFunctionExpr {
/// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
/// For aggregates that do not support calculation in reverse,
/// returns None (which is the default value).
pub fn reverse_expr(&self) -> Option<Arc<AggregateFunctionExpr>> {
pub fn reverse_expr(&self) -> Option<AggregateFunctionExpr> {
match self.fun.reverse_udf() {
ReversedUDAF::NotSupported => None,
ReversedUDAF::Identical => Some(Arc::new(self.clone())),
ReversedUDAF::Identical => Some(self.clone()),
ReversedUDAF::Reversed(reverse_udf) => {
let reverse_ordering_req = reverse_order_bys(&self.ordering_req);
let mut name = self.name().to_string();
Expand Down Expand Up @@ -507,7 +507,7 @@ impl AggregateFunctionExpr {
&self,
_args: Vec<Arc<dyn PhysicalExpr>>,
_order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<Arc<AggregateFunctionExpr>> {
) -> Option<AggregateFunctionExpr> {
None
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct PlainAggregateWindowExpr {
aggregate: Arc<AggregateFunctionExpr>,
aggregate: AggregateFunctionExpr,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Arc<WindowFrame>,
Expand All @@ -50,7 +50,7 @@ pub struct PlainAggregateWindowExpr {
impl PlainAggregateWindowExpr {
/// Create a new aggregate window function expression
pub fn new(
aggregate: Arc<AggregateFunctionExpr>,
aggregate: AggregateFunctionExpr,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
Expand All @@ -64,7 +64,7 @@ impl PlainAggregateWindowExpr {
}

/// Get aggregate expr of AggregateWindowExpr
pub fn get_aggregate_expr(&self) -> &Arc<AggregateFunctionExpr> {
pub fn get_aggregate_expr(&self) -> &AggregateFunctionExpr {
&self.aggregate
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct SlidingAggregateWindowExpr {
aggregate: Arc<AggregateFunctionExpr>,
aggregate: AggregateFunctionExpr,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Arc<WindowFrame>,
Expand All @@ -50,7 +50,7 @@ pub struct SlidingAggregateWindowExpr {
impl SlidingAggregateWindowExpr {
/// Create a new (sliding) aggregate window function expression.
pub fn new(
aggregate: Arc<AggregateFunctionExpr>,
aggregate: AggregateFunctionExpr,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
Expand All @@ -64,7 +64,7 @@ impl SlidingAggregateWindowExpr {
}

/// Get the [AggregateFunctionExpr] of this object.
pub fn get_aggregate_expr(&self) -> &Arc<AggregateFunctionExpr> {
pub fn get_aggregate_expr(&self) -> &AggregateFunctionExpr {
&self.aggregate
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {

type GroupExprsRef<'a> = (
&'a PhysicalGroupBy,
&'a [Arc<AggregateFunctionExpr>],
&'a [AggregateFunctionExpr],
&'a [Option<Arc<dyn PhysicalExpr>>],
);

Expand Down
Loading

0 comments on commit f5c47fa

Please sign in to comment.