Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Arrays for the Map scalar functions #11712

Merged
merged 24 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
70e46e2
crude impl to support array
dharanad Jul 29, 2024
05c8ebe
++improvement
dharanad Jul 29, 2024
03b3b4d
uncomment logic test
dharanad Jul 29, 2024
119b71d
Merge branch 'refs/heads/main' into support-map-array
dharanad Jul 29, 2024
ea76397
Merge branch 'refs/heads/main' into support-map-array
dharanad Jul 31, 2024
9f43c5e
working impl
dharanad Jul 31, 2024
b97ff6d
leverage return_type_from_exprs
dharanad Aug 1, 2024
0b2afc9
Merge branch 'refs/heads/main' into support-map-array
dharanad Aug 1, 2024
c503287
add documentation
dharanad Aug 2, 2024
436ebf4
Merge branch 'refs/heads/main' into support-map-array
dharanad Aug 3, 2024
e4da847
remove unwrap method
dharanad Aug 3, 2024
aa52b76
Merge branch 'refs/heads/main' into support-map-array
dharanad Aug 5, 2024
85a2754
add more slt tests
dharanad Aug 5, 2024
4b083b6
typos
dharanad Aug 5, 2024
5e824d6
typos
dharanad Aug 5, 2024
9e93537
Merge branch 'refs/heads/main' into support-map-array
dharanad Aug 7, 2024
6f126fc
remove extract based on dt
dharanad Aug 8, 2024
8998cd8
few more tests
dharanad Aug 8, 2024
12e6834
Merge branch 'refs/heads/main' into support-map-array
dharanad Aug 8, 2024
d8d5012
Merge branch 'refs/heads/main' into support-map-array
dharanad Aug 8, 2024
3f9d336
move back to return_type
dharanad Aug 8, 2024
aaf7e4f
Merge branch 'refs/heads/main' into support-map-array
dharanad Aug 9, 2024
857bed6
improve error & tests
dharanad Aug 9, 2024
247c93a
Update datafusion/functions-nested/src/map.rs
dharanad Aug 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 160 additions & 11 deletions datafusion/functions-nested/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use crate::make_array::make_array;
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::ArrayData;
use arrow_array::{Array, ArrayRef, MapArray, StructArray};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray};
use arrow_buffer::{Buffer, ToByteSlice};
use arrow_schema::{DataType, Field, SchemaBuilder};
use datafusion_common::{exec_err, ScalarValue};

use datafusion_common::{exec_err, internal_err, ExprSchema, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use crate::make_array::make_array;

/// Returns a map created from a key list and a value list
pub fn map(keys: Vec<Expr>, values: Vec<Expr>) -> Expr {
Expand Down Expand Up @@ -56,11 +60,11 @@ fn make_map_batch(args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarV
);
}

let data_type = args[0].data_type();
let can_evaluate_to_const = can_evaluate_to_const(args);

let key = get_first_array_ref(&args[0])?;
let value = get_first_array_ref(&args[1])?;
make_map_batch_internal(key, value, can_evaluate_to_const)
make_map_batch_internal(key, value, can_evaluate_to_const, data_type)
}

fn get_first_array_ref(
Expand All @@ -73,14 +77,15 @@ fn get_first_array_ref(
ScalarValue::FixedSizeList(array) => Ok(array.value(0)),
_ => exec_err!("Expected array, got {:?}", value),
},
ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}", array),
ColumnarValue::Array(array) => Ok(array.to_owned()),
}
}

