Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 70 additions & 11 deletions datafusion/functions-nested/src/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
use crate::utils::make_scalar_function;
use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::ArrowNativeType;
use arrow::datatypes::{
DataType,
DataType::{FixedSizeList, LargeList, List, Null},
};
use arrow::error::ArrowError;
use datafusion_common::cast::{as_large_list_array, as_list_array};
use datafusion_common::{exec_err, utils::take_function_args, Result};
use datafusion_expr::{
Expand Down Expand Up @@ -95,7 +97,9 @@ impl ScalarUDFImpl for Flatten {
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let data_type = match &arg_types[0] {
List(field) => match field.data_type() {
List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
List(field) | LargeList(field) | FixedSizeList(field, _) => {
List(Arc::clone(field))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this only supports arrays that can be converted from LargeList to List.

}
_ => arg_types[0].clone(),
},
LargeList(field) => match field.data_type() {
Expand Down Expand Up @@ -154,7 +158,32 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(flattened_array) as ArrayRef)
}
LargeList(_) => {
exec_err!("flatten does not support type '{:?}'", array.data_type())?
let (inner_field, inner_offsets, inner_values, _) =
as_large_list_array(&values)?.clone().into_parts();
// Try to downcast the inner offsets to i32
match downcast_i64_inner_to_i32(&inner_offsets, &offsets) {
Ok(i32offsets) => {
let flattened_array = GenericListArray::<i32>::new(
inner_field,
i32offsets,
inner_values,
nulls,
);
Ok(Arc::new(flattened_array) as ArrayRef)
}
// If downcast fails we keep the offsets as is
Err(_) => {
// Fallback: keep i64 offsets → LargeList<i64>
let i64offsets = keep_offsets_i64(inner_offsets, offsets);
let flattened_array = GenericListArray::<i64>::new(
inner_field,
i64offsets,
inner_values,
nulls,
);
Ok(Arc::new(flattened_array) as ArrayRef)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are trying to downcast the inner array offsets from i64 to i32 and if fail we fallback to i64 offsets.
The fallback is not yet supported by the return_type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make sense to me as it means the return value of the function might not conform to the stated return_type

If return type says the return array will be ListArray for a LargeListArray input, then the invoke function must return that value (or an error)

SO I think we need to either update the implementation and return typetype to return LargeListArray for LargeListArray inputs or else return an error if we can't cast the offsets correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at @Jefffrey's original comment suggesting that we should try to downcast it i32 returning a ListArray and alternatively return a LargeListArray if downcast is not possible.

He also mentioned that upcasting the parent offsets to LargeListArray blindly would be inefficient and undesirable.

I wanted to see if it's possible to infer the possibility of downcasting the inner array offsets within the return_type function. Then return the matching output type.

We could do it by checking if: inner_offsets.last() <= i32::MAX

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I did mention that, however I did make a note that:

though this might be tricky considering return_type() wouldn't know this until execution

I think we make a choice, either always upcast to large if one of the inner lists is a large list, or just error.

I guess the former is more favourable/robust than the latter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, if the consensus is to always upcast to LargeListArray I'll proceed with it.

}
}
}
_ => Ok(Arc::clone(array) as ArrayRef),
}
Expand All @@ -179,7 +208,7 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(flattened_array) as ArrayRef)
}
LargeList(_) => {
let (inner_field, inner_offsets, inner_values, nulls) =
let (inner_field, inner_offsets, inner_values, _) = // _ instead of nulls?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here flattened_array was generated using the inner nulls instead of the outer ones. I figured it should follow the same format and use nulls from the outer array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting; so this might be a bug in current behaviour? Do you think you can mock up a test where it fails on main without this fix?

as_large_list_array(&values)?.clone().into_parts();
let offsets = get_offsets_for_flatten::<i64>(inner_offsets, offsets);
let flattened_array = GenericListArray::<i64>::new(
Expand All @@ -203,11 +232,11 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {

// Create new offsets that are equivalent to `flatten` the array.
fn get_offsets_for_flatten<O: OffsetSizeTrait>(
offsets: OffsetBuffer<O>,
indexes: OffsetBuffer<O>,
inner_offsets: OffsetBuffer<O>,
outer_offsets: OffsetBuffer<O>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little naming change because the previous one was confusing.

) -> OffsetBuffer<O> {
let buffer = offsets.into_inner();
let offsets: Vec<O> = indexes
let buffer = inner_offsets.into_inner();
let offsets: Vec<O> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()])
.collect();
Expand All @@ -216,17 +245,47 @@ fn get_offsets_for_flatten<O: OffsetSizeTrait>(

// Create new large offsets that are equivalent to `flatten` the array.
fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
offsets: OffsetBuffer<O>,
indexes: OffsetBuffer<P>,
inner_offsets: OffsetBuffer<O>,
outer_offsets: OffsetBuffer<P>,
) -> OffsetBuffer<i64> {
let buffer = offsets.into_inner();
let offsets: Vec<i64> = indexes
let buffer = inner_offsets.into_inner();
let offsets: Vec<i64> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
.collect();
OffsetBuffer::new(offsets.into())
}

// Function for converting LargeList offsets into List offsets
fn downcast_i64_inner_to_i32(
inner_offsets: &OffsetBuffer<i64>,
outer_offsets: &OffsetBuffer<i32>,
) -> Result<OffsetBuffer<i32>, ArrowError> {
let buffer = inner_offsets.clone().into_inner();
let offsets: Result<Vec<i32>, _> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()])
.map(|i| {
i32::try_from(i)
.map_err(|_| ArrowError::CastError(format!("Cannot downcast offset {i}")))
})
.collect();
Ok(OffsetBuffer::new(offsets?.into()))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is trying to downcast offsets to i32.

  • On success it returns an OffsetBuffer<i32>
  • If it fails it errors out.


// In case the conversion fails we convert the outer offsets into i64
fn keep_offsets_i64(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function takes the outer i32 and inner i64 offsets and keeps the inner i64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should be renamed (and docstring adjusted) now? keep_offsets_i64 is a bit confusing to read for me

inner_offsets: OffsetBuffer<i64>,
outer_offsets: OffsetBuffer<i32>,
) -> OffsetBuffer<i64> {
let buffer = inner_offsets.into_inner();
let offsets: Vec<i64> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()])
.collect();
OffsetBuffer::new(offsets.into())
}

fn cast_fsl_to_list(array: ArrayRef) -> Result<ArrayRef> {
match array.data_type() {
FixedSizeList(field, _) => {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7737,6 +7737,12 @@ select flatten(arrow_cast(make_array(1, 2, 1, 3, 2), 'FixedSizeList(5, Int64)'))
----
[1, 2, 1, 3, 2] [1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]]

query ??
select flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5)), 'FixedSizeList(4, LargeList(Int64))')),
flatten(arrow_cast(make_array([[1.1], [2.2]], [[3.3], [4.4]]), 'List(LargeList(FixedSizeList(1, Float64)))'));
----
[1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should also add a check for the output type via arrow_typeof() to ensure we are indeed getting a large list


# flatten with column values
query ????
select flatten(column1),
Expand Down