-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: support array_compact builtin function
#21522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
08c5098
c0399a3
d280c88
5075c46
1cf1021
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! [`ScalarUDFImpl`] definitions for array_compact function. | ||
|
|
||
| use crate::utils::make_scalar_function; | ||
| use arrow::array::{ | ||
| Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullBufferBuilder, | ||
| OffsetSizeTrait, make_array, | ||
| }; | ||
| use arrow::buffer::OffsetBuffer; | ||
| use arrow::datatypes::DataType; | ||
| use arrow::datatypes::DataType::{LargeList, List, Null}; | ||
| use datafusion_common::cast::{as_large_list_array, as_list_array}; | ||
| use datafusion_common::{Result, exec_err, utils::take_function_args}; | ||
| use datafusion_expr::{ | ||
| ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, | ||
| Volatility, | ||
| }; | ||
| use datafusion_macros::user_doc; | ||
| use std::sync::Arc; | ||
|
|
||
| make_udf_expr_and_func!( | ||
| ArrayCompact, | ||
| array_compact, | ||
| array, | ||
| "removes null values from the array.", | ||
| array_compact_udf | ||
| ); | ||
|
|
||
| #[user_doc( | ||
| doc_section(label = "Array Functions"), | ||
| description = "Removes null values from the array.", | ||
| syntax_example = "array_compact(array)", | ||
| sql_example = r#"```sql | ||
| > select array_compact([1, NULL, 2, NULL, 3]) arr; | ||
| +-----------+ | ||
| | arr | | ||
| +-----------+ | ||
| | [1, 2, 3] | | ||
| +-----------+ | ||
| ```"#, | ||
| argument( | ||
| name = "array", | ||
| description = "Array expression. Can be a constant, column, or function, and any combination of array operators." | ||
| ) | ||
| )] | ||
| #[derive(Debug, PartialEq, Eq, Hash)] | ||
| pub struct ArrayCompact { | ||
| signature: Signature, | ||
| aliases: Vec<String>, | ||
| } | ||
|
|
||
| impl Default for ArrayCompact { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl ArrayCompact { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| signature: Signature::array(Volatility::Immutable), | ||
| aliases: vec!["list_compact".to_string()], | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ScalarUDFImpl for ArrayCompact { | ||
| fn name(&self) -> &str { | ||
| "array_compact" | ||
| } | ||
|
|
||
| fn signature(&self) -> &Signature { | ||
| &self.signature | ||
| } | ||
|
|
||
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { | ||
| Ok(arg_types[0].clone()) | ||
| } | ||
|
|
||
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { | ||
| make_scalar_function(array_compact_inner)(&args.args) | ||
| } | ||
|
|
||
| fn aliases(&self) -> &[String] { | ||
| &self.aliases | ||
| } | ||
|
|
||
| fn documentation(&self) -> Option<&Documentation> { | ||
| self.doc() | ||
| } | ||
| } | ||
|
|
||
| /// array_compact SQL function | ||
| pub fn array_compact_inner(arg: &[ArrayRef]) -> Result<ArrayRef> { | ||
| let [input_array] = take_function_args("array_compact", arg)?; | ||
|
|
||
| match &input_array.data_type() { | ||
| List(field) => { | ||
| let array = as_list_array(input_array)?; | ||
| compact_list::<i32>(array, field) | ||
| } | ||
| LargeList(field) => { | ||
| let array = as_large_list_array(input_array)?; | ||
| compact_list::<i64>(array, field) | ||
| } | ||
| Null => Ok(Arc::clone(input_array)), | ||
| array_type => exec_err!("array_compact does not support type '{array_type}'."), | ||
| } | ||
| } | ||
|
|
||
| /// Remove null elements from each row of a list array. | ||
| fn compact_list<O: OffsetSizeTrait>( | ||
| list_array: &GenericListArray<O>, | ||
| field: &Arc<arrow::datatypes::Field>, | ||
| ) -> Result<ArrayRef> { | ||
| let values = list_array.values(); | ||
| let original_data = values.to_data(); | ||
| let mut offsets = Vec::<O>::with_capacity(list_array.len() + 1); | ||
| offsets.push(O::zero()); | ||
| let mut mutable = MutableArrayData::with_capacities( | ||
| vec![&original_data], | ||
| false, | ||
| Capacities::Array(original_data.len()), | ||
| ); | ||
| let mut valid = NullBufferBuilder::new(list_array.len()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can directly grab null buffer from input array for the output array to avoid needing a builder here |
||
|
|
||
| for row_index in 0..list_array.len() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| if list_array.is_null(row_index) { | ||
| offsets.push(offsets[row_index]); | ||
| valid.append_null(); | ||
| continue; | ||
| } | ||
|
|
||
| let start = list_array.offsets()[row_index].as_usize(); | ||
| let end = list_array.offsets()[row_index + 1].as_usize(); | ||
| let mut copied = 0usize; | ||
|
|
||
| for i in start..end { | ||
| if !values.is_null(i) { | ||
| mutable.extend(0, i, i + 1); | ||
| copied += 1; | ||
| } | ||
| } | ||
|
|
||
| offsets.push(offsets[row_index] + O::usize_as(copied)); | ||
| valid.append_non_null(); | ||
| } | ||
|
|
||
| let new_values = make_array(mutable.freeze()); | ||
| Ok(Arc::new(GenericListArray::<O>::try_new( | ||
| Arc::clone(field), | ||
| OffsetBuffer::new(offsets.into()), | ||
| new_values, | ||
| valid.finish(), | ||
| )?)) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7641,6 +7641,88 @@ from array_distinct_table_2D_fixed; | |
| [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] | ||
| [[5, 6], NULL] | ||
|
|
||
| ## array_compact (aliases: `list_compact`) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also add a test for fixedsizelist? |
||
|
|
||
| # basic: remove nulls from integer array | ||
| query ? | ||
| select array_compact([1, NULL, 2, NULL, 3]); | ||
| ---- | ||
| [1, 2, 3] | ||
|
|
||
| # no nulls present | ||
| query ? | ||
| select array_compact([1, 2, 3]); | ||
| ---- | ||
| [1, 2, 3] | ||
|
|
||
| # all nulls | ||
| query ? | ||
| select array_compact(arrow_cast([NULL, NULL, NULL], 'List(Int64)')); | ||
| ---- | ||
| [] | ||
|
|
||
| # empty array | ||
| query ? | ||
| select array_compact([]); | ||
| ---- | ||
| [] | ||
|
|
||
| # NULL input returns NULL | ||
| query ? | ||
| select array_compact(NULL::INT[]); | ||
| ---- | ||
| NULL | ||
|
|
||
| # string array | ||
| query ? | ||
| select array_compact(['a', NULL, 'b', NULL, 'c']); | ||
| ---- | ||
| [a, b, c] | ||
|
|
||
| # float array | ||
| query ? | ||
| select array_compact([1.0, NULL, 2.0, NULL]); | ||
| ---- | ||
| [1.0, 2.0] | ||
|
|
||
| # nested array (2D) | ||
| query ? | ||
| select array_compact([make_array(1, 2), NULL, make_array(3, 4)]); | ||
| ---- | ||
| [[1, 2], [3, 4]] | ||
|
|
||
| # LargeList | ||
| query ? | ||
| select array_compact(arrow_cast([1, NULL, 2, NULL, 3], 'LargeList(Int64)')); | ||
| ---- | ||
| [1, 2, 3] | ||
|
|
||
| # alias list_compact | ||
| query ? | ||
| select list_compact([1, NULL, 2]); | ||
| ---- | ||
| [1, 2] | ||
|
|
||
| # table-based test | ||
| statement ok | ||
| CREATE TABLE array_compact_table AS VALUES | ||
| (make_array(1, NULL, 2, NULL, 3)), | ||
| (make_array(NULL, NULL, NULL)), | ||
| (make_array(4, 5, 6)), | ||
| (NULL::INT[]) | ||
| ; | ||
|
|
||
| query ? | ||
| select array_compact(column1) from array_compact_table; | ||
| ---- | ||
| [1, 2, 3] | ||
| [] | ||
| [4, 5, 6] | ||
| NULL | ||
|
|
||
| statement ok | ||
| DROP TABLE array_compact_table; | ||
|
|
||
| ## arrays_zip (aliases: `list_zip`) | ||
|
|
||
| # Spark example: arrays_zip(array(1, 2, 3), array(2, 3, 4)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.