Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: schema error when parsing order-by expressions #10234

Merged
merged 8 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context,
// Numeric literals in window function ORDER BY are treated as constants
false,
None,
)?;

let func_deps = schema.functional_dependencies();
Expand Down Expand Up @@ -219,8 +220,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} else {
// User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
let order_by =
self.order_by_to_sort_expr(&order_by, schema, planner_context, true)?;
let order_by = self.order_by_to_sort_expr(
&order_by,
schema,
planner_context,
true,
None,
)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema, planner_context)?;
// TODO: Support filter and distinct for UDAFs
Expand All @@ -236,8 +242,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let order_by =
self.order_by_to_sort_expr(&order_by, schema, planner_context, true)?;
let order_by = self.order_by_to_sort_expr(
&order_by,
schema,
planner_context,
true,
None,
)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema, planner_context)?;
let filter: Option<Box<Expr>> = filter
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema,
planner_context,
true,
None,
)?)
} else {
None
Expand Down
43 changes: 36 additions & 7 deletions datafusion/sql/src/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,39 @@ use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Convert sql [OrderByExpr] to `Vec<Expr>`.
///
/// If `literal_to_column` is true, treat any numeric literals (e.g. `2`) as a 1 based index
/// into the SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`).
/// `input_schema` and `additional_schema` are used to resolve column references in the order-by expressions.
/// `input_schema` is the schema of the input logical plan, typically derived from the SELECT list.
///
/// Usually order-by expressions can only reference the input plan's columns.
/// But the `SELECT ... FROM ... ORDER BY ...` syntax is a special case. Besides the input schema,
/// it can reference an `additional_schema` derived from the `FROM` clause.
///
/// If `literal_to_column` is true, treat any numeric literals (e.g. `2`) as a 1 based index into the
/// SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`). Literals only reference the `input_schema`.
///
/// If false, interpret numeric literals as constant values.
pub(crate) fn order_by_to_sort_expr(
&self,
exprs: &[OrderByExpr],
schema: &DFSchema,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
literal_to_column: bool,
additional_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
if exprs.is_empty() {
return Ok(vec![]);
}

let mut combined_schema;
let order_by_schema = match additional_schema {
Some(schema) => {
combined_schema = input_schema.clone();
combined_schema.merge(schema);
&combined_schema
}
None => input_schema,
};

let mut expr_vec = vec![];
for e in exprs {
let OrderByExpr {
Expand All @@ -52,17 +75,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return plan_err!(
"Order by index starts at 1 for column indexes"
);
} else if schema.fields().len() < field_index {
} else if input_schema.fields().len() < field_index {
return plan_err!(
"Order by column out of bounds, specified: {}, max: {}",
field_index,
schema.fields().len()
input_schema.fields().len()
);
}

Expr::Column(Column::from(schema.qualified_field(field_index - 1)))
Expr::Column(Column::from(
input_schema.qualified_field(field_index - 1),
))
}
e => self.sql_expr_to_logical_expr(e.clone(), schema, planner_context)?,
e => self.sql_expr_to_logical_expr(
e.clone(),
order_by_schema,
planner_context,
)?,
};
let asc = asc.unwrap_or(true);
expr_vec.push(Expr::Sort(Sort::new(
Expand Down
79 changes: 51 additions & 28 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion_expr::{
Operator,
};
use sqlparser::ast::{
Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, Value,
Expr as SQLExpr, Offset as SQLOffset, Query, SelectInto, SetExpr, Value,
};

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand All @@ -46,29 +46,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let mut set_expr = query.body;
if let Some(with) = query.with {
self.plan_with_clause(with, planner_context)?;
}
// Take the `SelectInto` for later processing.
let select_into = match set_expr.as_mut() {
SetExpr::Select(select) => select.into.take(),
_ => None,
};
let plan = self.set_expr_to_plan(*set_expr, planner_context)?;
let plan = self.order_by(plan, query.order_by, planner_context)?;
let mut plan = self.limit(plan, query.offset, query.limit)?;
if let Some(into) = select_into {
plan = LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
column_defaults: vec![],
}))

let set_expr = *query.body;
match set_expr {
SetExpr::Select(mut select) => {
let select_into = select.into.take();
// Order-by expressions may refer to columns in the `FROM` clause,
// so we need to process `SELECT` and `ORDER BY` together.
let plan =
self.select_to_plan(*select, query.order_by, planner_context)?;
let plan = self.limit(plan, query.offset, query.limit)?;
// Process the `SELECT INTO` after `LIMIT`.
self.select_into(plan, select_into)
}
other => {
let plan = self.set_expr_to_plan(other, planner_context)?;
let order_by_rex = self.order_by_to_sort_expr(
&query.order_by,
plan.schema(),
planner_context,
true,
None,
)?;
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, query.offset, query.limit)
}
}
Ok(plan)
}

