Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions datafusion/functions-nested/src/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.

//! [`ScalarUDFImpl`] definitions for array_except function.
//! [`ScalarUDFImpl`] definition for array_except function.

use crate::utils::{check_datatypes, make_scalar_function};
use arrow::array::new_null_array;
use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray};
use arrow::buffer::OffsetBuffer;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{DataType, FieldRef};
use arrow::row::{RowConverter, SortField};
use datafusion_common::utils::{ListCoercion, take_function_args};
Expand All @@ -28,6 +29,7 @@ use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
use datafusion_macros::user_doc;
use itertools::Itertools;
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -104,8 +106,11 @@ impl ScalarUDFImpl for ArrayExcept {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match (&arg_types[0].clone(), &arg_types[1].clone()) {
(DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()),
match (&arg_types[0], &arg_types[1]) {
(DataType::Null, DataType::Null) => {
Ok(DataType::new_list(DataType::Null, true))
}
(DataType::Null, dt) | (dt, DataType::Null) => Ok(dt.clone()),
(dt, _) => Ok(dt.clone()),
}
}
Expand All @@ -129,8 +134,16 @@ impl ScalarUDFImpl for ArrayExcept {
fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let [array1, array2] = take_function_args("array_except", args)?;

let len = array1.len();
match (array1.data_type(), array2.data_type()) {
(DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
(DataType::Null, DataType::Null) => Ok(new_null_array(
&DataType::new_list(DataType::Null, true),
len,
)),
(DataType::Null, dt @ DataType::List(_))
| (DataType::Null, dt @ DataType::LargeList(_))
| (dt @ DataType::List(_), DataType::Null)
| (dt @ DataType::LargeList(_), DataType::Null) => Ok(new_null_array(dt, len)),
(DataType::List(field), DataType::List(_)) => {
check_datatypes("array_except", &[array1, array2])?;
let list1 = array1.as_list::<i32>();
Expand Down Expand Up @@ -169,14 +182,25 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
let mut rows = Vec::with_capacity(l_values.num_rows());
let mut dedup = HashSet::new();

for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
for i in r_slice {
let nulls = NullBuffer::union(l.nulls(), r.nulls());

for (i, ((l_start, l_end), (r_start, r_end))) in l
.offsets()
.iter()
.tuple_windows()
.zip(r.offsets().iter().tuple_windows())
.enumerate()
{
if nulls.as_ref().is_some_and(|nulls| nulls.is_null(i)) {
offsets.push(OffsetSize::usize_as(rows.len()));
continue;
}

for i in r_start.as_usize()..r_end.as_usize() {
let right_row = r_values.row(i);
dedup.insert(right_row);
}
for i in l_slice {
for i in l_start.as_usize()..l_end.as_usize() {
let left_row = l_values.row(i);
if dedup.insert(left_row) {
rows.push(left_row);
Expand All @@ -192,7 +216,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
field.to_owned(),
OffsetBuffer::new(offsets.into()),
values.to_owned(),
l.nulls().cloned(),
nulls,
))
} else {
internal_err!("array_except failed to convert rows")
Expand Down
109 changes: 27 additions & 82 deletions datafusion/functions-nested/src/set_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

use crate::utils::make_scalar_function;
use arrow::array::{
Array, ArrayRef, GenericListArray, LargeListArray, ListArray, OffsetSizeTrait,
new_null_array,
Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array,
};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute;
Expand Down Expand Up @@ -69,7 +68,7 @@ make_udf_expr_and_func!(

#[user_doc(
doc_section(label = "Array Functions"),
description = "Returns an array of elements that are present in both arrays (all elements from both arrays) with out duplicates.",
description = "Returns an array of elements that are present in both arrays (all elements from both arrays) without duplicates.",
Comment on lines 69 to +71
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Documentation describes intersection semantics instead of union.

The description "Returns an array of elements that are present in both arrays" describes intersection behavior, but this is array_union. The parenthetical "(all elements from both arrays)" is correct, but the initial phrase is misleading.

📝 Suggested fix
-    description = "Returns an array of elements that are present in both arrays (all elements from both arrays) without duplicates.",
+    description = "Returns an array of all unique elements from both arrays without duplicates.",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[user_doc(
doc_section(label = "Array Functions"),
description = "Returns an array of elements that are present in both arrays (all elements from both arrays) with out duplicates.",
description = "Returns an array of elements that are present in both arrays (all elements from both arrays) without duplicates.",
#[user_doc(
doc_section(label = "Array Functions"),
description = "Returns an array of all unique elements from both arrays without duplicates.",
🤖 Prompt for AI Agents
In @datafusion/functions-nested/src/set_ops.rs around lines 69 - 71, The
docstring for array_union incorrectly describes intersection; update the
user_doc description on the array_union function to reflect union semantics by
replacing "Returns an array of elements that are present in both arrays" with
wording like "Returns an array of elements that are present in either array (all
elements from both arrays) without duplicates," ensuring the parenthetical note
remains and the description unambiguously states union behavior.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback:The CodeRabbit AI reviewer is correct! The documentation for array_union() is a bit confusing and it could be improved to not sound like it is about interception.

syntax_example = "array_union(array1, array2)",
sql_example = r#"```sql
> select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
Expand Down Expand Up @@ -136,8 +135,7 @@ impl ScalarUDFImpl for ArrayUnion {
let [array1, array2] = take_function_args(self.name(), arg_types)?;
match (array1, array2) {
(Null, Null) => Ok(DataType::new_list(Null, true)),
(Null, dt) => Ok(dt.clone()),
(dt, Null) => Ok(dt.clone()),
(Null, dt) | (dt, Null) => Ok(dt.clone()),
(dt, _) => Ok(dt.clone()),
}
}
Expand Down Expand Up @@ -221,8 +219,7 @@ impl ScalarUDFImpl for ArrayIntersect {
let [array1, array2] = take_function_args(self.name(), arg_types)?;
match (array1, array2) {
(Null, Null) => Ok(DataType::new_list(Null, true)),
(Null, dt) => Ok(dt.clone()),
(dt, Null) => Ok(dt.clone()),
(Null, dt) | (dt, Null) => Ok(dt.clone()),
(dt, _) => Ok(dt.clone()),
}
}
Expand Down Expand Up @@ -363,23 +360,19 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(

let mut offsets = vec![OffsetSize::usize_as(0)];
let mut new_arrays = vec![];
let mut new_null_buf = vec![];
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
for (first_arr, second_arr) in l.iter().zip(r.iter()) {
let mut ele_should_be_null = false;
for (l_arr, r_arr) in l.iter().zip(r.iter()) {
let last_offset = *offsets.last().unwrap();

let l_values = if let Some(first_arr) = first_arr {
converter.convert_columns(&[first_arr])?
} else {
ele_should_be_null = true;
converter.empty_rows(0, 0)
};

let r_values = if let Some(second_arr) = second_arr {
converter.convert_columns(&[second_arr])?
} else {
ele_should_be_null = true;
converter.empty_rows(0, 0)
let (l_values, r_values) = match (l_arr, r_arr) {
(Some(l_arr), Some(r_arr)) => (
converter.convert_columns(&[l_arr])?,
converter.convert_columns(&[r_arr])?,
),
_ => {
offsets.push(last_offset);
continue;
}
};

let l_iter = l_values.iter().sorted().dedup();
Expand All @@ -405,11 +398,6 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
}
}

let last_offset = match offsets.last() {
Some(offset) => *offset,
None => return internal_err!("offsets should not be empty"),
};

offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.first() {
Expand All @@ -419,18 +407,21 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
}
};

new_null_buf.push(!ele_should_be_null);
new_arrays.push(array);
}

let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect();
let values = compute::concat(&new_arrays_ref)?;
let values = if new_arrays_ref.is_empty() {
new_empty_array(&l.value_type())
} else {
compute::concat(&new_arrays_ref)?
};
let arr = GenericListArray::<OffsetSize>::try_new(
field,
offsets,
values,
Some(NullBuffer::new(new_null_buf.into())),
NullBuffer::union(l.nulls(), r.nulls()),
)?;
Ok(Arc::new(arr))
}
Expand All @@ -440,59 +431,13 @@ fn general_set_op(
array2: &ArrayRef,
set_op: SetOp,
) -> Result<ArrayRef> {
fn empty_array(data_type: &DataType, len: usize, large: bool) -> Result<ArrayRef> {
let field = Arc::new(Field::new_list_field(data_type.clone(), true));
let values = new_null_array(data_type, len);
if large {
Ok(Arc::new(LargeListArray::try_new(
field,
OffsetBuffer::new_zeroed(len),
values,
None,
)?))
} else {
Ok(Arc::new(ListArray::try_new(
field,
OffsetBuffer::new_zeroed(len),
values,
None,
)?))
}
}

let len = array1.len();
match (array1.data_type(), array2.data_type()) {
(Null, Null) => Ok(Arc::new(ListArray::new_null(
Arc::new(Field::new_list_field(Null, true)),
array1.len(),
))),
(Null, List(field)) => {
if set_op == SetOp::Intersect {
return empty_array(field.data_type(), array1.len(), false);
}
let array = as_list_array(&array2)?;
general_array_distinct::<i32>(array, field)
}
(List(field), Null) => {
if set_op == SetOp::Intersect {
return empty_array(field.data_type(), array1.len(), false);
}
let array = as_list_array(&array1)?;
general_array_distinct::<i32>(array, field)
}
(Null, LargeList(field)) => {
if set_op == SetOp::Intersect {
return empty_array(field.data_type(), array1.len(), true);
}
let array = as_large_list_array(&array2)?;
general_array_distinct::<i64>(array, field)
}
(LargeList(field), Null) => {
if set_op == SetOp::Intersect {
return empty_array(field.data_type(), array1.len(), true);
}
let array = as_large_list_array(&array1)?;
general_array_distinct::<i64>(array, field)
}
(Null, Null) => Ok(new_null_array(&DataType::new_list(Null, true), len)),
(Null, dt @ List(_))
| (Null, dt @ LargeList(_))
| (dt @ List(_), Null)
| (dt @ LargeList(_), Null) => Ok(new_null_array(dt, len)),
(List(field), List(_)) => {
let array1 = as_list_array(&array1)?;
let array2 = as_list_array(&array2)?;
Expand Down
Loading