diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index ba3e68e4011f..cd6faf4f7884 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1361,9 +1361,14 @@ dependencies = [ name = "datafusion-physical-expr-common" version = "37.1.0" dependencies = [ + "ahash", "arrow", + "arrow-schema", "datafusion-common", "datafusion-expr", + "half", + "hashbrown 0.14.3", + "paste", ] [[package]] diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index d1202c83d526..6ef738380531 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -36,6 +36,13 @@ name = "datafusion_physical_expr_common" path = "src/lib.rs" [dependencies] +ahash = { version = "0.8", default-features = false, features = [ + "runtime-rng", +] } arrow = { workspace = true } +arrow-schema = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +half = { workspace = true } +hashbrown = { version = "0.14", features = ["raw"] } +paste = "^1.0" diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr-common/src/expressions/binary.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/binary.rs rename to datafusion/physical-expr-common/src/expressions/binary.rs index 7c57dc050db5..7a67035f8e92 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr-common/src/expressions/binary.rs @@ -23,8 +23,8 @@ use std::{any::Any, sync::Arc}; use crate::expressions::datum::{apply, apply_cmp}; use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::array::*; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; @@ -623,7 +623,9 @@ pub fn binary( #[cfg(test)] mod tests { use super::*; - use crate::expressions::{col, lit, try_cast, Literal}; + use crate::expressions::column::col; + use crate::expressions::literal::{lit, Literal}; + use crate::expressions::try_cast::try_cast; use arrow::datatypes::{ ArrowNumericType, Decimal128Type, Field, Int32Type, SchemaRef, }; diff --git a/datafusion/physical-expr/src/expressions/binary/kernels.rs b/datafusion/physical-expr-common/src/expressions/binary/kernels.rs similarity index 100% rename from datafusion/physical-expr/src/expressions/binary/kernels.rs rename to datafusion/physical-expr-common/src/expressions/binary/kernels.rs diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr-common/src/expressions/cast.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/cast.rs rename to datafusion/physical-expr-common/src/expressions/cast.rs index e87c643cdeb5..36cdcd170838 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr-common/src/expressions/cast.rs @@ -16,8 +16,8 @@ // under the License. use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use std::any::Any; use std::fmt; use std::hash::{Hash, Hasher}; @@ -217,7 +217,7 @@ pub fn cast( #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{ array::{ diff --git a/datafusion/physical-expr/src/expressions/datum.rs b/datafusion/physical-expr-common/src/expressions/datum.rs similarity index 96% rename from datafusion/physical-expr/src/expressions/datum.rs rename to datafusion/physical-expr-common/src/expressions/datum.rs index 2bb79922cfec..201bf104f20e 100644 --- a/datafusion/physical-expr/src/expressions/datum.rs +++ b/datafusion/physical-expr-common/src/expressions/datum.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::BooleanArray; use arrow::array::{ArrayRef, Datum}; use arrow::error::ArrowError; -use arrow_array::BooleanArray; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::ColumnarValue; use std::sync::Arc; @@ -25,7 +25,7 @@ use std::sync::Arc; /// Applies a binary [`Datum`] kernel `f` to `lhs` and `rhs` /// /// This maps arrow-rs' [`Datum`] kernels to DataFusion's [`ColumnarValue`] abstraction -pub(crate) fn apply( +pub fn apply( lhs: &ColumnarValue, rhs: &ColumnarValue, f: impl Fn(&dyn Datum, &dyn Datum) -> Result, @@ -49,7 +49,7 @@ pub(crate) fn apply( } /// Applies a binary [`Datum`] comparison kernel `f` to `lhs` and `rhs` -pub(crate) fn apply_cmp( +pub fn apply_cmp( lhs: &ColumnarValue, rhs: &ColumnarValue, f: impl Fn(&dyn Datum, &dyn Datum) -> Result, diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr-common/src/expressions/in_list.rs similarity index 97% rename from datafusion/physical-expr/src/expressions/in_list.rs rename to datafusion/physical-expr-common/src/expressions/in_list.rs index 07185b4d6527..e1f2e1d5ff68 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr-common/src/expressions/in_list.rs @@ -22,18 +22,21 @@ use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; +use crate::physical_expr::PhysicalExpr; use crate::physical_expr::{down_cast_any_ref, physical_exprs_bag_equal}; -use crate::PhysicalExpr; -use arrow::array::*; +use arrow::array::downcast_primitive_array; +use arrow::array::{ + as_largestring_array, downcast_array, downcast_dictionary_array, Array, + ArrayAccessor, ArrayData, ArrayIter, ArrayRef, BooleanArray, +}; use arrow::buffer::BooleanBuffer; use arrow::compute::kernels::boolean::{not, or_kleene}; use arrow::compute::kernels::cmp::eq; use arrow::compute::take; -use arrow::datatypes::*; +use arrow::datatypes::{i256, DataType, Schema}; use arrow::record_batch::RecordBatch; use arrow::util::bit_iterator::BitIndexIterator; -use arrow::{downcast_dictionary_array, downcast_primitive_array}; use datafusion_common::cast::{ as_boolean_array, as_generic_binary_array, as_string_array, }; @@ -455,15 +458,29 @@ pub fn in_list( #[cfg(test)] mod tests { - use arrow::{array::StringArray, datatypes::Field}; - use super::*; - use crate::expressions; - use crate::expressions::{col, lit, try_cast}; + + use crate::expressions::cast::cast; + use crate::expressions::column::col; + use crate::expressions::literal::lit; + use crate::expressions::try_cast::try_cast; + use arrow::array::BinaryArray; + use arrow::array::Date32Array; + use arrow::array::Date64Array; + use arrow::array::Decimal128Array; + use arrow::array::Float64Array; + use arrow::array::Int32Array; + use arrow::array::Int64Array; + use arrow::array::StringArray; + use arrow::array::TimestampMicrosecondArray; + use arrow::array::UInt16DictionaryArray; use datafusion_common::plan_err; use datafusion_common::Result; use datafusion_expr::type_coercion::binary::comparison_coercion; + use arrow::datatypes::Field; + use arrow::datatypes::TimeUnit; + type InListCastResult = (Arc, Vec>); // Try to do the type coercion for list physical expr. @@ -1109,8 +1126,8 @@ mod tests { // list of phy expr let mut phy_exprs = vec![ lit(1i64), - expressions::cast(lit(2i32), &schema, DataType::Int64)?, - expressions::try_cast(lit(3.13f32), &schema, DataType::Int64)?, + cast(lit(2i32), &schema, DataType::Int64)?, + try_cast(lit(3.13f32), &schema, DataType::Int64)?, ]; let result = try_cast_static_filter_to_set(&phy_exprs, &schema).unwrap(); @@ -1120,8 +1137,8 @@ mod tests { try_cast_static_filter_to_set(&phy_exprs, &schema).unwrap(); // cast(cast(lit())), but the cast to the same data type, one case will be ignored - phy_exprs.push(expressions::cast( - expressions::cast(lit(2i32), &schema, DataType::Int64)?, + phy_exprs.push(cast( + cast(lit(2i32), &schema, DataType::Int64)?, &schema, DataType::Int64, )?); @@ -1130,15 +1147,15 @@ mod tests { phy_exprs.clear(); // case(cast(lit())), the cast to the diff data type - phy_exprs.push(expressions::cast( - expressions::cast(lit(2i32), &schema, DataType::Int64)?, + phy_exprs.push(cast( + cast(lit(2i32), &schema, DataType::Int64)?, &schema, DataType::Int32, )?); try_cast_static_filter_to_set(&phy_exprs, &schema).unwrap(); // column - phy_exprs.push(expressions::col("a", &schema)?); + phy_exprs.push(col("a", &schema)?); assert!(try_cast_static_filter_to_set(&phy_exprs, &schema).is_err()); Ok(()) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr-common/src/expressions/is_not_null.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/is_not_null.rs rename to datafusion/physical-expr-common/src/expressions/is_not_null.rs index 2e6a2bec9cab..c4d3e27b1fb5 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr-common/src/expressions/is_not_null.rs @@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::compute; use arrow::{ datatypes::{DataType, Schema}, @@ -115,7 +115,7 @@ pub fn is_not_null(arg: Arc) -> Result> #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{ array::{BooleanArray, StringArray}, datatypes::*, diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr-common/src/expressions/is_null.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/is_null.rs rename to datafusion/physical-expr-common/src/expressions/is_null.rs index 3ad4058dd649..fa14f364354b 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr-common/src/expressions/is_null.rs @@ -27,7 +27,7 @@ use arrow::{ }; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; @@ -116,7 +116,7 @@ pub fn is_null(arg: Arc) -> Result> { #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{ array::{BooleanArray, StringArray}, datatypes::*, diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr-common/src/expressions/like.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/like.rs rename to datafusion/physical-expr-common/src/expressions/like.rs index 6e0beeb0beea..e76e0b9f01ef 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr-common/src/expressions/like.rs @@ -18,7 +18,7 @@ use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use crate::{physical_expr::down_cast_any_ref, PhysicalExpr}; +use crate::physical_expr::{down_cast_any_ref, PhysicalExpr}; use crate::expressions::datum::apply_cmp; use arrow::record_batch::RecordBatch; @@ -174,7 +174,7 @@ pub fn like( #[cfg(test)] mod test { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::array::*; use arrow_schema::Field; use datafusion_common::cast::as_boolean_array; diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr-common/src/expressions/literal.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/literal.rs rename to datafusion/physical-expr-common/src/expressions/literal.rs index cd3b51f09105..6c94aef3599f 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr-common/src/expressions/literal.rs @@ -22,8 +22,8 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::{ datatypes::{DataType, Schema}, diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs b/datafusion/physical-expr-common/src/expressions/mod.rs index d102422081dc..48fd2a1a07a3 100644 --- a/datafusion/physical-expr-common/src/expressions/mod.rs +++ b/datafusion/physical-expr-common/src/expressions/mod.rs @@ -15,4 +15,18 @@ // specific language governing permissions and limitations // under the License. +//! Defines physical expressions that can evaluated at runtime during query execution + +#[macro_use] +pub mod binary; +pub mod cast; pub mod column; +pub mod datum; +pub mod in_list; +pub mod is_not_null; +pub mod is_null; +pub mod like; +pub mod literal; +pub mod negative; +pub mod not; +pub mod try_cast; diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr-common/src/expressions/negative.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/negative.rs rename to datafusion/physical-expr-common/src/expressions/negative.rs index d6dd3ddbea5e..16044cf3abdc 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr-common/src/expressions/negative.rs @@ -22,8 +22,8 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::{ compute::kernels::numeric::neg_wrapping, @@ -173,7 +173,7 @@ pub fn negative( #[cfg(test)] mod tests { use super::*; - use crate::expressions::{col, Column}; + use crate::expressions::column::{col, Column}; use arrow::array::*; use arrow::datatypes::*; diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr-common/src/expressions/not.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/not.rs rename to datafusion/physical-expr-common/src/expressions/not.rs index f17df73e3070..5bc5550df36a 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr-common/src/expressions/not.rs @@ -23,7 +23,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::{cast::as_boolean_array, Result, ScalarValue}; @@ -123,7 +123,7 @@ pub fn not(arg: Arc) -> Result> { #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{array::BooleanArray, datatypes::*}; use datafusion_common::Result; diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr-common/src/expressions/try_cast.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/try_cast.rs rename to datafusion/physical-expr-common/src/expressions/try_cast.rs index ddfe49dda7a3..8a43d95ab6cc 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr-common/src/expressions/try_cast.rs @@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::compute; use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::{DataType, Schema}; @@ -148,7 +148,7 @@ pub fn try_cast( #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::array::{ Decimal128Array, Decimal128Builder, StringArray, Time64NanosecondArray, }; diff --git a/datafusion/physical-expr-common/src/intervals/cp_solver.rs b/datafusion/physical-expr-common/src/intervals/cp_solver.rs new file mode 100644 index 000000000000..28cf7c7b3974 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/cp_solver.rs @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Constraint propagator/solver for custom PhysicalExpr graphs. + +use arrow::datatypes::DataType; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval}; +use datafusion_expr::Operator; + +use super::utils::{ + convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op, +}; + +/// This function refines intervals `left_child` and `right_child` by applying +/// constraint propagation through `parent` via operation. The main idea is +/// that we can shrink ranges of variables x and y using parent interval p. +/// +/// Assuming that x,y and p has ranges [xL, xU], [yL, yU], and [pL, pU], we +/// apply the following operations: +/// - For plus operation, specifically, we would first do +/// - [xL, xU] <- ([pL, pU] - [yL, yU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([pL, pU] - [xL, xU]) ∩ [yL, yU]. +/// - For minus operation, specifically, we would first do +/// - [xL, xU] <- ([yL, yU] + [pL, pU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([xL, xU] - [pL, pU]) ∩ [yL, yU]. +/// - For multiplication operation, specifically, we would first do +/// - [xL, xU] <- ([pL, pU] / [yL, yU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([pL, pU] / [xL, xU]) ∩ [yL, yU]. +/// - For division operation, specifically, we would first do +/// - [xL, xU] <- ([yL, yU] * [pL, pU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([xL, xU] / [pL, pU]) ∩ [yL, yU]. +pub fn propagate_arithmetic( + op: &Operator, + parent: &Interval, + left_child: &Interval, + right_child: &Interval, +) -> Result> { + let inverse_op = get_inverse_op(*op)?; + match (left_child.data_type(), right_child.data_type()) { + // If we have a child whose type is a time interval (i.e. DataType::Interval), + // we need special handling since timestamp differencing results in a + // Duration type. + (DataType::Timestamp(..), DataType::Interval(_)) => { + propagate_time_interval_at_right( + left_child, + right_child, + parent, + op, + &inverse_op, + ) + } + (DataType::Interval(_), DataType::Timestamp(..)) => { + propagate_time_interval_at_left( + left_child, + right_child, + parent, + op, + &inverse_op, + ) + } + _ => { + // First, propagate to the left: + match apply_operator(&inverse_op, parent, right_child)? + .intersect(left_child)? + { + // Left is feasible: + Some(value) => Ok( + // Propagate to the right using the new left. + propagate_right(&value, parent, right_child, op, &inverse_op)? + .map(|right| (value, right)), + ), + // If the left child is infeasible, short-circuit. + None => Ok(None), + } + } + } +} + +/// This function refines intervals `left_child` and `right_child` by applying +/// comparison propagation through `parent` via operation. The main idea is +/// that we can shrink ranges of variables x and y using parent interval p. +/// Two intervals can be ordered in 6 ways for a Gt `>` operator: +/// ```text +/// (1): Infeasible, short-circuit +/// left: | ================ | +/// right: | ======================== | +/// +/// (2): Update both interval +/// left: | ====================== | +/// right: | ====================== | +/// | +/// V +/// left: | ======= | +/// right: | ======= | +/// +/// (3): Update left interval +/// left: | ============================== | +/// right: | ========== | +/// | +/// V +/// left: | ===================== | +/// right: | ========== | +/// +/// (4): Update right interval +/// left: | ========== | +/// right: | =========================== | +/// | +/// V +/// left: | ========== | +/// right | ================== | +/// +/// (5): No change +/// left: | ============================ | +/// right: | =================== | +/// +/// (6): No change +/// left: | ==================== | +/// right: | =============== | +/// +/// -inf --------------------------------------------------------------- +inf +/// ``` +pub fn propagate_comparison( + op: &Operator, + parent: &Interval, + left_child: &Interval, + right_child: &Interval, +) -> Result> { + if parent == &Interval::CERTAINLY_TRUE { + match op { + Operator::Eq => left_child.intersect(right_child).map(|result| { + result.map(|intersection| (intersection.clone(), intersection)) + }), + Operator::Gt => satisfy_greater(left_child, right_child, true), + Operator::GtEq => satisfy_greater(left_child, right_child, false), + Operator::Lt => satisfy_greater(right_child, left_child, true) + .map(|t| t.map(reverse_tuple)), + Operator::LtEq => satisfy_greater(right_child, left_child, false) + .map(|t| t.map(reverse_tuple)), + _ => internal_err!( + "The operator must be a comparison operator to propagate intervals" + ), + } + } else if parent == &Interval::CERTAINLY_FALSE { + match op { + Operator::Eq => { + // TODO: Propagation is not possible until we support interval sets. + Ok(None) + } + Operator::Gt => satisfy_greater(right_child, left_child, false), + Operator::GtEq => satisfy_greater(right_child, left_child, true), + Operator::Lt => satisfy_greater(left_child, right_child, false) + .map(|t| t.map(reverse_tuple)), + Operator::LtEq => satisfy_greater(left_child, right_child, true) + .map(|t| t.map(reverse_tuple)), + _ => internal_err!( + "The operator must be a comparison operator to propagate intervals" + ), + } + } else { + // Uncertainty cannot change any end-point of the intervals. + Ok(None) + } +} + +/// During the propagation of [`Interval`] values on an ExprIntervalGraph, +/// if there exists a `timestamp - timestamp` operation, the result would be +/// of type `Duration`. However, we may encounter a situation where a time interval +/// is involved in an arithmetic operation with a `Duration` type. This function +/// offers special handling for such cases, where the time interval resides on +/// the right side of the operation. +fn propagate_time_interval_at_right( + left_child: &Interval, + right_child: &Interval, + parent: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + // We check if the child's time interval(s) has a non-zero month or day field(s). + // If so, we return it as is without propagating. Otherwise, we first convert + // the time intervals to the `Duration` type, then propagate, and then convert + // the bounds to time intervals again. + let result = if let Some(duration) = convert_interval_type_to_duration(right_child) { + match apply_operator(inverse_op, parent, &duration)?.intersect(left_child)? { + Some(value) => { + propagate_right(left_child, parent, &duration, op, inverse_op)? + .and_then(|right| convert_duration_type_to_interval(&right)) + .map(|right| (value, right)) + } + None => None, + } + } else { + apply_operator(inverse_op, parent, right_child)? + .intersect(left_child)? + .map(|value| (value, right_child.clone())) + }; + Ok(result) +} + +/// This is a subfunction of the `propagate_arithmetic` function that propagates to the right child. +fn propagate_right( + left: &Interval, + parent: &Interval, + right: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + match op { + Operator::Minus => apply_operator(op, left, parent), + Operator::Plus => apply_operator(inverse_op, parent, left), + Operator::Divide => apply_operator(op, left, parent), + Operator::Multiply => apply_operator(inverse_op, parent, left), + _ => internal_err!("Interval arithmetic does not support the operator {}", op), + }? + .intersect(right) +} + +/// During the propagation of [`Interval`] values on an ExprIntervalGraph, +/// if there exists a `timestamp - timestamp` operation, the result would be +/// of type `Duration`. However, we may encounter a situation where a time interval +/// is involved in an arithmetic operation with a `Duration` type. This function +/// offers special handling for such cases, where the time interval resides on +/// the left side of the operation. +fn propagate_time_interval_at_left( + left_child: &Interval, + right_child: &Interval, + parent: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + // We check if the child's time interval(s) has a non-zero month or day field(s). + // If so, we return it as is without propagating. Otherwise, we first convert + // the time intervals to the `Duration` type, then propagate, and then convert + // the bounds to time intervals again. + let result = if let Some(duration) = convert_interval_type_to_duration(left_child) { + match apply_operator(inverse_op, parent, right_child)?.intersect(duration)? { + Some(value) => { + let left = convert_duration_type_to_interval(&value); + let right = propagate_right(&value, parent, right_child, op, inverse_op)?; + match (left, right) { + (Some(left), Some(right)) => Some((left, right)), + _ => None, + } + } + None => None, + } + } else { + propagate_right(left_child, parent, right_child, op, inverse_op)? + .map(|right| (left_child.clone(), right)) + }; + Ok(result) +} + +fn reverse_tuple((first, second): (T, U)) -> (U, T) { + (second, first) +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::TimeUnit; + use datafusion_common::ScalarValue; + + use super::*; + + #[test] + fn test_propagate_comparison() -> Result<()> { + // In the examples below: + // `left` is unbounded: [?, ?], + // `right` is known to be [1000,1000] + // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 999] + let left = Interval::make_unbounded(&DataType::Int64)?; + let right = Interval::make(Some(1000_i64), Some(1000_i64))?; + assert_eq!( + (Some(( + Interval::make(None, Some(999_i64))?, + Interval::make(Some(1000_i64), Some(1000_i64))?, + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + let left = + Interval::make_unbounded(&DataType::Timestamp(TimeUnit::Nanosecond, None))?; + let right = Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), None), + ScalarValue::TimestampNanosecond(Some(1000), None), + )?; + assert_eq!( + (Some(( + Interval::try_new( + ScalarValue::try_from(&DataType::Timestamp( + TimeUnit::Nanosecond, + None + )) + .unwrap(), + ScalarValue::TimestampNanosecond(Some(999), None), + )?, + Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), None), + ScalarValue::TimestampNanosecond(Some(1000), None), + )? + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + let left = Interval::make_unbounded(&DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + ))?; + let right = Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + )?; + assert_eq!( + (Some(( + Interval::try_new( + ScalarValue::try_from(&DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + )) + .unwrap(), + ScalarValue::TimestampNanosecond(Some(999), Some("+05:00".into())), + )?, + Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + )? + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + Ok(()) + } +} diff --git a/datafusion/physical-expr-common/src/intervals/mod.rs b/datafusion/physical-expr-common/src/intervals/mod.rs new file mode 100644 index 000000000000..7022bf2c42b9 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Interval arithmetic and constraint propagation library + +pub mod cp_solver; +pub mod utils; diff --git a/datafusion/physical-expr-common/src/intervals/utils.rs b/datafusion/physical-expr-common/src/intervals/utils.rs new file mode 100644 index 000000000000..7e8fad259a96 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/utils.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for the interval arithmetic library + +use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; +use datafusion_expr::{interval_arithmetic::Interval, Operator}; + +const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; +const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; +const DT_MS_MASK: i64 = 0xFFFF_FFFF; + +// This function returns the inverse operator of the given operator. +pub fn get_inverse_op(op: Operator) -> Result { + match op { + Operator::Plus => Ok(Operator::Minus), + Operator::Minus => Ok(Operator::Plus), + Operator::Multiply => Ok(Operator::Divide), + Operator::Divide => Ok(Operator::Multiply), + _ => internal_err!("Interval arithmetic does not support the operator {}", op), + } +} + +/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`]. +pub fn convert_interval_type_to_duration(interval: &Interval) -> Option { + if let (Some(lower), Some(upper)) = ( + convert_interval_bound_to_duration(interval.lower()), + convert_interval_bound_to_duration(interval.upper()), + ) { + Interval::try_new(lower, upper).ok() + } else { + None + } +} + +/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`]. +fn convert_interval_bound_to_duration( + interval_bound: &ScalarValue, +) -> Option { + match interval_bound { + ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn) + .ok() + .map(|duration| ScalarValue::DurationNanosecond(Some(duration))), + ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt) + .ok() + .map(|duration| ScalarValue::DurationMillisecond(Some(duration))), + _ => None, + } +} + +/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. +/// Otherwise, it returns an error. +fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { + let months = mdn >> 96; + let days = (mdn & MDN_DAY_MASK) >> 64; + let nanoseconds = mdn & MDN_NS_MASK; + + if months == 0 && days == 0 { + nanoseconds + .try_into() + .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) + } else { + internal_err!( + "The interval cannot have a non-zero month or day value for duration convertibility" + ) + } +} + +/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. +/// Otherwise, it returns an error. +fn interval_dt_to_duration_ms(dt: &i64) -> Result { + let days = dt >> 32; + let milliseconds = dt & DT_MS_MASK; + + if days == 0 { + Ok(milliseconds) + } else { + internal_err!( + "The interval cannot have a non-zero day value for duration convertibility" + ) + } +} + +/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`]. +pub fn convert_duration_type_to_interval(interval: &Interval) -> Option { + if let (Some(lower), Some(upper)) = ( + convert_duration_bound_to_interval(interval.lower()), + convert_duration_bound_to_interval(interval.upper()), + ) { + Interval::try_new(lower, upper).ok() + } else { + None + } +} + +/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`]. +fn convert_duration_bound_to_interval( + interval_bound: &ScalarValue, +) -> Option { + match interval_bound { + ScalarValue::DurationNanosecond(Some(duration)) => { + Some(ScalarValue::new_interval_mdn(0, 0, *duration)) + } + ScalarValue::DurationMicrosecond(Some(duration)) => { + Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000)) + } + ScalarValue::DurationMillisecond(Some(duration)) => { + Some(ScalarValue::new_interval_dt(0, *duration as i32)) + } + ScalarValue::DurationSecond(Some(duration)) => { + Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000)) + } + _ => None, + } +} diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index 53e3134a1b05..de3b61c035ad 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -17,6 +17,7 @@ pub mod aggregate; pub mod expressions; +pub mod intervals; pub mod physical_expr; pub mod sort_expr; pub mod sort_properties; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index be6358e73c99..66a57b485af8 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -24,11 +24,31 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; + use datafusion_common::utils::DataPtr; -use datafusion_common::{internal_err, not_impl_err, Result}; +use datafusion_common::{ + exec_err, internal_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, +}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::expr::{Alias, InList}; use datafusion_expr::interval_arithmetic::Interval; -use datafusion_expr::ColumnarValue; +use datafusion_expr::var_provider::{is_system_variables, VarType}; +use datafusion_expr::{ + Between, BinaryExpr, Cast, ColumnarValue, Expr, GetFieldAccess, GetIndexedField, + Like, Operator, TryCast, +}; +use crate::expressions::binary::binary; +use crate::expressions::cast::cast; +use crate::expressions::column::Column; +use crate::expressions::in_list::in_list; +use crate::expressions::is_not_null::is_not_null; +use crate::expressions::is_null::is_null; +use crate::expressions::like::like; +use crate::expressions::literal::{lit, Literal}; +use crate::expressions::negative::negative; +use crate::expressions::not::not; +use crate::expressions::try_cast::try_cast; use crate::sort_properties::SortProperties; use crate::utils::scatter; @@ -209,3 +229,376 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { any } } + +/// Checks whether the given physical expression slices are equal in the sense +/// of bags (multi-sets), disregarding their orderings. +pub fn physical_exprs_bag_equal( + lhs: &[Arc], + rhs: &[Arc], +) -> bool { + // TODO: Once we can use `HashMap`s with `Arc`, this + // function should use a `HashMap` to reduce computational complexity. + if lhs.len() == rhs.len() { + let mut rhs_vec = rhs.to_vec(); + for expr in lhs { + if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) { + rhs_vec.swap_remove(idx); + } else { + return false; + } + } + true + } else { + false + } +} + +/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 +/// AS int)`. +/// +/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical +/// planning, and can be evaluated directly on a [RecordBatch]. They are +/// normally created from [Expr] by a [PhysicalPlanner] and can be created +/// directly using [create_physical_expr]. +/// +/// A Physical expression knows its type, nullability and how to evaluate itself. +/// +/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html +/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html +/// +/// # Example: Create `PhysicalExpr` from `Expr` +/// ``` +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::DFSchema; +/// # use datafusion_expr::{Expr, col, lit}; +/// # use datafusion_physical_expr_common::physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// // For a logical expression `a = 1`, we can create a physical expression +/// let expr = col("a").eq(lit(1)); +/// // To create a PhysicalExpr we need 1. a schema +/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// let df_schema = DFSchema::try_from(schema).unwrap(); +/// // 2. ExecutionProps +/// let props = ExecutionProps::new(); +/// // We can now create a PhysicalExpr: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// ``` +/// +/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue] +/// ``` +/// # use std::sync::Arc; +/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::{assert_batches_eq, DFSchema}; +/// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; +/// # use datafusion_physical_expr_common::physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// # let expr = col("a").eq(lit(1)); +/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); +/// # let props = ExecutionProps::new(); +/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// // Input of [1,2,3] +/// let input_batch = RecordBatch::try_from_iter(vec![ +/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) +/// ]).unwrap(); +/// // The result is a ColumnarValue (either an Array or a Scalar) +/// let result = physical_expr.evaluate(&input_batch).unwrap(); +/// // In this case, a BooleanArray with the result of the comparison +/// let ColumnarValue::Array(arr) = result else { +/// panic!("Expected an array") +/// }; +/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false])); +/// ``` +/// +/// [ColumnarValue]: datafusion_expr::ColumnarValue +/// +/// Create a physical expression from a logical expression ([Expr]). +/// +/// # Arguments +/// +/// * `e` - The logical expression +/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references +/// to qualified or unqualified fields by name. +#[allow(clippy::only_used_in_recursion)] +pub fn create_physical_expr( + e: &Expr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result> { + let input_schema: &Schema = &input_dfschema.into(); + + match e { + Expr::Alias(Alias { expr, .. }) => { + Ok(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::Column(c) => { + let idx = input_dfschema.index_of_column(c)?; + Ok(Arc::new(Column::new(&c.name, idx))) + } + Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), + Expr::ScalarVariable(_, variable_names) => { + if is_system_variables(variable_names) { + match execution_props.get_var_provider(VarType::System) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) + } + _ => plan_err!("No system variable provider found"), + } + } else { + match execution_props.get_var_provider(VarType::UserDefined) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) + } + _ => plan_err!("No user defined variable provider found"), + } + } + } + Expr::IsTrue(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsNotDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(true))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsNotTrue(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(true))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsFalse(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsNotDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(false))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsNotFalse(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(false))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsUnknown(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsNotDistinctFrom, + Expr::Literal(ScalarValue::Boolean(None)), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsNotUnknown(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsDistinctFrom, + Expr::Literal(ScalarValue::Boolean(None)), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + // Create physical expressions for left and right operands + let lhs = create_physical_expr(left, input_dfschema, execution_props)?; + let rhs = create_physical_expr(right, input_dfschema, execution_props)?; + // Note that the logical planner is responsible + // for type coercion on the arguments (e.g. if one + // argument was originally Int32 and one was + // Int64 they will both be coerced to Int64). + // + // There should be no coercion during physical + // planning. + binary(lhs, *op, rhs, input_schema) + } + Expr::Like(Like { + negated, + expr, + pattern, + escape_char, + case_insensitive, + }) => { + if escape_char.is_some() { + return exec_err!("LIKE does not support escape_char"); + } + let physical_expr = + create_physical_expr(expr, input_dfschema, execution_props)?; + let physical_pattern = + create_physical_expr(pattern, input_dfschema, execution_props)?; + like( + *negated, + *case_insensitive, + physical_expr, + physical_pattern, + input_schema, + ) + } + // Expr::Case(case) => { + // let expr: Option> = if let Some(e) = &case.expr { + // Some(create_physical_expr( + // e.as_ref(), + // input_dfschema, + // execution_props, + // )?) + // } else { + // None + // }; + // let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case + // .when_then_expr + // .iter() + // .map(|(w, t)| (w.as_ref(), t.as_ref())) + // .unzip(); + // let when_expr = + // create_physical_exprs(when_expr, input_dfschema, execution_props)?; + // let then_expr = + // create_physical_exprs(then_expr, input_dfschema, execution_props)?; + // let when_then_expr: Vec<(Arc, Arc)> = + // when_expr + // .iter() + // .zip(then_expr.iter()) + // .map(|(w, t)| (w.clone(), t.clone())) + // .collect(); + // let else_expr: Option> = + // if let Some(e) = &case.else_expr { + // Some(create_physical_expr( + // e.as_ref(), + // input_dfschema, + // execution_props, + // )?) + // } else { + // None + // }; + // Ok(expressions::case(expr, when_then_expr, else_expr)?) + // } + Expr::Cast(Cast { expr, data_type }) => cast( + create_physical_expr(expr, input_dfschema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::TryCast(TryCast { expr, data_type }) => try_cast( + create_physical_expr(expr, input_dfschema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::Not(expr) => { + not(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::Negative(expr) => negative( + create_physical_expr(expr, input_dfschema, execution_props)?, + input_schema, + ), + Expr::IsNull(expr) => { + is_null(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::IsNotNull(expr) => { + is_not_null(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::GetIndexedField(GetIndexedField { expr: _, field }) => match field { + GetFieldAccess::NamedStructField { name: _ } => { + internal_err!( + "NamedStructField should be rewritten in OperatorToFunction" + ) + } + GetFieldAccess::ListIndex { key: _ } => { + internal_err!("ListIndex should be rewritten in OperatorToFunction") + } + GetFieldAccess::ListRange { + start: _, + stop: _, + stride: _, + } => { + internal_err!("ListRange should be rewritten in OperatorToFunction") + } + }, + + // Expr::ScalarFunction(ScalarFunction { func_def, args }) => { + // let physical_args = + // create_physical_exprs(args, input_dfschema, execution_props)?; + + // match func_def { + // ScalarFunctionDefinition::BuiltIn(fun) => { + // functions::create_builtin_physical_expr( + // fun, + // &physical_args, + // input_schema, + // execution_props, + // ) + // } + // ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr( + // fun.clone().as_ref(), + // &physical_args, + // input_schema, + // args, + // input_dfschema, + // ), + // ScalarFunctionDefinition::Name(_) => { + // internal_err!("Function `Expr` with name should be resolved.") + // } + // } + // } + Expr::Between(Between { + expr, + negated, + low, + high, + }) => { + let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?; + let low_expr = create_physical_expr(low, input_dfschema, execution_props)?; + let high_expr = create_physical_expr(high, input_dfschema, execution_props)?; + + // rewrite the between into the two binary operators + let binary_expr = binary( + binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, + Operator::And, + binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, + input_schema, + ); + + if *negated { + not(binary_expr?) + } else { + binary_expr + } + } + Expr::InList(InList { + expr, + list, + negated, + }) => match expr.as_ref() { + Expr::Literal(ScalarValue::Utf8(None)) => Ok(lit(ScalarValue::Boolean(None))), + _ => { + let value_expr = + create_physical_expr(expr, input_dfschema, execution_props)?; + + let list_exprs = + create_physical_exprs(list, input_dfschema, execution_props)?; + in_list(value_expr, list_exprs, negated, input_schema) + } + }, + other => { + not_impl_err!("Physical plan does not support logical expression {other:?}") + } + } +} + +/// Create vector of Physical Expression from a vector of logical expression +pub fn create_physical_exprs<'a, I>( + exprs: I, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result>> +where + I: IntoIterator, +{ + exprs + .into_iter() + .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) + .collect::>>() +} diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils/mod.rs similarity index 100% rename from datafusion/physical-expr-common/src/utils.rs rename to datafusion/physical-expr-common/src/utils/mod.rs diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 688d5ce6eabf..50cad65b829d 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -17,21 +17,9 @@ //! Defines physical expressions that can evaluated at runtime during query execution -#[macro_use] -mod binary; mod case; -mod cast; mod column; -mod datum; -mod in_list; -mod is_not_null; -mod is_null; -mod like; -mod literal; -mod negative; mod no_op; -mod not; -mod try_cast; /// Module with some convenient methods used in expression building pub mod helpers { @@ -79,21 +67,27 @@ pub use datafusion_functions_aggregate::first_last::{ FirstValuePhysicalExpr as FirstValue, LastValuePhysicalExpr as LastValue, }; -pub use binary::{binary, BinaryExpr}; pub use case::{case, CaseExpr}; -pub use cast::{cast, cast_with_options, CastExpr}; pub use column::UnKnownColumn; pub use datafusion_expr::utils::format_state_name; +pub use datafusion_physical_expr_common::expressions::binary::{binary, BinaryExpr}; +pub use datafusion_physical_expr_common::expressions::cast::{ + cast, cast_with_options, CastExpr, +}; pub use datafusion_physical_expr_common::expressions::column::{col, Column}; -pub use in_list::{in_list, InListExpr}; -pub use is_not_null::{is_not_null, IsNotNullExpr}; -pub use is_null::{is_null, IsNullExpr}; -pub use like::{like, LikeExpr}; -pub use literal::{lit, Literal}; -pub use negative::{negative, NegativeExpr}; +pub use datafusion_physical_expr_common::expressions::in_list::{in_list, InListExpr}; +pub use datafusion_physical_expr_common::expressions::is_not_null::{ + is_not_null, IsNotNullExpr, +}; +pub use datafusion_physical_expr_common::expressions::is_null::{is_null, IsNullExpr}; +pub use datafusion_physical_expr_common::expressions::like::{like, LikeExpr}; +pub use datafusion_physical_expr_common::expressions::literal::{lit, Literal}; +pub use datafusion_physical_expr_common::expressions::negative::{ + negative, NegativeExpr, +}; +pub use datafusion_physical_expr_common::expressions::not::{not, NotExpr}; +pub use datafusion_physical_expr_common::expressions::try_cast::{try_cast, TryCastExpr}; pub use no_op::NoOp; -pub use not::{not, NotExpr}; -pub use try_cast::{try_cast, TryCastExpr}; #[cfg(test)] pub(crate) mod tests { diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 3bd059afa6be..49981036d3ff 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -21,23 +21,23 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::sync::Arc; -use super::utils::{ - convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op, -}; use crate::expressions::Literal; use crate::utils::{build_dag, ExprTreeNode}; use crate::PhysicalExpr; use arrow_schema::{DataType, Schema}; -use datafusion_common::{internal_err, Result}; -use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval}; -use datafusion_expr::Operator; +use datafusion_common::Result; +use datafusion_expr::interval_arithmetic::Interval; use petgraph::graph::NodeIndex; use petgraph::stable_graph::{DefaultIx, StableGraph}; use petgraph::visit::{Bfs, Dfs, DfsPostOrder, EdgeRef}; use petgraph::Outgoing; +pub use datafusion_physical_expr_common::intervals::cp_solver::{ + propagate_arithmetic, propagate_comparison, +}; + // Interval arithmetic provides a way to perform mathematical operations on // intervals, which represent a range of possible values rather than a single // point value. This allows for the propagation of ranges through mathematical @@ -198,157 +198,6 @@ impl PartialEq for ExprIntervalGraphNode { } } -/// This function refines intervals `left_child` and `right_child` by applying -/// constraint propagation through `parent` via operation. The main idea is -/// that we can shrink ranges of variables x and y using parent interval p. -/// -/// Assuming that x,y and p has ranges [xL, xU], [yL, yU], and [pL, pU], we -/// apply the following operations: -/// - For plus operation, specifically, we would first do -/// - [xL, xU] <- ([pL, pU] - [yL, yU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([pL, pU] - [xL, xU]) ∩ [yL, yU]. -/// - For minus operation, specifically, we would first do -/// - [xL, xU] <- ([yL, yU] + [pL, pU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([xL, xU] - [pL, pU]) ∩ [yL, yU]. -/// - For multiplication operation, specifically, we would first do -/// - [xL, xU] <- ([pL, pU] / [yL, yU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([pL, pU] / [xL, xU]) ∩ [yL, yU]. -/// - For division operation, specifically, we would first do -/// - [xL, xU] <- ([yL, yU] * [pL, pU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([xL, xU] / [pL, pU]) ∩ [yL, yU]. -pub fn propagate_arithmetic( - op: &Operator, - parent: &Interval, - left_child: &Interval, - right_child: &Interval, -) -> Result> { - let inverse_op = get_inverse_op(*op)?; - match (left_child.data_type(), right_child.data_type()) { - // If we have a child whose type is a time interval (i.e. DataType::Interval), - // we need special handling since timestamp differencing results in a - // Duration type. - (DataType::Timestamp(..), DataType::Interval(_)) => { - propagate_time_interval_at_right( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - (DataType::Interval(_), DataType::Timestamp(..)) => { - propagate_time_interval_at_left( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - _ => { - // First, propagate to the left: - match apply_operator(&inverse_op, parent, right_child)? - .intersect(left_child)? - { - // Left is feasible: - Some(value) => Ok( - // Propagate to the right using the new left. - propagate_right(&value, parent, right_child, op, &inverse_op)? - .map(|right| (value, right)), - ), - // If the left child is infeasible, short-circuit. - None => Ok(None), - } - } - } -} - -/// This function refines intervals `left_child` and `right_child` by applying -/// comparison propagation through `parent` via operation. The main idea is -/// that we can shrink ranges of variables x and y using parent interval p. -/// Two intervals can be ordered in 6 ways for a Gt `>` operator: -/// ```text -/// (1): Infeasible, short-circuit -/// left: | ================ | -/// right: | ======================== | -/// -/// (2): Update both interval -/// left: | ====================== | -/// right: | ====================== | -/// | -/// V -/// left: | ======= | -/// right: | ======= | -/// -/// (3): Update left interval -/// left: | ============================== | -/// right: | ========== | -/// | -/// V -/// left: | ===================== | -/// right: | ========== | -/// -/// (4): Update right interval -/// left: | ========== | -/// right: | =========================== | -/// | -/// V -/// left: | ========== | -/// right | ================== | -/// -/// (5): No change -/// left: | ============================ | -/// right: | =================== | -/// -/// (6): No change -/// left: | ==================== | -/// right: | =============== | -/// -/// -inf --------------------------------------------------------------- +inf -/// ``` -pub fn propagate_comparison( - op: &Operator, - parent: &Interval, - left_child: &Interval, - right_child: &Interval, -) -> Result> { - if parent == &Interval::CERTAINLY_TRUE { - match op { - Operator::Eq => left_child.intersect(right_child).map(|result| { - result.map(|intersection| (intersection.clone(), intersection)) - }), - Operator::Gt => satisfy_greater(left_child, right_child, true), - Operator::GtEq => satisfy_greater(left_child, right_child, false), - Operator::Lt => satisfy_greater(right_child, left_child, true) - .map(|t| t.map(reverse_tuple)), - Operator::LtEq => satisfy_greater(right_child, left_child, false) - .map(|t| t.map(reverse_tuple)), - _ => internal_err!( - "The operator must be a comparison operator to propagate intervals" - ), - } - } else if parent == &Interval::CERTAINLY_FALSE { - match op { - Operator::Eq => { - // TODO: Propagation is not possible until we support interval sets. - Ok(None) - } - Operator::Gt => satisfy_greater(right_child, left_child, false), - Operator::GtEq => satisfy_greater(right_child, left_child, true), - Operator::Lt => satisfy_greater(left_child, right_child, false) - .map(|t| t.map(reverse_tuple)), - Operator::LtEq => satisfy_greater(left_child, right_child, true) - .map(|t| t.map(reverse_tuple)), - _ => internal_err!( - "The operator must be a comparison operator to propagate intervals" - ), - } - } else { - // Uncertainty cannot change any end-point of the intervals. - Ok(None) - } -} - impl ExprIntervalGraph { pub fn try_new(expr: Arc, schema: &Schema) -> Result { // Build the full graph: @@ -624,107 +473,15 @@ impl ExprIntervalGraph { } } -/// This is a subfunction of the `propagate_arithmetic` function that propagates to the right child. -fn propagate_right( - left: &Interval, - parent: &Interval, - right: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - match op { - Operator::Minus => apply_operator(op, left, parent), - Operator::Plus => apply_operator(inverse_op, parent, left), - Operator::Divide => apply_operator(op, left, parent), - Operator::Multiply => apply_operator(inverse_op, parent, left), - _ => internal_err!("Interval arithmetic does not support the operator {}", op), - }? - .intersect(right) -} - -/// During the propagation of [`Interval`] values on an [`ExprIntervalGraph`], -/// if there exists a `timestamp - timestamp` operation, the result would be -/// of type `Duration`. However, we may encounter a situation where a time interval -/// is involved in an arithmetic operation with a `Duration` type. This function -/// offers special handling for such cases, where the time interval resides on -/// the left side of the operation. -fn propagate_time_interval_at_left( - left_child: &Interval, - right_child: &Interval, - parent: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - // We check if the child's time interval(s) has a non-zero month or day field(s). - // If so, we return it as is without propagating. Otherwise, we first convert - // the time intervals to the `Duration` type, then propagate, and then convert - // the bounds to time intervals again. - let result = if let Some(duration) = convert_interval_type_to_duration(left_child) { - match apply_operator(inverse_op, parent, right_child)?.intersect(duration)? { - Some(value) => { - let left = convert_duration_type_to_interval(&value); - let right = propagate_right(&value, parent, right_child, op, inverse_op)?; - match (left, right) { - (Some(left), Some(right)) => Some((left, right)), - _ => None, - } - } - None => None, - } - } else { - propagate_right(left_child, parent, right_child, op, inverse_op)? - .map(|right| (left_child.clone(), right)) - }; - Ok(result) -} - -/// During the propagation of [`Interval`] values on an [`ExprIntervalGraph`], -/// if there exists a `timestamp - timestamp` operation, the result would be -/// of type `Duration`. However, we may encounter a situation where a time interval -/// is involved in an arithmetic operation with a `Duration` type. This function -/// offers special handling for such cases, where the time interval resides on -/// the right side of the operation. -fn propagate_time_interval_at_right( - left_child: &Interval, - right_child: &Interval, - parent: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - // We check if the child's time interval(s) has a non-zero month or day field(s). - // If so, we return it as is without propagating. Otherwise, we first convert - // the time intervals to the `Duration` type, then propagate, and then convert - // the bounds to time intervals again. - let result = if let Some(duration) = convert_interval_type_to_duration(right_child) { - match apply_operator(inverse_op, parent, &duration)?.intersect(left_child)? { - Some(value) => { - propagate_right(left_child, parent, &duration, op, inverse_op)? - .and_then(|right| convert_duration_type_to_interval(&right)) - .map(|right| (value, right)) - } - None => None, - } - } else { - apply_operator(inverse_op, parent, right_child)? - .intersect(left_child)? - .map(|value| (value, right_child.clone())) - }; - Ok(result) -} - -fn reverse_tuple((first, second): (T, U)) -> (U, T) { - (second, first) -} - #[cfg(test)] mod tests { use super::*; use crate::expressions::{BinaryExpr, Column}; use crate::intervals::test_utils::gen_conjunctive_numerical_expr; - use arrow::datatypes::TimeUnit; use arrow_schema::{DataType, Field}; use datafusion_common::ScalarValue; + use datafusion_expr::Operator; use itertools::Itertools; use rand::rngs::StdRng; @@ -1477,90 +1234,6 @@ mod tests { Ok(()) } - #[test] - fn test_propagate_comparison() -> Result<()> { - // In the examples below: - // `left` is unbounded: [?, ?], - // `right` is known to be [1000,1000] - // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 999] - let left = Interval::make_unbounded(&DataType::Int64)?; - let right = Interval::make(Some(1000_i64), Some(1000_i64))?; - assert_eq!( - (Some(( - Interval::make(None, Some(999_i64))?, - Interval::make(Some(1000_i64), Some(1000_i64))?, - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - let left = - Interval::make_unbounded(&DataType::Timestamp(TimeUnit::Nanosecond, None))?; - let right = Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), None), - ScalarValue::TimestampNanosecond(Some(1000), None), - )?; - assert_eq!( - (Some(( - Interval::try_new( - ScalarValue::try_from(&DataType::Timestamp( - TimeUnit::Nanosecond, - None - )) - .unwrap(), - ScalarValue::TimestampNanosecond(Some(999), None), - )?, - Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), None), - ScalarValue::TimestampNanosecond(Some(1000), None), - )? - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - let left = Interval::make_unbounded(&DataType::Timestamp( - TimeUnit::Nanosecond, - Some("+05:00".into()), - ))?; - let right = Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - )?; - assert_eq!( - (Some(( - Interval::try_new( - ScalarValue::try_from(&DataType::Timestamp( - TimeUnit::Nanosecond, - Some("+05:00".into()), - )) - .unwrap(), - ScalarValue::TimestampNanosecond(Some(999), Some("+05:00".into())), - )?, - Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - )? - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - Ok(()) - } - #[test] fn test_propagate_or() -> Result<()> { let expr = Arc::new(BinaryExpr::new( diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index e188b2d56bae..ff7fd63126a6 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -25,14 +25,8 @@ use crate::{ }; use arrow_schema::{DataType, SchemaRef}; -use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; -use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::Operator; -const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; -const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; -const DT_MS_MASK: i64 = 0xFFFF_FFFF; - /// Indicates whether interval arithmetic is supported for the given expression. /// Currently, we do not support all [`PhysicalExpr`]s for interval calculations. /// We do not support every type of [`Operator`]s either. Over time, this check @@ -65,17 +59,6 @@ pub fn check_support(expr: &Arc, schema: &SchemaRef) -> bool { } } -// This function returns the inverse operator of the given operator. -pub fn get_inverse_op(op: Operator) -> Result { - match op { - Operator::Plus => Ok(Operator::Minus), - Operator::Minus => Ok(Operator::Plus), - Operator::Multiply => Ok(Operator::Divide), - Operator::Divide => Ok(Operator::Multiply), - _ => internal_err!("Interval arithmetic does not support the operator {}", op), - } -} - /// Indicates whether interval arithmetic is supported for the given operator. pub fn is_operator_supported(op: &Operator) -> bool { matches!( @@ -109,96 +92,3 @@ pub fn is_datatype_supported(data_type: &DataType) -> bool { | &DataType::Float32 ) } - -/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`]. -pub fn convert_interval_type_to_duration(interval: &Interval) -> Option { - if let (Some(lower), Some(upper)) = ( - convert_interval_bound_to_duration(interval.lower()), - convert_interval_bound_to_duration(interval.upper()), - ) { - Interval::try_new(lower, upper).ok() - } else { - None - } -} - -/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`]. -fn convert_interval_bound_to_duration( - interval_bound: &ScalarValue, -) -> Option { - match interval_bound { - ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn) - .ok() - .map(|duration| ScalarValue::DurationNanosecond(Some(duration))), - ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt) - .ok() - .map(|duration| ScalarValue::DurationMillisecond(Some(duration))), - _ => None, - } -} - -/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`]. -pub fn convert_duration_type_to_interval(interval: &Interval) -> Option { - if let (Some(lower), Some(upper)) = ( - convert_duration_bound_to_interval(interval.lower()), - convert_duration_bound_to_interval(interval.upper()), - ) { - Interval::try_new(lower, upper).ok() - } else { - None - } -} - -/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`]. -fn convert_duration_bound_to_interval( - interval_bound: &ScalarValue, -) -> Option { - match interval_bound { - ScalarValue::DurationNanosecond(Some(duration)) => { - Some(ScalarValue::new_interval_mdn(0, 0, *duration)) - } - ScalarValue::DurationMicrosecond(Some(duration)) => { - Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000)) - } - ScalarValue::DurationMillisecond(Some(duration)) => { - Some(ScalarValue::new_interval_dt(0, *duration as i32)) - } - ScalarValue::DurationSecond(Some(duration)) => { - Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000)) - } - _ => None, - } -} - -/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. -/// Otherwise, it returns an error. -fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { - let months = mdn >> 96; - let days = (mdn & MDN_DAY_MASK) >> 64; - let nanoseconds = mdn & MDN_NS_MASK; - - if months == 0 && days == 0 { - nanoseconds - .try_into() - .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) - } else { - internal_err!( - "The interval cannot have a non-zero month or day value for duration convertibility" - ) - } -} - -/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. -/// Otherwise, it returns an error. -fn interval_dt_to_duration_ms(dt: &i64) -> Result { - let days = dt >> 32; - let milliseconds = dt & DT_MS_MASK; - - if days == 0 { - Ok(milliseconds) - } else { - internal_err!( - "The interval cannot have a non-zero day value for duration convertibility" - ) - } -} diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 127194f681a5..236cc7d99b98 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -21,6 +21,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use itertools::izip; pub use datafusion_physical_expr_common::physical_expr::down_cast_any_ref; +pub use datafusion_physical_expr_common::physical_expr::physical_exprs_bag_equal; /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; @@ -44,29 +45,6 @@ pub fn physical_exprs_equal( lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs)) } -/// Checks whether the given physical expression slices are equal in the sense -/// of bags (multi-sets), disregarding their orderings. -pub fn physical_exprs_bag_equal( - lhs: &[Arc], - rhs: &[Arc], -) -> bool { - // TODO: Once we can use `HashMap`s with `Arc`, this - // function should use a `HashMap` to reduce computational complexity. - if lhs.len() == rhs.len() { - let mut rhs_vec = rhs.to_vec(); - for expr in lhs { - if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) { - rhs_vec.swap_remove(idx); - } else { - return false; - } - } - true - } else { - false - } -} - /// This utility function removes duplicates from the given `exprs` vector. /// Note that this function does not necessarily preserve its input ordering. pub fn deduplicate_physical_exprs(exprs: &mut Vec>) { diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index bf7b52f1c147..965c50c21921 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -109,6 +109,14 @@ pub fn create_physical_expr( input_dfschema: &DFSchema, execution_props: &ExecutionProps, ) -> Result> { + use datafusion_physical_expr_common::physical_expr::create_physical_expr as create_physical_expr_common; + + // PR #10074: Temporary solution, after all the logic is moved to common, we can remove this function + let res = create_physical_expr_common(e, input_dfschema, execution_props); + if res.is_ok() { + return res; + } + let input_schema: &Schema = &input_dfschema.into(); match e {