Skip to content

Commit

Permalink
Minor: remove old create_physical_expr to scalar_function (#10387)
Browse files Browse the repository at this point in the history
* rm old code

Signed-off-by: jayzhan211 <[email protected]>

* move to scalarfunction

Signed-off-by: jayzhan211 <[email protected]>

* fix import

Signed-off-by: jayzhan211 <[email protected]>

* Update datafusion/physical-expr/src/scalar_function.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
jayzhan211 and alamb authored May 7, 2024
1 parent 40a2055 commit b5cc6b9
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 456 deletions.
3 changes: 2 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
error::{DataFusionError, Result},
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
logical_expr::AggregateUDF,
logical_expr::ScalarUDF,
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
Expand All @@ -53,7 +54,7 @@ use crate::{
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udf::ScalarUDF, ExecutionPlan},
physical_plan::ExecutionPlan,
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
Expand Down
354 changes: 6 additions & 348 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! Declaration of built-in (scalar) functions.
//! Deprecated module. Add new feature in scalar_function.rs
//!
//! This module contains built-in functions' enumeration and metadata.
//!
//! Generally, a function has:
Expand All @@ -30,52 +31,15 @@
//! an argument i32 is passed to a function that supports f64, the
//! argument is automatically is coerced to f64.

use std::ops::Neg;
use std::sync::Arc;

use arrow::{array::ArrayRef, datatypes::Schema};
use arrow::array::ArrayRef;
use arrow_array::Array;

use datafusion_common::{DFSchema, Result, ScalarValue};
pub use crate::scalar_function::create_physical_expr;
use datafusion_common::{Result, ScalarValue};
pub use datafusion_expr::FuncMonotonicity;
use datafusion_expr::{
type_coercion::functions::data_types, ColumnarValue, ScalarFunctionImplementation,
};
use datafusion_expr::{Expr, ScalarFunctionDefinition, ScalarUDF};

use crate::sort_properties::SortProperties;
use crate::{PhysicalExpr, ScalarFunctionExpr};

/// Create a physical (function) expression.
/// This function errors when `args`' can't be coerced to a valid argument type of the function.
pub fn create_physical_expr(
fun: &ScalarUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
args: &[Expr],
input_dfschema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
let input_expr_types = input_phy_exprs
.iter()
.map(|e| e.data_type(input_schema))
.collect::<Result<Vec<_>>>()?;

// verify that input data types is consistent with function's `TypeSignature`
data_types(&input_expr_types, fun.signature())?;

// Since we have arg_types, we don't need args and schema.
let return_type =
fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?;

let fun_def = ScalarFunctionDefinition::UDF(Arc::new(fun.clone()));
Ok(Arc::new(ScalarFunctionExpr::new(
fun.name(),
fun_def,
input_phy_exprs.to_vec(),
return_type,
fun.monotonicity()?,
)))
}
use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation};

#[derive(Debug, Clone, Copy)]
pub enum Hint {
Expand Down Expand Up @@ -164,309 +128,3 @@ where
}
})
}

/// Determines a [`ScalarFunctionExpr`]'s monotonicity for the given arguments
/// and the function's behavior depending on its arguments.
pub fn out_ordering(
func: &FuncMonotonicity,
arg_orderings: &[SortProperties],
) -> SortProperties {
func.iter().zip(arg_orderings).fold(
SortProperties::Singleton,
|prev_sort, (item, arg)| {
let current_sort = func_order_in_one_dimension(item, arg);

match (prev_sort, current_sort) {
(_, SortProperties::Unordered) => SortProperties::Unordered,
(SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort,
(SortProperties::Ordered(prev), SortProperties::Ordered(current))
if prev.descending != current.descending =>
{
SortProperties::Unordered
}
_ => prev_sort,
}
},
)
}

/// This function decides the monotonicity property of a [`ScalarFunctionExpr`] for a single argument (i.e. across a single dimension), given that argument's sort properties.
fn func_order_in_one_dimension(
func_monotonicity: &Option<bool>,
arg: &SortProperties,
) -> SortProperties {
if *arg == SortProperties::Singleton {
SortProperties::Singleton
} else {
match func_monotonicity {
None => SortProperties::Unordered,
Some(false) => {
if let SortProperties::Ordered(_) = arg {
arg.neg()
} else {
SortProperties::Unordered
}
}
Some(true) => {
if let SortProperties::Ordered(_) = arg {
*arg
} else {
SortProperties::Unordered
}
}
}
}
}

#[cfg(test)]
mod tests {
use arrow::{
array::UInt64Array,
datatypes::{DataType, Field},
};
use arrow_schema::DataType::Utf8;

use datafusion_common::cast::as_uint64_array;
use datafusion_common::DataFusionError;
use datafusion_common::{internal_err, plan_err};
use datafusion_expr::{Signature, Volatility};

use crate::expressions::try_cast;
use crate::utils::tests::TestScalarUDF;

use super::*;

#[test]
fn test_empty_arguments_error() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let udf = ScalarUDF::new_from_impl(TestScalarUDF {
signature: Signature::variadic(vec![Utf8], Volatility::Immutable),
});
let expr = create_physical_expr_with_type_coercion(
&udf,
&[],
&schema,
&[],
&DFSchema::empty(),
);

match expr {
Ok(..) => {
return plan_err!(
"ScalarUDF function {udf:?} does not support empty arguments"
);
}
Err(DataFusionError::Plan(_)) => {
// Continue the loop
}
Err(..) => {
return internal_err!(
"ScalarUDF function {udf:?} didn't got the right error with empty arguments");
}
}

Ok(())
}

// Helper function just for testing.
// Returns `expressions` coerced to types compatible with
// `signature`, if possible.
pub fn coerce(
expressions: &[Arc<dyn PhysicalExpr>],
schema: &Schema,
signature: &Signature,
) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
if expressions.is_empty() {
return Ok(vec![]);
}

let current_types = expressions
.iter()
.map(|e| e.data_type(schema))
.collect::<Result<Vec<_>>>()?;

let new_types = data_types(&current_types, signature)?;

expressions
.iter()
.enumerate()
.map(|(i, expr)| try_cast(expr.clone(), schema, new_types[i].clone()))
.collect::<Result<Vec<_>>>()
}

