Skip to content

Commit

Permalink
Use schema_name to create the physical_name (#11977)
Browse files Browse the repository at this point in the history
More consistency and less opportunity for column name mismatch.
  • Loading branch information
joroKr21 committed Aug 21, 2024
1 parent 846befb commit 1c7209b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 273 deletions.
13 changes: 3 additions & 10 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, create_function_physical_name, physical_name, AggregateFunction, Alias,
GroupingSet, WindowFunction,
self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
Expand Down Expand Up @@ -1569,12 +1568,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
let name = if let Some(name) = name {
name
} else {
create_function_physical_name(
func.name(),
*distinct,
args,
order_by.as_ref(),
)?
physical_name(e)?
};

let physical_args =
Expand All @@ -1588,8 +1582,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
None => None,
};

let ignore_nulls = null_treatment
.unwrap_or(sqlparser::ast::NullTreatment::RespectNulls)
let ignore_nulls = null_treatment.unwrap_or(NullTreatment::RespectNulls)
== NullTreatment::IgnoreNulls;

let (agg_expr, filter, order_by) = {
Expand Down
272 changes: 12 additions & 260 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue,
TableReference,
plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
};
use sqlparser::ast::{
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
Expand Down Expand Up @@ -1082,7 +1081,7 @@ impl Expr {
/// For example, for a projection (e.g. `SELECT <expr>`) the resulting arrow
/// [`Schema`] will have a field with this name.
///
/// Note that the resulting string is subtlety different than the `Display`
/// Note that the resulting string is subtlety different from the `Display`
/// representation for certain `Expr`. Some differences:
///
/// 1. [`Expr::Alias`], which shows only the alias itself
Expand All @@ -1104,6 +1103,7 @@ impl Expr {
}

/// Returns a full and complete string representation of this expression.
#[deprecated(note = "use format! instead")]
pub fn canonical_name(&self) -> String {
format!("{self}")
}
Expand Down Expand Up @@ -2386,263 +2386,13 @@ fn fmt_function(
write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
}

pub fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};

let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));

Ok(order_by
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
.unwrap_or(phys_name))
}

pub fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(Alias { name, .. }) => Ok(name.clone()),
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{value:?}")),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
}
Expr::Case(case) => {
let mut name = "CASE ".to_string();
if let Some(e) = &case.expr {
let _ = write!(name, "{} ", create_physical_name(e, false)?);
}
for (w, t) in &case.when_then_expr {
let _ = write!(
name,
"WHEN {} THEN {} ",
create_physical_name(w, false)?,
create_physical_name(t, false)?
);
}
if let Some(e) = &case.else_expr {
let _ = write!(name, "ELSE {} ", create_physical_name(e, false)?);
}
name += "END";
Ok(name)
}
Expr::Cast(Cast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::Not(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {expr}"))
}
Expr::Negative(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {expr})"))
}
Expr::IsNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NULL"))
}
Expr::IsNotNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT NULL"))
}
Expr::IsTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS TRUE"))
}
Expr::IsFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS FALSE"))
}
Expr::IsUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS UNKNOWN"))
}
Expr::IsNotTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT TRUE"))
}
Expr::IsNotFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT FALSE"))
}
Expr::IsNotUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT UNKNOWN"))
}
Expr::ScalarFunction(fun) => fun.func.schema_name(&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
args,
order_by,
..
}) => {
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func,
distinct,
args,
filter: _,
order_by,
null_treatment: _,
}) => {
create_function_physical_name(func.name(), *distinct, args, order_by.as_ref())
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
"ROLLUP ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::Cube(exprs) => Ok(format!(
"CUBE ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut strings = vec![];
for exprs in lists_of_exprs {
let exprs_str = exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ");
strings.push(format!("({exprs_str})"));
}
Ok(format!("GROUPING SETS ({})", strings.join(", ")))
}
},

Expr::InList(InList {
expr,
list,
negated,
}) => {
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{expr} NOT IN ({list:?})"))
} else {
Ok(format!("{expr} IN ({list:?})"))
}
}
Expr::Exists { .. } => {
not_impl_err!("EXISTS is not yet supported in the physical plan")
}
Expr::InSubquery(_) => {
not_impl_err!("IN subquery is not yet supported in the physical plan")
}
Expr::ScalarSubquery(_) => {
not_impl_err!("Scalar subqueries are not yet supported in the physical plan")
}
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let expr = create_physical_name(expr, false)?;
let low = create_physical_name(low, false)?;
let high = create_physical_name(high, false)?;
if *negated {
Ok(format!("{expr} NOT BETWEEN {low} AND {high}"))
} else {
Ok(format!("{expr} BETWEEN {low} AND {high}"))
}
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" };
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT {op_name} {pattern}{escape}"))
} else {
Ok(format!("{expr} {op_name} {pattern}{escape}"))
}
}
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive: _,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}"))
} else {
Ok(format!("{expr} SIMILAR TO {pattern}{escape}"))
}
}
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { qualifier, options } => match qualifier {
Some(qualifier) => Ok(format!("{}.*{}", qualifier, options)),
None => Ok(format!("*{}", options)),
},
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expr::OuterReferenceColumn(_, _) => {
internal_err!("Create physical name does not support OuterReferenceColumn")
}
/// The name of the column (field) that this `Expr` will produce in the physical plan.
/// The difference from [Expr::schema_name] is that top-level columns are unqualified.
pub fn physical_name(expr: &Expr) -> Result<String> {
if let Expr::Column(col) = expr {
Ok(col.name.clone())
} else {
Ok(expr.schema_name().to_string())
}
}

Expand All @@ -2658,6 +2408,7 @@ mod test {
use std::any::Any;

#[test]
#[allow(deprecated)]
fn format_case_when() -> Result<()> {
let expr = case(col("a"))
.when(lit(1), lit(true))
Expand All @@ -2670,6 +2421,7 @@ mod test {
}

#[test]
#[allow(deprecated)]
fn format_cast() -> Result<()> {
let expr = Expr::Cast(Cast {
expr: Box::new(Expr::Literal(ScalarValue::Float32(Some(1.23)))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::ScalarValue;
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_expr::expr::create_function_physical_name;
use datafusion_expr::AggregateUDF;
use datafusion_expr::ReversedUDAF;
use datafusion_expr_common::accumulator::Accumulator;
Expand Down Expand Up @@ -112,8 +111,7 @@ impl AggregateExprBuilder {
let data_type = fun.return_type(&input_exprs_types)?;
let is_nullable = fun.is_nullable();
let name = match alias {
// TODO: Ideally, we should build the name from physical expressions
None => create_function_physical_name(fun.name(), is_distinct, &[], None)?,
None => return internal_err!("alias should be provided"),
Some(alias) => alias,
};

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,7 @@ mod tests {
.map(|order_by_expr| {
let ordering_req = order_by_expr.unwrap_or_default();
AggregateExprBuilder::new(array_agg_udaf(), vec![Arc::clone(col_a)])
.alias("a")
.order_by(ordering_req.to_vec())
.schema(Arc::clone(&test_schema))
.build()
Expand Down

0 comments on commit 1c7209b

Please sign in to comment.