From 8ead537c7db7d933f808d384f06d2fc0f3ad4a12 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 15 Oct 2025 18:57:59 +0800 Subject: [PATCH 1/7] Rewrote nvl2 to simplify into a CASE expression, added return-field nullability handling, and marked the evaluator as short-circuiting to avoid unreachable execution paths. Added a dataframe regression test that exercises nvl2 with a potentially failing branch to confirm lazy evaluation behaviour. --- .../tests/dataframe/dataframe_functions.rs | 27 ++++++ datafusion/functions/src/core/nvl2.rs | 93 +++++++++---------- 2 files changed, 72 insertions(+), 48 deletions(-) diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index b664fccdfa800..d95eb38c19e1a 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -274,6 +274,33 @@ async fn test_nvl2() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_nvl2_short_circuit() -> Result<()> { + let expr = nvl2( + col("a"), + arrow_cast(lit("1"), lit("Int32")), + arrow_cast(col("a"), lit("Int32")), + ); + + let batches = get_batches(expr).await?; + + assert_snapshot!( + batches_to_string(&batches), + @r#" + +-----------------------------------------------------------------------------------+ + | nvl2(test.a,arrow_cast(Utf8("1"),Utf8("Int32")),arrow_cast(test.a,Utf8("Int32"))) | + +-----------------------------------------------------------------------------------+ + | 1 | + | 1 | + | 1 | + | 1 | + +-----------------------------------------------------------------------------------+ + "# + ); + + Ok(()) +} #[tokio::test] async fn test_fn_arrow_typeof() -> Result<()> { let expr = arrow_typeof(col("l")); diff --git a/datafusion/functions/src/core/nvl2.rs b/datafusion/functions/src/core/nvl2.rs index 82aa8d2a4cd54..ad463706c379c 100644 --- a/datafusion/functions/src/core/nvl2.rs +++ b/datafusion/functions/src/core/nvl2.rs @@ -15,17 +15,16 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::Array; -use arrow::compute::is_not_null; -use arrow::compute::kernels::zip::zip; -use arrow::datatypes::DataType; -use datafusion_common::{internal_err, utils::take_function_args, Result}; +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::{internal_err, plan_err, utils::take_function_args, Result}; use datafusion_expr::{ - type_coercion::binary::comparison_coercion, ColumnarValue, Documentation, - ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, + conditional_expressions::CaseBuilder, + simplify::{ExprSimplifyResult, SimplifyInfo}, + type_coercion::binary::comparison_coercion, + ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs, + ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; -use std::sync::Arc; #[user_doc( doc_section(label = "Conditional Functions"), @@ -95,8 +94,45 @@ impl ScalarUDFImpl for NVL2Func { Ok(arg_types[1].clone()) } + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = + args.arg_fields[1].is_nullable() || args.arg_fields[2].is_nullable(); + let return_type = args.arg_fields[1].data_type().clone(); + Ok(Field::new(self.name(), return_type, nullable).into()) + } + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - nvl2_func(&args.args) + let _ = args; + internal_err!("nvl2 should have been simplified to case") + } + + fn simplify( + &self, + args: Vec, + _info: &dyn SimplifyInfo, + ) -> Result { + if args.len() != 3 { + return plan_err!("nvl2 must have exactly three arguments"); + } + + let mut args = args.into_iter(); + let test = args.next().unwrap(); + let if_non_null = args.next().unwrap(); + let if_null = args.next().unwrap(); + + let expr = CaseBuilder::new( + None, + vec![test.is_not_null()], + vec![if_non_null], + Some(Box::new(if_null)), + ) + .end()?; + + Ok(ExprSimplifyResult::Simplified(expr)) + } + + fn short_circuits(&self) -> bool { + true } fn coerce_types(&self, arg_types: &[DataType]) -> Result> { @@ -123,42 +159,3 @@ impl ScalarUDFImpl for NVL2Func { self.doc() } } - -fn nvl2_func(args: &[ColumnarValue]) -> Result { - let mut len = 1; - let mut is_array = false; - for arg in args { - if let ColumnarValue::Array(array) = arg { - len = array.len(); - is_array = true; - break; - } - } - if is_array { - let args = args - .iter() - .map(|arg| match arg { - ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len), - ColumnarValue::Array(array) => Ok(Arc::clone(array)), - }) - .collect::>>()?; - let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?; - let to_apply = is_not_null(&tested)?; - let value = zip(&to_apply, &if_non_null, &if_null)?; - Ok(ColumnarValue::Array(value)) - } else { - let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?; - match &tested { - ColumnarValue::Array(_) => { - internal_err!("except Scalar value, but got Array") - } - ColumnarValue::Scalar(scalar) => { - if scalar.is_null() { - Ok(if_null.clone()) - } else { - Ok(if_non_null.clone()) - } - } - } - } -} From a9048e9127cd66815d4ac244ada5fc53c072f3b1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 15 Oct 2025 19:34:12 +0800 Subject: [PATCH 2/7] Implement NVL2 execution for scalar and array inputs Handle NVL2 execution when simplifier is skipped using null masks to select between branch values. Add a regression test for expr_api to validate SessionContext::create_physical_expr with NVL2, ensuring successful evaluation without prior simplification. --- datafusion/core/tests/expr_api/mod.rs | 16 +++++++++++++ datafusion/functions/src/core/nvl2.rs | 33 +++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/expr_api/mod.rs b/datafusion/core/tests/expr_api/mod.rs index 4aee274de9083..466cc05f799d6 100644 --- a/datafusion/core/tests/expr_api/mod.rs +++ b/datafusion/core/tests/expr_api/mod.rs @@ -320,6 +320,22 @@ async fn test_create_physical_expr() { create_simplified_expr_test(lit(1i32) + lit(2i32), "3"); } +#[test] +fn test_create_physical_expr_nvl2() { + evaluate_expr_test( + nvl2(col("i"), lit(1i64), lit(0i64)), + vec![ + "+------+", + "| expr |", + "+------+", + "| 1 |", + "| 0 |", + "| 1 |", + "+------+", + ], + ); +} + #[tokio::test] async fn test_create_physical_expr_coercion() { // create_physical_expr does apply type coercion and unwrapping in cast diff --git a/datafusion/functions/src/core/nvl2.rs b/datafusion/functions/src/core/nvl2.rs index ad463706c379c..604aca413bd77 100644 --- a/datafusion/functions/src/core/nvl2.rs +++ b/datafusion/functions/src/core/nvl2.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use arrow::compute::is_not_null; +use arrow::compute::kernels::zip::zip; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::{internal_err, plan_err, utils::take_function_args, Result}; use datafusion_expr::{ @@ -102,8 +104,35 @@ impl ScalarUDFImpl for NVL2Func { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let _ = args; - internal_err!("nvl2 should have been simplified to case") + let [test, if_non_null, if_null] = + take_function_args(self.name(), args.args)?; + + match test { + ColumnarValue::Scalar(test_scalar) => { + if test_scalar.is_null() { + Ok(if_null) + } else { + Ok(if_non_null) + } + } + ColumnarValue::Array(test_array) => { + let len = test_array.len(); + + let if_non_null_array = match if_non_null { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len)?, + }; + + let if_null_array = match if_null { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len)?, + }; + + let mask = is_not_null(&test_array)?; + let result = zip(&mask, &if_non_null_array, &if_null_array)?; + Ok(ColumnarValue::Array(result)) + } + } } fn simplify( From d85087f57ccf0c6763dd21839748c8068f0be39f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 15 Oct 2025 21:40:57 +0800 Subject: [PATCH 3/7] Add test for NVL2 expression evaluation with scalar inputs --- datafusion/core/tests/expr_api/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/datafusion/core/tests/expr_api/mod.rs b/datafusion/core/tests/expr_api/mod.rs index 466cc05f799d6..3d59a6623fef3 100644 --- a/datafusion/core/tests/expr_api/mod.rs +++ b/datafusion/core/tests/expr_api/mod.rs @@ -334,6 +334,19 @@ fn test_create_physical_expr_nvl2() { "+------+", ], ); + + evaluate_expr_test( + nvl2(lit(1i64), col("i"), lit(0i64)), + vec![ + "+------+", + "| expr |", + "+------+", + "| 10 |", + "| |", + "| 5 |", + "+------+", + ], + ); } #[tokio::test] From b4b3550ec3d1ba224e58adc9868711c04711b051 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 15 Oct 2025 22:19:38 +0800 Subject: [PATCH 4/7] cargo fmt --- datafusion/core/tests/expr_api/mod.rs | 14 ++------------ datafusion/functions/src/core/nvl2.rs | 3 +-- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/datafusion/core/tests/expr_api/mod.rs b/datafusion/core/tests/expr_api/mod.rs index 3d59a6623fef3..77ddd2826d47d 100644 --- a/datafusion/core/tests/expr_api/mod.rs +++ b/datafusion/core/tests/expr_api/mod.rs @@ -325,12 +325,7 @@ fn test_create_physical_expr_nvl2() { evaluate_expr_test( nvl2(col("i"), lit(1i64), lit(0i64)), vec![ - "+------+", - "| expr |", - "+------+", - "| 1 |", - "| 0 |", - "| 1 |", + "+------+", "| expr |", "+------+", "| 1 |", "| 0 |", "| 1 |", "+------+", ], ); @@ -338,12 +333,7 @@ fn test_create_physical_expr_nvl2() { evaluate_expr_test( nvl2(lit(1i64), col("i"), lit(0i64)), vec![ - "+------+", - "| expr |", - "+------+", - "| 10 |", - "| |", - "| 5 |", + "+------+", "| expr |", "+------+", "| 10 |", "| |", "| 5 |", "+------+", ], ); diff --git a/datafusion/functions/src/core/nvl2.rs b/datafusion/functions/src/core/nvl2.rs index 604aca413bd77..2b063747a6c40 100644 --- a/datafusion/functions/src/core/nvl2.rs +++ b/datafusion/functions/src/core/nvl2.rs @@ -104,8 +104,7 @@ impl ScalarUDFImpl for NVL2Func { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let [test, if_non_null, if_null] = - take_function_args(self.name(), args.args)?; + let [test, if_non_null, if_null] = take_function_args(self.name(), args.args)?; match test { ColumnarValue::Scalar(test_scalar) => { From 10d78ac50c1a4995d5061d1874f50cd6de3ac5a6 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 20 Oct 2025 21:42:58 +0800 Subject: [PATCH 5/7] Improve formatting of NVL2 expression tests for better readability --- datafusion/core/tests/expr_api/mod.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/expr_api/mod.rs b/datafusion/core/tests/expr_api/mod.rs index 77ddd2826d47d..f01b92d38453f 100644 --- a/datafusion/core/tests/expr_api/mod.rs +++ b/datafusion/core/tests/expr_api/mod.rs @@ -322,18 +322,30 @@ async fn test_create_physical_expr() { #[test] fn test_create_physical_expr_nvl2() { + #[rustfmt::skip] evaluate_expr_test( nvl2(col("i"), lit(1i64), lit(0i64)), vec![ - "+------+", "| expr |", "+------+", "| 1 |", "| 0 |", "| 1 |", + "+------+", + "| expr |", + "+------+", + "| 1 |", + "| 0 |", + "| 1 |", "+------+", ], ); + #[rustfmt::skip] evaluate_expr_test( nvl2(lit(1i64), col("i"), lit(0i64)), vec![ - "+------+", "| expr |", "+------+", "| 10 |", "| |", "| 5 |", + "+------+", + "| expr |", + "+------+", + "| 10 |", + "| |", + "| 5 |", "+------+", ], ); From 4a3c6659596374fcd20c5cbacd9508e356da84d7 Mon Sep 17 00:00:00 2001 From: kosiew Date: Wed, 22 Oct 2025 11:13:50 +0800 Subject: [PATCH 6/7] Update datafusion/functions/src/core/nvl2.rs Co-authored-by: Jeffrey Vo --- datafusion/functions/src/core/nvl2.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/datafusion/functions/src/core/nvl2.rs b/datafusion/functions/src/core/nvl2.rs index 2b063747a6c40..b732dce0c607e 100644 --- a/datafusion/functions/src/core/nvl2.rs +++ b/datafusion/functions/src/core/nvl2.rs @@ -139,14 +139,7 @@ impl ScalarUDFImpl for NVL2Func { args: Vec, _info: &dyn SimplifyInfo, ) -> Result { - if args.len() != 3 { - return plan_err!("nvl2 must have exactly three arguments"); - } - - let mut args = args.into_iter(); - let test = args.next().unwrap(); - let if_non_null = args.next().unwrap(); - let if_null = args.next().unwrap(); + let [test, if_non_null, if_null] = take_function_args(self.name(), args)?; let expr = CaseBuilder::new( None, From a74bb365b932f9871c5390a3bd92b1d8816ac140 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 23 Oct 2025 15:26:15 +0800 Subject: [PATCH 7/7] Made the nvl2 scalar UDF return an internal error when it reaches execution without being simplified to a CASE expression, removing the eager evaluation helpers that previously enforced eager semantics. Updated the expr_api integration test to assert that unsimplified nvl2 evaluation now fails with the expected internal error message. --- datafusion/core/tests/expr_api/mod.rs | 41 ++++++++++----------------- datafusion/functions/src/core/nvl2.rs | 35 ++--------------------- 2 files changed, 18 insertions(+), 58 deletions(-) diff --git a/datafusion/core/tests/expr_api/mod.rs b/datafusion/core/tests/expr_api/mod.rs index f01b92d38453f..84e644480a4fd 100644 --- a/datafusion/core/tests/expr_api/mod.rs +++ b/datafusion/core/tests/expr_api/mod.rs @@ -322,33 +322,22 @@ async fn test_create_physical_expr() { #[test] fn test_create_physical_expr_nvl2() { - #[rustfmt::skip] - evaluate_expr_test( - nvl2(col("i"), lit(1i64), lit(0i64)), - vec![ - "+------+", - "| expr |", - "+------+", - "| 1 |", - "| 0 |", - "| 1 |", - "+------+", - ], - ); + let batch = &TEST_BATCH; + let df_schema = DFSchema::try_from(batch.schema()).unwrap(); + let ctx = SessionContext::new(); - #[rustfmt::skip] - evaluate_expr_test( - nvl2(lit(1i64), col("i"), lit(0i64)), - vec![ - "+------+", - "| expr |", - "+------+", - "| 10 |", - "| |", - "| 5 |", - "+------+", - ], - ); + let expect_err = |expr| { + let physical_expr = ctx.create_physical_expr(expr, &df_schema).unwrap(); + let err = physical_expr.evaluate(batch).unwrap_err(); + assert!( + err.to_string() + .contains("nvl2 should have been simplified to case"), + "unexpected error: {err:?}" + ); + }; + + expect_err(nvl2(col("i"), lit(1i64), lit(0i64))); + expect_err(nvl2(lit(1i64), col("i"), lit(0i64))); } #[tokio::test] diff --git a/datafusion/functions/src/core/nvl2.rs b/datafusion/functions/src/core/nvl2.rs index b732dce0c607e..45cb6760d062d 100644 --- a/datafusion/functions/src/core/nvl2.rs +++ b/datafusion/functions/src/core/nvl2.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow::compute::is_not_null; -use arrow::compute::kernels::zip::zip; use arrow::datatypes::{DataType, Field, FieldRef}; -use datafusion_common::{internal_err, plan_err, utils::take_function_args, Result}; +use datafusion_common::{internal_err, utils::take_function_args, Result}; use datafusion_expr::{ conditional_expressions::CaseBuilder, simplify::{ExprSimplifyResult, SimplifyInfo}, @@ -103,35 +101,8 @@ impl ScalarUDFImpl for NVL2Func { Ok(Field::new(self.name(), return_type, nullable).into()) } - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let [test, if_non_null, if_null] = take_function_args(self.name(), args.args)?; - - match test { - ColumnarValue::Scalar(test_scalar) => { - if test_scalar.is_null() { - Ok(if_null) - } else { - Ok(if_non_null) - } - } - ColumnarValue::Array(test_array) => { - let len = test_array.len(); - - let if_non_null_array = match if_non_null { - ColumnarValue::Array(array) => array, - ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len)?, - }; - - let if_null_array = match if_null { - ColumnarValue::Array(array) => array, - ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len)?, - }; - - let mask = is_not_null(&test_array)?; - let result = zip(&mask, &if_non_null_array, &if_null_array)?; - Ok(ColumnarValue::Array(result)) - } - } + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!("nvl2 should have been simplified to case") } fn simplify(