From 1bd1a6b08260f74ef3fe1d796bd2df8ea4ea9120 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 4 Oct 2025 11:20:31 -0700 Subject: [PATCH 01/10] chore: Extend backtrace coverage --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index bbfba44861344..ed6be91b20298 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -24,7 +24,7 @@ use arrow::array::{ use arrow::datatypes::TimeUnit::Microsecond; use arrow::datatypes::{DataType, Float64Type, Int32Type}; use datafusion_common::{ - exec_err, plan_datafusion_err, DataFusionError, Result, ScalarValue, + exec_err, internal_datafusion_err, plan_datafusion_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, From 1e0d5d025f7a4d72f8ef70280a9602aaab275b91 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 4 Oct 2025 11:27:42 -0700 Subject: [PATCH 02/10] fmt --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index ed6be91b20298..bbfba44861344 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -24,7 +24,7 @@ use arrow::array::{ use arrow::datatypes::TimeUnit::Microsecond; use arrow::datatypes::{DataType, Float64Type, Int32Type}; use datafusion_common::{ - exec_err, internal_datafusion_err, plan_datafusion_err, DataFusionError, Result, ScalarValue, + exec_err, plan_datafusion_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, From 7eda3bc946e66e55400e4df9bdf149633b5c47f6 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 4 Oct 2025 14:03:53 -0700 Subject: [PATCH 03/10] part2 --- datafusion/catalog/src/stream.rs | 6 ++++-- datafusion/core/src/datasource/file_format/arrow.rs | 3 ++- datafusion/datasource-parquet/src/file_format.rs | 12 ++++++------ datafusion/datasource/src/write/orchestration.rs | 4 ++-- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index f4a2338b8eecb..ad314e3977341 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -28,7 +28,9 @@ use std::sync::Arc; use crate::{Session, TableProvider, TableProviderFactory}; use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter}; use arrow::datatypes::SchemaRef; -use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result}; +use datafusion_common::{ + config_err, exec_datafusion_err, plan_err, Constraints, DataFusionError, Result, +}; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; @@ -440,6 +442,6 @@ impl DataSink for StreamWrite { write_task .join_unwind() .await - .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? + .map_err(|e| exec_datafusion_err!("Write task failed: {}", e))? } } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 25bc166d657a5..3ce8e66139120 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -42,6 +42,7 @@ use arrow::ipc::reader::FileReader; use arrow::ipc::writer::IpcWriteOptions; use arrow::ipc::{root_as_message, CompressionType}; use datafusion_catalog::Session; +use datafusion_common::exec_datafusion_err; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ internal_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics, @@ -299,7 +300,7 @@ impl FileSink for ArrowFileSink { demux_task .join_unwind() .await - .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + .map_err(|e| exec_datafusion_err!("ExecutionJoin error: {}", e))??; Ok(row_count as u64) } } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 963c1d77950c6..3210d8dc8ff65 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -43,8 +43,8 @@ use datafusion_common::encryption::map_config_decryption_to_decryption; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt, - HashSet, Result, DEFAULT_PARQUET_EXTENSION, + exec_datafusion_err, internal_datafusion_err, internal_err, not_impl_err, + DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -1343,7 +1343,7 @@ impl FileSink for ParquetSink { demux_task .join_unwind() .await - .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + .map_err(|e| exec_datafusion_err!("Join error in demux task: {e}"))??; Ok(row_count as u64) } @@ -1468,7 +1468,7 @@ fn spawn_rg_join_and_finalize_task( let (writer, _col_reservation) = task .join_unwind() .await - .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + .map_err(|e| exec_datafusion_err!("Join error: {e}"))??; let encoded_size = writer.get_estimated_total_bytes(); rg_reservation.grow(encoded_size); finalized_rg.push(writer.close()?); @@ -1596,7 +1596,7 @@ async fn concatenate_parallel_row_groups( while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; let (serialized_columns, mut rg_reservation, _cnt) = - result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + result.map_err(|e| exec_datafusion_err!("Join error: {e}"))??; let mut rg_out = parquet_writer.next_row_group()?; for chunk in serialized_columns { @@ -1678,7 +1678,7 @@ async fn output_single_parquet_file_parallelized( launch_serialization_task .join_unwind() .await - .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + .map_err(|e| exec_datafusion_err!("Join error: {e}"))??; Ok(file_metadata) } diff --git a/datafusion/datasource/src/write/orchestration.rs b/datafusion/datasource/src/write/orchestration.rs index ab836b7b7f388..933cbfd2089d3 100644 --- a/datafusion/datasource/src/write/orchestration.rs +++ b/datafusion/datasource/src/write/orchestration.rs @@ -285,8 +285,8 @@ pub async fn spawn_writer_tasks_and_join( write_coordinator_task.join_unwind(), demux_task.join_unwind() ); - r1.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; - r2.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + r1.map_err(|e| exec_datafusion_err!("Join error: {e}"))??; + r2.map_err(|e| exec_datafusion_err!("Join error: {e}"))??; // Return total row count: rx_row_cnt.await.map_err(|_| { From 0a926083b6d81de115bfc63ab612c62237f02c55 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 6 Oct 2025 08:04:17 -0700 Subject: [PATCH 04/10] feedback --- datafusion/catalog/src/stream.rs | 2 +- datafusion/core/src/datasource/file_format/arrow.rs | 2 +- datafusion/datasource-parquet/src/file_format.rs | 8 ++++---- datafusion/datasource/src/write/orchestration.rs | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index ad314e3977341..620f6d152a4fb 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -442,6 +442,6 @@ impl DataSink for StreamWrite { write_task .join_unwind() .await - .map_err(|e| exec_datafusion_err!("Write task failed: {}", e))? + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? } } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 3ce8e66139120..d7e0f261b7b7b 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -300,7 +300,7 @@ impl FileSink for ArrowFileSink { demux_task .join_unwind() .await - .map_err(|e| exec_datafusion_err!("ExecutionJoin error: {}", e))??; + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; Ok(row_count as u64) } } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 3210d8dc8ff65..c4da058779786 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1343,7 +1343,7 @@ impl FileSink for ParquetSink { demux_task .join_unwind() .await - .map_err(|e| exec_datafusion_err!("Join error in demux task: {e}"))??; + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; Ok(row_count as u64) } @@ -1468,7 +1468,7 @@ fn spawn_rg_join_and_finalize_task( let (writer, _col_reservation) = task .join_unwind() .await - .map_err(|e| exec_datafusion_err!("Join error: {e}"))??; + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; let encoded_size = writer.get_estimated_total_bytes(); rg_reservation.grow(encoded_size); finalized_rg.push(writer.close()?); @@ -1596,7 +1596,7 @@ async fn concatenate_parallel_row_groups( while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; let (serialized_columns, mut rg_reservation, _cnt) = - result.map_err(|e| exec_datafusion_err!("Join error: {e}"))??; + result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; let mut rg_out = parquet_writer.next_row_group()?; for chunk in serialized_columns { @@ -1678,7 +1678,7 @@ async fn output_single_parquet_file_parallelized( launch_serialization_task .join_unwind() .await - .map_err(|e| exec_datafusion_err!("Join error: {e}"))??; + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; Ok(file_metadata) } diff --git a/datafusion/datasource/src/write/orchestration.rs b/datafusion/datasource/src/write/orchestration.rs index 933cbfd2089d3..ab836b7b7f388 100644 --- a/datafusion/datasource/src/write/orchestration.rs +++ b/datafusion/datasource/src/write/orchestration.rs @@ -285,8 +285,8 @@ pub async fn spawn_writer_tasks_and_join( write_coordinator_task.join_unwind(), demux_task.join_unwind() ); - r1.map_err(|e| exec_datafusion_err!("Join error: {e}"))??; - r2.map_err(|e| exec_datafusion_err!("Join error: {e}"))??; + r1.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + r2.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; // Return total row count: rx_row_cnt.await.map_err(|_| { From 9e39d6c223e5ccafb34a1b2e0a5514f10363f1d3 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 6 Oct 2025 08:27:01 -0700 Subject: [PATCH 05/10] clippy --- datafusion/catalog/src/stream.rs | 4 +--- datafusion/core/src/datasource/file_format/arrow.rs | 1 - datafusion/datasource-parquet/src/file_format.rs | 4 ++-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 620f6d152a4fb..f4a2338b8eecb 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -28,9 +28,7 @@ use std::sync::Arc; use crate::{Session, TableProvider, TableProviderFactory}; use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter}; use arrow::datatypes::SchemaRef; -use datafusion_common::{ - config_err, exec_datafusion_err, plan_err, Constraints, DataFusionError, Result, -}; +use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index d7e0f261b7b7b..25bc166d657a5 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -42,7 +42,6 @@ use arrow::ipc::reader::FileReader; use arrow::ipc::writer::IpcWriteOptions; use arrow::ipc::{root_as_message, CompressionType}; use datafusion_catalog::Session; -use datafusion_common::exec_datafusion_err; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ internal_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics, diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c4da058779786..963c1d77950c6 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -43,8 +43,8 @@ use datafusion_common::encryption::map_config_decryption_to_decryption; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - exec_datafusion_err, internal_datafusion_err, internal_err, not_impl_err, - DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION, + internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt, + HashSet, Result, DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; From 3f59d812aa1bdd10d5df4b40ead0c551edd9151b Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 14 Oct 2025 15:26:09 -0700 Subject: [PATCH 06/10] feat: support Spark `concat` --- .../spark/src/function/string/concat.rs | 280 ++++++++++++++++++ .../test_files/spark/string/concat.slt | 41 +++ 2 files changed, 321 insertions(+) create mode 100644 datafusion/spark/src/function/string/concat.rs create mode 100644 datafusion/sqllogictest/test_files/spark/string/concat.slt diff --git a/datafusion/spark/src/function/string/concat.rs b/datafusion/spark/src/function/string/concat.rs new file mode 100644 index 0000000000000..a9be6df8dfea0 --- /dev/null +++ b/datafusion/spark/src/function/string/concat.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::Array; +use arrow::datatypes::DataType; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; +use datafusion_functions::string::concat::ConcatFunc; +use std::{any::Any, sync::Arc}; + +/// Spark-compatible `concat` expression +/// +/// +/// Concatenates multiple input strings into a single string. +/// Returns NULL if any input is NULL. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkConcat { + signature: Signature, +} + +impl Default for SparkConcat { + fn default() -> Self { + Self::new() + } +} + +impl SparkConcat { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![TypeSignature::UserDefined, TypeSignature::Nullary], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkConcat { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "concat" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + spark_concat(args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + // Accept any string types, including zero arguments + Ok(arg_types.to_vec()) + } +} + +/// Concatenates strings, returning NULL if any input is NULL +/// This is a Spark-specific wrapper around DataFusion's concat that returns NULL +/// if any argument is NULL (Spark behavior), whereas DataFusion's concat ignores NULLs. +fn spark_concat(args: ScalarFunctionArgs) -> Result { + let ScalarFunctionArgs { + args: arg_values, + arg_fields, + number_rows, + return_field, + config_options, + } = args; + + if arg_values.is_empty() { + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8( + Some(String::new()), + ))); + } + + // Check if all arguments are scalars + let all_scalars = arg_values + .iter() + .all(|arg| matches!(arg, ColumnarValue::Scalar(_))); + + if all_scalars { + // For scalars, check if any is NULL + for arg in &arg_values { + if let ColumnarValue::Scalar(scalar) = arg { + if scalar.is_null() { + // Return NULL if any argument is NULL (Spark behavior) + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))); + } + } + } + // No NULLs found, delegate to DataFusion's concat + let concat_func = ConcatFunc::new(); + let func_args = ScalarFunctionArgs { + args: arg_values, + arg_fields, + number_rows, + return_field, + config_options, + }; + concat_func.invoke_with_args(func_args) + } else { + // For arrays, we need to check each row for NULLs and return NULL for that row + // Get array length + let array_len = arg_values + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(array) => Some(array.len()), + _ => None, + }) + .unwrap_or(number_rows); + + // Convert all scalars to arrays + let arrays: Result> = arg_values + .iter() + .map(|arg| match arg { + ColumnarValue::Array(array) => Ok(Arc::clone(array)), + ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(array_len), + }) + .collect(); + let arrays = arrays?; + + // Check for NULL values in each row + let mut null_mask = vec![false; array_len]; + for array in &arrays { + for (i, null_flag) in null_mask.iter_mut().enumerate().take(array_len) { + if array.is_null(i) { + *null_flag = true; + } + } + } + + // Delegate to DataFusion's concat + let concat_func = ConcatFunc::new(); + let func_args = ScalarFunctionArgs { + args: arg_values, + arg_fields, + number_rows, + return_field, + config_options, + }; + + let result = concat_func.invoke_with_args(func_args)?; + + // Apply NULL mask to the result + match result { + ColumnarValue::Array(array) => { + let return_type = array.data_type(); + let mut builder: Box = match return_type { + DataType::Utf8 => { + let string_array = array + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = + arrow::array::StringBuilder::with_capacity(array_len, 0); + for (i, &is_null) in null_mask.iter().enumerate().take(array_len) + { + if is_null || string_array.is_null(i) { + builder.append_null(); + } else { + builder.append_value(string_array.value(i)); + } + } + Box::new(builder) + } + DataType::LargeUtf8 => { + let string_array = array + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = + arrow::array::LargeStringBuilder::with_capacity(array_len, 0); + for (i, &is_null) in null_mask.iter().enumerate().take(array_len) + { + if is_null || string_array.is_null(i) { + builder.append_null(); + } else { + builder.append_value(string_array.value(i)); + } + } + Box::new(builder) + } + DataType::Utf8View => { + let string_array = array + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = + arrow::array::StringViewBuilder::with_capacity(array_len); + for (i, &is_null) in null_mask.iter().enumerate().take(array_len) + { + if is_null || string_array.is_null(i) { + builder.append_null(); + } else { + builder.append_value(string_array.value(i)); + } + } + Box::new(builder) + } + _ => { + return datafusion_common::exec_err!( + "Unsupported return type for concat: {:?}", + return_type + ); + } + }; + + Ok(ColumnarValue::Array(builder.finish())) + } + other => Ok(other), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::function::utils::test::test_scalar_function; + use arrow::array::StringArray; + use arrow::datatypes::DataType; + use datafusion_common::Result; + use datafusion_expr::ColumnarValue; + + #[test] + fn test_concat_basic() -> Result<()> { + test_scalar_function!( + SparkConcat::new(), + vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("Spark".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("SQL".to_string()))), + ], + Ok(Some("SparkSQL")), + &str, + DataType::Utf8, + StringArray + ); + Ok(()) + } + + #[test] + fn test_concat_with_null() -> Result<()> { + test_scalar_function!( + SparkConcat::new(), + vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("Spark".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("SQL".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(None)), + ], + Ok(None), + &str, + DataType::Utf8, + StringArray + ); + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/spark/string/concat.slt b/datafusion/sqllogictest/test_files/spark/string/concat.slt new file mode 100644 index 0000000000000..a2f0a2fb39126 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/string/concat.slt @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +query T +SELECT concat('Spark', 'SQL'); +---- +SparkSQL + +query T +SELECT concat('Spark', 'SQL', NULL); +---- +NULL + +query T +SELECT concat('', '1', '', '2'); +---- +12 + +query T +SELECT concat(); +---- +(empty) + +query T +SELECT concat(''); +---- +(empty) \ No newline at end of file From 988ee87172aa5498809b10433c60958f25c3f9ce Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 14 Oct 2025 15:49:23 -0700 Subject: [PATCH 07/10] clippy --- .../spark/src/function/string/concat.rs | 210 ++++++++++-------- datafusion/spark/src/function/string/mod.rs | 8 + 2 files changed, 125 insertions(+), 93 deletions(-) diff --git a/datafusion/spark/src/function/string/concat.rs b/datafusion/spark/src/function/string/concat.rs index a9be6df8dfea0..2cc94b14d0a3b 100644 --- a/datafusion/spark/src/function/string/concat.rs +++ b/datafusion/spark/src/function/string/concat.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::Array; +use arrow::array::{Array, ArrayBuilder}; use arrow::datatypes::DataType; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ @@ -23,7 +23,8 @@ use datafusion_expr::{ Volatility, }; use datafusion_functions::string::concat::ConcatFunc; -use std::{any::Any, sync::Arc}; +use std::any::Any; +use std::sync::Arc; /// Spark-compatible `concat` expression /// @@ -97,35 +98,55 @@ fn spark_concat(args: ScalarFunctionArgs) -> Result { ))); } + // Step 1: Check for NULL mask in incoming args + let null_mask = compute_null_mask(&arg_values, number_rows)?; + + // If all scalars and any is NULL, return NULL immediately + if null_mask.is_none() { + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))); + } + + // Step 2: Delegate to DataFusion's concat + let concat_func = ConcatFunc::new(); + let func_args = ScalarFunctionArgs { + args: arg_values, + arg_fields, + number_rows, + return_field, + config_options, + }; + let result = concat_func.invoke_with_args(func_args)?; + + // Step 3: Apply NULL mask to result + apply_null_mask(result, null_mask) +} + +/// Compute NULL mask for the arguments +/// Returns None if all scalars and any is NULL, or a Vec for arrays +fn compute_null_mask( + args: &[ColumnarValue], + number_rows: usize, +) -> Result>> { // Check if all arguments are scalars - let all_scalars = arg_values + let all_scalars = args .iter() .all(|arg| matches!(arg, ColumnarValue::Scalar(_))); if all_scalars { // For scalars, check if any is NULL - for arg in &arg_values { + for arg in args { if let ColumnarValue::Scalar(scalar) = arg { if scalar.is_null() { - // Return NULL if any argument is NULL (Spark behavior) - return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))); + // Return None to indicate all values should be NULL + return Ok(None); } } } - // No NULLs found, delegate to DataFusion's concat - let concat_func = ConcatFunc::new(); - let func_args = ScalarFunctionArgs { - args: arg_values, - arg_fields, - number_rows, - return_field, - config_options, - }; - concat_func.invoke_with_args(func_args) + // No NULLs in scalars + Ok(Some(vec![])) } else { - // For arrays, we need to check each row for NULLs and return NULL for that row - // Get array length - let array_len = arg_values + // For arrays, compute NULL mask for each row + let array_len = args .iter() .find_map(|arg| match arg { ColumnarValue::Array(array) => Some(array.len()), @@ -133,8 +154,8 @@ fn spark_concat(args: ScalarFunctionArgs) -> Result { }) .unwrap_or(number_rows); - // Convert all scalars to arrays - let arrays: Result> = arg_values + // Convert all scalars to arrays for uniform processing + let arrays: Result> = args .iter() .map(|arg| match arg { ColumnarValue::Array(array) => Ok(Arc::clone(array)), @@ -143,7 +164,7 @@ fn spark_concat(args: ScalarFunctionArgs) -> Result { .collect(); let arrays = arrays?; - // Check for NULL values in each row + // Compute NULL mask let mut null_mask = vec![false; array_len]; for array in &arrays { for (i, null_flag) in null_mask.iter_mut().enumerate().take(array_len) { @@ -153,86 +174,90 @@ fn spark_concat(args: ScalarFunctionArgs) -> Result { } } - // Delegate to DataFusion's concat - let concat_func = ConcatFunc::new(); - let func_args = ScalarFunctionArgs { - args: arg_values, - arg_fields, - number_rows, - return_field, - config_options, - }; + Ok(Some(null_mask)) + } +} - let result = concat_func.invoke_with_args(func_args)?; +/// Apply NULL mask to the result +fn apply_null_mask( + result: ColumnarValue, + null_mask: Option>, +) -> Result { + match (result, null_mask) { + // Scalar with NULL mask means return NULL + (ColumnarValue::Scalar(_), None) => { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) + } + // Scalar without NULL mask, return as-is + (scalar @ ColumnarValue::Scalar(_), Some(mask)) if mask.is_empty() => Ok(scalar), + // Array with NULL mask + (ColumnarValue::Array(array), Some(null_mask)) if !null_mask.is_empty() => { + let array_len = array.len(); + let return_type = array.data_type(); - // Apply NULL mask to the result - match result { - ColumnarValue::Array(array) => { - let return_type = array.data_type(); - let mut builder: Box = match return_type { - DataType::Utf8 => { - let string_array = array - .as_any() - .downcast_ref::() - .unwrap(); - let mut builder = - arrow::array::StringBuilder::with_capacity(array_len, 0); - for (i, &is_null) in null_mask.iter().enumerate().take(array_len) - { - if is_null || string_array.is_null(i) { - builder.append_null(); - } else { - builder.append_value(string_array.value(i)); - } + let mut builder: Box = match return_type { + DataType::Utf8 => { + let string_array = array + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = + arrow::array::StringBuilder::with_capacity(array_len, 0); + for (i, &is_null) in null_mask.iter().enumerate().take(array_len) { + if is_null || string_array.is_null(i) { + builder.append_null(); + } else { + builder.append_value(string_array.value(i)); } - Box::new(builder) } - DataType::LargeUtf8 => { - let string_array = array - .as_any() - .downcast_ref::() - .unwrap(); - let mut builder = - arrow::array::LargeStringBuilder::with_capacity(array_len, 0); - for (i, &is_null) in null_mask.iter().enumerate().take(array_len) - { - if is_null || string_array.is_null(i) { - builder.append_null(); - } else { - builder.append_value(string_array.value(i)); - } + Box::new(builder) + } + DataType::LargeUtf8 => { + let string_array = array + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = + arrow::array::LargeStringBuilder::with_capacity(array_len, 0); + for (i, &is_null) in null_mask.iter().enumerate().take(array_len) { + if is_null || string_array.is_null(i) { + builder.append_null(); + } else { + builder.append_value(string_array.value(i)); } - Box::new(builder) } - DataType::Utf8View => { - let string_array = array - .as_any() - .downcast_ref::() - .unwrap(); - let mut builder = - arrow::array::StringViewBuilder::with_capacity(array_len); - for (i, &is_null) in null_mask.iter().enumerate().take(array_len) - { - if is_null || string_array.is_null(i) { - builder.append_null(); - } else { - builder.append_value(string_array.value(i)); - } + Box::new(builder) + } + DataType::Utf8View => { + let string_array = array + .as_any() + .downcast_ref::() + .unwrap(); + let mut builder = + arrow::array::StringViewBuilder::with_capacity(array_len); + for (i, &is_null) in null_mask.iter().enumerate().take(array_len) { + if is_null || string_array.is_null(i) { + builder.append_null(); + } else { + builder.append_value(string_array.value(i)); } - Box::new(builder) - } - _ => { - return datafusion_common::exec_err!( - "Unsupported return type for concat: {:?}", - return_type - ); } - }; + Box::new(builder) + } + _ => { + return datafusion_common::exec_err!( + "Unsupported return type for concat: {:?}", + return_type + ); + } + }; - Ok(ColumnarValue::Array(builder.finish())) - } - other => Ok(other), + Ok(ColumnarValue::Array(builder.finish())) } + // Array without NULL mask, return as-is + (array @ ColumnarValue::Array(_), _) => Ok(array), + // Shouldn't happen + (scalar, _) => Ok(scalar), } } @@ -243,7 +268,6 @@ mod tests { use arrow::array::StringArray; use arrow::datatypes::DataType; use datafusion_common::Result; - use datafusion_expr::ColumnarValue; #[test] fn test_concat_basic() -> Result<()> { diff --git a/datafusion/spark/src/function/string/mod.rs b/datafusion/spark/src/function/string/mod.rs index 3115c1e960fa8..480984f02159b 100644 --- a/datafusion/spark/src/function/string/mod.rs +++ b/datafusion/spark/src/function/string/mod.rs @@ -17,6 +17,7 @@ pub mod ascii; pub mod char; +pub mod concat; pub mod elt; pub mod format_string; pub mod ilike; @@ -30,6 +31,7 @@ use std::sync::Arc; make_udf_function!(ascii::SparkAscii, ascii); make_udf_function!(char::CharFunc, char); +make_udf_function!(concat::SparkConcat, concat); make_udf_function!(ilike::SparkILike, ilike); make_udf_function!(length::SparkLengthFunc, length); make_udf_function!(elt::SparkElt, elt); @@ -50,6 +52,11 @@ pub mod expr_fn { "Returns the ASCII character having the binary equivalent to col. If col is larger than 256 the result is equivalent to char(col % 256).", arg1 )); + export_functions!(( + concat, + "Concatenates multiple input strings into a single string. Returns NULL if any input is NULL.", + args + )); export_functions!(( elt, "Returns the n-th input (1-indexed), e.g. returns 2nd input when n is 2. The function returns NULL if the index is 0 or exceeds the length of the array.", @@ -86,6 +93,7 @@ pub fn functions() -> Vec> { vec![ ascii(), char(), + concat(), elt(), ilike(), length(), From e9ab10e151a3bd42d2de64a1e3ddc38cd868e27e Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 14 Oct 2025 15:49:52 -0700 Subject: [PATCH 08/10] comments --- datafusion/spark/src/function/string/concat.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/spark/src/function/string/concat.rs b/datafusion/spark/src/function/string/concat.rs index 2cc94b14d0a3b..c75d8824732c9 100644 --- a/datafusion/spark/src/function/string/concat.rs +++ b/datafusion/spark/src/function/string/concat.rs @@ -92,6 +92,7 @@ fn spark_concat(args: ScalarFunctionArgs) -> Result { config_options, } = args; + // Handle zero-argument case: return empty string if arg_values.is_empty() { return Ok(ColumnarValue::Scalar(ScalarValue::Utf8( Some(String::new()), From 4f2784a594ca75bd212b81fb94b5766e7906b9c1 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 14 Oct 2025 15:57:21 -0700 Subject: [PATCH 09/10] test --- .../sqllogictest/test_files/spark/string/concat.slt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/spark/string/concat.slt b/datafusion/sqllogictest/test_files/spark/string/concat.slt index a2f0a2fb39126..0b796a54a69e8 100644 --- a/datafusion/sqllogictest/test_files/spark/string/concat.slt +++ b/datafusion/sqllogictest/test_files/spark/string/concat.slt @@ -38,4 +38,11 @@ SELECT concat(); query T SELECT concat(''); ---- -(empty) \ No newline at end of file +(empty) + + +query T +SELECT concat(a, b, c) from (select 'a' a, 'b' b, 'c' c union all select null a, 'b', 'c') order by 1 nulls last; +---- +abc +NULL \ No newline at end of file From 2efea06142ffec878ba80ed5dd01a6fb53b6d4bb Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 15 Oct 2025 10:36:37 -0700 Subject: [PATCH 10/10] doc --- datafusion/spark/src/function/string/concat.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/string/concat.rs b/datafusion/spark/src/function/string/concat.rs index c75d8824732c9..0e981e7c37224 100644 --- a/datafusion/spark/src/function/string/concat.rs +++ b/datafusion/spark/src/function/string/concat.rs @@ -123,7 +123,8 @@ fn spark_concat(args: ScalarFunctionArgs) -> Result { } /// Compute NULL mask for the arguments -/// Returns None if all scalars and any is NULL, or a Vec for arrays +/// Returns None if all scalars and any is NULL, or a Vector of +/// boolean representing the null mask for incoming arrays fn compute_null_mask( args: &[ColumnarValue], number_rows: usize,