@@ -81,7 +81,7 @@ use datafusion_expr::{
8181 WindowFrameBound , WriteOp ,
8282} ;
8383use datafusion_physical_expr:: aggregate:: { AggregateExprBuilder , AggregateFunctionExpr } ;
84- use datafusion_physical_expr:: expressions:: Literal ;
84+ use datafusion_physical_expr:: expressions:: { Column , Literal } ;
8585use datafusion_physical_expr:: LexOrdering ;
8686use datafusion_physical_optimizer:: PhysicalOptimizerRule ;
8787use datafusion_physical_plan:: execution_plan:: InvariantLevel ;
@@ -2006,7 +2006,8 @@ impl DefaultPhysicalPlanner {
20062006 input : & Arc < LogicalPlan > ,
20072007 expr : & [ Expr ] ,
20082008 ) -> Result < Arc < dyn ExecutionPlan > > {
2009- let input_schema = input. as_ref ( ) . schema ( ) ;
2009+ let input_logical_schema = input. as_ref ( ) . schema ( ) ;
2010+ let input_physical_schema = input_exec. schema ( ) ;
20102011 let physical_exprs = expr
20112012 . iter ( )
20122013 . map ( |e| {
@@ -2025,7 +2026,7 @@ impl DefaultPhysicalPlanner {
20252026 // This depends on the invariant that logical schema field index MUST match
20262027 // with physical schema field index.
20272028 let physical_name = if let Expr :: Column ( col) = e {
2028- match input_schema . index_of_column ( col) {
2029+ match input_logical_schema . index_of_column ( col) {
20292030 Ok ( idx) => {
20302031 // index physical field using logical field index
20312032 Ok ( input_exec. schema ( ) . field ( idx) . name ( ) . to_string ( ) )
@@ -2038,10 +2039,14 @@ impl DefaultPhysicalPlanner {
20382039 physical_name ( e)
20392040 } ;
20402041
2041- tuple_err ( (
2042- self . create_physical_expr ( e, input_schema, session_state) ,
2043- physical_name,
2044- ) )
2042+ let physical_expr =
2043+ self . create_physical_expr ( e, input_logical_schema, session_state) ;
2044+
2045+ // Check for possible column name mismatches
2046+ let final_physical_expr =
2047+ maybe_fix_physical_column_name ( physical_expr, & input_physical_schema) ;
2048+
2049+ tuple_err ( ( final_physical_expr, physical_name) )
20452050 } )
20462051 . collect :: < Result < Vec < _ > > > ( ) ?;
20472052
@@ -2061,6 +2066,35 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
20612066 }
20622067}
20632068
2069+ // Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2070+ // Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2071+ //
2072+ // This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2073+ // to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2074+ fn maybe_fix_physical_column_name (
2075+ expr : Result < Arc < dyn PhysicalExpr > > ,
2076+ input_physical_schema : & SchemaRef ,
2077+ ) -> Result < Arc < dyn PhysicalExpr > > {
2078+ if let Ok ( e) = & expr {
2079+ if let Some ( column) = e. as_any ( ) . downcast_ref :: < Column > ( ) {
2080+ let physical_field = input_physical_schema. field ( column. index ( ) ) ;
2081+ let expr_col_name = column. name ( ) ;
2082+ let physical_name = physical_field. name ( ) ;
2083+
2084+ if physical_name != expr_col_name {
2085+ if let Some ( idx) = expr_col_name. find ( ':' ) {
2086+ let base_name = & expr_col_name[ ..idx] ;
2087+ if base_name == physical_name {
2088+ let updated_column = Column :: new ( physical_name, column. index ( ) ) ;
2089+ return Ok ( Arc :: new ( updated_column) ) ;
2090+ }
2091+ }
2092+ }
2093+ }
2094+ }
2095+ expr
2096+ }
2097+
20642098struct OptimizationInvariantChecker < ' a > {
20652099 rule : & ' a Arc < dyn PhysicalOptimizerRule + Send + Sync > ,
20662100}
0 commit comments