Skip to content

Commit

Permalink
Improve CombinePartialFinalAggregate code (#12128)
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Aug 24, 2024
1 parent 8fd9d69 commit 2b6341c
Showing 1 changed file with 51 additions and 56 deletions.
107 changes: 51 additions & 56 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,62 +51,57 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(|plan| {
let transformed =
plan.as_any()
.downcast_ref::<AggregateExec>()
.and_then(|agg_exec| {
if matches!(
agg_exec.mode(),
AggregateMode::Final | AggregateMode::FinalPartitioned
) {
agg_exec
.input()
.as_any()
.downcast_ref::<AggregateExec>()
.and_then(|input_agg_exec| {
if matches!(
input_agg_exec.mode(),
AggregateMode::Partial
) && can_combine(
(
agg_exec.group_expr(),
agg_exec.aggr_expr(),
agg_exec.filter_expr(),
),
(
input_agg_exec.group_expr(),
input_agg_exec.aggr_expr(),
input_agg_exec.filter_expr(),
),
) {
let mode =
if agg_exec.mode() == &AggregateMode::Final {
AggregateMode::Single
} else {
AggregateMode::SinglePartitioned
};
AggregateExec::try_new(
mode,
input_agg_exec.group_expr().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
input_agg_exec.input().clone(),
input_agg_exec.input_schema(),
)
.map(|combined_agg| {
combined_agg.with_limit(agg_exec.limit())
})
.ok()
.map(Arc::new)
} else {
None
}
})
} else {
None
}
});

// Check if the plan is AggregateExec
let Some(agg_exec) = plan.as_any().downcast_ref::<AggregateExec>() else {
return Ok(Transformed::no(plan));
};

if !matches!(
agg_exec.mode(),
AggregateMode::Final | AggregateMode::FinalPartitioned
) {
return Ok(Transformed::no(plan));
}

// Check if the input is AggregateExec
let Some(input_agg_exec) =
agg_exec.input().as_any().downcast_ref::<AggregateExec>()
else {
return Ok(Transformed::no(plan));
};

let transformed = if matches!(input_agg_exec.mode(), AggregateMode::Partial)
&& can_combine(
(
agg_exec.group_expr(),
agg_exec.aggr_expr(),
agg_exec.filter_expr(),
),
(
input_agg_exec.group_expr(),
input_agg_exec.aggr_expr(),
input_agg_exec.filter_expr(),
),
) {
let mode = if agg_exec.mode() == &AggregateMode::Final {
AggregateMode::Single
} else {
AggregateMode::SinglePartitioned
};
AggregateExec::try_new(
mode,
input_agg_exec.group_expr().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
input_agg_exec.input().clone(),
input_agg_exec.input_schema(),
)
.map(|combined_agg| combined_agg.with_limit(agg_exec.limit()))
.ok()
.map(Arc::new)
} else {
None
};
Ok(if let Some(transformed) = transformed {
Transformed::yes(transformed)
} else {
Expand Down

0 comments on commit 2b6341c

Please sign in to comment.