From c195050a40c3ac08ba8e59c02710696fd8fa4219 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 00:09:41 +0100 Subject: [PATCH 01/11] very rough implementation of custom operators --- datafusion/core/src/execution/context/mod.rs | 15 +- .../core/src/execution/session_state.rs | 13 +- datafusion/expr/src/lib.rs | 2 +- datafusion/expr/src/operator.rs | 232 +++++++++++++----- datafusion/expr/src/type_coercion/binary.rs | 4 + .../physical-expr/src/expressions/binary.rs | 1 + datafusion/sql/src/expr/binary_op.rs | 8 +- datafusion/sql/src/planner.rs | 23 +- datafusion/sql/src/unparser/expr.rs | 1 + .../substrait/src/logical_plan/producer.rs | 1 + 10 files changed, 225 insertions(+), 75 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9ec0148d9122..e1f1ba2dda4c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -18,7 +18,6 @@ //! [`SessionContext`] API for registering data sources and executing queries use std::collections::HashSet; -use std::fmt::Debug; use std::sync::{Arc, Weak}; use super::options::ReadOptions; @@ -62,6 +61,7 @@ use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, Expr, UserDefinedLogicalNode, WindowUDF, }; +use datafusion_sql::planner::ParseCustomOperator; // backwards compatibility pub use crate::execution::session_state::SessionState; @@ -488,7 +488,7 @@ impl SessionContext { sql: &str, options: SQLOptions, ) -> Result { - let plan = self.state().create_logical_plan(sql).await?; + let plan = self.state().create_logical_plan(sql, options.parse_custom_operator.clone()).await?; options.verify_plan(&plan)?; self.execute_logical_plan(plan).await @@ -1464,7 +1464,7 @@ impl SerializerRegistry for EmptySerializerRegistry { /// Describes which SQL statements can be run. /// /// See [`SessionContext::sql_with_options`] for more details. -#[derive(Clone, Debug, Copy)] +#[derive(Clone)] pub struct SQLOptions { /// See [`Self::with_allow_ddl`] allow_ddl: bool, @@ -1472,6 +1472,8 @@ pub struct SQLOptions { allow_dml: bool, /// See [`Self::with_allow_statements`] allow_statements: bool, + /// Custom SQL operator parser + parse_custom_operator: Option, } impl Default for SQLOptions { @@ -1480,6 +1482,7 @@ impl Default for SQLOptions { allow_ddl: true, allow_dml: true, allow_statements: true, + parse_custom_operator: None, } } } @@ -1508,6 +1511,12 @@ impl SQLOptions { self } + /// Set the custom SQL operator parser + pub fn with_parse_custom_operator(mut self, parse_custom_operator: Option) -> Self { + self.parse_custom_operator = parse_custom_operator; + self + } + /// Return an error if the [`LogicalPlan`] has any nodes that are /// incompatible with this [`SQLOptions`]. pub fn verify_plan(&self, plan: &LogicalPlan) -> Result<()> { diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index d2bac134b54a..acf36d2dc573 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -67,7 +67,7 @@ use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; use datafusion_sql::parser::{DFParser, Statement}; -use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel}; +use datafusion_sql::planner::{ContextProvider, ParseCustomOperator, ParserOptions, PlannerContext, SqlToRel}; use sqlparser::ast::Expr as SQLExpr; use sqlparser::dialect::dialect_from_str; use std::collections::hash_map::Entry; @@ -544,6 +544,7 @@ impl SessionState { pub async fn statement_to_plan( &self, statement: datafusion_sql::parser::Statement, + parse_custom_operator: Option, ) -> datafusion_common::Result { let references = self.resolve_table_references(&statement)?; @@ -563,16 +564,17 @@ impl SessionState { } } - let query = SqlToRel::new_with_options(&provider, self.get_parser_options()); + let query = SqlToRel::new_with_options(&provider, self.get_parser_options(parse_custom_operator)); query.statement_to_plan(statement) } - fn get_parser_options(&self) -> ParserOptions { + fn get_parser_options(&self, parse_custom_operator: Option) -> ParserOptions { let sql_parser_options = &self.config.options().sql_parser; ParserOptions { parse_float_as_decimal: sql_parser_options.parse_float_as_decimal, enable_ident_normalization: sql_parser_options.enable_ident_normalization, + parse_custom_operator, } } @@ -591,10 +593,11 @@ impl SessionState { pub async fn create_logical_plan( &self, sql: &str, + parse_custom_operator: Option, ) -> datafusion_common::Result { let dialect = self.config.options().sql_parser.dialect.as_str(); let statement = self.sql_to_statement(sql, dialect)?; - let plan = self.statement_to_plan(statement).await?; + let plan = self.statement_to_plan(statement, parse_custom_operator).await?; Ok(plan) } @@ -615,7 +618,7 @@ impl SessionState { tables: HashMap::new(), }; - let query = SqlToRel::new_with_options(&provider, self.get_parser_options()); + let query = SqlToRel::new_with_options(&provider, self.get_parser_options(None)); query.sql_to_expr(sql_expr, df_schema, &mut PlannerContext::new()) } diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 89ee94f9f845..32798108f0e8 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -76,7 +76,7 @@ pub use function::{ pub use groups_accumulator::{EmitTo, GroupsAccumulator}; pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; pub use logical_plan::*; -pub use operator::Operator; +pub use operator::{Operator, CustomOperator}; pub use partition_evaluator::PartitionEvaluator; pub use signature::{ ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index 742511822a0f..1ed6f13cdcb6 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -23,6 +23,11 @@ use crate::Like; use std::fmt; use std::ops; use std::ops::Not; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::datatypes::DataType; +use datafusion_common::Result; /// Operators applied to expressions #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] @@ -93,11 +98,17 @@ pub enum Operator { AtArrow, /// Arrow at, like `<@` ArrowAt, + /// Custom operator + Custom(CustomOperatorWrapper), } impl Operator { + pub fn custom(op: Arc) -> Self { + Operator::Custom(CustomOperatorWrapper(op)) + } + /// If the operator can be negated, return the negated operator - /// otherwise return None + /// otherwise return `None` pub fn negate(&self) -> Option { match self { Operator::Eq => Some(Operator::NotEq), @@ -131,51 +142,64 @@ impl Operator { | Operator::StringConcat | Operator::AtArrow | Operator::ArrowAt => None, + Operator::Custom(op) => op.0.negate(), } } /// Return true if the operator is a numerical operator. /// - /// For example, 'Binary(a, +, b)' would be a numerical expression. + /// For example, `Binary(a, +, b)` would be a numerical expression. /// PostgresSQL concept: pub fn is_numerical_operators(&self) -> bool { - matches!( - self, - Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::Modulo - ) + if let Self::Custom(op) = self { + op.0.is_numerical_operators() + } else { + matches!( + self, + Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + ) + } } /// Return true if the operator is a comparison operator. /// - /// For example, 'Binary(a, >, b)' would be a comparison expression. + /// For example, `Binary(a, >, b)` would be a comparison expression. pub fn is_comparison_operator(&self) -> bool { - matches!( - self, - Operator::Eq - | Operator::NotEq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - | Operator::IsDistinctFrom - | Operator::IsNotDistinctFrom - | Operator::RegexMatch - | Operator::RegexIMatch - | Operator::RegexNotMatch - | Operator::RegexNotIMatch - ) + if let Self::Custom(op) = self { + op.0.is_comparison_operator() + } else { + matches!( + self, + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq + | Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + ) + } } /// Return true if the operator is a logic operator. /// - /// For example, 'Binary(Binary(a, >, b), AND, Binary(a, <, b + 3))' would + /// For example, `Binary(Binary(a, >, b), AND, Binary(a, <, b + 3))` would /// be a logical expression. pub fn is_logic_operator(&self) -> bool { - matches!(self, Operator::And | Operator::Or) + if let Self::Custom(op) = self { + op.0.is_logic_operator() + } else { + matches!(self, Operator::And | Operator::Or) + } } /// Return the operator where swapping lhs and rhs wouldn't change the result. @@ -214,6 +238,7 @@ impl Operator { | Operator::BitwiseShiftRight | Operator::BitwiseShiftLeft | Operator::StringConcat => None, + Operator::Custom(op) => op.0.swap(), } } @@ -249,46 +274,129 @@ impl Operator { | Operator::StringConcat | Operator::AtArrow | Operator::ArrowAt => 0, + Operator::Custom(op) => op.0.precedence(), } } } impl fmt::Display for Operator { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let display = match &self { - Operator::Eq => "=", - Operator::NotEq => "!=", - Operator::Lt => "<", - Operator::LtEq => "<=", - Operator::Gt => ">", - Operator::GtEq => ">=", - Operator::Plus => "+", - Operator::Minus => "-", - Operator::Multiply => "*", - Operator::Divide => "/", - Operator::Modulo => "%", - Operator::And => "AND", - Operator::Or => "OR", - Operator::RegexMatch => "~", - Operator::RegexIMatch => "~*", - Operator::RegexNotMatch => "!~", - Operator::RegexNotIMatch => "!~*", - Operator::LikeMatch => "~~", - Operator::ILikeMatch => "~~*", - Operator::NotLikeMatch => "!~~", - Operator::NotILikeMatch => "!~~*", - Operator::IsDistinctFrom => "IS DISTINCT FROM", - Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM", - Operator::BitwiseAnd => "&", - Operator::BitwiseOr => "|", - Operator::BitwiseXor => "BIT_XOR", - Operator::BitwiseShiftRight => ">>", - Operator::BitwiseShiftLeft => "<<", - Operator::StringConcat => "||", - Operator::AtArrow => "@>", - Operator::ArrowAt => "<@", - }; - write!(f, "{display}") + match &self { + Operator::Eq => write!(f, "="), + Operator::NotEq => write!(f, "!="), + Operator::Lt => write!(f, "<"), + Operator::LtEq => write!(f, "<="), + Operator::Gt => write!(f, ">"), + Operator::GtEq => write!(f, ">="), + Operator::Plus => write!(f, "+"), + Operator::Minus => write!(f, "-"), + Operator::Multiply => write!(f, "*"), + Operator::Divide => write!(f, "/"), + Operator::Modulo => write!(f, "%"), + Operator::And => write!(f, "AND"), + Operator::Or => write!(f, "OR"), + Operator::RegexMatch => write!(f, "~"), + Operator::RegexIMatch => write!(f, "~*"), + Operator::RegexNotMatch => write!(f, "!~"), + Operator::RegexNotIMatch => write!(f, "!~*"), + Operator::LikeMatch => write!(f, "~~"), + Operator::ILikeMatch => write!(f, "~~*"), + Operator::NotLikeMatch => write!(f, "!~~"), + Operator::NotILikeMatch => write!(f, "!~~*"), + Operator::IsDistinctFrom => write!(f, "IS DISTINCT FROM"), + Operator::IsNotDistinctFrom => write!(f, "IS NOT DISTINCT FROM"), + Operator::BitwiseAnd => write!(f, "&"), + Operator::BitwiseOr => write!(f, "|"), + Operator::BitwiseXor => write!(f, "BIT_XOR"), + Operator::BitwiseShiftRight => write!(f, ">>"), + Operator::BitwiseShiftLeft => write!(f, "<<"), + Operator::StringConcat => write!(f, "||"), + Operator::AtArrow => write!(f, "@>"), + Operator::ArrowAt => write!(f, "<@"), + Operator::Custom(op) => write!(f, "{}", op.0), + } + } +} + +pub trait CustomOperator: fmt::Debug + fmt::Display + Send + Sync { + /// Use in `datafusion/expr/src/type_coercion/binary.rs::Signature`, but the struct there isn't public, + /// hence returning a tuple. + /// + /// Returns `(lhs_type, rhs_type, return_type)` + fn binary_signature(&self, lhs: &DataType, rhs: &DataType) -> Result<(DataType, DataType, DataType)>; + + /// Used by unparse to convert the operator back to SQL + fn op_to_sql(&self) -> Result; + + /// Name used to uniquely identify the operator, and in logical plan producer + fn name(&self) -> &'static str; + + /// If the operator can be negated, return the negated operator + /// otherwise return None + fn negate(&self) -> Option { + None + } + /// Return true if the operator is a numerical operator. + /// + /// For example, `Binary(a, +, b)` would be a numerical expression. + /// PostgresSQL concept: + fn is_numerical_operators(&self) -> bool { + false + } + + /// Return true if the operator is a comparison operator. + /// + /// For example, `Binary(a, >, b)` would be a comparison expression. + fn is_comparison_operator(&self) -> bool { + false + } + + /// Return true if the operator is a logic operator. + /// + /// For example, `Binary(Binary(a, >, b), AND, Binary(a, <, b + 3))` would + /// be a logical expression. + fn is_logic_operator(&self) -> bool { + false + } + + /// Return the operator where swapping lhs and rhs wouldn't change the result. + /// + /// For example `Binary(50, >=, a)` could also be represented as `Binary(a, <=, 50)`. + fn swap(&self) -> Option { + None + } + + /// Get the operator precedence + /// use as a reference + fn precedence(&self) -> u8 { + 0 + } +} + +/// This is a somewhat hacky workaround for https://github.com/rust-lang/rust/issues/31740 +/// and generally for the complexity of deriving common traits for a trait object. +/// +/// This assumes that the String representation of the operator is unique. +#[derive(Debug, Clone)] +pub struct CustomOperatorWrapper(pub Arc); + +impl Eq for CustomOperatorWrapper {} + +impl PartialEq for CustomOperatorWrapper { + fn eq(&self, rhs: &Self) -> bool { + self.0.name() == rhs.0.name() + } +} + +impl PartialOrd for CustomOperatorWrapper { + fn partial_cmp(&self, rhs: &Self) -> Option { + self.0.name().partial_cmp(rhs.0.name()) + } +} + +impl Hash for CustomOperatorWrapper { + fn hash(&self, state: &mut H) { + self.0.name().hash(state) } } diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 5645a2a4dede..681a9b4d59be 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -189,6 +189,10 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType) -> Result ) } } + Custom(op) => { + let (lhs, rhs, ret) = op.0.binary_signature(lhs, rhs)?; + Ok(Signature { lhs, rhs, ret }) + } } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index d19279c20d10..7eadd1bcec4f 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -638,6 +638,7 @@ impl BinaryExpr { AtArrow | ArrowAt => { unreachable!("ArrowAt and AtArrow should be rewritten to function") } + Custom(_) => internal_err!("Custom operator should be rewritten to functions"), } } } diff --git a/datafusion/sql/src/expr/binary_op.rs b/datafusion/sql/src/expr/binary_op.rs index fcb57e8a82e4..0f735bb697ac 100644 --- a/datafusion/sql/src/expr/binary_op.rs +++ b/datafusion/sql/src/expr/binary_op.rs @@ -53,7 +53,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { BinaryOperator::StringConcat => Ok(Operator::StringConcat), BinaryOperator::ArrowAt => Ok(Operator::ArrowAt), BinaryOperator::AtArrow => Ok(Operator::AtArrow), - _ => not_impl_err!("Unsupported SQL binary operator {op:?}"), + _ => { + if let Some(parse_custom_op) = &self.options.parse_custom_operator { + parse_custom_op(op).map(Operator::custom) + } else { + not_impl_err!("Unsupported SQL binary operator {op:?}") + } + }, } } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 30f95170a34f..063eff49e9a4 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -18,13 +18,13 @@ //! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST) use std::collections::HashMap; use std::sync::Arc; -use std::vec; +use std::{fmt, vec}; use arrow_schema::*; use datafusion_common::{ field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError, }; -use datafusion_expr::WindowUDF; +use datafusion_expr::{WindowUDF, CustomOperator}; use sqlparser::ast::TimezoneInfo; use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo}; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; @@ -92,11 +92,27 @@ pub trait ContextProvider { fn udwf_names(&self) -> Vec; } +// pub trait ParseCustomOperator: fmt::Debug + Send + Sync { +// fn parse(&self, op: sqlparser::ast::BinaryOperator) -> Result>; +// } + +pub type ParseCustomOperator = Arc Result> + Send + Sync>; + /// SQL parser options -#[derive(Debug)] pub struct ParserOptions { pub parse_float_as_decimal: bool, pub enable_ident_normalization: bool, + pub parse_custom_operator: Option, +} + +impl fmt::Debug for ParserOptions { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ParserOptions") + .field("parse_float_as_decimal", &self.parse_float_as_decimal) + .field("enable_ident_normalization", &self.enable_ident_normalization) + .field("parse_custom_operator", &self.parse_custom_operator.is_some()) + .finish() + } } impl Default for ParserOptions { @@ -104,6 +120,7 @@ impl Default for ParserOptions { Self { parse_float_as_decimal: false, enable_ident_normalization: true, + parse_custom_operator: None, } } } diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index ad898de5987a..c359ade3850d 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -649,6 +649,7 @@ impl Unparser<'_> { Operator::BitwiseShiftRight => Ok(ast::BinaryOperator::PGBitwiseShiftRight), Operator::BitwiseShiftLeft => Ok(ast::BinaryOperator::PGBitwiseShiftLeft), Operator::StringConcat => Ok(ast::BinaryOperator::StringConcat), + Operator::Custom(op) => op.0.op_to_sql(), Operator::AtArrow => not_impl_err!("unsupported operation: {op:?}"), Operator::ArrowAt => not_impl_err!("unsupported operation: {op:?}"), } diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 302f38606bfb..9f10802c4b92 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -723,6 +723,7 @@ pub fn operator_to_name(op: Operator) -> &'static str { Operator::BitwiseXor => "bitwise_xor", Operator::BitwiseShiftRight => "bitwise_shift_right", Operator::BitwiseShiftLeft => "bitwise_shift_left", + Operator::Custom(op) => op.0.name(), } } From 0e25ccfec3b99fa7da4d31711ec8042de1626361 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 10:08:09 +0100 Subject: [PATCH 02/11] implement register_parse_custom_operator --- datafusion/core/src/execution/context/mod.rs | 26 +++++++------ .../core/src/execution/session_state.rs | 34 +++++++++++----- datafusion/expr/src/lib.rs | 2 +- datafusion/expr/src/operator.rs | 39 ++++++++++++------- .../physical-expr/src/expressions/binary.rs | 4 +- datafusion/sql/src/expr/binary_op.rs | 14 +++---- datafusion/sql/src/planner.rs | 29 ++++++-------- 7 files changed, 89 insertions(+), 59 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index e1f1ba2dda4c..596ed74e9f9b 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -61,7 +61,6 @@ use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, Expr, UserDefinedLogicalNode, WindowUDF, }; -use datafusion_sql::planner::ParseCustomOperator; // backwards compatibility pub use crate::execution::session_state::SessionState; @@ -76,6 +75,7 @@ pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; +use datafusion_sql::planner::ParseCustomOperator; mod avro; mod csv; @@ -488,7 +488,7 @@ impl SessionContext { sql: &str, options: SQLOptions, ) -> Result { - let plan = self.state().create_logical_plan(sql, options.parse_custom_operator.clone()).await?; + let plan = self.state().create_logical_plan(sql).await?; options.verify_plan(&plan)?; self.execute_logical_plan(plan).await @@ -1353,6 +1353,19 @@ impl SessionContext { .write() .register_table_options_extension(extension) } + + /// Registers a new [`ParseCustomOperator`] with the registry. + /// + /// `ParseCustomOperator` is used to parse custom operators from SQL, + /// e.g. `->>` or `?`. + pub fn register_parse_custom_operator( + &mut self, + parse_custom_operator: Arc, + ) -> Result<()> { + self.state + .write() + .register_parse_custom_operator(parse_custom_operator) + } } impl FunctionRegistry for SessionContext { @@ -1472,8 +1485,6 @@ pub struct SQLOptions { allow_dml: bool, /// See [`Self::with_allow_statements`] allow_statements: bool, - /// Custom SQL operator parser - parse_custom_operator: Option, } impl Default for SQLOptions { @@ -1482,7 +1493,6 @@ impl Default for SQLOptions { allow_ddl: true, allow_dml: true, allow_statements: true, - parse_custom_operator: None, } } } @@ -1511,12 +1521,6 @@ impl SQLOptions { self } - /// Set the custom SQL operator parser - pub fn with_parse_custom_operator(mut self, parse_custom_operator: Option) -> Self { - self.parse_custom_operator = parse_custom_operator; - self - } - /// Return an error if the [`LogicalPlan`] has any nodes that are /// incompatible with this [`SQLOptions`]. pub fn verify_plan(&self, plan: &LogicalPlan) -> Result<()> { diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index acf36d2dc573..ec7a1a089bc6 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -67,7 +67,9 @@ use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; use datafusion_sql::parser::{DFParser, Statement}; -use datafusion_sql::planner::{ContextProvider, ParseCustomOperator, ParserOptions, PlannerContext, SqlToRel}; +use datafusion_sql::planner::{ + ContextProvider, ParseCustomOperator, ParserOptions, PlannerContext, SqlToRel, +}; use sqlparser::ast::Expr as SQLExpr; use sqlparser::dialect::dialect_from_str; use std::collections::hash_map::Entry; @@ -91,6 +93,8 @@ pub struct SessionState { session_id: String, /// Responsible for analyzing and rewrite a logical plan before optimization analyzer: Analyzer, + /// Provides support for parsing custom SQL operators, e.g. `->>` or `?` + parse_custom_operators: Vec>, /// Responsible for optimizing a logical plan optimizer: Optimizer, /// Responsible for optimizing a physical execution plan @@ -221,6 +225,7 @@ impl SessionState { let mut new_self = SessionState { session_id, analyzer: Analyzer::new(), + parse_custom_operators: vec![], optimizer: Optimizer::new(), physical_optimizers: PhysicalOptimizer::new(), query_planner: Arc::new(DefaultQueryPlanner {}), @@ -543,8 +548,7 @@ impl SessionState { /// Convert an AST Statement into a LogicalPlan pub async fn statement_to_plan( &self, - statement: datafusion_sql::parser::Statement, - parse_custom_operator: Option, + statement: Statement, ) -> datafusion_common::Result { let references = self.resolve_table_references(&statement)?; @@ -564,17 +568,17 @@ impl SessionState { } } - let query = SqlToRel::new_with_options(&provider, self.get_parser_options(parse_custom_operator)); + let query = SqlToRel::new_with_options(&provider, self.get_parser_options()); query.statement_to_plan(statement) } - fn get_parser_options(&self, parse_custom_operator: Option) -> ParserOptions { + fn get_parser_options(&self) -> ParserOptions { let sql_parser_options = &self.config.options().sql_parser; ParserOptions { parse_float_as_decimal: sql_parser_options.parse_float_as_decimal, enable_ident_normalization: sql_parser_options.enable_ident_normalization, - parse_custom_operator, + parse_custom_operator: self.parse_custom_operators.clone(), } } @@ -593,11 +597,10 @@ impl SessionState { pub async fn create_logical_plan( &self, sql: &str, - parse_custom_operator: Option, ) -> datafusion_common::Result { let dialect = self.config.options().sql_parser.dialect.as_str(); let statement = self.sql_to_statement(sql, dialect)?; - let plan = self.statement_to_plan(statement, parse_custom_operator).await?; + let plan = self.statement_to_plan(statement).await?; Ok(plan) } @@ -618,7 +621,7 @@ impl SessionState { tables: HashMap::new(), }; - let query = SqlToRel::new_with_options(&provider, self.get_parser_options(None)); + let query = SqlToRel::new_with_options(&provider, self.get_parser_options()); query.sql_to_expr(sql_expr, df_schema, &mut PlannerContext::new()) } @@ -878,6 +881,19 @@ impl SessionState { let udtf = self.table_functions.remove(name); Ok(udtf.map(|x| x.function().clone())) } + + /// Registers a new [`ParseCustomOperator`] with the registry. + /// + /// `ParseCustomOperator` is used to parse custom operators from SQL, + /// e.g. `->>` or `?`. + pub fn register_parse_custom_operator( + &mut self, + parse_custom_operator: Arc, + ) -> datafusion_common::Result<()> { + // TODO moved into FunctionRegistry? it would involve adding datafusion_sql as a dep of datafusion-expr + self.parse_custom_operators.push(parse_custom_operator); + Ok(()) + } } struct SessionContextProvider<'a> { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 32798108f0e8..7e8c38a22012 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -76,7 +76,7 @@ pub use function::{ pub use groups_accumulator::{EmitTo, GroupsAccumulator}; pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; pub use logical_plan::*; -pub use operator::{Operator, CustomOperator}; +pub use operator::{CustomOperator, Operator}; pub use partition_evaluator::PartitionEvaluator; pub use signature::{ ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index 1ed6f13cdcb6..b03036cf5764 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -21,9 +21,9 @@ use crate::expr_fn::binary_expr; use crate::Expr; use crate::Like; use std::fmt; +use std::hash::{Hash, Hasher}; use std::ops; use std::ops::Not; -use std::hash::{Hash, Hasher}; use std::sync::Arc; use arrow::datatypes::DataType; @@ -99,12 +99,12 @@ pub enum Operator { /// Arrow at, like `<@` ArrowAt, /// Custom operator - Custom(CustomOperatorWrapper), + Custom(WrapCustomOperator), } impl Operator { - pub fn custom(op: Arc) -> Self { - Operator::Custom(CustomOperatorWrapper(op)) + pub fn custom(op: impl CustomOperator + 'static) -> Self { + Operator::Custom(WrapCustomOperator(Arc::new(op))) } /// If the operator can be negated, return the negated operator @@ -318,12 +318,22 @@ impl fmt::Display for Operator { } } +impl From for Operator { + fn from(op: T) -> Self { + Operator::Custom(WrapCustomOperator(Arc::new(op))) + } +} + pub trait CustomOperator: fmt::Debug + fmt::Display + Send + Sync { /// Use in `datafusion/expr/src/type_coercion/binary.rs::Signature`, but the struct there isn't public, /// hence returning a tuple. /// /// Returns `(lhs_type, rhs_type, return_type)` - fn binary_signature(&self, lhs: &DataType, rhs: &DataType) -> Result<(DataType, DataType, DataType)>; + fn binary_signature( + &self, + lhs: &DataType, + rhs: &DataType, + ) -> Result<(DataType, DataType, DataType)>; /// Used by unparse to convert the operator back to SQL fn op_to_sql(&self) -> Result; @@ -373,28 +383,31 @@ pub trait CustomOperator: fmt::Debug + fmt::Display + Send + Sync { } } -/// This is a somewhat hacky workaround for https://github.com/rust-lang/rust/issues/31740 -/// and generally for the complexity of deriving common traits for a trait object. +/// Wraps a [`CustomOperator`] and implements traits required by [`Operator`]. +/// +/// This uses [`CustomOperator::name`] for equality, partial equality, ordering, and hashing; and therefore assumes +/// it is unique for each custom operator. /// -/// This assumes that the String representation of the operator is unique. +/// See details on why dyn traits can't implement +/// `PartialEq` and friends. #[derive(Debug, Clone)] -pub struct CustomOperatorWrapper(pub Arc); +pub struct WrapCustomOperator(pub Arc); -impl Eq for CustomOperatorWrapper {} +impl Eq for WrapCustomOperator {} -impl PartialEq for CustomOperatorWrapper { +impl PartialEq for WrapCustomOperator { fn eq(&self, rhs: &Self) -> bool { self.0.name() == rhs.0.name() } } -impl PartialOrd for CustomOperatorWrapper { +impl PartialOrd for WrapCustomOperator { fn partial_cmp(&self, rhs: &Self) -> Option { self.0.name().partial_cmp(rhs.0.name()) } } -impl Hash for CustomOperatorWrapper { +impl Hash for WrapCustomOperator { fn hash(&self, state: &mut H) { self.0.name().hash(state) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 7eadd1bcec4f..5ba306d36999 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -638,7 +638,9 @@ impl BinaryExpr { AtArrow | ArrowAt => { unreachable!("ArrowAt and AtArrow should be rewritten to function") } - Custom(_) => internal_err!("Custom operator should be rewritten to functions"), + Custom(_) => { + internal_err!("Custom operator should be rewritten to functions") + } } } } diff --git a/datafusion/sql/src/expr/binary_op.rs b/datafusion/sql/src/expr/binary_op.rs index 0f735bb697ac..96cf12ef038e 100644 --- a/datafusion/sql/src/expr/binary_op.rs +++ b/datafusion/sql/src/expr/binary_op.rs @@ -22,6 +22,12 @@ use sqlparser::ast::BinaryOperator; impl<'a, S: ContextProvider> SqlToRel<'a, S> { pub(crate) fn parse_sql_binary_op(&self, op: BinaryOperator) -> Result { + for parse_custom_op in &self.options.parse_custom_operator { + if let Some(op) = parse_custom_op.parse(&op)? { + return Ok(op); + } + } + match op { BinaryOperator::Gt => Ok(Operator::Gt), BinaryOperator::GtEq => Ok(Operator::GtEq), @@ -53,13 +59,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { BinaryOperator::StringConcat => Ok(Operator::StringConcat), BinaryOperator::ArrowAt => Ok(Operator::ArrowAt), BinaryOperator::AtArrow => Ok(Operator::AtArrow), - _ => { - if let Some(parse_custom_op) = &self.options.parse_custom_operator { - parse_custom_op(op).map(Operator::custom) - } else { - not_impl_err!("Unsupported SQL binary operator {op:?}") - } - }, + _ => not_impl_err!("Unsupported SQL binary operator {op:?}"), } } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 063eff49e9a4..638f1f39ecc7 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -24,7 +24,7 @@ use arrow_schema::*; use datafusion_common::{ field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError, }; -use datafusion_expr::{WindowUDF, CustomOperator}; +use datafusion_expr::{Operator, WindowUDF}; use sqlparser::ast::TimezoneInfo; use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo}; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; @@ -92,27 +92,22 @@ pub trait ContextProvider { fn udwf_names(&self) -> Vec; } -// pub trait ParseCustomOperator: fmt::Debug + Send + Sync { -// fn parse(&self, op: sqlparser::ast::BinaryOperator) -> Result>; -// } +pub trait ParseCustomOperator: fmt::Debug + Send + Sync { + /// Return a human readable name for this parser + fn name(&self) -> &str; -pub type ParseCustomOperator = Arc Result> + Send + Sync>; + /// potentially parse a custom operator. + /// + /// Return `None` if the operator is not recognized + fn parse(&self, op: &sqlparser::ast::BinaryOperator) -> Result>; +} /// SQL parser options +#[derive(Debug)] pub struct ParserOptions { pub parse_float_as_decimal: bool, pub enable_ident_normalization: bool, - pub parse_custom_operator: Option, -} - -impl fmt::Debug for ParserOptions { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ParserOptions") - .field("parse_float_as_decimal", &self.parse_float_as_decimal) - .field("enable_ident_normalization", &self.enable_ident_normalization) - .field("parse_custom_operator", &self.parse_custom_operator.is_some()) - .finish() - } + pub parse_custom_operator: Vec>, } impl Default for ParserOptions { @@ -120,7 +115,7 @@ impl Default for ParserOptions { Self { parse_float_as_decimal: false, enable_ident_normalization: true, - parse_custom_operator: None, + parse_custom_operator: vec![], } } } From f031f66c065ffc14931f43707f63dd4a584e6971 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 10:22:58 +0100 Subject: [PATCH 03/11] cleanup --- datafusion/core/src/execution/context/mod.rs | 3 ++- datafusion/expr/src/lib.rs | 2 +- datafusion/expr/src/operator.rs | 19 ++++++++----------- datafusion/expr/src/type_coercion/binary.rs | 5 +++-- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 596ed74e9f9b..ca0c02d60136 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -18,6 +18,7 @@ //! [`SessionContext`] API for registering data sources and executing queries use std::collections::HashSet; +use std::fmt::Debug; use std::sync::{Arc, Weak}; use super::options::ReadOptions; @@ -1477,7 +1478,7 @@ impl SerializerRegistry for EmptySerializerRegistry { /// Describes which SQL statements can be run. /// /// See [`SessionContext::sql_with_options`] for more details. -#[derive(Clone)] +#[derive(Clone, Debug, Copy)] pub struct SQLOptions { /// See [`Self::with_allow_ddl`] allow_ddl: bool, diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 7e8c38a22012..f0e27749a08a 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -76,7 +76,7 @@ pub use function::{ pub use groups_accumulator::{EmitTo, GroupsAccumulator}; pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; pub use logical_plan::*; -pub use operator::{CustomOperator, Operator}; +pub use operator::{CustomOperator, Operator, WrapCustomOperator}; pub use partition_evaluator::PartitionEvaluator; pub use signature::{ ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index b03036cf5764..77e2f5fda666 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -103,10 +103,6 @@ pub enum Operator { } impl Operator { - pub fn custom(op: impl CustomOperator + 'static) -> Self { - Operator::Custom(WrapCustomOperator(Arc::new(op))) - } - /// If the operator can be negated, return the negated operator /// otherwise return `None` pub fn negate(&self) -> Option { @@ -169,8 +165,8 @@ impl Operator { /// /// For example, `Binary(a, >, b)` would be a comparison expression. pub fn is_comparison_operator(&self) -> bool { - if let Self::Custom(op) = self { - op.0.is_comparison_operator() + if let Self::Custom(WrapCustomOperator(op)) = self { + op.is_comparison_operator() } else { matches!( self, @@ -195,8 +191,8 @@ impl Operator { /// For example, `Binary(Binary(a, >, b), AND, Binary(a, <, b + 3))` would /// be a logical expression. pub fn is_logic_operator(&self) -> bool { - if let Self::Custom(op) = self { - op.0.is_logic_operator() + if let Self::Custom(WrapCustomOperator(op)) = self { + op.is_logic_operator() } else { matches!(self, Operator::And | Operator::Or) } @@ -238,7 +234,7 @@ impl Operator { | Operator::BitwiseShiftRight | Operator::BitwiseShiftLeft | Operator::StringConcat => None, - Operator::Custom(op) => op.0.swap(), + Operator::Custom(WrapCustomOperator(op)) => op.swap(), } } @@ -274,7 +270,7 @@ impl Operator { | Operator::StringConcat | Operator::AtArrow | Operator::ArrowAt => 0, - Operator::Custom(op) => op.0.precedence(), + Operator::Custom(WrapCustomOperator(op)) => op.precedence(), } } } @@ -313,7 +309,7 @@ impl fmt::Display for Operator { Operator::StringConcat => write!(f, "||"), Operator::AtArrow => write!(f, "@>"), Operator::ArrowAt => write!(f, "<@"), - Operator::Custom(op) => write!(f, "{}", op.0), + Operator::Custom(WrapCustomOperator(op)) => write!(f, "{op}"), } } } @@ -346,6 +342,7 @@ pub trait CustomOperator: fmt::Debug + fmt::Display + Send + Sync { fn negate(&self) -> Option { None } + /// Return true if the operator is a numerical operator. /// /// For example, `Binary(a, +, b)` would be a numerical expression. diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 681a9b4d59be..6d5a348d9c1a 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -29,6 +29,7 @@ use arrow::datatypes::{ DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; +use crate::operator::WrapCustomOperator; use datafusion_common::{exec_datafusion_err, plan_datafusion_err, plan_err, Result}; /// The type signature of an instantiation of binary operator expression such as @@ -189,8 +190,8 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType) -> Result ) } } - Custom(op) => { - let (lhs, rhs, ret) = op.0.binary_signature(lhs, rhs)?; + Custom(WrapCustomOperator(op)) => { + let (lhs, rhs, ret) = op.binary_signature(lhs, rhs)?; Ok(Signature { lhs, rhs, ret }) } } From 2fb7f7b52c449ecc1335265e5218a1df734cfa32 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 10:32:09 +0100 Subject: [PATCH 04/11] more cleanup --- datafusion/expr/src/type_coercion/binary.rs | 3 +-- datafusion/sql/src/unparser/expr.rs | 3 ++- datafusion/substrait/src/logical_plan/producer.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 6d5a348d9c1a..e517f5f7dc91 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -20,7 +20,7 @@ use std::collections::HashSet; use std::sync::Arc; -use crate::Operator; +use crate::{Operator, WrapCustomOperator}; use arrow::array::{new_empty_array, Array}; use arrow::compute::can_cast_types; @@ -29,7 +29,6 @@ use arrow::datatypes::{ DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; -use crate::operator::WrapCustomOperator; use datafusion_common::{exec_datafusion_err, plan_datafusion_err, plan_err, Result}; /// The type signature of an instantiation of binary operator expression such as diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index c359ade3850d..2214fcb44625 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -41,6 +41,7 @@ use datafusion_common::{ use datafusion_expr::{ expr::{Alias, Exists, InList, ScalarFunction, Sort, WindowFunction}, Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Operator, TryCast, + WrapCustomOperator, }; use super::Unparser; @@ -649,7 +650,7 @@ impl Unparser<'_> { Operator::BitwiseShiftRight => Ok(ast::BinaryOperator::PGBitwiseShiftRight), Operator::BitwiseShiftLeft => Ok(ast::BinaryOperator::PGBitwiseShiftLeft), Operator::StringConcat => Ok(ast::BinaryOperator::StringConcat), - Operator::Custom(op) => op.0.op_to_sql(), + Operator::Custom(WrapCustomOperator(op)) => op.op_to_sql(), Operator::AtArrow => not_impl_err!("unsupported operation: {op:?}"), Operator::ArrowAt => not_impl_err!("unsupported operation: {op:?}"), } diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 9f10802c4b92..fb01bbb9136e 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use arrow_buffer::ToByteSlice; use datafusion::arrow::datatypes::IntervalUnit; use datafusion::logical_expr::{ - CrossJoin, Distinct, Like, Partitioning, WindowFrameUnits, + CrossJoin, Distinct, Like, Partitioning, WindowFrameUnits, WrapCustomOperator, }; use datafusion::{ arrow::datatypes::{DataType, TimeUnit}, @@ -723,7 +723,7 @@ pub fn operator_to_name(op: Operator) -> &'static str { Operator::BitwiseXor => "bitwise_xor", Operator::BitwiseShiftRight => "bitwise_shift_right", Operator::BitwiseShiftLeft => "bitwise_shift_left", - Operator::Custom(op) => op.0.name(), + Operator::Custom(WrapCustomOperator(op)) => op.name(), } } From 99d7ae0a17bd49274cc109de730e13220ba2b647 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 11:30:09 +0100 Subject: [PATCH 05/11] fix tests --- datafusion/sql/tests/sql_integration.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index f196d71d41de..977fd738e322 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -84,6 +84,7 @@ fn parse_decimals() { ParserOptions { parse_float_as_decimal: true, enable_ident_normalization: false, + ..Default::default() }, ); } @@ -137,6 +138,7 @@ fn parse_ident_normalization() { ParserOptions { parse_float_as_decimal: false, enable_ident_normalization, + ..Default::default() }, ); if plan.is_ok() { From 620a32fc6534be1519b8497bf7c5db7b26685779 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 13:32:07 +0100 Subject: [PATCH 06/11] move register_parse_custom_operator to FunctionRegistry, fix from_proto --- datafusion/core/src/execution/context/mod.rs | 29 ++++++++------- .../core/src/execution/session_state.rs | 35 +++++++++---------- datafusion/expr/src/lib.rs | 2 ++ datafusion/expr/src/parse_custom_operator.rs | 18 ++++++++++ datafusion/expr/src/registry.rs | 19 +++++++++- .../proto/src/logical_plan/from_proto.rs | 20 ++++++++--- .../proto/src/physical_plan/from_proto.rs | 5 ++- datafusion/sql/src/expr/binary_op.rs | 2 +- datafusion/sql/src/planner.rs | 14 ++------ 9 files changed, 91 insertions(+), 53 deletions(-) create mode 100644 datafusion/expr/src/parse_custom_operator.rs diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index ca0c02d60136..e8e0edced4cf 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -60,7 +60,7 @@ use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ expr_rewriter::FunctionRewrite, logical_plan::{DdlStatement, Statement}, - Expr, UserDefinedLogicalNode, WindowUDF, + Expr, ParseCustomOperator, UserDefinedLogicalNode, WindowUDF, }; // backwards compatibility @@ -76,7 +76,6 @@ pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; -use datafusion_sql::planner::ParseCustomOperator; mod avro; mod csv; @@ -1354,19 +1353,6 @@ impl SessionContext { .write() .register_table_options_extension(extension) } - - /// Registers a new [`ParseCustomOperator`] with the registry. - /// - /// `ParseCustomOperator` is used to parse custom operators from SQL, - /// e.g. `->>` or `?`. - pub fn register_parse_custom_operator( - &mut self, - parse_custom_operator: Arc, - ) -> Result<()> { - self.state - .write() - .register_parse_custom_operator(parse_custom_operator) - } } impl FunctionRegistry for SessionContext { @@ -1404,6 +1390,19 @@ impl FunctionRegistry for SessionContext { ) -> Result<()> { self.state.write().register_function_rewrite(rewrite) } + + fn register_parse_custom_operator( + &mut self, + parse_custom_operator: Arc, + ) -> Result<()> { + self.state + .write() + .register_parse_custom_operator(parse_custom_operator) + } + + fn parse_custom_operators(&self) -> Vec> { + todo!(); + } } /// Create a new task context instance from SessionContext diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index ec7a1a089bc6..f500076193a6 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -56,8 +56,8 @@ use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry}; use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::var_provider::{is_system_variables, VarType}; use datafusion_expr::{ - AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, TableSource, - WindowUDF, + AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ParseCustomOperator, + ScalarUDF, TableSource, WindowUDF, }; use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datafusion_optimizer::{ @@ -67,9 +67,7 @@ use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; use datafusion_sql::parser::{DFParser, Statement}; -use datafusion_sql::planner::{ - ContextProvider, ParseCustomOperator, ParserOptions, PlannerContext, SqlToRel, -}; +use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel}; use sqlparser::ast::Expr as SQLExpr; use sqlparser::dialect::dialect_from_str; use std::collections::hash_map::Entry; @@ -578,7 +576,7 @@ impl SessionState { ParserOptions { parse_float_as_decimal: sql_parser_options.parse_float_as_decimal, enable_ident_normalization: sql_parser_options.enable_ident_normalization, - parse_custom_operator: self.parse_custom_operators.clone(), + parse_custom_operator: self.parse_custom_operators(), } } @@ -881,19 +879,6 @@ impl SessionState { let udtf = self.table_functions.remove(name); Ok(udtf.map(|x| x.function().clone())) } - - /// Registers a new [`ParseCustomOperator`] with the registry. - /// - /// `ParseCustomOperator` is used to parse custom operators from SQL, - /// e.g. `->>` or `?`. - pub fn register_parse_custom_operator( - &mut self, - parse_custom_operator: Arc, - ) -> datafusion_common::Result<()> { - // TODO moved into FunctionRegistry? it would involve adding datafusion_sql as a dep of datafusion-expr - self.parse_custom_operators.push(parse_custom_operator); - Ok(()) - } } struct SessionContextProvider<'a> { @@ -1093,6 +1078,18 @@ impl FunctionRegistry for SessionState { self.analyzer.add_function_rewrite(rewrite); Ok(()) } + + fn register_parse_custom_operator( + &mut self, + parse_custom_operator: Arc, + ) -> datafusion_common::Result<()> { + self.parse_custom_operators.push(parse_custom_operator); + Ok(()) + } + + fn parse_custom_operators(&self) -> Vec> { + self.parse_custom_operators.clone() + } } impl OptimizerConfig for SessionState { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index f0e27749a08a..c434da82bcbe 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -48,6 +48,7 @@ pub mod function; pub mod groups_accumulator; pub mod interval_arithmetic; pub mod logical_plan; +pub mod parse_custom_operator; pub mod registry; pub mod simplify; pub mod sort_properties; @@ -77,6 +78,7 @@ pub use groups_accumulator::{EmitTo, GroupsAccumulator}; pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; pub use logical_plan::*; pub use operator::{CustomOperator, Operator, WrapCustomOperator}; +pub use parse_custom_operator::ParseCustomOperator; pub use partition_evaluator::PartitionEvaluator; pub use signature::{ ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, diff --git a/datafusion/expr/src/parse_custom_operator.rs b/datafusion/expr/src/parse_custom_operator.rs new file mode 100644 index 000000000000..6e3d4462df77 --- /dev/null +++ b/datafusion/expr/src/parse_custom_operator.rs @@ -0,0 +1,18 @@ +use std::fmt::Debug; + +use datafusion_common::Result; +use sqlparser::ast::BinaryOperator; + +use crate::Operator; + +pub trait ParseCustomOperator: Debug + Send + Sync { + /// Return a human-readable name for this parser + fn name(&self) -> &str; + + /// potentially parse a custom operator. + /// + /// Return `None` if the operator is not recognized + fn op_from_ast(&self, op: &BinaryOperator) -> Result>; + + fn op_from_name(&self, raw_op: &str) -> Result>; +} diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 70d0a21a870e..9cd04269d6c4 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -18,7 +18,9 @@ //! FunctionRegistry trait use crate::expr_rewriter::FunctionRewrite; -use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; +use crate::{ + AggregateUDF, ParseCustomOperator, ScalarUDF, UserDefinedLogicalNode, WindowUDF, +}; use datafusion_common::{not_impl_err, plan_datafusion_err, Result}; use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; @@ -108,6 +110,21 @@ pub trait FunctionRegistry { ) -> Result<()> { not_impl_err!("Registering FunctionRewrite") } + + /// Registers a new [`ParseCustomOperator`] with the registry. + /// + /// `ParseCustomOperator` is used to parse custom operators from SQL, + /// e.g. `->>` or `?`. + fn register_parse_custom_operator( + &mut self, + _parse_custom_operator: Arc, + ) -> Result<()> { + not_impl_err!("Registering ParseCustomOperator") + } + + fn parse_custom_operators(&self) -> Vec> { + vec![] + } } /// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode]. diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 21331a94c18c..d223789df033 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -31,8 +31,8 @@ use datafusion_expr::{ AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, Case, Cast, Expr, GroupingSet, GroupingSet::GroupingSets, - JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, - WindowFrameUnits, + JoinConstraint, JoinType, Like, Operator, ParseCustomOperator, TryCast, WindowFrame, + WindowFrameBound, WindowFrameUnits, }; use datafusion_proto_common::{from_proto::FromOptionalField, FromProtoError as Error}; @@ -252,7 +252,10 @@ pub fn parse_expr( match expr_type { ExprType::BinaryExpr(binary_expr) => { - let op = from_proto_binary_op(&binary_expr.op)?; + let op = from_proto_binary_op( + &binary_expr.op, + ®istry.parse_custom_operators(), + )?; let operands = parse_exprs(&binary_expr.operands, registry, codec)?; if operands.len() < 2 { @@ -676,7 +679,16 @@ fn parse_escape_char(s: &str) -> Result> { } } -pub fn from_proto_binary_op(op: &str) -> Result { +pub fn from_proto_binary_op( + op: &str, + parse_custom_operator: &[Arc], +) -> Result { + for parse_custom_op in parse_custom_operator { + if let Some(op) = parse_custom_op.op_from_name(op)? { + return Ok(op); + } + } + match op { "And" => Ok(Operator::And), "Or" => Ok(Operator::Or), diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index b636c77641c7..545d2f81f408 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -241,7 +241,10 @@ pub fn parse_physical_expr( input_schema, codec, )?, - logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?, + logical_plan::from_proto::from_proto_binary_op( + &binary_expr.op, + ®istry.parse_custom_operators(), + )?, parse_required_physical_expr( binary_expr.r.as_deref(), registry, diff --git a/datafusion/sql/src/expr/binary_op.rs b/datafusion/sql/src/expr/binary_op.rs index 96cf12ef038e..a9ca1583f448 100644 --- a/datafusion/sql/src/expr/binary_op.rs +++ b/datafusion/sql/src/expr/binary_op.rs @@ -23,7 +23,7 @@ use sqlparser::ast::BinaryOperator; impl<'a, S: ContextProvider> SqlToRel<'a, S> { pub(crate) fn parse_sql_binary_op(&self, op: BinaryOperator) -> Result { for parse_custom_op in &self.options.parse_custom_operator { - if let Some(op) = parse_custom_op.parse(&op)? { + if let Some(op) = parse_custom_op.op_from_ast(&op)? { return Ok(op); } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 638f1f39ecc7..b885afb18b50 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -18,13 +18,13 @@ //! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST) use std::collections::HashMap; use std::sync::Arc; -use std::{fmt, vec}; +use std::vec; use arrow_schema::*; use datafusion_common::{ field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError, }; -use datafusion_expr::{Operator, WindowUDF}; +use datafusion_expr::{ParseCustomOperator, WindowUDF}; use sqlparser::ast::TimezoneInfo; use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo}; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; @@ -92,16 +92,6 @@ pub trait ContextProvider { fn udwf_names(&self) -> Vec; } -pub trait ParseCustomOperator: fmt::Debug + Send + Sync { - /// Return a human readable name for this parser - fn name(&self) -> &str; - - /// potentially parse a custom operator. - /// - /// Return `None` if the operator is not recognized - fn parse(&self, op: &sqlparser::ast::BinaryOperator) -> Result>; -} - /// SQL parser options #[derive(Debug)] pub struct ParserOptions { From 18290e61ece8c4a01ef9b1b4bbf6f363f55fea3c Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 13:35:34 +0100 Subject: [PATCH 07/11] woops, forgot license --- datafusion/expr/src/parse_custom_operator.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/expr/src/parse_custom_operator.rs b/datafusion/expr/src/parse_custom_operator.rs index 6e3d4462df77..e83e89d5c663 100644 --- a/datafusion/expr/src/parse_custom_operator.rs +++ b/datafusion/expr/src/parse_custom_operator.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::fmt::Debug; use datafusion_common::Result; From 5f21379f53ffa3870b24ce074ee40bb328a8ba76 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 13:45:21 +0100 Subject: [PATCH 08/11] fix todo on SessionContext --- datafusion/core/src/execution/context/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index e8e0edced4cf..89a50e5749bf 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1401,7 +1401,7 @@ impl FunctionRegistry for SessionContext { } fn parse_custom_operators(&self) -> Vec> { - todo!(); + self.state.read().parse_custom_operators() } } From f4bdfa04aa3ba21432c0a96378a54f67e462b7b5 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 15:58:38 +0100 Subject: [PATCH 09/11] add tests --- datafusion/core/tests/user_defined/mod.rs | 1 + .../user_defined_custom_operators.rs | 169 ++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 datafusion/core/tests/user_defined/user_defined_custom_operators.rs diff --git a/datafusion/core/tests/user_defined/mod.rs b/datafusion/core/tests/user_defined/mod.rs index 6c6d966cc3aa..5e7689a7a178 100644 --- a/datafusion/core/tests/user_defined/mod.rs +++ b/datafusion/core/tests/user_defined/mod.rs @@ -16,6 +16,7 @@ // under the License. /// Tests for user defined Scalar functions +mod user_defined_custom_operators; mod user_defined_scalar_functions; /// Tests for User Defined Aggregate Functions diff --git a/datafusion/core/tests/user_defined/user_defined_custom_operators.rs b/datafusion/core/tests/user_defined/user_defined_custom_operators.rs new file mode 100644 index 000000000000..e16c7bfd4884 --- /dev/null +++ b/datafusion/core/tests/user_defined/user_defined_custom_operators.rs @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use arrow_array::RecordBatch; +use std::sync::Arc; + +use datafusion::arrow::datatypes::DataType; +use datafusion::common::config::ConfigOptions; +use datafusion::common::tree_node::Transformed; +use datafusion::common::DFSchema; +use datafusion::error::Result; +use datafusion::execution::FunctionRegistry; +use datafusion::logical_expr::expr_rewriter::FunctionRewrite; +use datafusion::logical_expr::{ + CustomOperator, Operator, ParseCustomOperator, WrapCustomOperator, +}; +use datafusion::prelude::*; +use datafusion::sql::sqlparser::ast::BinaryOperator; +use datafusion_common::assert_batches_eq; + +#[derive(Debug)] +enum MyCustomOperator { + Arrow, + LongArrow, +} + +impl std::fmt::Display for MyCustomOperator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MyCustomOperator::Arrow => write!(f, "->"), + MyCustomOperator::LongArrow => write!(f, "->>"), + } + } +} + +impl CustomOperator for MyCustomOperator { + fn binary_signature( + &self, + lhs: &DataType, + rhs: &DataType, + ) -> Result<(DataType, DataType, DataType)> { + Ok((lhs.clone(), rhs.clone(), lhs.clone())) + } + + fn op_to_sql(&self) -> Result { + match self { + MyCustomOperator::Arrow => Ok(BinaryOperator::Arrow), + MyCustomOperator::LongArrow => Ok(BinaryOperator::LongArrow), + } + } + + fn name(&self) -> &'static str { + match self { + MyCustomOperator::Arrow => "Arrow", + MyCustomOperator::LongArrow => "LongArrow", + } + } +} + +impl TryFrom<&str> for MyCustomOperator { + type Error = (); + + fn try_from(value: &str) -> std::result::Result { + match value { + "Arrow" => Ok(MyCustomOperator::Arrow), + "LongArrow" => Ok(MyCustomOperator::LongArrow), + _ => Err(()), + } + } +} + +#[derive(Debug)] +struct CustomOperatorParser; + +impl ParseCustomOperator for CustomOperatorParser { + fn name(&self) -> &str { + "CustomOperatorParser" + } + + fn op_from_ast(&self, op: &BinaryOperator) -> Result> { + match op { + BinaryOperator::Arrow => Ok(Some(MyCustomOperator::Arrow.into())), + BinaryOperator::LongArrow => Ok(Some(MyCustomOperator::LongArrow.into())), + _ => Ok(None), + } + } + + fn op_from_name(&self, raw_op: &str) -> Result> { + if let Ok(op) = MyCustomOperator::try_from(raw_op) { + Ok(Some(op.into())) + } else { + Ok(None) + } + } +} + +impl FunctionRewrite for CustomOperatorParser { + fn name(&self) -> &str { + "CustomOperatorParser" + } + + fn rewrite( + &self, + expr: Expr, + _schema: &DFSchema, + _config: &ConfigOptions, + ) -> Result> { + if let Expr::BinaryExpr(bin_expr) = &expr { + if let Operator::Custom(WrapCustomOperator(op)) = &bin_expr.op { + if let Ok(pg_op) = MyCustomOperator::try_from(op.name()) { + // return BinaryExpr with a different operator + let mut bin_expr = bin_expr.clone(); + bin_expr.op = match pg_op { + MyCustomOperator::Arrow => Operator::StringConcat, + MyCustomOperator::LongArrow => Operator::Plus, + }; + return Ok(Transformed::yes(Expr::BinaryExpr(bin_expr))); + } + } + } + Ok(Transformed::no(expr)) + } +} + +async fn plan_and_collect(sql: &str) -> Result> { + let mut ctx = SessionContext::new(); + ctx.register_function_rewrite(Arc::new(CustomOperatorParser))?; + ctx.register_parse_custom_operator(Arc::new(CustomOperatorParser))?; + ctx.sql(sql).await?.collect().await +} + +#[tokio::test] +async fn test_custom_operators_arrow() { + let actual = plan_and_collect("select 'foo'->'bar';").await.unwrap(); + let expected = [ + "+----------------------------+", + "| Utf8(\"foo\") -> Utf8(\"bar\") |", + "+----------------------------+", + "| foobar |", + "+----------------------------+", + ]; + assert_batches_eq!(&expected, &actual); +} + +#[tokio::test] +async fn test_custom_operators_long_arrow() { + let actual = plan_and_collect("select 1->>2;").await.unwrap(); + let expected = [ + "+-----------------------+", + "| Int64(1) ->> Int64(2) |", + "+-----------------------+", + "| 3 |", + "+-----------------------+", + ]; + assert_batches_eq!(&expected, &actual); +} From e17b82763e5a9ea7429af7d71c281c8197e224aa Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 16:14:18 +0100 Subject: [PATCH 10/11] clean up tests --- datafusion/core/tests/user_defined/mod.rs | 3 ++- .../tests/user_defined/user_defined_custom_operators.rs | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/user_defined/mod.rs b/datafusion/core/tests/user_defined/mod.rs index 5e7689a7a178..a893ca02b322 100644 --- a/datafusion/core/tests/user_defined/mod.rs +++ b/datafusion/core/tests/user_defined/mod.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -/// Tests for user defined Scalar functions +/// Tests for custom operator parsing and substitution mod user_defined_custom_operators; +/// Tests for user defined Scalar functions mod user_defined_scalar_functions; /// Tests for User Defined Aggregate Functions diff --git a/datafusion/core/tests/user_defined/user_defined_custom_operators.rs b/datafusion/core/tests/user_defined/user_defined_custom_operators.rs index e16c7bfd4884..3c0a23738f32 100644 --- a/datafusion/core/tests/user_defined/user_defined_custom_operators.rs +++ b/datafusion/core/tests/user_defined/user_defined_custom_operators.rs @@ -14,13 +14,14 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use arrow_array::RecordBatch; + use std::sync::Arc; +use arrow_array::RecordBatch; use datafusion::arrow::datatypes::DataType; use datafusion::common::config::ConfigOptions; use datafusion::common::tree_node::Transformed; -use datafusion::common::DFSchema; +use datafusion::common::{DFSchema, assert_batches_eq}; use datafusion::error::Result; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::expr_rewriter::FunctionRewrite; @@ -29,7 +30,6 @@ use datafusion::logical_expr::{ }; use datafusion::prelude::*; use datafusion::sql::sqlparser::ast::BinaryOperator; -use datafusion_common::assert_batches_eq; #[derive(Debug)] enum MyCustomOperator { From 71b795626d8fd5103d396153feff70b8af28d883 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 27 Jun 2024 17:29:58 +0100 Subject: [PATCH 11/11] snoooooze, fmt --- .../core/tests/user_defined/user_defined_custom_operators.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_custom_operators.rs b/datafusion/core/tests/user_defined/user_defined_custom_operators.rs index 3c0a23738f32..38ab7f615a9c 100644 --- a/datafusion/core/tests/user_defined/user_defined_custom_operators.rs +++ b/datafusion/core/tests/user_defined/user_defined_custom_operators.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; use arrow_array::RecordBatch; +use std::sync::Arc; use datafusion::arrow::datatypes::DataType; use datafusion::common::config::ConfigOptions; use datafusion::common::tree_node::Transformed; -use datafusion::common::{DFSchema, assert_batches_eq}; +use datafusion::common::{assert_batches_eq, DFSchema}; use datafusion::error::Result; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::expr_rewriter::FunctionRewrite;