From 22fd1b54436032daf99ddd0f54b25b6b15c91340 Mon Sep 17 00:00:00 2001 From: Devan Date: Tue, 3 Sep 2024 21:09:51 -0500 Subject: [PATCH 01/21] Update concat_ws scalar function to support Utf8View Signed-off-by: Devan --- datafusion/functions/src/string/concat.rs | 10 +- datafusion/functions/src/string/concat_ws.rs | 238 +++++++++++++++--- .../sqllogictest/test_files/string_view.slt | 79 +++++- 3 files changed, 279 insertions(+), 48 deletions(-) diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index 00fe69b0bd33..98f57efef90d 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -136,13 +136,9 @@ impl ScalarUDFImpl for ConcatFunc { for arg in args { match arg { - ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => { - if let Some(s) = maybe_value { - data_size += s.len() * len; - columns.push(ColumnarValueRef::Scalar(s.as_bytes())); - } - } - ColumnarValue::Scalar(ScalarValue::Utf8View(maybe_value)) => { + ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(maybe_value)) + | ColumnarValue::Scalar(ScalarValue::Utf8View(maybe_value)) => { if let Some(s) = maybe_value { data_size += s.len() * len; columns.push(ColumnarValueRef::Scalar(s.as_bytes())); diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 4d05f4e707b1..d87056d4afa0 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::StringArray; +use arrow::array::{as_largestring_array, Array, StringArray}; use std::any::Any; use std::sync::Arc; use arrow::datatypes::DataType; -use arrow::datatypes::DataType::Utf8; -use datafusion_common::cast::as_string_array; -use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; +use datafusion_common::cast::{as_string_array, as_string_view_array}; +use datafusion_common::{exec_err, internal_err, plan_err, Result, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{lit, ColumnarValue, Expr, Volatility}; @@ -46,9 +45,10 @@ impl Default for ConcatWsFunc { impl ConcatWsFunc { pub fn new() -> Self { - use DataType::*; Self { - signature: Signature::variadic(vec![Utf8], Volatility::Immutable), + signature: Signature::variadic_any( + Volatility::Immutable, + ), } } } @@ -66,8 +66,19 @@ impl ScalarUDFImpl for ConcatWsFunc { &self.signature } - fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(Utf8) + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + let mut dt = &Utf8; + arg_types.iter().for_each(|data_type| { + if data_type == &Utf8View { + dt = data_type; + } + if data_type == &LargeUtf8 && dt != &Utf8View { + dt = data_type; + } + }); + + Ok(dt.to_owned()) } /// Concatenates all but the first argument, with separators. The first argument is used as the separator string, and should not be NULL. Other NULL arguments are ignored. @@ -89,6 +100,18 @@ impl ScalarUDFImpl for ConcatWsFunc { }) .next(); + let mut return_datatype = DataType::Utf8; + args.iter().for_each(|col| { + if col.data_type() == DataType::Utf8View { + return_datatype = col.data_type(); + } + if col.data_type() == DataType::LargeUtf8 + && return_datatype != DataType::Utf8View + { + return_datatype = col.data_type(); + } + }); + // Scalar if array_len.is_none() { let sep = match &args[0] { @@ -104,27 +127,43 @@ impl ScalarUDFImpl for ConcatWsFunc { for arg in iter.by_ref() { match arg { - ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) + | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => { result.push_str(s); break; } - ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + ColumnarValue::Scalar(ScalarValue::Utf8(None)) + | ColumnarValue::Scalar(ScalarValue::Utf8View(None)) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(None)) => {} _ => unreachable!(), } } for arg in iter.by_ref() { match arg { - ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) + | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => { result.push_str(sep); result.push_str(s); } - ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + ColumnarValue::Scalar(ScalarValue::Utf8(None)) + | ColumnarValue::Scalar(ScalarValue::Utf8View(None)) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(None)) => {} _ => unreachable!(), } } - return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))); + return match return_datatype { + DataType::Utf8View => { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))) + } + DataType::LargeUtf8 => { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))) + } + _ => Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))), + }; } // Array @@ -155,52 +194,145 @@ impl ScalarUDFImpl for ConcatWsFunc { let mut columns = Vec::with_capacity(args.len() - 1); for arg in &args[1..] { match arg { - ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => { + ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(maybe_value)) + | ColumnarValue::Scalar(ScalarValue::Utf8View(maybe_value)) => { if let Some(s) = maybe_value { data_size += s.len() * len; columns.push(ColumnarValueRef::Scalar(s.as_bytes())); } } ColumnarValue::Array(array) => { - let string_array = as_string_array(array)?; - data_size += string_array.values().len(); - let column = if array.is_nullable() { - ColumnarValueRef::NullableArray(string_array) - } else { - ColumnarValueRef::NonNullableArray(string_array) + match array.data_type() { + DataType::Utf8 => { + let string_array = as_string_array(array)?; + + data_size += string_array.values().len(); + let column = if array.is_nullable() { + ColumnarValueRef::NullableArray(string_array) + } else { + ColumnarValueRef::NonNullableArray(string_array) + }; + columns.push(column); + }, + DataType::LargeUtf8 => { + let string_array = as_largestring_array(array); + + data_size += string_array.values().len(); + let column = if array.is_nullable() { + ColumnarValueRef::NullableLargeStringArray(string_array) + } else { + ColumnarValueRef::NonNullableLargeStringArray(string_array) + }; + columns.push(column); + }, + DataType::Utf8View => { + let string_array = as_string_view_array(array)?; + + data_size += string_array.len(); + let column = if array.is_nullable() { + ColumnarValueRef::NullableStringViewArray(string_array) + } else { + ColumnarValueRef::NonNullableStringViewArray(string_array) + }; + columns.push(column); + }, + other => { + return plan_err!("Input was {other} which is not a supported datatype for concat function") + } }; - columns.push(column); } _ => unreachable!(), } } - let mut builder = StringArrayBuilder::with_capacity(len, data_size); - for i in 0..len { - if !sep.is_valid(i) { - builder.append_offset(); - continue; - } + match return_datatype { + DataType::Utf8 => { + let mut builder = StringArrayBuilder::with_capacity(len, data_size); + for i in 0..len { + if !sep.is_valid(i) { + builder.append_offset(); + continue; + } - let mut iter = columns.iter(); - for column in iter.by_ref() { - if column.is_valid(i) { - builder.write::(column, i); - break; + let mut iter = columns.iter(); + for column in iter.by_ref() { + if column.is_valid(i) { + builder.write::(column, i); + break; + } + } + + for column in iter { + if column.is_valid(i) { + builder.write::(&sep, i); + builder.write::(column, i); + } + } + + builder.append_offset(); } + + Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) } + DataType::Utf8View => { + let mut builder = StringViewArrayBuilder::with_capacity(len, data_size); + for i in 0..len { + if !sep.is_valid(i) { + builder.append_offset(); + continue; + } - for column in iter { - if column.is_valid(i) { - builder.write::(&sep, i); - builder.write::(column, i); + let mut iter = columns.iter(); + for column in iter.by_ref() { + if column.is_valid(i) { + builder.write::(column, i); + break; + } + } + + for column in iter { + if column.is_valid(i) { + builder.write::(&sep, i); + builder.write::(column, i); + } + } + + builder.append_offset(); } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } + DataType::LargeUtf8 => { + let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size); + for i in 0..len { + if !sep.is_valid(i) { + builder.append_offset(); + continue; + } - builder.append_offset(); - } + let mut iter = columns.iter(); + for column in iter.by_ref() { + if column.is_valid(i) { + builder.write::(column, i); + break; + } + } - Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) + for column in iter { + if column.is_valid(i) { + builder.write::(&sep, i); + builder.write::(column, i); + } + } + + builder.append_offset(); + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) + } + _ => unreachable!(), + } } /// Simply the `concat_ws` function by @@ -304,7 +436,7 @@ mod tests { use std::sync::Arc; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow::datatypes::DataType::Utf8; + use arrow::datatypes::DataType::{Utf8, LargeUtf8, Utf8View}; use crate::string::concat_ws::ConcatWsFunc; use datafusion_common::Result; @@ -365,6 +497,32 @@ mod tests { Utf8, StringArray ); + test_function!( + ConcatWsFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::from("|")), + ColumnarValue::Scalar(ScalarValue::from("aa")), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(None)), + ColumnarValue::Scalar(ScalarValue::from("cc")), + ], + Ok(Some("aa|cc")), + &str, + LargeUtf8, + StringArray + ); + test_function!( + ConcatWsFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::from("|")), + ColumnarValue::Scalar(ScalarValue::from("aa")), + ColumnarValue::Scalar(ScalarValue::Utf8View(None)), + ColumnarValue::Scalar(ScalarValue::from("cc")), + ], + Ok(Some("aa|cc")), + &str, + Utf8View, + StringArray + ); Ok(()) } diff --git a/datafusion/sqllogictest/test_files/string_view.slt b/datafusion/sqllogictest/test_files/string_view.slt index eb625e530b66..2ff935351828 100644 --- a/datafusion/sqllogictest/test_files/string_view.slt +++ b/datafusion/sqllogictest/test_files/string_view.slt @@ -796,7 +796,7 @@ EXPLAIN SELECT FROM test; ---- logical_plan -01)Projection: concat_ws(Utf8(", "), CAST(test.column1_utf8view AS Utf8), CAST(test.column2_utf8view AS Utf8)) AS c +01)Projection: concat_ws(Utf8(", "), test.column1_utf8view, test.column2_utf8view) AS c 02)--TableScan: test projection=[column1_utf8view, column2_utf8view] ## Ensure no casts for CONTAINS @@ -927,6 +927,83 @@ XiangpengXiangpeng RaphaelR R +## Should run CONCAT successfully with utf8view +query T +SELECT + concat(column1_utf8view, column2_utf8view) as c +FROM test; +---- +AndrewX +XiangpengXiangpeng +RaphaelR +R + +## Should run CONCAT_WS successfully with utf8 +query T +SELECT + concat_ws(',', column1_utf8, column2_utf8) as c +FROM test; +---- +Andrew,X +Xiangpeng,Xiangpeng +Raphael,R +R + +## Should run CONCAT_WS successfully with utf8view +query T +SELECT + concat_ws(',', column1_utf8view, column2_utf8view) as c +FROM test; +---- +Andrew,X +Xiangpeng,Xiangpeng +Raphael,R +R + +## Should run CONCAT_WS successfully with largeutf8 +query T +SELECT + concat_ws(',', column1_large_utf8, column2_large_utf8) as c +FROM test; +---- +Andrew,X +Xiangpeng,Xiangpeng +Raphael,R +R + +## Should run CONCAT_WS successfully with utf8 and largeutf8 +query T +SELECT + concat_ws(',', column1_utf8, column2_large_utf8) as c +FROM test; +---- +Andrew,X +Xiangpeng,Xiangpeng +Raphael,R +R + +## Should run CONCAT_WS successfully with utf8 and utf8view +query T +SELECT + concat_ws(',', column1_utf8view, column2_utf8) as c +FROM test; +---- +Andrew,X +Xiangpeng,Xiangpeng +Raphael,R +R + +## Should run CONCAT_WS successfully with largeutf8 and utf8view +query T +SELECT + concat_ws(',', column1_utf8view, column2_large_utf8) as c +FROM test; +---- +Andrew,X +Xiangpeng,Xiangpeng +Raphael,R +R + ## Ensure no casts for LPAD query TT EXPLAIN SELECT From d60a75d2ae5d4592cac6a6dc3cb373b4e5b920e5 Mon Sep 17 00:00:00 2001 From: Devan Date: Tue, 3 Sep 2024 21:10:49 -0500 Subject: [PATCH 02/21] fmt Signed-off-by: Devan --- datafusion/functions/src/string/concat_ws.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index d87056d4afa0..faf0a4c829d0 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -46,9 +46,7 @@ impl Default for ConcatWsFunc { impl ConcatWsFunc { pub fn new() -> Self { Self { - signature: Signature::variadic_any( - Volatility::Immutable, - ), + signature: Signature::variadic_any(Volatility::Immutable), } } } @@ -436,7 +434,7 @@ mod tests { use std::sync::Arc; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow::datatypes::DataType::{Utf8, LargeUtf8, Utf8View}; + use arrow::datatypes::DataType::{LargeUtf8, Utf8, Utf8View}; use crate::string::concat_ws::ConcatWsFunc; use datafusion_common::Result; From 1383f8bef3f78d4b2fecb46a4c0a1be410228af5 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 10:58:40 -0500 Subject: [PATCH 03/21] adds one_of for type sig --- datafusion/functions/src/string/concat_ws.rs | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index faf0a4c829d0..94d7422023f8 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -21,17 +21,17 @@ use std::sync::Arc; use arrow::datatypes::DataType; +use crate::string::common::*; +use crate::string::concat::simplify_concat; +use crate::string::concat_ws; use datafusion_common::cast::{as_string_array, as_string_view_array}; use datafusion_common::{exec_err, internal_err, plan_err, Result, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; +use datafusion_expr::TypeSignature::Variadic; use datafusion_expr::{lit, ColumnarValue, Expr, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; -use crate::string::common::*; -use crate::string::concat::simplify_concat; -use crate::string::concat_ws; - #[derive(Debug)] pub struct ConcatWsFunc { signature: Signature, @@ -45,8 +45,16 @@ impl Default for ConcatWsFunc { impl ConcatWsFunc { pub fn new() -> Self { + use DataType::*; Self { - signature: Signature::variadic_any(Volatility::Immutable), + signature: Signature::one_of( + vec![ + Variadic(vec![Utf8View]), + Variadic(vec![Utf8, LargeUtf8]), + Variadic(vec![Utf8View, Utf8, LargeUtf8]), + ], + Volatility::Volatile, + ), } } } @@ -353,7 +361,7 @@ impl ScalarUDFImpl for ConcatWsFunc { fn simplify_concat_ws(delimiter: &Expr, args: &[Expr]) -> Result { match delimiter { Expr::Literal( - ScalarValue::Utf8(delimiter) | ScalarValue::LargeUtf8(delimiter), + ScalarValue::Utf8(delimiter) | ScalarValue::LargeUtf8(delimiter) | ScalarValue::Utf8View(delimiter), ) => { match delimiter { // when the delimiter is an empty string, @@ -366,8 +374,8 @@ fn simplify_concat_ws(delimiter: &Expr, args: &[Expr]) -> Result {} - Expr::Literal(ScalarValue::Utf8(Some(v)) | ScalarValue::LargeUtf8(Some(v))) => { + Expr::Literal(ScalarValue::Utf8(None) | ScalarValue::LargeUtf8(None) | ScalarValue::Utf8View(None)) => {} + Expr::Literal(ScalarValue::Utf8(Some(v)) | ScalarValue::LargeUtf8(Some(v)) | ScalarValue::Utf8View(Some(v))) => { match contiguous_scalar { None => contiguous_scalar = Some(v.to_string()), Some(mut pre) => { From 63fd5c6cad06dd37f5a4e307b8a337bef4c56a12 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 11:05:41 -0500 Subject: [PATCH 04/21] revert type sig, the problem was outside the actual signature xD --- datafusion/functions/src/string/concat_ws.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 94d7422023f8..848c48803537 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -28,7 +28,6 @@ use datafusion_common::cast::{as_string_array, as_string_view_array}; use datafusion_common::{exec_err, internal_err, plan_err, Result, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; -use datafusion_expr::TypeSignature::Variadic; use datafusion_expr::{lit, ColumnarValue, Expr, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -47,13 +46,9 @@ impl ConcatWsFunc { pub fn new() -> Self { use DataType::*; Self { - signature: Signature::one_of( - vec![ - Variadic(vec![Utf8View]), - Variadic(vec![Utf8, LargeUtf8]), - Variadic(vec![Utf8View, Utf8, LargeUtf8]), - ], - Volatility::Volatile, + signature: Signature::variadic( + vec![Utf8View, Utf8, LargeUtf8], + Volatility::Immutable ), } } From 92fbcfa786a00748c012036eb24fca973fbeef16 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 11:11:38 -0500 Subject: [PATCH 05/21] fmt and clippy --- datafusion/functions/src/string/concat_ws.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 848c48803537..b29ec8d044b2 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -48,7 +48,7 @@ impl ConcatWsFunc { Self { signature: Signature::variadic( vec![Utf8View, Utf8, LargeUtf8], - Volatility::Immutable + Volatility::Immutable, ), } } @@ -356,7 +356,9 @@ impl ScalarUDFImpl for ConcatWsFunc { fn simplify_concat_ws(delimiter: &Expr, args: &[Expr]) -> Result { match delimiter { Expr::Literal( - ScalarValue::Utf8(delimiter) | ScalarValue::LargeUtf8(delimiter) | ScalarValue::Utf8View(delimiter), + ScalarValue::Utf8(delimiter) + | ScalarValue::LargeUtf8(delimiter) + | ScalarValue::Utf8View(delimiter), ) => { match delimiter { // when the delimiter is an empty string, From 1d50a6292eb7fe1cee720e33135782f81c515dae Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 11:27:11 -0500 Subject: [PATCH 06/21] add utf8view --- datafusion/functions/src/string/concat_ws.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index b29ec8d044b2..417e3bd9c373 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -116,10 +116,18 @@ impl ScalarUDFImpl for ConcatWsFunc { // Scalar if array_len.is_none() { let sep = match &args[0] { - ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s, + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) + | ColumnarValue::Scalar(ScalarValue::Utf8View(s)) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(s)) => s, ColumnarValue::Scalar(ScalarValue::Utf8(None)) => { return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))); } + ColumnarValue::Scalar(ScalarValue::Utf8View(None)) => { + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(None))); + } + ColumnarValue::Scalar(ScalarValue::LargeUtf8(None)) => { + return Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(None))); + } _ => unreachable!(), }; From 31576e35e07d459835b437a2eb8651841961ff76 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 11:33:54 -0500 Subject: [PATCH 07/21] fix match --- datafusion/functions/src/string/concat_ws.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 417e3bd9c373..685a02299fee 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -117,8 +117,8 @@ impl ScalarUDFImpl for ConcatWsFunc { if array_len.is_none() { let sep = match &args[0] { ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) - | ColumnarValue::Scalar(ScalarValue::Utf8View(s)) - | ColumnarValue::Scalar(ScalarValue::LargeUtf8(s)) => s, + | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => s, ColumnarValue::Scalar(ScalarValue::Utf8(None)) => { return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))); } From b5f64f3a4666fb92f3e6e4116e89b7858e001a26 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 12:35:33 -0500 Subject: [PATCH 08/21] fix match --- datafusion/functions/src/string/concat_ws.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 685a02299fee..5421e7bc5248 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -47,8 +47,8 @@ impl ConcatWsFunc { use DataType::*; Self { signature: Signature::variadic( - vec![Utf8View, Utf8, LargeUtf8], - Volatility::Immutable, + vec![Utf8, Utf8View, LargeUtf8, Null], + Volatility::Volatile, ), } } @@ -69,7 +69,7 @@ impl ScalarUDFImpl for ConcatWsFunc { fn return_type(&self, arg_types: &[DataType]) -> Result { use DataType::*; - let mut dt = &Utf8; + let mut dt = &Null; arg_types.iter().for_each(|data_type| { if data_type == &Utf8View { dt = data_type; @@ -77,6 +77,9 @@ impl ScalarUDFImpl for ConcatWsFunc { if data_type == &LargeUtf8 && dt != &Utf8View { dt = data_type; } + if data_type == &Utf8 { + dt = data_type; + } }); Ok(dt.to_owned()) @@ -85,7 +88,7 @@ impl ScalarUDFImpl for ConcatWsFunc { /// Concatenates all but the first argument, with separators. The first argument is used as the separator string, and should not be NULL. Other NULL arguments are ignored. /// concat_ws(',', 'abcde', 2, NULL, 22) = 'abcde,2,22' fn invoke(&self, args: &[ColumnarValue]) -> Result { - // do not accept 0 or 1 arguments. + // do not accept 0 arguments. if args.len() < 2 { return exec_err!( "concat_ws was called with {} arguments. It requires at least 2.", From fec232c58bad0614b3e37b2724f0aa99af24750b Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 12:43:24 -0500 Subject: [PATCH 09/21] :thinking: why nulls why --- datafusion/functions/src/string/concat_ws.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 5421e7bc5248..9bfc78ad347e 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -47,8 +47,8 @@ impl ConcatWsFunc { use DataType::*; Self { signature: Signature::variadic( - vec![Utf8, Utf8View, LargeUtf8, Null], - Volatility::Volatile, + vec![Utf8, Utf8View, LargeUtf8], + Volatility::Immutable, ), } } From 5e0c4c61057b547c291699e10386c57f6e6a19cf Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 12:55:44 -0500 Subject: [PATCH 10/21] small fix --- datafusion/functions/src/string/concat_ws.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 9bfc78ad347e..97f80a402f43 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -69,7 +69,7 @@ impl ScalarUDFImpl for ConcatWsFunc { fn return_type(&self, arg_types: &[DataType]) -> Result { use DataType::*; - let mut dt = &Null; + let mut dt = &Utf8; arg_types.iter().for_each(|data_type| { if data_type == &Utf8View { dt = data_type; @@ -77,9 +77,6 @@ impl ScalarUDFImpl for ConcatWsFunc { if data_type == &LargeUtf8 && dt != &Utf8View { dt = data_type; } - if data_type == &Utf8 { - dt = data_type; - } }); Ok(dt.to_owned()) From a660a31aa55f939c4d02cee87003a74b81613a72 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 12:58:21 -0500 Subject: [PATCH 11/21] log ms --- datafusion/functions/src/string/concat_ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 97f80a402f43..33562f473c57 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -247,7 +247,7 @@ impl ScalarUDFImpl for ConcatWsFunc { columns.push(column); }, other => { - return plan_err!("Input was {other} which is not a supported datatype for concat function") + return plan_err!("Input was {other} which is not a supported datatype for concat_ws function.") } }; } From a9e3c4abb042af1f7aac0540ab582da65ffaaa97 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 6 Sep 2024 20:10:22 -0500 Subject: [PATCH 12/21] pushing up -- wip --- datafusion/functions/src/string/concat_ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 33562f473c57..100b8c62d575 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -47,7 +47,7 @@ impl ConcatWsFunc { use DataType::*; Self { signature: Signature::variadic( - vec![Utf8, Utf8View, LargeUtf8], + vec![Utf8View, Utf8, LargeUtf8], Volatility::Immutable, ), } From cc2e03fa5b571b3f9b964f55032d9ba82477b60a Mon Sep 17 00:00:00 2001 From: Devan Date: Sun, 8 Sep 2024 11:45:02 -0500 Subject: [PATCH 13/21] make it so the return type is just Utf8 Signed-off-by: Devan --- .../expr/src/type_coercion/functions.rs | 1 - datafusion/functions/src/string/concat_ws.rs | 145 +++--------------- datafusion/optimizer/src/optimizer.rs | 1 - 3 files changed, 25 insertions(+), 122 deletions(-) diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index d30d202df050..7d6fe227d16c 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -43,7 +43,6 @@ pub fn data_types_with_scalar_udf( func: &ScalarUDF, ) -> Result> { let signature = func.signature(); - if current_types.is_empty() { if signature.type_signature.supports_zero_argument() { return Ok(vec![]); diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 100b8c62d575..77f944658bca 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -47,7 +47,7 @@ impl ConcatWsFunc { use DataType::*; Self { signature: Signature::variadic( - vec![Utf8View, Utf8, LargeUtf8], + vec![Utf8View, LargeUtf8, Utf8], Volatility::Immutable, ), } @@ -67,19 +67,9 @@ impl ScalarUDFImpl for ConcatWsFunc { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { + fn return_type(&self, _arg_types: &[DataType]) -> Result { use DataType::*; - let mut dt = &Utf8; - arg_types.iter().for_each(|data_type| { - if data_type == &Utf8View { - dt = data_type; - } - if data_type == &LargeUtf8 && dt != &Utf8View { - dt = data_type; - } - }); - - Ok(dt.to_owned()) + Ok(Utf8) } /// Concatenates all but the first argument, with separators. The first argument is used as the separator string, and should not be NULL. Other NULL arguments are ignored. @@ -101,33 +91,17 @@ impl ScalarUDFImpl for ConcatWsFunc { }) .next(); - let mut return_datatype = DataType::Utf8; - args.iter().for_each(|col| { - if col.data_type() == DataType::Utf8View { - return_datatype = col.data_type(); - } - if col.data_type() == DataType::LargeUtf8 - && return_datatype != DataType::Utf8View - { - return_datatype = col.data_type(); - } - }); - // Scalar if array_len.is_none() { let sep = match &args[0] { ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => s, - ColumnarValue::Scalar(ScalarValue::Utf8(None)) => { + ColumnarValue::Scalar(ScalarValue::Utf8(None)) + | ColumnarValue::Scalar(ScalarValue::Utf8View(None)) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(None)) => { return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))); } - ColumnarValue::Scalar(ScalarValue::Utf8View(None)) => { - return Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(None))); - } - ColumnarValue::Scalar(ScalarValue::LargeUtf8(None)) => { - return Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(None))); - } _ => unreachable!(), }; @@ -164,15 +138,7 @@ impl ScalarUDFImpl for ConcatWsFunc { } } - return match return_datatype { - DataType::Utf8View => { - Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))) - } - DataType::LargeUtf8 => { - Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))) - } - _ => Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))), - }; + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))); } // Array @@ -255,93 +221,32 @@ impl ScalarUDFImpl for ConcatWsFunc { } } - match return_datatype { - DataType::Utf8 => { - let mut builder = StringArrayBuilder::with_capacity(len, data_size); - for i in 0..len { - if !sep.is_valid(i) { - builder.append_offset(); - continue; - } - - let mut iter = columns.iter(); - for column in iter.by_ref() { - if column.is_valid(i) { - builder.write::(column, i); - break; - } - } - - for column in iter { - if column.is_valid(i) { - builder.write::(&sep, i); - builder.write::(column, i); - } - } - - builder.append_offset(); - } - - Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) + let mut builder = StringArrayBuilder::with_capacity(len, data_size); + for i in 0..len { + if !sep.is_valid(i) { + builder.append_offset(); + continue; } - DataType::Utf8View => { - let mut builder = StringViewArrayBuilder::with_capacity(len, data_size); - for i in 0..len { - if !sep.is_valid(i) { - builder.append_offset(); - continue; - } - - let mut iter = columns.iter(); - for column in iter.by_ref() { - if column.is_valid(i) { - builder.write::(column, i); - break; - } - } - for column in iter { - if column.is_valid(i) { - builder.write::(&sep, i); - builder.write::(column, i); - } - } - - builder.append_offset(); + let mut iter = columns.iter(); + for column in iter.by_ref() { + if column.is_valid(i) { + builder.write::(column, i); + break; } - - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } - DataType::LargeUtf8 => { - let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size); - for i in 0..len { - if !sep.is_valid(i) { - builder.append_offset(); - continue; - } - let mut iter = columns.iter(); - for column in iter.by_ref() { - if column.is_valid(i) { - builder.write::(column, i); - break; - } - } - - for column in iter { - if column.is_valid(i) { - builder.write::(&sep, i); - builder.write::(column, i); - } - } - - builder.append_offset(); + for column in iter { + if column.is_valid(i) { + builder.write::(&sep, i); + builder.write::(column, i); } - - Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) } - _ => unreachable!(), + + builder.append_offset(); } + + Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) } /// Simply the `concat_ws` function by diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 2fc560cceeb4..d43731f0d3b2 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -365,7 +365,6 @@ impl Optimizer { let start_time = Instant::now(); let options = config.options(); let mut new_plan = plan; - let mut previous_plans = HashSet::with_capacity(16); previous_plans.insert(LogicalPlanSignature::new(&new_plan)); From 0d07f25d2107f2497bc7f521c94b45f1911b5ef0 Mon Sep 17 00:00:00 2001 From: Devan Date: Sun, 8 Sep 2024 11:49:37 -0500 Subject: [PATCH 14/21] fmt Signed-off-by: Devan --- .../expr/src/type_coercion/functions.rs | 1 + datafusion/functions/src/string/concat_ws.rs | 28 +------------------ datafusion/optimizer/src/optimizer.rs | 1 + 3 files changed, 3 insertions(+), 27 deletions(-) diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 7d6fe227d16c..d30d202df050 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -43,6 +43,7 @@ pub fn data_types_with_scalar_udf( func: &ScalarUDF, ) -> Result> { let signature = func.signature(); + if current_types.is_empty() { if signature.type_signature.supports_zero_argument() { return Ok(vec![]); diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 77f944658bca..7cc6ebbf142e 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -352,7 +352,7 @@ mod tests { use std::sync::Arc; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow::datatypes::DataType::{LargeUtf8, Utf8, Utf8View}; + use arrow::datatypes::DataType::Utf8; use crate::string::concat_ws::ConcatWsFunc; use datafusion_common::Result; @@ -413,32 +413,6 @@ mod tests { Utf8, StringArray ); - test_function!( - ConcatWsFunc::new(), - &[ - ColumnarValue::Scalar(ScalarValue::from("|")), - ColumnarValue::Scalar(ScalarValue::from("aa")), - ColumnarValue::Scalar(ScalarValue::LargeUtf8(None)), - ColumnarValue::Scalar(ScalarValue::from("cc")), - ], - Ok(Some("aa|cc")), - &str, - LargeUtf8, - StringArray - ); - test_function!( - ConcatWsFunc::new(), - &[ - ColumnarValue::Scalar(ScalarValue::from("|")), - ColumnarValue::Scalar(ScalarValue::from("aa")), - ColumnarValue::Scalar(ScalarValue::Utf8View(None)), - ColumnarValue::Scalar(ScalarValue::from("cc")), - ], - Ok(Some("aa|cc")), - &str, - Utf8View, - StringArray - ); Ok(()) } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d43731f0d3b2..1ffc9a18a920 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -366,6 +366,7 @@ impl Optimizer { let options = config.options(); let mut new_plan = plan; let mut previous_plans = HashSet::with_capacity(16); + previous_plans.insert(LogicalPlanSignature::new(&new_plan)); let mut i = 0; From b6967f0713e8dd51463177f8f884843a321ec885 Mon Sep 17 00:00:00 2001 From: Devan Date: Sun, 8 Sep 2024 11:50:29 -0500 Subject: [PATCH 15/21] fmt Signed-off-by: Devan --- datafusion/optimizer/src/optimizer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 1ffc9a18a920..f3d4e1c095a4 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -365,8 +365,8 @@ impl Optimizer { let start_time = Instant::now(); let options = config.options(); let mut new_plan = plan; + let mut previous_plans = HashSet::with_capacity(16); - previous_plans.insert(LogicalPlanSignature::new(&new_plan)); let mut i = 0; From f256b45f1436ecdd1c3aa694fa8ceeee57020348 Mon Sep 17 00:00:00 2001 From: Devan Date: Sun, 8 Sep 2024 11:50:58 -0500 Subject: [PATCH 16/21] fmt Signed-off-by: Devan --- datafusion/optimizer/src/optimizer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index f3d4e1c095a4..d43731f0d3b2 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -365,7 +365,6 @@ impl Optimizer { let start_time = Instant::now(); let options = config.options(); let mut new_plan = plan; - let mut previous_plans = HashSet::with_capacity(16); previous_plans.insert(LogicalPlanSignature::new(&new_plan)); From c7d35e824a44811ab646beafc2c4d1a00ec81872 Mon Sep 17 00:00:00 2001 From: Devan Date: Sun, 8 Sep 2024 11:51:52 -0500 Subject: [PATCH 17/21] fmt Signed-off-by: Devan --- datafusion/optimizer/src/optimizer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d43731f0d3b2..f3d4e1c095a4 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -365,6 +365,7 @@ impl Optimizer { let start_time = Instant::now(); let options = config.options(); let mut new_plan = plan; + let mut previous_plans = HashSet::with_capacity(16); previous_plans.insert(LogicalPlanSignature::new(&new_plan)); From 2926d0b5903f45db2bf92ae4fa178740f24ff5dc Mon Sep 17 00:00:00 2001 From: Devan Date: Sun, 8 Sep 2024 11:52:06 -0500 Subject: [PATCH 18/21] fmt Signed-off-by: Devan --- datafusion/optimizer/src/optimizer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index f3d4e1c095a4..2fc560cceeb4 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -365,7 +365,7 @@ impl Optimizer { let start_time = Instant::now(); let options = config.options(); let mut new_plan = plan; - + let mut previous_plans = HashSet::with_capacity(16); previous_plans.insert(LogicalPlanSignature::new(&new_plan)); From bc32c904f3be289e4994a11ca5eda076ca67d114 Mon Sep 17 00:00:00 2001 From: Devan Date: Sun, 8 Sep 2024 12:05:00 -0500 Subject: [PATCH 19/21] order matters Signed-off-by: Devan --- datafusion/functions/src/string/concat_ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 7cc6ebbf142e..3d2859af8294 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -47,7 +47,7 @@ impl ConcatWsFunc { use DataType::*; Self { signature: Signature::variadic( - vec![Utf8View, LargeUtf8, Utf8], + vec![Utf8View, Utf8, LargeUtf8], Volatility::Immutable, ), } From 66a576df61501a97125c96441ad2e1deecdcd552 Mon Sep 17 00:00:00 2001 From: Devan Date: Mon, 9 Sep 2024 16:36:55 -0500 Subject: [PATCH 20/21] sum inner buffer for stringviewarray data_size --- datafusion/functions/src/string/concat_ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 3d2859af8294..0e4c6db84053 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -204,7 +204,7 @@ impl ScalarUDFImpl for ConcatWsFunc { DataType::Utf8View => { let string_array = as_string_view_array(array)?; - data_size += string_array.len(); + data_size += string_array.data_buffers().iter().map(|buf| buf.len()).sum(); let column = if array.is_nullable() { ColumnarValueRef::NullableStringViewArray(string_array) } else { From cef69b783cb26c8b7981c4e4a5eb218d4fa75363 Mon Sep 17 00:00:00 2001 From: Devan Date: Mon, 9 Sep 2024 16:38:08 -0500 Subject: [PATCH 21/21] need turbo fish --- datafusion/functions/src/string/concat_ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index 0e4c6db84053..1134c525cfca 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -204,7 +204,7 @@ impl ScalarUDFImpl for ConcatWsFunc { DataType::Utf8View => { let string_array = as_string_view_array(array)?; - data_size += string_array.data_buffers().iter().map(|buf| buf.len()).sum(); + data_size += string_array.data_buffers().iter().map(|buf| buf.len()).sum::(); let column = if array.is_nullable() { ColumnarValueRef::NullableStringViewArray(string_array) } else {