Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use datafusion_expr::{
WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::InvariantLevel;
Expand Down Expand Up @@ -2006,7 +2006,8 @@ impl DefaultPhysicalPlanner {
input: &Arc<LogicalPlan>,
expr: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let input_schema = input.as_ref().schema();
let input_logical_schema = input.as_ref().schema();
let input_physical_schema = input_exec.schema();
let physical_exprs = expr
.iter()
.map(|e| {
Expand All @@ -2025,7 +2026,7 @@ impl DefaultPhysicalPlanner {
// This depends on the invariant that logical schema field index MUST match
// with physical schema field index.
let physical_name = if let Expr::Column(col) = e {
match input_schema.index_of_column(col) {
match input_logical_schema.index_of_column(col) {
Ok(idx) => {
// index physical field using logical field index
Ok(input_exec.schema().field(idx).name().to_string())
Expand All @@ -2038,10 +2039,14 @@ impl DefaultPhysicalPlanner {
physical_name(e)
};

tuple_err((
self.create_physical_expr(e, input_schema, session_state),
physical_name,
))
let physical_expr =
self.create_physical_expr(e, input_logical_schema, session_state);

// Check for possible column name mismatches
let final_physical_expr =
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);

tuple_err((final_physical_expr, physical_name))
})
.collect::<Result<Vec<_>>>()?;

Expand All @@ -2061,6 +2066,35 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
}
}

// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
//
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
fn maybe_fix_physical_column_name(
expr: Result<Arc<dyn PhysicalExpr>>,
input_physical_schema: &SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>> {
Comment on lines +2069 to +2077
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rename happens here in the logical plan builder to avoid errors while building the logical plan.

if let Ok(e) = &expr {
if let Some(column) = e.as_any().downcast_ref::<Column>() {
let physical_field = input_physical_schema.field(column.index());
let expr_col_name = column.name();
let physical_name = physical_field.name();

if physical_name != expr_col_name {
if let Some(idx) = expr_col_name.find(':') {
let base_name = &expr_col_name[..idx];
if base_name == physical_name {
let updated_column = Column::new(physical_name, column.index());
return Ok(Arc::new(updated_column));
}
}
}
}
}
expr
}

struct OptimizationInvariantChecker<'a> {
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::PhysicalExpr;

use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_common::{internal_err, Result};

/// Stores the mapping between source expressions and target expressions for a
/// projection.
Expand Down Expand Up @@ -66,8 +66,8 @@ impl ProjectionMapping {
let idx = col.index();
let matching_input_field = input_schema.field(idx);
if col.name() != matching_input_field.name() {
let fixed_col = Column::new(col.name(), idx);
return Ok(Transformed::yes(Arc::new(fixed_col)));
return internal_err!("Input field name {} does not match with the projection expression {}",
matching_input_field.name(),col.name())
}
let matching_input_column =
Column::new(matching_input_field.name(), idx);
Expand Down