Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move sql_compound_identifier_to_expr to ExprPlanner #11487

Merged
merged 20 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::sync::Arc;

use arrow::datatypes::{DataType, SchemaRef};
use arrow::datatypes::{DataType, Field, SchemaRef};
use datafusion_common::{
config::ConfigOptions, file_options::file_type::FileType, not_impl_err, DFSchema,
Result, TableReference,
Expand Down Expand Up @@ -168,6 +168,24 @@ pub trait ExprPlanner: Send + Sync {
fn plan_overlay(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}

fn plan_get_field(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
dharanad marked this conversation as resolved.
Show resolved Hide resolved
Ok(PlannerResult::Original(args))
}
/// Plans compound identifier eg `db.schema.table`.
///
/// Note:
/// Currently compound identifier for outer query schema is not supported.
///
/// Returns empty expression arguments if not possible
dharanad marked this conversation as resolved.
Show resolved Hide resolved
fn plan_compound_identifier(
&self,
_filed: &Field,
dharanad marked this conversation as resolved.
Show resolved Hide resolved
_qualifier: Option<&TableReference>,
_nested_names: &[String],
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(vec![]))
}
}

/// An operator with two arguments to plan
Expand Down
1 change: 0 additions & 1 deletion datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
nvl2(),
arrow_typeof(),
named_struct(),
get_field(),
coalesce(),
make_map(),
map(),
Expand Down
35 changes: 33 additions & 2 deletions datafusion/functions/src/core/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::DFSchema;
use arrow::datatypes::Field;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Column, DFSchema, ScalarValue, TableReference};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::planner::{ExprPlanner, PlannerResult, RawDictionaryExpr};
use datafusion_expr::Expr;
use datafusion_expr::{lit, Expr};

use super::named_struct;

Expand Down Expand Up @@ -62,4 +63,34 @@ impl ExprPlanner for CoreFunctionPlanner {
ScalarFunction::new_udf(crate::string::overlay(), args),
)))
}

fn plan_compound_identifier(
&self,
field: &Field,
qualifier: Option<&TableReference>,
nested_names: &[String],
) -> Result<PlannerResult<Vec<Expr>>> {
// found matching field with no spare identifier(s)
if nested_names.is_empty() {
Ok(PlannerResult::Planned(Expr::Column(Column::from((
qualifier, field,
)))))
} else {
// found matching field with spare identifier(s) for nested field(s) in structure
// TODO: remove when can support multiple nested identifiers
if nested_names.len() > 1 {
return not_impl_err!(
"Nested identifiers not yet supported for column {}",
Column::from((qualifier, field)).quoted_flat_name()
);
}
let nested_name = nested_names[0].to_string();

let col = Expr::Column(Column::from((qualifier, field)));
let get_field_args = vec![col, lit(ScalarValue::from(nested_name))];
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::core::get_field(), get_field_args),
)))
}
}
}
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async-trait = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-physical-expr = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_expr::test::function_stub::sum_udaf;
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
use datafusion_functions::core::planner::CoreFunctionPlanner;
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_optimizer::analyzer::Analyzer;
Expand Down Expand Up @@ -344,7 +345,8 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
.with_udaf(sum_udaf())
.with_udaf(count_udaf())
.with_udaf(avg_udaf());
let sql_to_rel = SqlToRel::new(&context_provider);
let sql_to_rel = SqlToRel::new(&context_provider)
.with_user_defined_planner(Arc::new(CoreFunctionPlanner::default()));
dharanad marked this conversation as resolved.
Show resolved Hide resolved
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

