@@ -81,7 +81,7 @@ use datafusion_expr::{
8181 WindowFrameBound , WriteOp ,
8282} ;
8383use datafusion_physical_expr:: aggregate:: { AggregateExprBuilder , AggregateFunctionExpr } ;
84- use datafusion_physical_expr:: expressions:: Literal ;
84+ use datafusion_physical_expr:: expressions:: { Column , Literal } ;
8585use datafusion_physical_expr:: LexOrdering ;
8686use datafusion_physical_optimizer:: PhysicalOptimizerRule ;
8787use datafusion_physical_plan:: execution_plan:: InvariantLevel ;
@@ -2000,7 +2000,8 @@ impl DefaultPhysicalPlanner {
20002000 input : & Arc < LogicalPlan > ,
20012001 expr : & [ Expr ] ,
20022002 ) -> Result < Arc < dyn ExecutionPlan > > {
2003- let input_schema = input. as_ref ( ) . schema ( ) ;
2003+ let input_logical_schema = input. as_ref ( ) . schema ( ) ;
2004+ let input_physical_schema = input_exec. schema ( ) ;
20042005 let physical_exprs = expr
20052006 . iter ( )
20062007 . map ( |e| {
@@ -2019,7 +2020,7 @@ impl DefaultPhysicalPlanner {
20192020 // This depends on the invariant that logical schema field index MUST match
20202021 // with physical schema field index.
20212022 let physical_name = if let Expr :: Column ( col) = e {
2022- match input_schema . index_of_column ( col) {
2023+ match input_logical_schema . index_of_column ( col) {
20232024 Ok ( idx) => {
20242025 // index physical field using logical field index
20252026 Ok ( input_exec. schema ( ) . field ( idx) . name ( ) . to_string ( ) )
@@ -2032,10 +2033,14 @@ impl DefaultPhysicalPlanner {
20322033 physical_name ( e)
20332034 } ;
20342035
2035- tuple_err ( (
2036- self . create_physical_expr ( e, input_schema, session_state) ,
2037- physical_name,
2038- ) )
2036+ let physical_expr =
2037+ self . create_physical_expr ( e, input_logical_schema, session_state) ;
2038+
2039+ // Check for possible column name mismatches
2040+ let final_physical_expr =
2041+ maybe_fix_physical_column_name ( physical_expr, & input_physical_schema) ;
2042+
2043+ tuple_err ( ( final_physical_expr, physical_name) )
20392044 } )
20402045 . collect :: < Result < Vec < _ > > > ( ) ?;
20412046
@@ -2055,6 +2060,40 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
20552060 }
20562061}
20572062
2063+ // Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2064+ // Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2065+ //
2066+ // This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2067+ // to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2068+ fn maybe_fix_physical_column_name (
2069+ expr : Result < Arc < dyn PhysicalExpr > > ,
2070+ input_physical_schema : & SchemaRef ,
2071+ ) -> Result < Arc < dyn PhysicalExpr > > {
2072+ if let Ok ( e) = & expr {
2073+ if let Some ( column) = e. as_any ( ) . downcast_ref :: < Column > ( ) {
2074+ let physical_field = input_physical_schema. field ( column. index ( ) ) ;
2075+ let expr_col_name = column. name ( ) ;
2076+ let physical_name = physical_field. name ( ) ;
2077+
2078+ if physical_name != expr_col_name {
2079+ // handle edge cases where the physical_name contains ':'.
2080+ let colon_count = physical_name. matches ( ':' ) . count ( ) ;
2081+ let mut splits = expr_col_name. match_indices ( ':' ) ;
2082+ let split_pos = splits. nth ( colon_count) ;
2083+
2084+ if let Some ( ( idx, _) ) = split_pos {
2085+ let base_name = & expr_col_name[ ..idx] ;
2086+ if base_name == physical_name {
2087+ let updated_column = Column :: new ( physical_name, column. index ( ) ) ;
2088+ return Ok ( Arc :: new ( updated_column) ) ;
2089+ }
2090+ }
2091+ }
2092+ }
2093+ }
2094+ expr
2095+ }
2096+
20582097struct OptimizationInvariantChecker < ' a > {
20592098 rule : & ' a Arc < dyn PhysicalOptimizerRule + Send + Sync > ,
20602099}
@@ -2650,6 +2689,30 @@ mod tests {
26502689 }
26512690 }
26522691
2692+ #[ tokio:: test]
2693+ async fn test_maybe_fix_colon_in_physical_name ( ) {
2694+ // The physical schema has a field name with a colon
2695+ let schema = Schema :: new ( vec ! [ Field :: new( "metric:avg" , DataType :: Int32 , false ) ] ) ;
2696+ let schema_ref: SchemaRef = Arc :: new ( schema) ;
2697+
2698+ // What might happen after deduplication
2699+ let logical_col_name = "metric:avg:1" ;
2700+ let expr_with_suffix =
2701+ Arc :: new ( Column :: new ( logical_col_name, 0 ) ) as Arc < dyn PhysicalExpr > ;
2702+ let expr_result = Ok ( expr_with_suffix) ;
2703+
2704+ // Call function under test
2705+ let fixed_expr =
2706+ maybe_fix_physical_column_name ( expr_result, & schema_ref) . unwrap ( ) ;
2707+
2708+ // Downcast back to Column so we can check the name
2709+ let col = fixed_expr
2710+ . as_any ( )
2711+ . downcast_ref :: < Column > ( )
2712+ . expect ( "Column" ) ;
2713+
2714+ assert_eq ! ( col. name( ) , "metric:avg" ) ;
2715+ }
26532716 struct ErrorExtensionPlanner { }
26542717
26552718 #[ async_trait]
0 commit comments