diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 15d325288b071..12299956cf478 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -62,9 +62,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor, -}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::TableReference; use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, @@ -85,7 +83,7 @@ use datafusion_expr::{ WindowFrameBound, WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::expressions::{Column, Literal}; +use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::{ create_physical_sort_exprs, LexOrdering, PhysicalSortExpr, }; @@ -2181,11 +2179,7 @@ impl DefaultPhysicalPlanner { let physical_expr = self.create_physical_expr(e, input_logical_schema, session_state); - // Check for possible column name mismatches - let final_physical_expr = - maybe_fix_physical_column_name(physical_expr, &input_physical_schema); - - tuple_err((final_physical_expr, physical_name)) + tuple_err((physical_expr, physical_name)) }) .collect::>>()?; @@ -2291,47 +2285,6 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } -// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names. -// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names. -// -// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'), -// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`. -fn maybe_fix_physical_column_name( - expr: Result>, - input_physical_schema: &SchemaRef, -) -> Result> { - let Ok(expr) = expr else { return expr }; - expr.transform_down(|node| { - if let Some(column) = node.as_any().downcast_ref::() { - let idx = column.index(); - let physical_field = input_physical_schema.field(idx); - let expr_col_name = column.name(); - let physical_name = physical_field.name(); - - if expr_col_name != physical_name { - // handle edge cases where the physical_name contains ':'. - let colon_count = physical_name.matches(':').count(); - let mut splits = expr_col_name.match_indices(':'); - let split_pos = splits.nth(colon_count); - - if let Some((i, _)) = split_pos { - let base_name = &expr_col_name[..i]; - if base_name == physical_name { - let updated_column = Column::new(physical_name, idx); - return Ok(Transformed::yes(Arc::new(updated_column))); - } - } - } - - // If names already match or fix is not possible, just leave it as it is - Ok(Transformed::no(node)) - } else { - Ok(Transformed::no(node)) - } - }) - .data() -} - struct OptimizationInvariantChecker<'a> { rule: &'a Arc, } @@ -2435,12 +2388,10 @@ mod tests { }; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; - use datafusion_expr::{ - col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore, - }; + use datafusion_expr::builder::subquery_alias; + use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore}; use datafusion_functions_aggregate::count::count_all; use datafusion_functions_aggregate::expr_fn::sum; - use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -3001,71 +2952,6 @@ mod tests { } } - #[tokio::test] - async fn test_maybe_fix_colon_in_physical_name() { - // The physical schema has a field name with a colon - let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]); - let schema_ref: SchemaRef = Arc::new(schema); - - // What might happen after deduplication - let logical_col_name = "metric:avg:1"; - let expr_with_suffix = - Arc::new(Column::new(logical_col_name, 0)) as Arc; - let expr_result = Ok(expr_with_suffix); - - // Call function under test - let fixed_expr = - maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap(); - - // Downcast back to Column so we can check the name - let col = fixed_expr - .as_any() - .downcast_ref::() - .expect("Column"); - - assert_eq!(col.name(), "metric:avg"); - } - - #[tokio::test] - async fn test_maybe_fix_nested_column_name_with_colon() { - let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]); - let schema_ref: SchemaRef = Arc::new(schema); - - // Construct the nested expr - let col_expr = Arc::new(Column::new("column:1", 0)) as Arc; - let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone())); - - // Create a binary expression and put the column inside - let binary_expr = Arc::new(BinaryExpr::new( - is_not_null_expr.clone(), - Operator::Or, - is_not_null_expr.clone(), - )) as Arc; - - let fixed_expr = - maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap(); - - let bin = fixed_expr - .as_any() - .downcast_ref::() - .expect("Expected BinaryExpr"); - - // Check that both sides where renamed - for expr in &[bin.left(), bin.right()] { - let is_not_null = expr - .as_any() - .downcast_ref::() - .expect("Expected IsNotNull"); - - let col = is_not_null - .arg() - .as_any() - .downcast_ref::() - .expect("Expected Column"); - - assert_eq!(col.name(), "column"); - } - } struct ErrorExtensionPlanner {} #[async_trait] @@ -3562,4 +3448,61 @@ digraph { Ok(()) } + + // Reproducer for DataFusion issue #17405: + // + // The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a` + // clause is missing from the explicit logical plan: + // + // SELECT a FROM ( + // -- SELECT left_table.a, right_table.a + // FROM left_table + // FULL JOIN right_table ON left_table.a = right_table.a + // ) AS alias + // GROUP BY a; + // + // As a result, the variables within `alias` subquery are not properly distinguished, which + // leads to a bug for logical and physical planning. + // + // The fix is to implicitly insert a Projection node to represent the missing SELECT clause to + // ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added. + #[tokio::test] + async fn subquery_alias_confusing_the_optimizer() -> Result<()> { + let state = make_session_state(); + + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema = Arc::new(schema); + + let table = MemTable::try_new(schema.clone(), vec![vec![]])?; + let table = Arc::new(table); + + let source = DefaultTableSource::new(table); + let source = Arc::new(source); + + let left = LogicalPlanBuilder::scan("left", source.clone(), None)?; + let right = LogicalPlanBuilder::scan("right", source, None)?.build()?; + + let join_keys = ( + vec![datafusion_common::Column::new(Some("left"), "a")], + vec![datafusion_common::Column::new(Some("right"), "a")], + ); + + let join = left.join(right, JoinType::Full, join_keys, None)?.build()?; + + let alias = subquery_alias(join, "alias")?; + + let planner = DefaultPhysicalPlanner::default(); + + let logical_plan = LogicalPlanBuilder::new(alias) + .aggregate(vec![col("a:1")], Vec::::new())? + .build()?; + let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?; + + let optimized_logical_plan = state.optimize(&logical_plan)?; + let _optimized_physical_plan = planner + .create_physical_plan(&optimized_logical_plan, &state) + .await?; + + Ok(()) + } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 511d8c27a5c44..ac699b9d30c3c 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -18,6 +18,7 @@ //! This module provides a builder for creating LogicalPlans use std::any::Any; +use std::borrow::Cow; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::iter::once; @@ -1517,37 +1518,49 @@ impl ValuesFields { } } -// `name_map` tracks a mapping between a field name and the number of appearances of that field. -// -// Some field names might already come to this function with the count (number of times it appeared) -// as a suffix e.g. id:1, so there's still a chance of name collisions, for example, -// if these three fields passed to this function: "col:1", "col" and "col", the function -// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema. -// that's why we need the `seen` set, so the fields are always unique. -// -pub fn change_redundant_column(fields: &Fields) -> Vec { - let mut name_map = HashMap::new(); - let mut seen: HashSet = HashSet::new(); +/// Returns aliases to make field names unique. +/// +/// Returns a vector of optional aliases, one per input field. `None` means keep the original name, +/// `Some(alias)` means rename to the alias to ensure uniqueness. +/// +/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need +/// to maintain unique column names. +/// +/// # Example +/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers) +/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]` +pub fn unique_field_aliases(fields: &Fields) -> Vec> { + // Some field names might already come to this function with the count (number of times it appeared) + // as a suffix e.g. id:1, so there's still a chance of name collisions, for example, + // if these three fields passed to this function: "col:1", "col" and "col", the function + // would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema. + // That's why we need the `seen` set, so the fields are always unique. + + // Tracks a mapping between a field name and the number of appearances of that field. + let mut name_map = HashMap::<&str, usize>::new(); + // Tracks all the fields and aliases that were previously seen. + let mut seen = HashSet::>::new(); fields - .into_iter() + .iter() .map(|field| { - let base_name = field.name(); - let count = name_map.entry(base_name.clone()).or_insert(0); - let mut new_name = base_name.clone(); + let original_name = field.name(); + let mut name = Cow::Borrowed(original_name); + + let count = name_map.entry(original_name).or_insert(0); - // Loop until we find a name that hasn't been used - while seen.contains(&new_name) { + // Loop until we find a name that hasn't been used. + while seen.contains(&name) { *count += 1; - new_name = format!("{base_name}:{count}"); + name = Cow::Owned(format!("{original_name}:{count}")); } - seen.insert(new_name.clone()); + seen.insert(name.clone()); - let mut modified_field = - Field::new(&new_name, field.data_type().clone(), field.is_nullable()); - modified_field.set_metadata(field.metadata().clone()); - modified_field + match name { + Cow::Borrowed(_) => None, + Cow::Owned(alias) => Some(alias), + } }) .collect() } @@ -2675,34 +2688,6 @@ mod tests { Ok(()) } - #[test] - fn test_change_redundant_column() -> Result<()> { - let t1_field_1 = Field::new("a", DataType::Int32, false); - let t2_field_1 = Field::new("a", DataType::Int32, false); - let t2_field_3 = Field::new("a", DataType::Int32, false); - let t2_field_4 = Field::new("a:1", DataType::Int32, false); - let t1_field_2 = Field::new("b", DataType::Int32, false); - let t2_field_2 = Field::new("b", DataType::Int32, false); - - let field_vec = vec![ - t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4, - ]; - let remove_redundant = change_redundant_column(&Fields::from(field_vec)); - - assert_eq!( - remove_redundant, - vec![ - Field::new("a", DataType::Int32, false), - Field::new("a:1", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("b:1", DataType::Int32, false), - Field::new("a:2", DataType::Int32, false), - Field::new("a:1:1", DataType::Int32, false), - ] - ); - Ok(()) - } - #[test] fn plan_builder_from_logical_plan() -> Result<()> { let plan = @@ -2787,4 +2772,39 @@ mod tests { Ok(()) } + + #[test] + fn test_unique_field_aliases() { + let t1_field_1 = Field::new("a", DataType::Int32, false); + let t2_field_1 = Field::new("a", DataType::Int32, false); + let t2_field_3 = Field::new("a", DataType::Int32, false); + let t2_field_4 = Field::new("a:1", DataType::Int32, false); + let t1_field_2 = Field::new("b", DataType::Int32, false); + let t2_field_2 = Field::new("b", DataType::Int32, false); + + let fields = vec![ + t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4, + ]; + let fields = Fields::from(fields); + + let remove_redundant = unique_field_aliases(&fields); + + // Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1] + // First occurrence of each field name keeps original name (None), duplicates get + // incremental suffixes (:1, :2, etc.). + // Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later + // conflicts with the last column which is _actually_ called `a:1` so we need to rename it + // as well to `a:1:1`. + assert_eq!( + remove_redundant, + vec![ + None, + Some("a:1".to_string()), + None, + Some("b:1".to_string()), + Some("a:2".to_string()), + Some("a:1:1".to_string()), + ] + ); + } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 7dc750a35c0ec..fbeb46ec180d6 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -29,9 +29,9 @@ use super::invariants::{ InvariantLevel, }; use super::DdlStatement; -use crate::builder::{change_redundant_column, unnest_with_options}; +use crate::builder::{unique_field_aliases, unnest_with_options}; use crate::expr::{ - intersect_metadata_for_union, Placeholder, Sort as SortExpr, WindowFunction, + intersect_metadata_for_union, Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams, }; use crate::expr_rewriter::{ @@ -2239,13 +2239,45 @@ impl SubqueryAlias { alias: impl Into, ) -> Result { let alias = alias.into(); - let fields = change_redundant_column(plan.schema().fields()); - let meta_data = plan.schema().as_ref().metadata().clone(); - let schema: Schema = - DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into(); - // Since schema is the same, other than qualifier, we can use existing - // functional dependencies: + + // Since SubqueryAlias will replace all field qualification for the output schema of `plan`, + // no field must share the same column name as this would lead to ambiguity when referencing + // columns in parent logical nodes. + + // Compute unique aliases, if any, for each column of the input's schema. + let aliases = unique_field_aliases(plan.schema().fields()); + let is_projection_needed = aliases.iter().any(Option::is_some); + + // Insert a projection node, if needed, to make sure aliases are applied. + let plan = if is_projection_needed { + let projection_expressions = aliases + .iter() + .zip(plan.schema().iter()) + .map(|(alias, (qualifier, field))| { + let column = + Expr::Column(Column::new(qualifier.cloned(), field.name())); + match alias { + None => column, + Some(alias) => { + Expr::Alias(Alias::new(column, qualifier.cloned(), alias)) + } + } + }) + .collect(); + let projection = Projection::try_new(projection_expressions, plan)?; + Arc::new(LogicalPlan::Projection(projection)) + } else { + plan + }; + + // Requalify fields with the new `alias`. + let fields = plan.schema().fields().clone(); + let meta_data = plan.schema().metadata().clone(); let func_dependencies = plan.schema().functional_dependencies().clone(); + + let schema = DFSchema::from_unqualified_fields(fields, meta_data)?; + let schema = Schema::from(schema); + let schema = DFSchemaRef::new( DFSchema::try_from_qualified_schema(alias.clone(), &schema)? .with_functional_dependencies(func_dependencies)?, diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs b/datafusion/substrait/tests/cases/consumer_integration.rs index 6ea0de9379f69..a92fc2957cae3 100644 --- a/datafusion/substrait/tests/cases/consumer_integration.rs +++ b/datafusion/substrait/tests/cases/consumer_integration.rs @@ -605,26 +605,30 @@ mod tests { #[tokio::test] async fn test_multiple_joins() -> Result<()> { let plan_str = test_plan_to_string("multiple_joins.json").await?; - assert_eq!( + assert_snapshot!( plan_str, - "Projection: left.count(Int64(1)) AS count_first, left.category, left.count(Int64(1)):1 AS count_second, right.count(Int64(1)) AS count_third\ - \n Left Join: left.id = right.id\ - \n SubqueryAlias: left\ - \n Left Join: left.id = right.id\ - \n SubqueryAlias: left\ - \n Left Join: left.id = right.id\ - \n SubqueryAlias: left\ - \n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\ - \n Values: (Int64(1)), (Int64(2))\ - \n SubqueryAlias: right\ - \n Aggregate: groupBy=[[id, category]], aggr=[[]]\ - \n Values: (Int64(1), Utf8(\"info\")), (Int64(2), Utf8(\"low\"))\ - \n SubqueryAlias: right\ - \n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\ - \n Values: (Int64(1)), (Int64(2))\ - \n SubqueryAlias: right\ - \n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\ - \n Values: (Int64(1)), (Int64(2))" + @r#" + Projection: left.count(Int64(1)) AS count_first, left.category, left.count(Int64(1)):1 AS count_second, right.count(Int64(1)) AS count_third + Left Join: left.id = right.id + SubqueryAlias: left + Projection: left.id, left.count(Int64(1)), left.id:1, left.category, right.id AS id:2, right.count(Int64(1)) AS count(Int64(1)):1 + Left Join: left.id = right.id + SubqueryAlias: left + Projection: left.id, left.count(Int64(1)), right.id AS id:1, right.category + Left Join: left.id = right.id + SubqueryAlias: left + Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]] + Values: (Int64(1)), (Int64(2)) + SubqueryAlias: right + Aggregate: groupBy=[[id, category]], aggr=[[]] + Values: (Int64(1), Utf8("info")), (Int64(2), Utf8("low")) + SubqueryAlias: right + Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]] + Values: (Int64(1)), (Int64(2)) + SubqueryAlias: right + Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]] + Values: (Int64(1)), (Int64(2)) + "# ); Ok(()) }