/// Wrap a plan in a limit
Expand Down Expand Up @@ -114,26 +120,43 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

/// Wrap the logical in a sort
fn order_by(
pub(super) fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<OrderByExpr>,
planner_context: &mut PlannerContext,
order_by: Vec<Expr>,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}

let order_by_rex =
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context, true)?;

if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
// In case of `DISTINCT ON` we must capture the sort expressions since during the plan
// optimization we're effectively doing a `first_value` aggregation according to them.
let distinct_on = distinct_on.clone().with_sort_expr(order_by_rex)?;
let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
} else {
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
LogicalPlanBuilder::from(plan).sort(order_by)?.build()
}
}

/// Wrap the logical plan in a `SelectInto`
fn select_into(
&self,
plan: LogicalPlan,
select_into: Option<SelectInto>,
) -> Result<LogicalPlan> {
match select_into {
Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
column_defaults: vec![],
},
))),
_ => Ok(plan),
}
}
}
Expand Down
24 changes: 19 additions & 5 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::{Alias, Unnest};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols,
};
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
Expand All @@ -39,8 +39,8 @@ use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, ReplaceSelectItem, WildcardAdditionalOptions,
WindowType,
Distinct, Expr as SQLExpr, GroupByExpr, OrderByExpr, ReplaceSelectItem,
WildcardAdditionalOptions, WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};

Expand All @@ -49,6 +49,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn select_to_plan(
&self,
mut select: Select,
order_by: Vec<OrderByExpr>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
// check for unsupported syntax first
Expand Down Expand Up @@ -94,6 +95,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut combined_schema = base_plan.schema().as_ref().clone();
combined_schema.merge(projected_plan.schema());

// Order-by expressions prioritize referencing columns from the select list,
// then from the FROM clause.
let order_by_rex = self.order_by_to_sort_expr(
&order_by,
projected_plan.schema().as_ref(),
planner_context,
true,
Some(base_plan.schema().as_ref()),
)?;
let order_by_rex = normalize_cols(order_by_rex, &projected_plan)?;

// this alias map is resolved and looked up in both having exprs and group by exprs
let alias_map = extract_aliases(&select_exprs);

Expand Down Expand Up @@ -248,9 +260,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<Vec<_>>>()?;

// Build the final plan
return LogicalPlanBuilder::from(base_plan)
LogicalPlanBuilder::from(base_plan)
.distinct_on(on_expr, select_exprs, None)?
.build();
.build()
}
}?;

Expand All @@ -274,6 +286,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan
};

let plan = self.order_by(plan, order_by_rex)?;

Ok(plan)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(*s, planner_context),
SetExpr::Select(s) => self.select_to_plan(*s, vec![], planner_context),
SetExpr::Values(v) => self.sql_values_to_plan(v, planner_context),
SetExpr::SetOperation {
op,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
for expr in order_exprs {
// Convert each OrderByExpr to a SortExpr:
let expr_vec =
self.order_by_to_sort_expr(&expr, schema, planner_context, true)?;
self.order_by_to_sort_expr(&expr, schema, planner_context, true, None)?;
// Verify that columns of all SortExprs exist in the schema:
for expr in expr_vec.iter() {
for column in expr.to_columns()?.iter() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3646,7 +3646,7 @@ fn test_select_distinct_order_by() {
let sql = "SELECT distinct '1' from person order by id";

let expected =
"Error during planning: For SELECT DISTINCT, ORDER BY expressions id must appear in select list";
"Error during planning: For SELECT DISTINCT, ORDER BY expressions person.id must appear in select list";

// It should return error.
let result = logical_plan(sql);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2010,7 +2010,7 @@ set datafusion.explain.logical_plan_only = false;
statement ok
set datafusion.execution.target_partitions = 4;

# Planning inner nested loop join
# Planning inner nested loop join
# inputs are swapped due to inexact statistics + join reordering caused additional projection

query TT
Expand Down
Loading