let config = OptimizerContext::new().with_skip_failing_rules(false);
Expand Down
4 changes: 3 additions & 1 deletion datafusion/sql/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion_expr::WindowUDF;
use datafusion_expr::{
logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource,
};
use datafusion_functions::core::planner::CoreFunctionPlanner;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_sql::{
Expand Down Expand Up @@ -54,7 +55,8 @@ fn main() {
let context_provider = MyContextProvider::new()
.with_udaf(sum_udaf())
.with_udaf(count_udaf());
let sql_to_rel = SqlToRel::new(&context_provider);
let sql_to_rel = SqlToRel::new(&context_provider)
.with_user_defined_planner(Arc::new(CoreFunctionPlanner::default()));
dharanad marked this conversation as resolved.
Show resolved Hide resolved
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// show the plan
Expand Down
48 changes: 20 additions & 28 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use arrow_schema::Field;
use sqlparser::ast::{Expr as SQLExpr, Ident};

use datafusion_common::{
internal_err, not_impl_err, plan_datafusion_err, Column, DFSchema, DataFusionError,
Result, ScalarValue, TableReference,
Result, TableReference,
};
use datafusion_expr::{expr::ScalarFunction, lit, Case, Expr};
use sqlparser::ast::{Expr as SQLExpr, Ident};
use datafusion_expr::planner::PlannerResult;
use datafusion_expr::{Case, Expr};

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_identifier_to_expr(
Expand Down Expand Up @@ -124,31 +127,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let search_result = search_dfschema(&ids, schema);
match search_result {
// found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names)) if !nested_names.is_empty() => {
// TODO: remove when can support multiple nested identifiers
if nested_names.len() > 1 {
return not_impl_err!(
"Nested identifiers not yet supported for column {}",
Column::from((qualifier, field)).quoted_flat_name()
);
}
let nested_name = nested_names[0].to_string();

let col = Expr::Column(Column::from((qualifier, field)));
if let Some(udf) =
self.context_provider.get_function_meta("get_field")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder could we plan the whole compound identifier, the benefit of plan_* function is able to customize the operator (in this case is compound identifier i.e. a.b.c) and expressions we get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally i wanted to do that same, but this function uses IdentNormalizer field of SqlToRel struct. I am just thinking a way around & wanted to discuss the same here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the result of self.normalizer.normalize() as the planner arguments, similar to the spirit of others function that we take the result of sql_expr_to_logical_expr as the planner arguments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this revision ? Do you think we aligned here ? There is still more work to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn sql_compound_identifier_to_expr does many things, handles the logic for variable, compund indentifier & compund indentifier in outer query. In this change we are only supporting compund indentifier & not adding support compund indentifier in outer query.

{
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
udf,
vec![col, lit(ScalarValue::from(nested_name))],
)))
} else {
internal_err!("get_field not found")
Some((field, qualifier, nested_names)) => {
for planner in self.planners.iter() {
match planner.plan_compound_identifier(
field,
qualifier,
nested_names,
)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(_args) => {}
}
}
}
// found matching field with no spare identifier(s)
Some((field, qualifier, _nested_names)) => {
dharanad marked this conversation as resolved.
Show resolved Hide resolved
Ok(Expr::Column(Column::from((qualifier, field))))
not_impl_err!(
"Compound identifiers not supported by ExprPlanner: {ids:?}"
)
}
None => {
// return default where use all identifiers to not have a nested field
Expand Down
11 changes: 8 additions & 3 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use std::vec;

use arrow_schema::*;
Expand All @@ -28,6 +29,7 @@ use datafusion_sql::unparser::dialect::{
};
use datafusion_sql::unparser::{expr_to_sql, plan_to_sql, Unparser};

use datafusion_functions::core::planner::CoreFunctionPlanner;
use sqlparser::dialect::{Dialect, GenericDialect, MySqlDialect};
use sqlparser::parser::Parser;

Expand Down Expand Up @@ -156,7 +158,8 @@ fn roundtrip_statement() -> Result<()> {
let context = MockContextProvider::default()
.with_udaf(sum_udaf())
.with_udaf(count_udaf());
let sql_to_rel = SqlToRel::new(&context);
let sql_to_rel = SqlToRel::new(&context)
.with_user_defined_planner(Arc::new(CoreFunctionPlanner::default()));
let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap();

let roundtrip_statement = plan_to_sql(&plan)?;
Expand Down Expand Up @@ -185,7 +188,8 @@ fn roundtrip_crossjoin() -> Result<()> {
.parse_statement()?;

let context = MockContextProvider::default();
let sql_to_rel = SqlToRel::new(&context);
let sql_to_rel = SqlToRel::new(&context)
.with_user_defined_planner(Arc::new(CoreFunctionPlanner::default()));
let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap();

let roundtrip_statement = plan_to_sql(&plan)?;
Expand Down Expand Up @@ -248,7 +252,8 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
.parse_statement()?;

let context = MockContextProvider::default();
let sql_to_rel = SqlToRel::new(&context);
let sql_to_rel = SqlToRel::new(&context)
.with_user_defined_planner(Arc::new(CoreFunctionPlanner::default()));
let plan = sql_to_rel
.sql_statement_to_plan(statement)
.unwrap_or_else(|e| panic!("Failed to parse sql: {}\n{e}", query.sql));
Expand Down
5 changes: 4 additions & 1 deletion datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::any::Any;
#[cfg(test)]
use std::collections::HashMap;
use std::sync::Arc;
use std::vec;

use arrow_schema::TimeUnit::Nanosecond;
Expand All @@ -37,6 +38,7 @@ use datafusion_sql::{
planner::{ParserOptions, SqlToRel},
};

use datafusion_functions::core::planner::CoreFunctionPlanner;
use datafusion_functions_aggregate::{
approx_median::approx_median_udaf, count::count_udaf,
};
Expand Down Expand Up @@ -2696,7 +2698,8 @@ fn logical_plan_with_dialect_and_options(
.with_udaf(avg_udaf())
.with_udaf(grouping_udaf());

let planner = SqlToRel::new_with_options(&context, options);
let planner = SqlToRel::new_with_options(&context, options)
.with_user_defined_planner(Arc::new(CoreFunctionPlanner::default()));
let result = DFParser::parse_sql_with_dialect(sql, dialect);
let mut ast = result?;
planner.statement_to_plan(ast.pop_front().unwrap())
Expand Down
Loading