Skip to content

Commit

Permalink
add UserDefinedSQLPlanner for create struct
Browse files Browse the repository at this point in the history
  • Loading branch information
dharanad committed Jul 4, 2024
1 parent 6e63748 commit 30672ce
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 39 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl SessionState {

let user_defined_sql_planners: Vec<Arc<dyn UserDefinedSQLPlanner>> = vec![
Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
Arc::new(functions::core::planner::CreateStructPlanner),
// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::ArrayFunctionPlanner),
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}

fn plan_create_struct(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
}

/// An operator with two arguments to plan
Expand Down
1 change: 0 additions & 1 deletion datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
nvl(),
nvl2(),
arrow_typeof(),
r#struct(),
named_struct(),
get_field(),
coalesce(),
Expand Down
13 changes: 13 additions & 0 deletions datafusion/functions/src/core/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::planner::{PlannerResult, RawDictionaryExpr, UserDefinedSQLPlanner};
use datafusion_expr::Expr;

use super::named_struct;

Expand All @@ -38,3 +40,14 @@ impl UserDefinedSQLPlanner for CoreFunctionPlanner {
Ok(PlannerResult::Planned(named_struct().call(args)))
}
}

#[derive(Default)]
pub struct CreateStructPlanner;

impl UserDefinedSQLPlanner for CreateStructPlanner {
fn plan_create_struct(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::core::r#struct(), args),
)))
}
}
66 changes: 28 additions & 38 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

SQLExpr::Struct { values, fields } => {
self.parse_struct(values, fields, schema, planner_context)
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}
fn is_named_struct(values: &Vec<sqlparser::ast::Expr>) -> bool {
values
.iter()
.any(|value| matches!(value, SQLExpr::Named { .. }))
}

if is_named_struct(&values) {
return self.create_named_struct(values, schema, planner_context);
} else {
let mut create_struct_args =
self.create_struct_expr(values, schema, planner_context)?;
for planner in self.planners.iter() {
match planner.plan_create_struct(create_struct_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
create_struct_args = args;
}
}
}
not_impl_err!("CreateStruct not supported by UserDefinedExtensionPlanners: {create_struct_args:?}")
}
}
SQLExpr::Position { expr, r#in } => {
self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
Expand Down Expand Up @@ -658,28 +681,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
not_impl_err!("Unsupported dictionary literal: {raw_expr:?}")
}

/// Parses a struct(..) expression
fn parse_struct(
&self,
values: Vec<SQLExpr>,
fields: Vec<sqlparser::ast::StructField>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}

if values
.iter()
.any(|value| matches!(value, SQLExpr::Named { .. }))
{
self.create_named_struct(values, input_schema, planner_context)
} else {
self.create_struct(values, input_schema, planner_context)
}
}

// Handles a call to struct(...) where the arguments are named. For example
// `struct (v as foo, v2 as bar)` by creating a call to the `named_struct` function
fn create_named_struct(
Expand Down Expand Up @@ -735,29 +736,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Handles a call to struct(...) where the arguments are not named. For example
// `struct (v, v2)` by creating a call to the `struct` function
// which will create a struct with fields named `c0`, `c1`, etc.
fn create_struct(
fn create_struct_expr(
&self,
values: Vec<SQLExpr>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args = values
) -> Result<Vec<Expr>> {
values
.into_iter()
.map(|value| {
self.sql_expr_to_logical_expr(value, input_schema, planner_context)
})
.collect::<Result<Vec<_>>>()?;
let struct_func = self
.context_provider
.get_function_meta("struct")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'struct' function")
})?;

Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
struct_func,
args,
)))
.collect::<Result<Vec<_>>>()
}

fn sql_in_list_to_expr(
Expand Down

0 comments on commit 30672ce

Please sign in to comment.