fn make_map_batch_internal(
keys: ArrayRef,
values: ArrayRef,
can_evaluate_to_const: bool,
data_type: DataType,
) -> datafusion_common::Result<ColumnarValue> {
if keys.null_count() > 0 {
return exec_err!("map key cannot be null");
Expand All @@ -90,6 +95,14 @@ fn make_map_batch_internal(
return exec_err!("map requires key and value lists to have the same length");
}

if !can_evaluate_to_const {
return if let DataType::LargeList(..) = data_type {
make_map_array_internal::<i64>(keys, values)
} else {
make_map_array_internal::<i32>(keys, values)
};
}

let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
let value_field = Arc::new(Field::new("value", values.data_type().clone(), true));
let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> = VecDeque::new();
Expand Down Expand Up @@ -161,7 +174,19 @@ impl ScalarUDFImpl for MapFunc {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
fn return_type(
&self,
_arg_types: &[DataType],
) -> datafusion_common::Result<DataType> {
internal_err!("map: return_type called instead of return_type_from_exprs")
}

fn return_type_from_exprs(
&self,
_args: &[Expr],
_schema: &dyn ExprSchema,
arg_types: &[DataType],
) -> datafusion_common::Result<DataType> {
dharanad marked this conversation as resolved.
Show resolved Hide resolved
if arg_types.len() % 2 != 0 {
return exec_err!(
"map requires an even number of arguments, got {} instead",
Expand Down Expand Up @@ -190,7 +215,6 @@ impl ScalarUDFImpl for MapFunc {
make_map_batch(args)
}
}

fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType> {
match data_type {
DataType::List(element) => Ok(element.data_type()),
Expand All @@ -202,3 +226,128 @@ fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType
),
}
}

/// Helper function to create MapArray from array of values to support arrays for Map scalar function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

///
/// ``` text
/// Format of input KEYS and VALUES column
/// keys values
/// +---------------------+ +---------------------+
/// | +-----------------+ | | +-----------------+ |
/// | | [k11, k12, k13] | | | | [v11, v12, v13] | |
/// | +-----------------+ | | +-----------------+ |
/// | | | |
/// | +-----------------+ | | +-----------------+ |
/// | | [k21, k22, k23] | | | | [v21, v22, v23] | |
/// | +-----------------+ | | +-----------------+ |
/// | | | |
/// | +-----------------+ | | +-----------------+ |
/// | |[k31, k32, k33] | | | |[v31, v32, v33] | |
/// | +-----------------+ | | +-----------------+ |
/// +---------------------+ +---------------------+
/// ```
/// Flattened keys and values array to user create `StructArray`,
/// which serves as inner child for `MapArray`
///
/// ``` text
/// Flattened Flattened
/// Keys Values
/// +-----------+ +-----------+
/// | +-------+ | | +-------+ |
/// | | k11 | | | | v11 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k12 | | | | v12 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k13 | | | | v13 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k21 | | | | v21 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k22 | | | | v22 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k23 | | | | v23 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k31 | | | | v31 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k32 | | | | v32 | |
/// | +-------+ | | +-------+ |
/// | +-------+ | | +-------+ |
/// | | k33 | | | | v33 | |
/// | +-------+ | | +-------+ |
/// +-----------+ +-----------+
/// ```text

fn make_map_array_internal<O: OffsetSizeTrait>(
keys: ArrayRef,
values: ArrayRef,
) -> datafusion_common::Result<ColumnarValue> {
let mut offset_buffer = vec![O::usize_as(0)];
let mut running_offset = O::usize_as(0);
dharanad marked this conversation as resolved.
Show resolved Hide resolved

let keys = collect_array_ref::<O>(keys);
let values = collect_array_ref::<O>(values);

let mut key_array_vec = vec![];
let mut value_array_vec = vec![];
for (k, v) in keys.iter().zip(values.iter()) {
running_offset = running_offset.add(O::usize_as(k.len()));
offset_buffer.push(running_offset);
key_array_vec.push(k.as_ref());
value_array_vec.push(v.as_ref());
}

// concatenate all the arrays
let flattened_keys = arrow::compute::concat(key_array_vec.as_ref()).unwrap();
let flattened_values = arrow::compute::concat(value_array_vec.as_ref()).unwrap();
dharanad marked this conversation as resolved.
Show resolved Hide resolved

let fields = vec![
Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)),
Arc::new(Field::new(
"value",
flattened_values.data_type().clone(),
true,
)),
];

let struct_data = ArrayData::builder(DataType::Struct(fields.into()))
.len(flattened_keys.len())
.add_child_data(flattened_keys.to_data())
.add_child_data(flattened_values.to_data())
.build()
.unwrap();
dharanad marked this conversation as resolved.
Show resolved Hide resolved

let map_data = ArrayData::builder(DataType::Map(
Arc::new(Field::new(
"entries",
struct_data.data_type().clone(),
false,
)),
false,
))
.len(keys.len())
.add_child_data(struct_data)
.add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice()))
.build()
.unwrap();
dharanad marked this conversation as resolved.
Show resolved Hide resolved
Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data))))
}

pub fn collect_array_ref<O: OffsetSizeTrait>(a: ArrayRef) -> Vec<ArrayRef> {
dharanad marked this conversation as resolved.
Show resolved Hide resolved
a.as_list::<O>()
.iter()
.flatten()
.map(|x| match x.data_type() {
// if an element is a list
// [[1,2,3]] -> [1,2,3]
DataType::List(_) => x.as_list::<i32>().value(0),
DataType::LargeList(_) => x.as_list::<i64>().value(0),
_ => x,
})
.collect::<Vec<_>>()
}
22 changes: 10 additions & 12 deletions datafusion/sqllogictest/test_files/map.slt
dharanad marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,19 @@ create table t as values
('b', 2, 'k3', 30, ['k3'], [3]),
('d', 4, 'k5', 50, ['k5'], [5]);

query error
query ?
SELECT make_map(column1, column2, column3, column4) FROM t;
# TODO: support array value
# ----
# {a: 1, k1: 10}
# {b: 2, k3: 30}
# {d: 4, k5: 50}
----
{a: 1, k1: 10}
{b: 2, k3: 30}
{d: 4, k5: 50}

query error
query ?
SELECT map(column5, column6) FROM t;
# TODO: support array value
# ----
# {k1:1, k2:2}
# {k3: 3}
# {k5: 5}
----
{k1: 1, k2: 2}
{k3: 3}
{k5: 5}

query ?
SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count from t;
Expand Down