From 084d41e8b857901b1b6262fe7e7dbe555f98f114 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 7 Mar 2026 21:50:18 +0000 Subject: [PATCH 01/10] Support binary-to-binary concat operator --- datafusion/expr-common/src/type_coercion/binary.rs | 1 + datafusion/physical-expr/src/expressions/binary.rs | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index fa109e38a4382..5e7fa89e41d98 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -1638,6 +1638,7 @@ fn string_concat_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { string_coercion(lhs_value_type, rhs_value_type).or(None) } + (Binary, Binary) => Some(Utf8), _ => None, }) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 02628b405ec6c..ecd8b9154793b 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -933,6 +933,18 @@ fn pre_selection_scatter( } fn concat_elements(left: &ArrayRef, right: &ArrayRef) -> Result { + if *left.data_type() == DataType::Binary && *right.data_type() == DataType::Binary { + // Cast Binary to Utf8 to validate UTF-8 encoding before concatenation + // Follow widespread approach of PostgreSQL, sqlite, DuckDB, Snowflake + // Spark does it in a different way by making a binary-to-binary concatenation + let left = cast(left.as_ref(), &DataType::Utf8)?; + let right = cast(right.as_ref(), &DataType::Utf8)?; + return Ok(Arc::new(concat_elements_utf8( + left.as_string::(), + right.as_string::(), + )?)); + } + Ok(match left.data_type() { DataType::Utf8 => Arc::new(concat_elements_utf8( left.as_string::(), From 5e82d5be7bb1130738dcbf2b0b8daccc501885f4 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 7 Mar 2026 21:50:33 +0000 Subject: [PATCH 02/10] Support binary-to-binary concat UDF --- datafusion/functions/src/string/concat.rs | 78 +++++++++++++++++++---- datafusion/functions/src/strings.rs | 38 ++++++++++- 2 files changed, 100 insertions(+), 16 deletions(-) diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index c65b990f42397..9a5f6cd9ede46 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -25,8 +25,10 @@ use crate::string::concat; use crate::strings::{ ColumnarValueRef, LargeStringArrayBuilder, StringArrayBuilder, StringViewArrayBuilder, }; -use datafusion_common::cast::{as_string_array, as_string_view_array}; -use datafusion_common::{Result, ScalarValue, internal_err, plan_err}; +use datafusion_common::cast::{as_binary_array, as_string_array, as_string_view_array}; +use datafusion_common::{ + Result, ScalarValue, exec_datafusion_err, internal_err, plan_err, +}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; use datafusion_expr::{ColumnarValue, Documentation, Expr, Volatility, lit}; @@ -68,7 +70,7 @@ impl ConcatFunc { use DataType::*; Self { signature: Signature::variadic( - vec![Utf8View, Utf8, LargeUtf8], + vec![Utf8View, Utf8, LargeUtf8, Binary], Volatility::Immutable, ), } @@ -123,19 +125,25 @@ impl ScalarUDFImpl for ConcatFunc { // Scalar if array_len.is_none() { - let mut values = Vec::with_capacity(args.len()); + let mut values: Vec<&str> = Vec::with_capacity(args.len()); for arg in &args { let ColumnarValue::Scalar(scalar) = arg else { return internal_err!("concat expected scalar value, got {arg:?}"); }; - - match scalar.try_as_str() { - Some(Some(v)) => values.push(v), - Some(None) => {} // null literal - None => plan_err!( - "Concat function does not support scalar type {}", - scalar - )?, + if let ScalarValue::Binary(Some(value)) = scalar { + let s: &str = std::str::from_utf8(value).map_err(|_| { + exec_datafusion_err!("invalid UTF-8 in binary literal: {value:?}") + })?; + values.push(s); + } else { + match scalar.try_as_str() { + Some(Some(v)) => values.push(v), + Some(None) => {} // null literal + None => plan_err!( + "Concat function does not support scalar type {}", + scalar + )?, + } } } let result = values.concat(); @@ -171,6 +179,13 @@ impl ScalarUDFImpl for ConcatFunc { columns.push(ColumnarValueRef::Scalar(s.as_bytes())); } } + ColumnarValue::Scalar(ScalarValue::Binary(maybe_value)) => { + if let Some(b) = maybe_value { + // data_size is a capacity hint, so doesn't matter if it is chars or bytes + data_size += b.len() * len; + columns.push(ColumnarValueRef::Scalar(b.as_slice())); + } + } ColumnarValue::Array(array) => { match array.data_type() { DataType::Utf8 => { @@ -210,6 +225,17 @@ impl ScalarUDFImpl for ConcatFunc { }; columns.push(column); } + DataType::Binary => { + let string_array = as_binary_array(array)?; + + data_size += string_array.values().len(); + let column = if array.is_nullable() { + ColumnarValueRef::NullableBinaryArray(string_array) + } else { + ColumnarValueRef::NonNullableBinaryArray(string_array) + }; + columns.push(column); + } other => { return plan_err!( "Input was {other} which is not a supported datatype for concat function" @@ -451,7 +477,33 @@ mod tests { Utf8View, StringViewArray ); - + test_function!( + ConcatFunc::new(), + vec![ + ColumnarValue::Scalar(ScalarValue::Binary(Some( + "Café".as_bytes().into() + ))), + ColumnarValue::Scalar(ScalarValue::Utf8(None)), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("cc".to_string()))), + ], + Ok(Some("Cafécc")), + &str, + Utf8, + StringArray + ); + test_function!( + ConcatFunc::new(), + vec![ + ColumnarValue::Scalar(ScalarValue::Binary(Some(Vec::from( + "Café".as_bytes() + )))), + ColumnarValue::Scalar(ScalarValue::Binary(Some("cc".as_bytes().into()))), + ], + Ok(Some("Cafécc")), + &str, + Utf8, + StringArray + ); Ok(()) } diff --git a/datafusion/functions/src/strings.rs b/datafusion/functions/src/strings.rs index 8e25a45cf62dd..8d3b8ca7a625f 100644 --- a/datafusion/functions/src/strings.rs +++ b/datafusion/functions/src/strings.rs @@ -18,7 +18,7 @@ use std::mem::size_of; use arrow::array::{ - Array, ArrayAccessor, ArrayDataBuilder, ByteView, LargeStringArray, + Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray, NullBufferBuilder, StringArray, StringViewArray, StringViewBuilder, make_view, }; use arrow::buffer::{MutableBuffer, NullBuffer}; @@ -75,6 +75,11 @@ impl StringArrayBuilder { .extend_from_slice(array.value(i).as_bytes()); } } + ColumnarValueRef::NullableBinaryArray(array) => { + if !CHECK_VALID || array.is_valid(i) { + self.value_buffer.extend_from_slice(array.value(i)); + } + } ColumnarValueRef::NonNullableArray(array) => { self.value_buffer .extend_from_slice(array.value(i).as_bytes()); @@ -87,6 +92,9 @@ impl StringArrayBuilder { self.value_buffer .extend_from_slice(array.value(i).as_bytes()); } + ColumnarValueRef::NonNullableBinaryArray(array) => { + self.value_buffer.extend_from_slice(array.value(i)); + } } } @@ -165,6 +173,12 @@ impl StringViewArrayBuilder { self.block.push_str(array.value(i)); } } + ColumnarValueRef::NullableBinaryArray(array) => { + if !CHECK_VALID || array.is_valid(i) { + self.block + .push_str(std::str::from_utf8(array.value(i)).unwrap()); + } + } ColumnarValueRef::NonNullableArray(array) => { self.block.push_str(array.value(i)); } @@ -174,6 +188,10 @@ impl StringViewArrayBuilder { ColumnarValueRef::NonNullableStringViewArray(array) => { self.block.push_str(array.value(i)); } + ColumnarValueRef::NonNullableBinaryArray(array) => { + self.block + .push_str(std::str::from_utf8(array.value(i)).unwrap()); + } } } @@ -243,6 +261,11 @@ impl LargeStringArrayBuilder { .extend_from_slice(array.value(i).as_bytes()); } } + ColumnarValueRef::NullableBinaryArray(array) => { + if !CHECK_VALID || array.is_valid(i) { + self.value_buffer.extend_from_slice(array.value(i)); + } + } ColumnarValueRef::NonNullableArray(array) => { self.value_buffer .extend_from_slice(array.value(i).as_bytes()); @@ -255,6 +278,9 @@ impl LargeStringArrayBuilder { self.value_buffer .extend_from_slice(array.value(i).as_bytes()); } + ColumnarValueRef::NonNullableBinaryArray(array) => { + self.value_buffer.extend_from_slice(array.value(i)); + } } } @@ -340,6 +366,8 @@ pub enum ColumnarValueRef<'a> { NonNullableLargeStringArray(&'a LargeStringArray), NullableStringViewArray(&'a StringViewArray), NonNullableStringViewArray(&'a StringViewArray), + NullableBinaryArray(&'a BinaryArray), + NonNullableBinaryArray(&'a BinaryArray), } impl ColumnarValueRef<'_> { @@ -349,10 +377,12 @@ impl ColumnarValueRef<'_> { Self::Scalar(_) | Self::NonNullableArray(_) | Self::NonNullableLargeStringArray(_) - | Self::NonNullableStringViewArray(_) => true, + | Self::NonNullableStringViewArray(_) + | Self::NonNullableBinaryArray(_) => true, Self::NullableArray(array) => array.is_valid(i), Self::NullableStringViewArray(array) => array.is_valid(i), Self::NullableLargeStringArray(array) => array.is_valid(i), + Self::NullableBinaryArray(array) => array.is_valid(i), } } @@ -362,10 +392,12 @@ impl ColumnarValueRef<'_> { Self::Scalar(_) | Self::NonNullableArray(_) | Self::NonNullableStringViewArray(_) - | Self::NonNullableLargeStringArray(_) => None, + | Self::NonNullableLargeStringArray(_) + | Self::NonNullableBinaryArray(_) => None, Self::NullableArray(array) => array.nulls().cloned(), Self::NullableStringViewArray(array) => array.nulls().cloned(), Self::NullableLargeStringArray(array) => array.nulls().cloned(), + Self::NullableBinaryArray(array) => array.nulls().cloned(), } } } From bde377210c41a65331ab647a9e7535eeb1f23b00 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 7 Mar 2026 21:50:49 +0000 Subject: [PATCH 03/10] Add SLT for binary-to-binary concat UDF and operator --- datafusion/sqllogictest/test_files/scalar.slt | 19 +++++++++++++++ .../test_files/spark/string/concat.slt | 24 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/datafusion/sqllogictest/test_files/scalar.slt b/datafusion/sqllogictest/test_files/scalar.slt index e91ec1cb848ba..6eebc8ef2ea63 100644 --- a/datafusion/sqllogictest/test_files/scalar.slt +++ b/datafusion/sqllogictest/test_files/scalar.slt @@ -1739,6 +1739,25 @@ SELECT 'a' || 42 || 23.3 ---- a4223.3 +# concat of binary and text provides a text output +query T +select arrow_cast('Café', 'Utf8') || arrow_cast('Foobar', 'Binary'); +---- +CaféFoobar + +query T +select arrow_cast('Café', 'Binary') || arrow_cast('Foobar', 'Utf8'); +---- +CaféFoobar + +# Concat of two binaries should cast arguments to text and produce a text output, +# following common behaviour of PostreSQL. However, Spark is providing binary +query T +select arrow_cast('Café', 'Binary') || arrow_cast('Foobar', 'Binary'); +---- +CaféFoobar + + # test_not_expressions() query BB diff --git a/datafusion/sqllogictest/test_files/spark/string/concat.slt b/datafusion/sqllogictest/test_files/spark/string/concat.slt index 97e7b57f7d06e..373e670658100 100644 --- a/datafusion/sqllogictest/test_files/spark/string/concat.slt +++ b/datafusion/sqllogictest/test_files/spark/string/concat.slt @@ -70,3 +70,27 @@ query TT SELECT concat('a', arrow_cast('b', 'LargeUtf8'), arrow_cast('c', 'Utf8View')), arrow_typeof(concat('a', arrow_cast('b', 'LargeUtf8'), arrow_cast('c', 'Utf8View'))); ---- abc Utf8View + +# Test mixed types: Utf8 + Binary +query TT +SELECT concat(arrow_cast('hello', 'Utf8'), arrow_cast(' world', 'Binary')), arrow_typeof(concat(arrow_cast('hello', 'Utf8'), arrow_cast(' world', 'Binary'))); +---- +hello world Utf8 + +# Test mixed types: Utf8View + Binary +query TT +SELECT concat(arrow_cast('hello', 'Utf8View'), arrow_cast(' world', 'Binary')), arrow_typeof(concat(arrow_cast('hello', 'Utf8View'), arrow_cast(' world', 'Binary'))); +---- +hello world Utf8View + +# Test mixed types: Binary + Binary +query TT +SELECT concat(arrow_cast('hello', 'Binary'), arrow_cast(' world', 'Binary')), arrow_typeof(concat(arrow_cast('hello', 'Binary'), arrow_cast(' world', 'Binary'))); +---- +hello world Utf8 + +# Test mixed types with ws: Binary + Binary +query TT +SELECT concat_ws('|', arrow_cast('hello', 'Binary'), arrow_cast('world', 'Binary')), arrow_typeof(concat_ws('|', arrow_cast('hello', 'Binary'), arrow_cast('world', 'Binary'))); +---- +hello|world Utf8 From 417471f39a3a88ee021e4d13e3bf6f3173eb923d Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 7 Mar 2026 21:57:52 +0000 Subject: [PATCH 04/10] Fixup schema test --- datafusion/sqllogictest/test_files/information_schema.slt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index aeeb3481c76b9..fdce8b7ac6411 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -846,8 +846,10 @@ datafusion public string_agg 1 OUT NULL String NULL false 1 query TTTBI rowsort select specific_name, data_type, parameter_mode, is_variadic, rid from information_schema.parameters where specific_name = 'concat'; ---- -concat String IN true 0 +concat Binary IN true 0 +concat String IN true 1 concat String OUT false 0 +concat String OUT false 1 # test ceorcion signature query TTITI rowsort From a10287ca614cca6ecb8b470d23295dc94a6bb774 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 21 Mar 2026 16:04:11 +0000 Subject: [PATCH 05/10] Tests for invalid binaries --- .../test_files/spark/string/concat.slt | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/datafusion/sqllogictest/test_files/spark/string/concat.slt b/datafusion/sqllogictest/test_files/spark/string/concat.slt index 373e670658100..428ce394195f1 100644 --- a/datafusion/sqllogictest/test_files/spark/string/concat.slt +++ b/datafusion/sqllogictest/test_files/spark/string/concat.slt @@ -94,3 +94,37 @@ query TT SELECT concat_ws('|', arrow_cast('hello', 'Binary'), arrow_cast('world', 'Binary')), arrow_typeof(concat_ws('|', arrow_cast('hello', 'Binary'), arrow_cast('world', 'Binary'))); ---- hello|world Utf8 + +# Invalid UTF8 binaries for concatenation, scalar case +# 636166c3a9 = café , where c3a9 is a char é +# 68656c6c6f = hello +query error Execution error: invalid UTF-8 in binary literal +SELECT concat(x'636166c3', x'68656c6c6f'); + +statement ok +create table t as values (x'636166c3', x'68656c6c6f'); + +# Invalid UTF8 sequence for concatenation, array case +query error Arrow error: Invalid argument error: Invalid UTF8 sequence at string +SELECT concat(column1, column2) from t; + +statement ok +drop table t + +statement ok +create table t as values (x'636166c3', x'a968656c6c6f'); + +# Invalid UTF8 binaries make a valid UTF8 sequence after concatenation, array case +query T +SELECT concat(column1, column2) from t; +---- +caféhello + +statement ok +drop table t + +# Invalid UTF8 binaries make a valid UTF8 sequence after concatenation, scalar case +query T +SELECT concat(x'636166c3', x'a968656c6c6f'); +---- +caféhello From 2d43e661a0eb605327225fc7ed0e2c4809bf02ec Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 21 Mar 2026 16:02:45 +0000 Subject: [PATCH 06/10] Make finish return error # Conflicts: # datafusion/functions/src/string/concat_ws.rs --- datafusion/functions/src/string/concat.rs | 4 ++-- datafusion/functions/src/string/concat_ws.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index 9a5f6cd9ede46..e0de5292a2cdd 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -257,7 +257,7 @@ impl ScalarUDFImpl for ConcatFunc { builder.append_offset(); } - let string_array = builder.finish(None); + let string_array = builder.finish(None)?; Ok(ColumnarValue::Array(Arc::new(string_array))) } DataType::Utf8View => { @@ -281,7 +281,7 @@ impl ScalarUDFImpl for ConcatFunc { builder.append_offset(); } - let string_array = builder.finish(None); + let string_array = builder.finish(None)?; Ok(ColumnarValue::Array(Arc::new(string_array))) } _ => unreachable!(), diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 80e11d286ed87..369c9789e30ab 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -355,7 +355,7 @@ impl ScalarUDFImpl for ConcatWsFunc { } builder.append_offset(); } - Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) + Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?))) } _ => { let mut builder = StringArrayBuilder::with_capacity(len, data_size); @@ -376,7 +376,7 @@ impl ScalarUDFImpl for ConcatWsFunc { } builder.append_offset(); } - Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) + Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?))) } } } From d1f04838377bf53aafcd753d724b356e343bbb2e Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 21 Mar 2026 16:03:56 +0000 Subject: [PATCH 07/10] Flag binary array operations as tainted --- datafusion/functions/src/string/concat.rs | 14 ++--- datafusion/functions/src/strings.rs | 70 ++++++++++++++++------- 2 files changed, 55 insertions(+), 29 deletions(-) diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index e0de5292a2cdd..d13b2820e9d85 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -125,19 +125,16 @@ impl ScalarUDFImpl for ConcatFunc { // Scalar if array_len.is_none() { - let mut values: Vec<&str> = Vec::with_capacity(args.len()); + let mut values: Vec<&[u8]> = Vec::with_capacity(args.len()); for arg in &args { let ColumnarValue::Scalar(scalar) = arg else { return internal_err!("concat expected scalar value, got {arg:?}"); }; if let ScalarValue::Binary(Some(value)) = scalar { - let s: &str = std::str::from_utf8(value).map_err(|_| { - exec_datafusion_err!("invalid UTF-8 in binary literal: {value:?}") - })?; - values.push(s); + values.push(value); } else { match scalar.try_as_str() { - Some(Some(v)) => values.push(v), + Some(Some(v)) => values.push(v.as_bytes()), Some(None) => {} // null literal None => plan_err!( "Concat function does not support scalar type {}", @@ -146,7 +143,10 @@ impl ScalarUDFImpl for ConcatFunc { } } } - let result = values.concat(); + let concat_bytes = values.concat(); + let result = std::str::from_utf8(&concat_bytes) + .map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))? + .to_string(); return match return_datatype { DataType::Utf8View => { diff --git a/datafusion/functions/src/strings.rs b/datafusion/functions/src/strings.rs index 8d3b8ca7a625f..2c6a5f36025ca 100644 --- a/datafusion/functions/src/strings.rs +++ b/datafusion/functions/src/strings.rs @@ -17,6 +17,8 @@ use std::mem::size_of; +use datafusion_common::{Result, internal_err}; + use arrow::array::{ Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray, NullBufferBuilder, StringArray, StringViewArray, StringViewBuilder, make_view, @@ -30,6 +32,8 @@ use arrow::datatypes::DataType; pub struct StringArrayBuilder { offsets_buffer: MutableBuffer, value_buffer: MutableBuffer, + /// If true, a safety check is required during the `finish` call + tainted: bool, } impl StringArrayBuilder { @@ -45,6 +49,7 @@ impl StringArrayBuilder { Self { offsets_buffer, value_buffer: MutableBuffer::with_capacity(data_capacity), + tainted: false, } } @@ -79,6 +84,7 @@ impl StringArrayBuilder { if !CHECK_VALID || array.is_valid(i) { self.value_buffer.extend_from_slice(array.value(i)); } + self.tainted = true; } ColumnarValueRef::NonNullableArray(array) => { self.value_buffer @@ -94,6 +100,7 @@ impl StringArrayBuilder { } ColumnarValueRef::NonNullableBinaryArray(array) => { self.value_buffer.extend_from_slice(array.value(i)); + self.tainted = true; } } } @@ -109,17 +116,17 @@ impl StringArrayBuilder { /// Finalize the builder into a concrete [`StringArray`]. /// - /// # Panics + /// # Errors /// - /// This method can panic when: + /// Returns an error when: /// /// - the provided `null_buffer` is not the same length as the `offsets_buffer`. - pub fn finish(self, null_buffer: Option) -> StringArray { + pub fn finish(self, null_buffer: Option) -> Result { let row_count = self.offsets_buffer.len() / size_of::() - 1; - if let Some(ref null_buffer) = null_buffer { - assert_eq!( - null_buffer.len(), - row_count, + if let Some(ref null_buffer) = null_buffer + && null_buffer.len() != row_count + { + return internal_err!( "Null buffer and offsets buffer must be the same length" ); } @@ -128,10 +135,17 @@ impl StringArrayBuilder { .add_buffer(self.offsets_buffer.into()) .add_buffer(self.value_buffer.into()) .nulls(null_buffer); - // SAFETY: all data that was appended was valid UTF8 and the values - // and offsets were created correctly - let array_data = unsafe { array_builder.build_unchecked() }; - StringArray::from(array_data) + if self.tainted { + // Raw binary arrays with possible invalid utf-8 were used, + // so let ArrayDataBuilder perform validation + let array_data = array_builder.build()?; + Ok(StringArray::from(array_data)) + } else { + // SAFETY: all data that was appended was valid UTF8 and the values + // and offsets were created correctly + let array_data = unsafe { array_builder.build_unchecked() }; + Ok(StringArray::from(array_data)) + } } } @@ -216,6 +230,8 @@ impl StringViewArrayBuilder { pub struct LargeStringArrayBuilder { offsets_buffer: MutableBuffer, value_buffer: MutableBuffer, + /// If true, a safety check is required during the `finish` call + tainted: bool, } impl LargeStringArrayBuilder { @@ -231,6 +247,7 @@ impl LargeStringArrayBuilder { Self { offsets_buffer, value_buffer: MutableBuffer::with_capacity(data_capacity), + tainted: false, } } @@ -265,6 +282,7 @@ impl LargeStringArrayBuilder { if !CHECK_VALID || array.is_valid(i) { self.value_buffer.extend_from_slice(array.value(i)); } + self.tainted = true; } ColumnarValueRef::NonNullableArray(array) => { self.value_buffer @@ -280,6 +298,7 @@ impl LargeStringArrayBuilder { } ColumnarValueRef::NonNullableBinaryArray(array) => { self.value_buffer.extend_from_slice(array.value(i)); + self.tainted = true; } } } @@ -295,17 +314,17 @@ impl LargeStringArrayBuilder { /// Finalize the builder into a concrete [`LargeStringArray`]. /// - /// # Panics + /// # Errors /// - /// This method can panic when: + /// Returns an error when: /// /// - the provided `null_buffer` is not the same length as the `offsets_buffer`. - pub fn finish(self, null_buffer: Option) -> LargeStringArray { + pub fn finish(self, null_buffer: Option) -> Result { let row_count = self.offsets_buffer.len() / size_of::() - 1; - if let Some(ref null_buffer) = null_buffer { - assert_eq!( - null_buffer.len(), - row_count, + if let Some(ref null_buffer) = null_buffer + && null_buffer.len() != row_count + { + return internal_err!( "Null buffer and offsets buffer must be the same length" ); } @@ -314,10 +333,17 @@ impl LargeStringArrayBuilder { .add_buffer(self.offsets_buffer.into()) .add_buffer(self.value_buffer.into()) .nulls(null_buffer); - // SAFETY: all data that was appended was valid Large UTF8 and the values - // and offsets were created correctly - let array_data = unsafe { array_builder.build_unchecked() }; - LargeStringArray::from(array_data) + if self.tainted { + // Raw binary arrays with possible invalid utf-8 were used, + // so let ArrayDataBuilder perform validation + let array_data = array_builder.build()?; + Ok(LargeStringArray::from(array_data)) + } else { + // SAFETY: all data that was appended was valid Large UTF8 and the values + // and offsets were created correctly + let array_data = unsafe { array_builder.build_unchecked() }; + Ok(LargeStringArray::from(array_data)) + } } } From 469f79070c578d8c8a67eb25b40166a37eb131b2 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 21 Mar 2026 20:55:27 +0000 Subject: [PATCH 08/10] Refactor return type code --- datafusion/functions/src/string/concat.rs | 30 +++++++++++------------ 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index d13b2820e9d85..8d04cfdf7841e 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -77,6 +77,17 @@ impl ConcatFunc { } } +fn deduce_return_type(arg_types: &[DataType]) -> DataType { + use DataType::*; + if arg_types.contains(&Utf8View) { + Utf8View + } else if arg_types.contains(&LargeUtf8) { + LargeUtf8 + } else { + Utf8 + } +} + impl ScalarUDFImpl for ConcatFunc { fn as_any(&self) -> &dyn Any { self @@ -94,14 +105,7 @@ impl ScalarUDFImpl for ConcatFunc { /// mixed inputs, prefer Utf8View; prefer LargeUtf8 over Utf8 to avoid /// potential overflow on LargeUtf8 input. fn return_type(&self, arg_types: &[DataType]) -> Result { - use DataType::*; - if arg_types.contains(&Utf8View) { - Ok(Utf8View) - } else if arg_types.contains(&LargeUtf8) { - Ok(LargeUtf8) - } else { - Ok(Utf8) - } + Ok(deduce_return_type(arg_types)) } /// Concatenates the text representations of all the arguments. NULL arguments are ignored. @@ -109,14 +113,8 @@ impl ScalarUDFImpl for ConcatFunc { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let ScalarFunctionArgs { args, .. } = args; - let return_datatype = if args.iter().any(|c| c.data_type() == DataType::Utf8View) - { - DataType::Utf8View - } else if args.iter().any(|c| c.data_type() == DataType::LargeUtf8) { - DataType::LargeUtf8 - } else { - DataType::Utf8 - }; + let arg_types: Vec = args.iter().map(|c| c.data_type()).collect(); + let return_datatype = deduce_return_type(&arg_types); let array_len = args.iter().find_map(|x| match x { ColumnarValue::Array(array) => Some(array.len()), From a3d33bd6460fc1c4c065a814ee0266d4f4ecd784 Mon Sep 17 00:00:00 2001 From: theirix Date: Mon, 23 Mar 2026 21:43:48 +0000 Subject: [PATCH 09/10] Handle invalid utf-8 in StringViewArrayBuilder --- datafusion/functions/src/string/concat.rs | 4 +- datafusion/functions/src/string/concat_ws.rs | 6 +- datafusion/functions/src/strings.rs | 69 +++++++++++++------ .../test_files/spark/string/concat.slt | 7 ++ 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index 8d04cfdf7841e..40b5b45857a9a 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -264,10 +264,10 @@ impl ScalarUDFImpl for ConcatFunc { columns .iter() .for_each(|column| builder.write::(column, i)); - builder.append_offset(); + builder.append_offset()?; } - let string_array = builder.finish(None); + let string_array = builder.finish(None)?; Ok(ColumnarValue::Array(Arc::new(string_array))) } DataType::LargeUtf8 => { diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 369c9789e30ab..8aa3c623c7fff 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -319,7 +319,7 @@ impl ScalarUDFImpl for ConcatWsFunc { let mut builder = StringViewArrayBuilder::with_capacity(len, data_size); for i in 0..len { if !sep.is_valid(i) { - builder.append_offset(); + builder.append_offset()?; continue; } let mut first = true; @@ -332,9 +332,9 @@ impl ScalarUDFImpl for ConcatWsFunc { first = false; } } - builder.append_offset(); + builder.append_offset()?; } - Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) + Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?))) } DataType::LargeUtf8 => { let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size); diff --git a/datafusion/functions/src/strings.rs b/datafusion/functions/src/strings.rs index 2c6a5f36025ca..b0e988a3a8727 100644 --- a/datafusion/functions/src/strings.rs +++ b/datafusion/functions/src/strings.rs @@ -17,7 +17,7 @@ use std::mem::size_of; -use datafusion_common::{Result, internal_err}; +use datafusion_common::{Result, exec_datafusion_err, internal_err}; use arrow::array::{ Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray, @@ -61,6 +61,7 @@ impl StringArrayBuilder { match column { ColumnarValueRef::Scalar(s) => { self.value_buffer.extend_from_slice(s); + self.tainted = true; } ColumnarValueRef::NullableArray(array) => { if !CHECK_VALID || array.is_valid(i) { @@ -151,15 +152,18 @@ impl StringArrayBuilder { pub struct StringViewArrayBuilder { builder: StringViewBuilder, - block: String, + block: Vec, + /// If true, a safety check is required during the `append_offset` call + tainted: bool, } impl StringViewArrayBuilder { - pub fn with_capacity(_item_capacity: usize, data_capacity: usize) -> Self { - let builder = StringViewBuilder::with_capacity(data_capacity); + pub fn with_capacity(item_capacity: usize, _data_capacity: usize) -> Self { + let builder = StringViewBuilder::with_capacity(item_capacity); Self { builder, - block: String::new(), + block: vec![], + tainted: false, } } @@ -170,59 +174,79 @@ impl StringViewArrayBuilder { ) { match column { ColumnarValueRef::Scalar(s) => { - self.block.push_str(std::str::from_utf8(s).unwrap()); + self.block.extend_from_slice(s); + self.tainted = true; } ColumnarValueRef::NullableArray(array) => { if !CHECK_VALID || array.is_valid(i) { - self.block.push_str(array.value(i)); + self.block.extend_from_slice(array.value(i).as_bytes()); } } ColumnarValueRef::NullableLargeStringArray(array) => { if !CHECK_VALID || array.is_valid(i) { - self.block.push_str(array.value(i)); + self.block.extend_from_slice(array.value(i).as_bytes()); } } ColumnarValueRef::NullableStringViewArray(array) => { if !CHECK_VALID || array.is_valid(i) { - self.block.push_str(array.value(i)); + self.block.extend_from_slice(array.value(i).as_bytes()); } } ColumnarValueRef::NullableBinaryArray(array) => { if !CHECK_VALID || array.is_valid(i) { - self.block - .push_str(std::str::from_utf8(array.value(i)).unwrap()); + self.block.extend_from_slice(array.value(i)); } + self.tainted = true; } ColumnarValueRef::NonNullableArray(array) => { - self.block.push_str(array.value(i)); + self.block.extend_from_slice(array.value(i).as_bytes()); } ColumnarValueRef::NonNullableLargeStringArray(array) => { - self.block.push_str(array.value(i)); + self.block.extend_from_slice(array.value(i).as_bytes()); } ColumnarValueRef::NonNullableStringViewArray(array) => { - self.block.push_str(array.value(i)); + self.block.extend_from_slice(array.value(i).as_bytes()); } ColumnarValueRef::NonNullableBinaryArray(array) => { - self.block - .push_str(std::str::from_utf8(array.value(i)).unwrap()); + self.block.extend_from_slice(array.value(i)); + self.tainted = true; } } } - pub fn append_offset(&mut self) { - self.builder.append_value(&self.block); + pub fn append_offset(&mut self) -> Result<()> { + let block_str = if self.tainted { + std::str::from_utf8(&self.block) + .map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))? + } else { + // SAFETY: all data that was appended was valid UTF8 + unsafe { std::str::from_utf8_unchecked(&self.block) } + }; + self.builder.append_value(block_str); self.block.clear(); + Ok(()) } - pub fn finish(mut self, null_buffer: Option) -> StringViewArray { + /// Finalize the builder into a concrete [`StringViewArray`]. + /// + /// # Errors + /// + /// Returns an error when: + /// + /// - the provided `null_buffer` does not match amount of `append_offset` calls. + pub fn finish(mut self, null_buffer: Option) -> Result { let array = self.builder.finish(); match null_buffer { + Some(nulls) if nulls.len() != array.len() => { + internal_err!("Null buffer and views buffer must be the same length") + } Some(nulls) => { - let array_data = array.into_data().into_builder().nulls(Some(nulls)); + let array_builder = array.into_data().into_builder().nulls(Some(nulls)); // SAFETY: the underlying data is valid; we are only adding a null buffer - StringViewArray::from(unsafe { array_data.build_unchecked() }) + let array_data = unsafe { array_builder.build_unchecked() }; + Ok(StringViewArray::from(array_data)) } - None => array, + None => Ok(array), } } } @@ -259,6 +283,7 @@ impl LargeStringArrayBuilder { match column { ColumnarValueRef::Scalar(s) => { self.value_buffer.extend_from_slice(s); + self.tainted = true; } ColumnarValueRef::NullableArray(array) => { if !CHECK_VALID || array.is_valid(i) { diff --git a/datafusion/sqllogictest/test_files/spark/string/concat.slt b/datafusion/sqllogictest/test_files/spark/string/concat.slt index 428ce394195f1..df539a1c7a159 100644 --- a/datafusion/sqllogictest/test_files/spark/string/concat.slt +++ b/datafusion/sqllogictest/test_files/spark/string/concat.slt @@ -101,6 +101,9 @@ hello|world Utf8 query error Execution error: invalid UTF-8 in binary literal SELECT concat(x'636166c3', x'68656c6c6f'); +query error Execution error: invalid UTF-8 in binary literal +SELECT concat(x'636166c3', arrow_cast(x'68656c6c6f', 'Utf8View')); + statement ok create table t as values (x'636166c3', x'68656c6c6f'); @@ -108,6 +111,10 @@ create table t as values (x'636166c3', x'68656c6c6f'); query error Arrow error: Invalid argument error: Invalid UTF8 sequence at string SELECT concat(column1, column2) from t; +# Invalid UTF8 sequence for concatenation, array case +query error DataFusion error: Execution error: invalid UTF-8 in binary literal +SELECT concat(column1, arrow_cast(column2, 'Utf8View')) from t; + statement ok drop table t From eff396118ea16d0732eeff820e863287ceed9513 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Sat, 4 Apr 2026 13:08:19 +0800 Subject: [PATCH 10/10] trigger ci