Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions datafusion/functions-nested/src/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl ScalarUDFImpl for Flatten {
let data_type = match &arg_types[0] {
List(field) => match field.data_type() {
List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
LargeList(field) => LargeList(Arc::clone(field)),
_ => arg_types[0].clone(),
},
LargeList(field) => match field.data_type() {
Expand Down Expand Up @@ -143,7 +144,8 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
List(_) => {
let (inner_field, inner_offsets, inner_values, _) =
as_list_array(&values)?.clone().into_parts();
let offsets = get_offsets_for_flatten::<i32>(inner_offsets, offsets);
let offsets =
get_offsets_for_flatten::<i32, i32>(inner_offsets, offsets);
let flattened_array = GenericListArray::<i32>::new(
inner_field,
offsets,
Expand All @@ -154,7 +156,17 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(flattened_array) as ArrayRef)
}
LargeList(_) => {
exec_err!("flatten does not support type '{:?}'", array.data_type())?
let (inner_field, inner_offsets, inner_values, _) =
as_large_list_array(&values)?.clone().into_parts();
let offsets =
get_offsets_for_flatten::<i64, i32>(inner_offsets, offsets);
let flattened_array = GenericListArray::<i64>::new(
inner_field,
offsets,
inner_values,
nulls,
);
Ok(Arc::new(flattened_array) as ArrayRef)
}
_ => Ok(Arc::clone(array) as ArrayRef),
}
Expand All @@ -179,9 +191,10 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(flattened_array) as ArrayRef)
}
LargeList(_) => {
let (inner_field, inner_offsets, inner_values, nulls) =
let (inner_field, inner_offsets, inner_values, _) =
as_large_list_array(&values)?.clone().into_parts();
let offsets = get_offsets_for_flatten::<i64>(inner_offsets, offsets);
let offsets =
get_offsets_for_flatten::<i64, i64>(inner_offsets, offsets);
let flattened_array = GenericListArray::<i64>::new(
inner_field,
offsets,
Expand All @@ -202,12 +215,12 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
}

// Create new offsets that are equivalent to `flatten` the array.
fn get_offsets_for_flatten<O: OffsetSizeTrait>(
offsets: OffsetBuffer<O>,
indexes: OffsetBuffer<O>,
fn get_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
inner_offsets: OffsetBuffer<O>,
outer_offsets: OffsetBuffer<P>,
) -> OffsetBuffer<O> {
let buffer = offsets.into_inner();
let offsets: Vec<O> = indexes
let buffer = inner_offsets.into_inner();
let offsets: Vec<O> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()])
.collect();
Expand All @@ -216,11 +229,11 @@ fn get_offsets_for_flatten<O: OffsetSizeTrait>(

// Create new large offsets that are equivalent to `flatten` the array.
fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
offsets: OffsetBuffer<O>,
indexes: OffsetBuffer<P>,
inner_offsets: OffsetBuffer<O>,
outer_offsets: OffsetBuffer<P>,
) -> OffsetBuffer<i64> {
let buffer = offsets.into_inner();
let offsets: Vec<i64> = indexes
let buffer = inner_offsets.into_inner();
let offsets: Vec<i64> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
.collect();
Expand Down
12 changes: 10 additions & 2 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7757,10 +7757,10 @@ select flatten(make_array(1, 2, 1, 3, 2)),

query ???
select flatten(arrow_cast(make_array(1, 2, 1, 3, 2), 'LargeList(Int64)')),
flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5)), 'LargeList(LargeList(Int64))')),
flatten(arrow_cast(make_array([1], null, [2, 3], [null], make_array(4, null, 5)), 'LargeList(LargeList(Int64))')),
flatten(arrow_cast(make_array([[1.1]], [[2.2]], [[3.3], [4.4]]), 'LargeList(LargeList(LargeList(Float64)))'));
----
[1, 2, 1, 3, 2] [1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]]
[1, 2, 1, 3, 2] [1, NULL, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]]

query ???
select flatten(arrow_cast(make_array(1, 2, 1, 3, 2), 'FixedSizeList(5, Int64)')),
Expand All @@ -7769,6 +7769,14 @@ select flatten(arrow_cast(make_array(1, 2, 1, 3, 2), 'FixedSizeList(5, Int64)'))
----
[1, 2, 1, 3, 2] [1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]]

query ??TT
select flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5)), 'FixedSizeList(4, LargeList(Int64))')),
flatten(arrow_cast(make_array([[1.1], [2.2]], [[3.3], [4.4]]), 'List(LargeList(FixedSizeList(1, Float64)))')),
arrow_typeof(flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5)), 'FixedSizeList(4, LargeList(Int64))'))),
arrow_typeof(flatten(arrow_cast(make_array([[1.1], [2.2]], [[3.3], [4.4]]), 'List(LargeList(FixedSizeList(1, Float64)))')));
----
[1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]] LargeList(nullable Int64) LargeList(nullable FixedSizeList(1 x nullable Float64))

# flatten with column values
query ????
select flatten(column1),
Expand Down
Loading