Skip to content

Commit

Permalink
Implement user defined planner for extract (apache#11215)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and comphead committed Jul 8, 2024
1 parent 286ae6a commit a665a9d
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 16 deletions.
13 changes: 9 additions & 4 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,14 +967,19 @@ impl SessionState {
let field_access_planner =
Arc::new(functions_array::planner::FieldAccessPlanner) as _;

query
query = query
.with_user_defined_planner(array_planner)
.with_user_defined_planner(field_access_planner)
.with_user_defined_planner(field_access_planner);
}
#[cfg(not(feature = "array_expressions"))]
#[cfg(feature = "datetime_expressions")]
{
query
let extract_planner =
Arc::new(functions::datetime::planner::ExtractPlanner::default()) as _;

query = query.with_user_defined_planner(extract_planner);
}

query
}
}

Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(exprs))
}

// Plan the Extract expression, e.g., EXTRACT(month FROM foo)
// returns origin expression arguments if not possible
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
}

/// An operator with two arguments to plan
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod date_trunc;
pub mod from_unixtime;
pub mod make_date;
pub mod now;
pub mod planner;
pub mod to_char;
pub mod to_date;
pub mod to_timestamp;
Expand Down
36 changes: 36 additions & 0 deletions datafusion/functions/src/datetime/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! SQL planning extensions like [`ExtractPlanner`]

use datafusion_common::Result;
use datafusion_expr::{
expr::ScalarFunction,
planner::{PlannerResult, UserDefinedSQLPlanner},
Expr,
};

#[derive(Default)]
pub struct ExtractPlanner {}

impl UserDefinedSQLPlanner for ExtractPlanner {
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::datetime::date_part(), args),
)))
}
}
24 changes: 12 additions & 12 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.parse_value(value, planner_context.prepare_param_data_types())
}
SQLExpr::Extract { field, expr } => {
let date_part = self
.context_provider
.get_function_meta("date_part")
.ok_or_else(|| {
internal_datafusion_err!(
"Unable to find expected 'date_part' function"
)
})?;
let args = vec![
let mut extract_args = vec![
Expr::Literal(ScalarValue::from(format!("{field}"))),
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
];
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
date_part, args,
)))

for planner in self.planners.iter() {
match planner.plan_extract(extract_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
extract_args = args;
}
}
}

not_impl_err!("Extract not supported by UserDefinedExtensionPlanners: {extract_args:?}")
}

SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
Expand Down

0 comments on commit a665a9d

Please sign in to comment.