// Helper function just for testing.
// The type coercion will be done in the logical phase, should do the type coercion for the test
fn create_physical_expr_with_type_coercion(
fun: &ScalarUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
args: &[Expr],
input_dfschema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
let type_coerced_phy_exprs =
coerce(input_phy_exprs, input_schema, fun.signature()).unwrap();
create_physical_expr(
fun,
&type_coerced_phy_exprs,
input_schema,
args,
input_dfschema,
)
}

fn dummy_function(args: &[ArrayRef]) -> Result<ArrayRef> {
let result: UInt64Array =
args.iter().map(|array| Some(array.len() as u64)).collect();
Ok(Arc::new(result) as ArrayRef)
}

fn unpack_uint64_array(col: Result<ColumnarValue>) -> Result<Vec<u64>> {
if let ColumnarValue::Array(array) = col? {
Ok(as_uint64_array(&array)?.values().to_vec())
} else {
internal_err!("Unexpected scalar created by a test function")
}
}

#[test]
fn test_make_scalar_function() -> Result<()> {
let adapter_func = make_scalar_function_inner(dummy_function);

let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
let array_arg = ColumnarValue::Array(
ScalarValue::Int64(Some(1))
.to_array_of_size(5)
.expect("Failed to convert to array of size"),
);
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 5]);

Ok(())
}

#[test]
fn test_make_scalar_function_with_no_hints() -> Result<()> {
let adapter_func = make_scalar_function_with_hints(dummy_function, vec![]);

let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
let array_arg = ColumnarValue::Array(
ScalarValue::Int64(Some(1))
.to_array_of_size(5)
.expect("Failed to convert to array of size"),
);
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 5]);

Ok(())
}

#[test]
fn test_make_scalar_function_with_hints() -> Result<()> {
let adapter_func = make_scalar_function_with_hints(
dummy_function,
vec![Hint::Pad, Hint::AcceptsSingular],
);

let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
let array_arg = ColumnarValue::Array(
ScalarValue::Int64(Some(1))
.to_array_of_size(5)
.expect("Failed to convert to array of size"),
);
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 1]);

Ok(())
}

#[test]
fn test_make_scalar_function_with_hints_on_arrays() -> Result<()> {
let array_arg = ColumnarValue::Array(
ScalarValue::Int64(Some(1))
.to_array_of_size(5)
.expect("Failed to convert to array of size"),
);
let adapter_func = make_scalar_function_with_hints(
dummy_function,
vec![Hint::Pad, Hint::AcceptsSingular],
);

let result = unpack_uint64_array(adapter_func(&[array_arg.clone(), array_arg]))?;
assert_eq!(result, vec![5, 5]);

Ok(())
}

#[test]
fn test_make_scalar_function_with_mixed_hints() -> Result<()> {
let adapter_func = make_scalar_function_with_hints(
dummy_function,
vec![Hint::Pad, Hint::AcceptsSingular, Hint::Pad],
);

let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
let array_arg = ColumnarValue::Array(
ScalarValue::Int64(Some(1))
.to_array_of_size(5)
.expect("Failed to convert to array of size"),
);
let result = unpack_uint64_array(adapter_func(&[
array_arg,
scalar_arg.clone(),
scalar_arg,
]))?;
assert_eq!(result, vec![5, 1, 5]);

Ok(())
}

#[test]
fn test_make_scalar_function_with_more_arguments_than_hints() -> Result<()> {
let adapter_func = make_scalar_function_with_hints(
dummy_function,
vec![Hint::Pad, Hint::AcceptsSingular, Hint::Pad],
);

let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
let array_arg = ColumnarValue::Array(
ScalarValue::Int64(Some(1))
.to_array_of_size(5)
.expect("Failed to convert to array of size"),
);
let result = unpack_uint64_array(adapter_func(&[
array_arg.clone(),
scalar_arg.clone(),
scalar_arg,
array_arg,
]))?;
assert_eq!(result, vec![5, 1, 5, 5]);

Ok(())
}

#[test]
fn test_make_scalar_function_with_hints_than_arguments() -> Result<()> {
let adapter_func = make_scalar_function_with_hints(
dummy_function,
vec![
Hint::Pad,
Hint::AcceptsSingular,
Hint::Pad,
Hint::Pad,
Hint::AcceptsSingular,
Hint::Pad,
],
);

let scalar_arg = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
let array_arg = ColumnarValue::Array(
ScalarValue::Int64(Some(1))
.to_array_of_size(5)
.expect("Failed to convert to array of size"),
);
let result = unpack_uint64_array(adapter_func(&[array_arg, scalar_arg]))?;
assert_eq!(result, vec![5, 1]);

Ok(())
}
}
Loading

0 comments on commit b5cc6b9

Please sign in to comment.