diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 41f29c4b99052..85c4aea99ff5f 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -17,7 +17,10 @@ //! This module provides a builder for creating LogicalPlans -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use arrow::{ datatypes::{Schema, SchemaRef}, @@ -220,10 +223,7 @@ impl LogicalPlanBuilder { for e in expr { match e { Expr::Wildcard => { - (0..input_schema.fields().len()).for_each(|i| { - projected_expr - .push(Expr::Column(input_schema.field(i).qualified_column())) - }); + projected_expr.extend(expand_wildcard(input_schema, &self.plan)?) } _ => projected_expr .push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)), @@ -508,6 +508,47 @@ pub fn union_with_alias( }) } +/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. +pub(crate) fn expand_wildcard( + schema: &DFSchema, + plan: &LogicalPlan, +) -> Result> { + let using_columns = plan.using_columns()?; + let columns_to_skip = using_columns + .into_iter() + // For each USING JOIN condition, only expand to one column in projection + .map(|cols| { + let mut cols = cols.into_iter().collect::>(); + // sort join columns to make sure we consistently keep the same + // qualified column + cols.sort(); + cols.into_iter().skip(1) + }) + .flatten() + .collect::>(); + + if columns_to_skip.is_empty() { + Ok(schema + .fields() + .iter() + .map(|f| Expr::Column(f.qualified_column())) + .collect::>()) + } else { + Ok(schema + .fields() + .iter() + .filter_map(|f| { + let col = f.qualified_column(); + if !columns_to_skip.contains(&col) { + Some(Expr::Column(col)) + } else { + None + } + }) + .collect::>()) + } +} + #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field}; @@ -587,6 +628,27 @@ mod tests { Ok(()) } + #[test] + fn plan_using_join_wildcard_projection() -> Result<()> { + let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)? + .build()?; + + let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)? + .join_using(&t2, JoinType::Inner, vec!["id"])? + .project(vec![Expr::Wildcard])? + .build()?; + + // id column should only show up once in projection + let expected = "Projection: #t1.id, #t1.first_name, #t1.last_name, #t1.state, #t1.salary, #t2.first_name, #t2.last_name, #t2.state, #t2.salary\ + \n Join: Using #t1.id = #t2.id\ + \n TableScan: t1 projection=None\ + \n TableScan: t2 projection=None"; + + assert_eq!(expected, format!("{:?}", plan)); + + Ok(()) + } + #[test] fn plan_builder_union_combined_single_union() -> Result<()> { let plan = LogicalPlanBuilder::scan_empty( diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 59c99797e0cd8..2eee140f47fe5 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -34,7 +34,7 @@ use std::fmt; use std::sync::Arc; /// A named reference to a qualified field in a schema. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Column { /// relation/table name. pub relation: Option, diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index 2c751abdad349..f381e316669e4 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -21,7 +21,7 @@ //! Logical query plans can then be optimized and executed directly, or translated into //! physical query plans and executed. -mod builder; +pub(crate) mod builder; mod dfschema; mod display; mod expr; diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index b954b6a97950c..2504dfaa6f236 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -25,8 +25,8 @@ use crate::error::DataFusionError; use crate::logical_plan::dfschema::DFSchemaRef; use crate::sql::parser::FileType; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use std::collections::HashSet; use std::{ + collections::HashSet, fmt::{self, Display}, sync::Arc, }; diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index f89ba3f659c88..41b4e20f15f3d 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -27,8 +27,9 @@ use crate::datasource::TableProvider; use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ - and, col, lit, normalize_col, union_with_alias, Column, DFSchema, Expr, LogicalPlan, - LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, ToDFSchema, + and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column, + DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, + ToDFSchema, }; use crate::prelude::JoinType; use crate::scalar::ScalarValue; @@ -56,7 +57,7 @@ use sqlparser::parser::ParserError::ParserError; use super::{ parser::DFParser, utils::{ - can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases, + can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases, find_aggregate_exprs, find_column_exprs, find_window_exprs, group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs, resolve_positions_to_exprs, @@ -687,9 +688,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .iter() .map(|expr| self.sql_select_to_rex(expr, input_schema)) .collect::>>()? - .iter() - .flat_map(|expr| expand_wildcard(expr, input_schema)) - .map(|expr| normalize_col(expr, plan)) + .into_iter() + .map(|expr| { + Ok(match expr { + Expr::Wildcard => expand_wildcard(input_schema, plan)?, + _ => vec![normalize_col(expr, plan)?], + }) + }) + .flat_map(|res| match res { + Ok(v) => v.into_iter().map(Ok).collect(), + Err(e) => vec![Err(e)], + }) .collect::>>() } @@ -2773,6 +2782,19 @@ mod tests { quick_test(sql, expected); } + #[test] + fn project_wildcard_on_join_with_using() { + let sql = "SELECT * \ + FROM lineitem \ + JOIN lineitem as lineitem2 \ + USING (l_item_id)"; + let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\ + \n Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\ + \n TableScan: lineitem projection=None\ + \n TableScan: lineitem2 projection=None"; + quick_test(sql, expected); + } + #[test] fn equijoin_explicit_syntax_3_tables() { let sql = "SELECT id, order_id, l_description \ diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index 28243360c412f..41bcd205800df 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -17,7 +17,7 @@ //! SQL Utility Functions -use crate::logical_plan::{DFSchema, Expr, LogicalPlan}; +use crate::logical_plan::{Expr, LogicalPlan}; use crate::scalar::ScalarValue; use crate::{ error::{DataFusionError, Result}, @@ -25,18 +25,6 @@ use crate::{ }; use std::collections::HashMap; -/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. -pub(crate) fn expand_wildcard(expr: &Expr, schema: &DFSchema) -> Vec { - match expr { - Expr::Wildcard => schema - .fields() - .iter() - .map(|f| Expr::Column(f.qualified_column())) - .collect::>(), - _ => vec![expr.clone()], - } -} - /// Collect all deeply nested `Expr::AggregateFunction` and /// `Expr::AggregateUDF`. They are returned in order of occurrence (depth /// first), with duplicates omitted.