Skip to content

Commit

Permalink
Support array flatten sql function (#7239)
Browse files Browse the repository at this point in the history
* Support array flatten sql function

Signed-off-by: jayzhan211 <[email protected]>

* add null and float

Signed-off-by: jayzhan211 <[email protected]>

* add alias, 1d test and docs

Signed-off-by: jayzhan211 <[email protected]>

* pretty

Signed-off-by: jayzhan211 <[email protected]>

* rename

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Aug 9, 2023
1 parent 026c1d0 commit ee59dcc
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 2 deletions.
34 changes: 34 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ AS VALUES
(NULL, NULL, NULL, NULL)
;

statement ok
CREATE TABLE flatten_table
AS VALUES
(make_array([1], [2], [3]), make_array([[1, 2, 3]], [[4, 5]], [[6]]), make_array([[[1]]], [[[2, 3]]]), make_array([1.0], [2.1, 2.2], [3.2, 3.3, 3.4])),
(make_array([1, 2], [3, 4], [5, 6]), make_array([[8]]), make_array([[[1,2]]], [[[3]]]), make_array([1.0, 2.0], [3.0, 4.0], [5.0, 6.0]))
;

statement ok
CREATE TABLE array_has_table_1D
AS VALUES
Expand Down Expand Up @@ -2330,6 +2337,30 @@ select array_concat(column1, [7]) from arrays_values_v2;
[11, 12, 7]
[7]

# flatten
query ???
select flatten(make_array(1, 2, 1, 3, 2)),
flatten(make_array([1], [2, 3], [null], make_array(4, null, 5))),
flatten(make_array([[1.1]], [[2.2]], [[3.3], [4.4]]));
----
[1, 2, 1, 3, 2] [1, 2, 3, , 4, , 5] [1.1, 2.2, 3.3, 4.4]

query ????
select column1, column2, column3, column4 from flatten_table;
----
[[1], [2], [3]] [[[1, 2, 3]], [[4, 5]], [[6]]] [[[[1]]], [[[2, 3]]]] [[1.0], [2.1, 2.2], [3.2, 3.3, 3.4]]
[[1, 2], [3, 4], [5, 6]] [[[8]]] [[[[1, 2]]], [[[3]]]] [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]

query ????
select flatten(column1),
flatten(column2),
flatten(column3),
flatten(column4)
from flatten_table;
----
[1, 2, 3] [1, 2, 3, 4, 5, 6] [1, 2, 3] [1.0, 2.1, 2.2, 3.2, 3.3, 3.4]
[1, 2, 3, 4, 5, 6] [8] [1, 2, 3] [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]

### Delete tables

statement ok
Expand Down Expand Up @@ -2382,3 +2413,6 @@ drop table arrays_with_repeating_elements;

statement ok
drop table nested_arrays_with_repeating_elements;

statement ok
drop table flatten_table;
25 changes: 23 additions & 2 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub enum BuiltinScalarFunction {
Cardinality,
/// construct an array from columns
MakeArray,
/// Flatten
Flatten,

// struct functions
/// struct
Expand Down Expand Up @@ -368,6 +370,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplace => Volatility::Immutable,
BuiltinScalarFunction::ArrayReplaceN => Volatility::Immutable,
BuiltinScalarFunction::ArrayReplaceAll => Volatility::Immutable,
BuiltinScalarFunction::Flatten => Volatility::Immutable,
BuiltinScalarFunction::ArraySlice => Volatility::Immutable,
BuiltinScalarFunction::ArrayToString => Volatility::Immutable,
BuiltinScalarFunction::Cardinality => Volatility::Immutable,
Expand Down Expand Up @@ -501,6 +504,22 @@ impl BuiltinScalarFunction {
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match self {
BuiltinScalarFunction::Flatten => {
fn get_base_type(data_type: &DataType) -> Result<DataType> {
match data_type {
DataType::List(field) => match field.data_type() {
DataType::List(_) => get_base_type(field.data_type()),
_ => Ok(data_type.to_owned()),
},
_ => Err(DataFusionError::Internal(
"Not reachable, data_type should be List".to_string(),
)),
}
}

let data_type = get_base_type(&input_expr_types[0])?;
Ok(data_type)
}
BuiltinScalarFunction::ArrayAppend => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayConcat => {
let mut expr_type = Null;
Expand Down Expand Up @@ -819,11 +838,12 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayConcat => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayDims => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayElement => Signature::any(2, self.volatility()),
BuiltinScalarFunction::Flatten => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayHasAll
| BuiltinScalarFunction::ArrayHasAny
| BuiltinScalarFunction::ArrayHas => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayDims => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayElement => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayLength => {
Signature::variadic_any(self.volatility())
}
Expand Down Expand Up @@ -1307,6 +1327,7 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] {
"list_element",
"list_extract",
],
BuiltinScalarFunction::Flatten => &["flatten"],
BuiltinScalarFunction::ArrayHasAll => &["array_has_all", "list_has_all"],
BuiltinScalarFunction::ArrayHasAny => &["array_has_any", "list_has_any"],
BuiltinScalarFunction::ArrayHas => {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,12 @@ scalar_expr!(
first_array second_array,
"Returns true if at least one element of the second array appears in the first array; otherwise, it returns false."
);
scalar_expr!(
Flatten,
flatten,
array,
"flattens an array of arrays into a single array."
);
scalar_expr!(
ArrayDims,
array_dims,
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl ExprSchemable for Expr {
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;

fun.return_type(&data_types)
}
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
Expand Down
47 changes: 47 additions & 0 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,53 @@ pub fn cardinality(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(result) as ArrayRef)
}

// Create new offsets that are euqiavlent to `flatten` the array.
fn get_offsets_for_flatten(
offsets: OffsetBuffer<i32>,
indexes: OffsetBuffer<i32>,
) -> OffsetBuffer<i32> {
let buffer = offsets.into_inner();
let offsets: Vec<i32> = indexes.iter().map(|i| buffer[*i as usize]).collect();
OffsetBuffer::new(offsets.into())
}

fn flatten_internal(
array: &dyn Array,
indexes: Option<OffsetBuffer<i32>>,
) -> Result<ListArray> {
let list_arr = as_list_array(array)?;
let (field, offsets, values, nulls) = list_arr.clone().into_parts();
let data_type = field.data_type();

match data_type {
// Recursively get the base offsets for flattened array
DataType::List(_) => {
if let Some(indexes) = indexes {
let offsets = get_offsets_for_flatten(offsets, indexes);
flatten_internal(&values, Some(offsets))
} else {
flatten_internal(&values, Some(offsets))
}
}
// Reach the base level, create a new list array
_ => {
if let Some(indexes) = indexes {
let offsets = get_offsets_for_flatten(offsets, indexes);
let list_arr = ListArray::new(field, offsets, values, nulls);
Ok(list_arr)
} else {
Ok(list_arr.clone())
}
}
}
}

/// Flatten SQL function
pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {
let flattened_array = flatten_internal(&args[0], None)?;
Ok(Arc::new(flattened_array) as ArrayRef)
}

/// Array_length SQL function
pub fn array_length(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayLength => {
Arc::new(|args| make_scalar_function(array_expressions::array_length)(args))
}
BuiltinScalarFunction::Flatten => {
Arc::new(|args| make_scalar_function(array_expressions::flatten)(args))
}

BuiltinScalarFunction::ArrayNdims => {
Arc::new(|args| make_scalar_function(array_expressions::array_ndims)(args))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ enum ScalarFunction {
ArrayRemoveAll = 109;
ArrayReplaceAll = 110;
Nanvl = 111;
Flatten = 112;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::ArrayHas => Self::ArrayHas,
ScalarFunction::ArrayDims => Self::ArrayDims,
ScalarFunction::ArrayElement => Self::ArrayElement,
ScalarFunction::Flatten => Self::Flatten,
ScalarFunction::ArrayLength => Self::ArrayLength,
ScalarFunction::ArrayNdims => Self::ArrayNdims,
ScalarFunction::ArrayPosition => Self::ArrayPosition,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::ArrayHas => Self::ArrayHas,
BuiltinScalarFunction::ArrayDims => Self::ArrayDims,
BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
BuiltinScalarFunction::Flatten => Self::Flatten,
BuiltinScalarFunction::ArrayLength => Self::ArrayLength,
BuiltinScalarFunction::ArrayNdims => Self::ArrayNdims,
BuiltinScalarFunction::ArrayPosition => Self::ArrayPosition,
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ Unlike to some databases the math functions in Datafusion works the same way as
| array_has_any(array, sub-array) | Returns true if any elements exist in both arrays `array_has_any([1,2,3], [1,4]) -> true` |
| array_dims(array) | Returns an array of the array's dimensions. `array_dims([[1, 2, 3], [4, 5, 6]]) -> [2, 3]` |
| array_element(array, index) | Extracts the element with the index n from the array `array_element([1, 2, 3, 4], 3) -> 3` |
| flatten(array) | Converts an array of arrays to a flat array `flatten([[1], [2, 3], [4, 5, 6]]) -> [1, 2, 3, 4, 5, 6]` |
| array_length(array, dimension) | Returns the length of the array dimension. `array_length([1, 2, 3, 4, 5]) -> 5` |
| array_ndims(array) | Returns the number of dimensions of the array. `array_ndims([[1, 2, 3], [4, 5, 6]]) -> 2` |
| array_position(array, element) | Searches for an element in the array, returns first occurrence. `array_position([1, 2, 2, 3, 4], 2) -> 2` |
Expand Down
18 changes: 18 additions & 0 deletions docs/source/user-guide/sql/scalar_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,24 @@ array_fill(element, array)
Can be a constant, column, or function, and any combination of array operators.
- **element**: Element to copy to the array.

### `flatten`

Converts an array of arrays to a flat array

- Applies to any depth of nested arrays
- Does not change arrays that are already flat

The flattened array contains all the elements from all source arrays.

#### Arguments

- **array**: Array expression
Can be a constant, column, or function, and any combination of array operators.

```
flatten(array)
```

### `array_indexof`

_Alias of [array_position](#array_position)._
Expand Down

0 comments on commit ee59dcc

Please sign in to comment.