Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement user defined planner for create_struct & create_named_struct #11273

Merged
merged 11 commits into from
Jul 8, 2024
16 changes: 16 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,22 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}

/// Plans a struct `struct(expression1[, ..., expression_n])`
/// or named struct `named_struct(expression1_name, expression1_input[, ..., expression_n_name, expression_n_input])`
/// literal based on the given input expressions.
/// This function takes a vector of expressions and a boolean flag indicating whether
/// the struct is a named struct.
///
/// Returns a `PlannerResult` containing either the planned struct expressions or the original
/// input expressions if planning is not possible.
fn plan_struct_literal(
&self,
args: Vec<Expr>,
_is_named_struct: bool,
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
}

/// An operator with two arguments to plan
Expand Down
2 changes: 0 additions & 2 deletions datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
nvl(),
nvl2(),
arrow_typeof(),
r#struct(),
named_struct(),
get_field(),
coalesce(),
]
Expand Down
19 changes: 19 additions & 0 deletions datafusion/functions/src/core/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::planner::{PlannerResult, RawDictionaryExpr, UserDefinedSQLPlanner};
use datafusion_expr::Expr;

use super::named_struct;

Expand All @@ -37,4 +39,21 @@ impl UserDefinedSQLPlanner for CoreFunctionPlanner {
}
Ok(PlannerResult::Planned(named_struct().call(args)))
}

fn plan_struct_literal(
&self,
args: Vec<Expr>,
is_named_struct: bool,
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(
if is_named_struct {
crate::core::named_struct()
} else {
crate::core::r#struct()
},
args,
),
)))
}
}
96 changes: 41 additions & 55 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use datafusion_expr::planner::PlannerResult;
use datafusion_expr::planner::RawDictionaryExpr;
use datafusion_expr::planner::RawFieldAccessExpr;
use sqlparser::ast::{
CastKind, DictionaryField, Expr as SQLExpr, Subscript, TrimWhereField, Value,
CastKind, DictionaryField, Expr as SQLExpr, StructField, Subscript, TrimWhereField,
Value,
};

use datafusion_common::{
Expand Down Expand Up @@ -597,7 +598,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

SQLExpr::Struct { values, fields } => {
self.parse_struct(values, fields, schema, planner_context)
self.parse_struct(schema, planner_context, values, fields)
}
SQLExpr::Position { expr, r#in } => {
self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
Expand Down Expand Up @@ -629,6 +630,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

/// Parses a struct(..) expression and plans it creation
fn parse_struct(
&self,
schema: &DFSchema,
planner_context: &mut PlannerContext,
values: Vec<sqlparser::ast::Expr>,
fields: Vec<StructField>,
) -> Result<Expr> {
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}
let is_named_struct = values
.iter()
.any(|value| matches!(value, SQLExpr::Named { .. }));

let mut create_struct_args = if is_named_struct {
self.create_named_struct_expr(values, schema, planner_context)?
} else {
self.create_struct_expr(values, schema, planner_context)?
};

for planner in self.planners.iter() {
match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => create_struct_args = args,
}
}
Comment on lines +654 to +659
Copy link
Contributor Author

@dharanad dharanad Jul 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have one StructPlan that returns struct or named_struct based on the values?

let mut create_struct_args = if is_named_struct {
            self.create_named_struct_expr(values, schema, planner_context)?
        } else {
            self.create_struct_expr(values, schema, planner_context)?
        };

The logic here should be inside planner

@jayzhan211 Is this what u suggesting ?

If you were suggesting to move create_struct_expr & create_named_struct_expr to planner. I see an issues over there i.e these functions intnerally call self.sql_expr_to_logical_expr. I would rather have it here to keep it simple. Sorry if i understood it wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe like #11318 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding we also have a named_struct function which is an alias for struct function with input expressions optionally named.

In this PR i am not moving named_struct function to udp.

My understanding was to move create_named_struct & create_struct which were solely handling struct function.

Copy link
Contributor

@jayzhan211 jayzhan211 Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have one StructPlan that returns struct or named_struct based on the values?

let mut create_struct_args = if is_named_struct {
            self.create_named_struct_expr(values, schema, planner_context)?
        } else {
            self.create_struct_expr(values, schema, planner_context)?
        };

The logic here should be inside planner

@jayzhan211 Is this what u suggesting ?

If you were suggesting to move create_struct_expr & create_named_struct_expr to planner. I see an issues over there i.e these functions intnerally call self.sql_expr_to_logical_expr. I would rather have it here to keep it simple. Sorry if i understood it wrong.

Right, they are not possible to move into the planner. Another thing we can improve on is, forgetting about create_struct_expr and use create_named_struct_expr and named_struct. But, we can change this in next PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Liked mentioned earlier. Thats a great idea, i will make the change in another PR. Thanks for your help @jayzhan211

not_impl_err!("Struct not supported by UserDefinedExtensionPlanners: {create_struct_args:?}")
}

fn sql_position_to_expr(
&self,
substr_expr: SQLExpr,
Expand Down Expand Up @@ -683,37 +714,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
not_impl_err!("Unsupported dictionary literal: {raw_expr:?}")
}

/// Parses a struct(..) expression
fn parse_struct(
&self,
values: Vec<SQLExpr>,
fields: Vec<sqlparser::ast::StructField>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}

if values
.iter()
.any(|value| matches!(value, SQLExpr::Named { .. }))
{
self.create_named_struct(values, input_schema, planner_context)
} else {
self.create_struct(values, input_schema, planner_context)
}
}

// Handles a call to struct(...) where the arguments are named. For example
// `struct (v as foo, v2 as bar)` by creating a call to the `named_struct` function
fn create_named_struct(
fn create_named_struct_expr(
&self,
values: Vec<SQLExpr>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args = values
) -> Result<Vec<Expr>> {
Ok(values
.into_iter()
.enumerate()
.map(|(i, value)| {
Expand Down Expand Up @@ -742,47 +751,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

let named_struct_func = self
.context_provider
.get_function_meta("named_struct")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'named_struct' function")
})?;

Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
named_struct_func,
args,
)))
.collect())
}

// Handles a call to struct(...) where the arguments are not named. For example
// `struct (v, v2)` by creating a call to the `struct` function
// which will create a struct with fields named `c0`, `c1`, etc.
fn create_struct(
fn create_struct_expr(
&self,
values: Vec<SQLExpr>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args = values
) -> Result<Vec<Expr>> {
values
.into_iter()
.map(|value| {
self.sql_expr_to_logical_expr(value, input_schema, planner_context)
})
.collect::<Result<Vec<_>>>()?;
let struct_func = self
.context_provider
.get_function_meta("struct")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'struct' function")
})?;

Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
struct_func,
args,
)))
.collect::<Result<Vec<_>>>()
}

fn sql_in_list_to_expr(
Expand Down
Loading