From 70e46e29aa68b7c562065985d1687fccc6b84d81 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 29 Jul 2024 14:03:16 +0530 Subject: [PATCH 01/15] crude impl to support array --- datafusion/functions-nested/src/map.rs | 116 ++++++++++++++++++++++--- 1 file changed, 106 insertions(+), 10 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index e218b501dcf1..8e561039edce 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -15,17 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::make_array::make_array; -use arrow::array::ArrayData; -use arrow_array::{Array, ArrayRef, MapArray, StructArray}; -use arrow_buffer::{Buffer, ToByteSlice}; +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use arrow::array::{ArrayData, Capacities, MutableArrayData}; +use arrow_array::{Array, ArrayRef, GenericListArray, MapArray, StructArray}; +use arrow_buffer::{Buffer, OffsetBuffer, ToByteSlice}; use arrow_schema::{DataType, Field, SchemaBuilder}; + +use datafusion_common::cast::as_list_array; use datafusion_common::{exec_err, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility}; -use std::any::Any; -use std::collections::VecDeque; -use std::sync::Arc; + +use crate::make_array::make_array; /// Returns a map created from a key list and a value list pub fn map(keys: Vec, values: Vec) -> Expr { @@ -73,7 +77,12 @@ fn get_first_array_ref( ScalarValue::FixedSizeList(array) => Ok(array.value(0)), _ => exec_err!("Expected array, got {:?}", value), }, - ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}", array), + ColumnarValue::Array(array) => { + // return ListArray + // First element in array has all keys in first row, + // Same for values + Ok(array.to_owned()) + } } } @@ -90,6 +99,10 @@ fn make_map_batch_internal( return exec_err!("map requires key and value lists to have the same length"); } + if keys.data_type().is_nested() && values.data_type().is_nested() { + return handle_array(keys, values); + } + let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false)); let value_field = Arc::new(Field::new("value", values.data_type().clone(), true)); let mut entry_struct_buffer: VecDeque<(Arc, ArrayRef)> = VecDeque::new(); @@ -129,6 +142,84 @@ fn make_map_batch_internal( }) } +fn create_map(keys: ArrayRef, values: ArrayRef) -> ArrayData { + let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false)); + let value_field = Arc::new(Field::new("value", values.data_type().clone(), true)); + let mut entry_struct_buffer: VecDeque<(Arc, ArrayRef)> = VecDeque::new(); + let mut entry_offsets_buffer = VecDeque::new(); + entry_offsets_buffer.push_back(0); + + entry_struct_buffer.push_back((Arc::clone(&key_field), Arc::clone(&keys))); + entry_struct_buffer.push_back((Arc::clone(&value_field), Arc::clone(&values))); + entry_offsets_buffer.push_back(keys.len() as u32); + + let entry_struct: Vec<(Arc, ArrayRef)> = entry_struct_buffer.into(); + let entry_struct = StructArray::from(entry_struct); + + let map_data_type = DataType::Map( + Arc::new(Field::new( + "entries", + entry_struct.data_type().clone(), + false, + )), + false, + ); + + let entry_offsets: Vec = entry_offsets_buffer.into(); + let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice()); + + let map_data = ArrayData::builder(map_data_type) + .len(entry_offsets.len() - 1) + .add_buffer(entry_offsets_buffer) + .add_child_data(entry_struct.to_data()) + .build() + .unwrap(); + map_data +} + +fn handle_array( + keys: ArrayRef, + values: ArrayRef, +) -> datafusion_common::Result { + let keys = as_list_array(keys.as_ref())?; + let values = as_list_array(values.as_ref())?; + + let mut map_data_vec = vec![]; + for (k, v) in keys.iter().zip(values.iter()) { + map_data_vec.push(create_map(k.unwrap(), v.unwrap())); + } + + let total_len = map_data_vec.len(); + let mut offsets: Vec = Vec::with_capacity(total_len); + offsets.push(0); + + let capacity = Capacities::Array(total_len); + let map_data_vec_ref = map_data_vec.iter().collect::>(); + let mut mutable = MutableArrayData::with_capacities(map_data_vec_ref, true, capacity); + + // let num_rows = args[0].len(); + + for (arr_idx, arg) in map_data_vec.iter().enumerate() { + if !arg.is_null(0) { + mutable.extend(arr_idx, 0, arg.len()); + } else { + mutable.extend_nulls(1); + } + offsets.push(mutable.len() as i32); + } + let data = mutable.freeze(); + + let data_type = map_data_vec[0].data_type().to_owned(); + + let array = Arc::new(GenericListArray::::try_new( + Arc::new(Field::new("item", data_type, true)), + OffsetBuffer::new(offsets.into()), + arrow_array::make_array(data), + None, + )?); + Ok(ColumnarValue::Array(array)) +} + #[derive(Debug)] pub struct MapFunc { signature: Signature, @@ -180,10 +271,15 @@ impl ScalarUDFImpl for MapFunc { true, )); let fields = builder.finish().fields; - Ok(DataType::Map( + let map_dt = DataType::Map( Arc::new(Field::new("entries", DataType::Struct(fields), false)), false, - )) + ); + if !arg_types[0].is_nested() { + return Ok(map_dt); + } else { + Ok(DataType::List(Arc::new(Field::new("item", map_dt, true)))) + } } fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { From 05c8ebe90ccb94ae8cf79c78f2dd076ad70f18f4 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 29 Jul 2024 21:14:51 +0530 Subject: [PATCH 02/15] ++improvement --- datafusion/functions-nested/src/map.rs | 122 ++++++++++--------------- 1 file changed, 48 insertions(+), 74 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index 8e561039edce..ec86f7a3d3a5 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -19,9 +19,10 @@ use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; -use arrow::array::{ArrayData, Capacities, MutableArrayData}; -use arrow_array::{Array, ArrayRef, GenericListArray, MapArray, StructArray}; -use arrow_buffer::{Buffer, OffsetBuffer, ToByteSlice}; +use arrow::array::ArrayData; +use arrow::compute::concat; +use arrow_array::{Array, ArrayRef, MapArray, StructArray}; +use arrow_buffer::{Buffer, ToByteSlice}; use arrow_schema::{DataType, Field, SchemaBuilder}; use datafusion_common::cast::as_list_array; @@ -142,82 +143,60 @@ fn make_map_batch_internal( }) } -fn create_map(keys: ArrayRef, values: ArrayRef) -> ArrayData { - let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false)); - let value_field = Arc::new(Field::new("value", values.data_type().clone(), true)); - let mut entry_struct_buffer: VecDeque<(Arc, ArrayRef)> = VecDeque::new(); - let mut entry_offsets_buffer = VecDeque::new(); - entry_offsets_buffer.push_back(0); - - entry_struct_buffer.push_back((Arc::clone(&key_field), Arc::clone(&keys))); - entry_struct_buffer.push_back((Arc::clone(&value_field), Arc::clone(&values))); - entry_offsets_buffer.push_back(keys.len() as u32); - - let entry_struct: Vec<(Arc, ArrayRef)> = entry_struct_buffer.into(); - let entry_struct = StructArray::from(entry_struct); - - let map_data_type = DataType::Map( - Arc::new(Field::new( - "entries", - entry_struct.data_type().clone(), - false, - )), - false, - ); - - let entry_offsets: Vec = entry_offsets_buffer.into(); - let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice()); - - let map_data = ArrayData::builder(map_data_type) - .len(entry_offsets.len() - 1) - .add_buffer(entry_offsets_buffer) - .add_child_data(entry_struct.to_data()) - .build() - .unwrap(); - map_data -} - fn handle_array( keys: ArrayRef, values: ArrayRef, ) -> datafusion_common::Result { + let key_count = keys.len(); let keys = as_list_array(keys.as_ref())?; let values = as_list_array(values.as_ref())?; - let mut map_data_vec = vec![]; - for (k, v) in keys.iter().zip(values.iter()) { - map_data_vec.push(create_map(k.unwrap(), v.unwrap())); - } - - let total_len = map_data_vec.len(); - let mut offsets: Vec = Vec::with_capacity(total_len); + let mut keys_ref = vec![]; + let mut values_ref = vec![]; + let mut offsets = vec![]; offsets.push(0); + let keys = keys.iter().map(|x| x.unwrap()).collect::>(); + let values = values.iter().map(|x| x.unwrap()).collect::>(); - let capacity = Capacities::Array(total_len); - let map_data_vec_ref = map_data_vec.iter().collect::>(); - let mut mutable = MutableArrayData::with_capacities(map_data_vec_ref, true, capacity); - - // let num_rows = args[0].len(); - - for (arr_idx, arg) in map_data_vec.iter().enumerate() { - if !arg.is_null(0) { - mutable.extend(arr_idx, 0, arg.len()); - } else { - mutable.extend_nulls(1); - } - offsets.push(mutable.len() as i32); + let mut o = 0; + for (k, v) in keys.iter().zip(values.iter()) { + o += k.len() as i32; + offsets.push(o); + keys_ref.push(k.as_ref()); + values_ref.push(v.as_ref()); } - let data = mutable.freeze(); - let data_type = map_data_vec[0].data_type().to_owned(); + let flattened_keys = concat(&keys_ref).unwrap(); + let flattened_values = concat(&values_ref).unwrap(); + + let fields = vec![ + Field::new("key", keys_ref[0].data_type().clone(), false), + Field::new("value", values_ref[0].data_type().clone(), true), + ]; + let struct_data = ArrayData::builder(DataType::Struct(fields.clone().into())) + .len(flattened_keys.len()) + .offset(0) + .add_child_data(flattened_keys.to_data()) + .add_child_data(flattened_values.to_data()) + .build() + .unwrap(); - let array = Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type, true)), - OffsetBuffer::new(offsets.into()), - arrow_array::make_array(data), - None, - )?); - Ok(ColumnarValue::Array(array)) + let map_data = ArrayData::builder(DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(fields.clone().into()), + false, + )), + false, + )) + .len(key_count) + .offset(0) + .add_buffer(Buffer::from_slice_ref(offsets.as_slice())) + .add_child_data(struct_data.clone()) + .build() + .unwrap(); + let map_array = Arc::new(MapArray::from(map_data)); + Ok(ColumnarValue::Array(map_array)) } #[derive(Debug)] @@ -271,15 +250,10 @@ impl ScalarUDFImpl for MapFunc { true, )); let fields = builder.finish().fields; - let map_dt = DataType::Map( + Ok(DataType::Map( Arc::new(Field::new("entries", DataType::Struct(fields), false)), false, - ); - if !arg_types[0].is_nested() { - return Ok(map_dt); - } else { - Ok(DataType::List(Arc::new(Field::new("item", map_dt, true)))) - } + )) } fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { From 03b3b4d61f579f88318ffd963a413cf85527e019 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 29 Jul 2024 23:20:05 +0530 Subject: [PATCH 03/15] uncomment logic test --- datafusion/functions-nested/src/map.rs | 45 ++++++++++++---------- datafusion/sqllogictest/test_files/map.slt | 11 +++--- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index ec86f7a3d3a5..f2f17ad4a664 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -21,11 +21,11 @@ use std::sync::Arc; use arrow::array::ArrayData; use arrow::compute::concat; -use arrow_array::{Array, ArrayRef, MapArray, StructArray}; +use arrow_array::cast::AsArray; +use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray}; use arrow_buffer::{Buffer, ToByteSlice}; use arrow_schema::{DataType, Field, SchemaBuilder}; -use datafusion_common::cast::as_list_array; use datafusion_common::{exec_err, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility}; @@ -61,11 +61,19 @@ fn make_map_batch(args: &[ColumnarValue]) -> datafusion_common::Result { + let can_evaluate_to_const = can_evaluate_to_const(args); + let key = get_first_array_ref(&args[0])?; + let value = get_first_array_ref(&args[1])?; + make_map_batch_internal(key, value, can_evaluate_to_const) + } + _ => { + let key = get_first_array_ref(&args[0])?; + let value = get_first_array_ref(&args[1])?; + return handle_array::(key, value); + } + } } fn get_first_array_ref( @@ -100,10 +108,6 @@ fn make_map_batch_internal( return exec_err!("map requires key and value lists to have the same length"); } - if keys.data_type().is_nested() && values.data_type().is_nested() { - return handle_array(keys, values); - } - let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false)); let value_field = Arc::new(Field::new("value", values.data_type().clone(), true)); let mut entry_struct_buffer: VecDeque<(Arc, ArrayRef)> = VecDeque::new(); @@ -143,25 +147,25 @@ fn make_map_batch_internal( }) } -fn handle_array( +fn handle_array( keys: ArrayRef, values: ArrayRef, ) -> datafusion_common::Result { let key_count = keys.len(); - let keys = as_list_array(keys.as_ref())?; - let values = as_list_array(values.as_ref())?; + let keys = keys.as_list::(); + let values = values.as_list::(); let mut keys_ref = vec![]; let mut values_ref = vec![]; let mut offsets = vec![]; - offsets.push(0); + offsets.push(O::usize_as(0)); let keys = keys.iter().map(|x| x.unwrap()).collect::>(); let values = values.iter().map(|x| x.unwrap()).collect::>(); - let mut o = 0; + let mut current_offset: O = O::usize_as(0); for (k, v) in keys.iter().zip(values.iter()) { - o += k.len() as i32; - offsets.push(o); + current_offset = current_offset.add(O::usize_as(k.len())); + offsets.push(current_offset); keys_ref.push(k.as_ref()); values_ref.push(v.as_ref()); } @@ -173,6 +177,7 @@ fn handle_array( Field::new("key", keys_ref[0].data_type().clone(), false), Field::new("value", values_ref[0].data_type().clone(), true), ]; + let struct_data = ArrayData::builder(DataType::Struct(fields.clone().into())) .len(flattened_keys.len()) .offset(0) @@ -195,8 +200,8 @@ fn handle_array( .add_child_data(struct_data.clone()) .build() .unwrap(); - let map_array = Arc::new(MapArray::from(map_data)); - Ok(ColumnarValue::Array(map_array)) + + Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) } #[derive(Debug)] diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index e530e14df66e..010c70734884 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -203,13 +203,12 @@ create table t as values ('b', 2, 'k3', 30, ['k3'], [3]), ('d', 4, 'k5', 50, ['k5'], [5]); -query error +query ? SELECT make_map(column1, column2, column3, column4) FROM t; -# TODO: support array value -# ---- -# {a: 1, k1: 10} -# {b: 2, k3: 30} -# {d: 4, k5: 50} +---- +{a: 1, k1: 10} +{b: 2, k3: 30} +{d: 4, k5: 50} query error SELECT map(column5, column6) FROM t; From 9f43c5ee49be8bacb5b5945eabe7e6d330dc3498 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Wed, 31 Jul 2024 20:29:28 +0530 Subject: [PATCH 04/15] working impl --- datafusion/functions-nested/src/map.rs | 120 +++++++++++++-------- datafusion/sqllogictest/test_files/map.slt | 11 +- 2 files changed, 79 insertions(+), 52 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index f2f17ad4a664..68f9085525d8 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -20,7 +20,6 @@ use std::collections::VecDeque; use std::sync::Arc; use arrow::array::ArrayData; -use arrow::compute::concat; use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray}; use arrow_buffer::{Buffer, ToByteSlice}; @@ -68,10 +67,10 @@ fn make_map_batch(args: &[ColumnarValue]) -> datafusion_common::Result { + ColumnarValue::Array(_) => { let key = get_first_array_ref(&args[0])?; let value = get_first_array_ref(&args[1])?; - return handle_array::(key, value); + make_map_array_internal::(key, value) } } } @@ -147,61 +146,77 @@ fn make_map_batch_internal( }) } -fn handle_array( +fn make_map_array_internal( keys: ArrayRef, values: ArrayRef, ) -> datafusion_common::Result { - let key_count = keys.len(); - let keys = keys.as_list::(); - let values = values.as_list::(); - - let mut keys_ref = vec![]; - let mut values_ref = vec![]; - let mut offsets = vec![]; - offsets.push(O::usize_as(0)); - let keys = keys.iter().map(|x| x.unwrap()).collect::>(); - let values = values.iter().map(|x| x.unwrap()).collect::>(); - - let mut current_offset: O = O::usize_as(0); + let mut offset_buffer = vec![O::usize_as(0)]; + let mut running_offset = O::usize_as(0); + // FIXME: Can we use ColumnarValue::values_to_arrays() over here + let keys = keys + .as_list::() + .iter() + .flatten() + .map(|x| match x.data_type() { + DataType::List(_) => x.as_list::().value(0), + _ => x, + }) + .collect::>(); + // each item is an array + let values = values + .as_list::() + .iter() + .flatten() + .map(|x| match x.data_type() { + DataType::List(_) => x.as_list::().value(0), + _ => x, + }) + .collect::>(); + assert_eq!(keys.len(), values.len()); + + let mut key_array_vec = vec![]; + let mut value_array_vec = vec![]; for (k, v) in keys.iter().zip(values.iter()) { - current_offset = current_offset.add(O::usize_as(k.len())); - offsets.push(current_offset); - keys_ref.push(k.as_ref()); - values_ref.push(v.as_ref()); + running_offset = running_offset.add(O::usize_as(k.len())); + offset_buffer.push(running_offset); + key_array_vec.push(k.as_ref()); + value_array_vec.push(v.as_ref()); } - let flattened_keys = concat(&keys_ref).unwrap(); - let flattened_values = concat(&values_ref).unwrap(); + // concatenate all the arrays in t + let flattened_keys = arrow::compute::concat(key_array_vec.as_ref()).unwrap(); + let flattened_values = arrow::compute::concat(value_array_vec.as_ref()).unwrap(); let fields = vec![ - Field::new("key", keys_ref[0].data_type().clone(), false), - Field::new("value", values_ref[0].data_type().clone(), true), + Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)), + Arc::new(Field::new( + "value", + flattened_values.data_type().clone(), + true, + )), ]; - - let struct_data = ArrayData::builder(DataType::Struct(fields.clone().into())) + let struct_data = ArrayData::builder(DataType::Struct(fields.into())) .len(flattened_keys.len()) - .offset(0) .add_child_data(flattened_keys.to_data()) .add_child_data(flattened_values.to_data()) .build() .unwrap(); - - let map_data = ArrayData::builder(DataType::Map( + // Data should be struct array + // offer should be partition of the struct array + let data = ArrayData::builder(DataType::Map( Arc::new(Field::new( "entries", - DataType::Struct(fields.clone().into()), + struct_data.data_type().clone(), false, )), false, )) - .len(key_count) - .offset(0) - .add_buffer(Buffer::from_slice_ref(offsets.as_slice())) - .add_child_data(struct_data.clone()) + .len(keys.len()) + .add_child_data(struct_data) + .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice())) .build() .unwrap(); - - Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) + Ok(ColumnarValue::Array(Arc::new(MapArray::from(data)))) } #[derive(Debug)] @@ -246,12 +261,12 @@ impl ScalarUDFImpl for MapFunc { let mut builder = SchemaBuilder::new(); builder.push(Field::new( "key", - get_element_type(&arg_types[0])?.clone(), + get_element_type(&arg_types[0], 0)?.clone(), false, )); builder.push(Field::new( "value", - get_element_type(&arg_types[1])?.clone(), + get_element_type(&arg_types[1], 0)?.clone(), true, )); let fields = builder.finish().fields; @@ -266,14 +281,27 @@ impl ScalarUDFImpl for MapFunc { } } -fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType> { +fn get_element_type( + data_type: &DataType, + level: u8, +) -> datafusion_common::Result<&DataType> { match data_type { - DataType::List(element) => Ok(element.data_type()), - DataType::LargeList(element) => Ok(element.data_type()), - DataType::FixedSizeList(element, _) => Ok(element.data_type()), - _ => exec_err!( - "Expected list, large_list or fixed_size_list, got {:?}", - data_type - ), + DataType::List(element) => Ok(get_element_type(element.data_type(), level + 1)?), + DataType::LargeList(element) => { + Ok(get_element_type(element.data_type(), level + 1)?) + } + DataType::FixedSizeList(element, _) => { + Ok(get_element_type(element.data_type(), level + 1)?) + } + t => { + if level <= 2 { + Ok(t) + } else { + exec_err!( + "Expected list, large_list or fixed_size_list, got {:?}", + data_type + ) + } + } } } diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 010c70734884..5b1e07cb6a8b 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -210,13 +210,12 @@ SELECT make_map(column1, column2, column3, column4) FROM t; {b: 2, k3: 30} {d: 4, k5: 50} -query error +query ? SELECT map(column5, column6) FROM t; -# TODO: support array value -# ---- -# {k1:1, k2:2} -# {k3: 3} -# {k5: 5} +---- +{k1:1, k2:2} +{k3: 3} +{k5: 5} query ? SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count from t; From b97ff6de9c24f2db9d748439ad258696622902a0 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 1 Aug 2024 20:23:27 +0530 Subject: [PATCH 05/15] leverage return_type_from_exprs --- datafusion/functions-nested/src/map.rs | 95 ++++++++++------------ datafusion/sqllogictest/test_files/map.slt | 2 +- 2 files changed, 45 insertions(+), 52 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index 68f9085525d8..ff9c58aec8cc 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -25,7 +25,7 @@ use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray}; use arrow_buffer::{Buffer, ToByteSlice}; use arrow_schema::{DataType, Field, SchemaBuilder}; -use datafusion_common::{exec_err, ScalarValue}; +use datafusion_common::{exec_err, internal_err, ExprSchema, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility}; @@ -60,17 +60,16 @@ fn make_map_batch(args: &[ColumnarValue]) -> datafusion_common::Result { - let can_evaluate_to_const = can_evaluate_to_const(args); - let key = get_first_array_ref(&args[0])?; - let value = get_first_array_ref(&args[1])?; - make_map_batch_internal(key, value, can_evaluate_to_const) - } - ColumnarValue::Array(_) => { - let key = get_first_array_ref(&args[0])?; - let value = get_first_array_ref(&args[1])?; - make_map_array_internal::(key, value) + let data_type = args[0].data_type(); + let can_evaluate_to_const = can_evaluate_to_const(args); + let key = get_first_array_ref(&args[0])?; + let value = get_first_array_ref(&args[1])?; + if can_evaluate_to_const { + make_map_batch_internal(key, value, can_evaluate_to_const) + } else { + match data_type { + DataType::LargeList(..) => make_map_array_internal::(key, value), + _ => make_map_array_internal::(key, value), } } } @@ -85,12 +84,7 @@ fn get_first_array_ref( ScalarValue::FixedSizeList(array) => Ok(array.value(0)), _ => exec_err!("Expected array, got {:?}", value), }, - ColumnarValue::Array(array) => { - // return ListArray - // First element in array has all keys in first row, - // Same for values - Ok(array.to_owned()) - } + ColumnarValue::Array(array) => Ok(array.to_owned()), } } @@ -152,27 +146,28 @@ fn make_map_array_internal( ) -> datafusion_common::Result { let mut offset_buffer = vec![O::usize_as(0)]; let mut running_offset = O::usize_as(0); - // FIXME: Can we use ColumnarValue::values_to_arrays() over here + let keys = keys .as_list::() .iter() .flatten() .map(|x| match x.data_type() { + // To handle [[1,2,3]] DataType::List(_) => x.as_list::().value(0), + DataType::LargeList(_) => x.as_list::().value(0), _ => x, }) .collect::>(); - // each item is an array let values = values .as_list::() .iter() .flatten() .map(|x| match x.data_type() { DataType::List(_) => x.as_list::().value(0), + DataType::LargeList(_) => x.as_list::().value(0), _ => x, }) .collect::>(); - assert_eq!(keys.len(), values.len()); let mut key_array_vec = vec![]; let mut value_array_vec = vec![]; @@ -183,7 +178,7 @@ fn make_map_array_internal( value_array_vec.push(v.as_ref()); } - // concatenate all the arrays in t + // concatenate all the arrays let flattened_keys = arrow::compute::concat(key_array_vec.as_ref()).unwrap(); let flattened_values = arrow::compute::concat(value_array_vec.as_ref()).unwrap(); @@ -195,15 +190,15 @@ fn make_map_array_internal( true, )), ]; + let struct_data = ArrayData::builder(DataType::Struct(fields.into())) .len(flattened_keys.len()) .add_child_data(flattened_keys.to_data()) .add_child_data(flattened_values.to_data()) .build() .unwrap(); - // Data should be struct array - // offer should be partition of the struct array - let data = ArrayData::builder(DataType::Map( + + let map_data = ArrayData::builder(DataType::Map( Arc::new(Field::new( "entries", struct_data.data_type().clone(), @@ -216,7 +211,7 @@ fn make_map_array_internal( .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice())) .build() .unwrap(); - Ok(ColumnarValue::Array(Arc::new(MapArray::from(data)))) + Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) } #[derive(Debug)] @@ -251,7 +246,19 @@ impl ScalarUDFImpl for MapFunc { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + fn return_type( + &self, + _arg_types: &[DataType], + ) -> datafusion_common::Result { + internal_err!("map: return_type called instead of return_type_from_exprs") + } + + fn return_type_from_exprs( + &self, + _args: &[Expr], + _schema: &dyn ExprSchema, + arg_types: &[DataType], + ) -> datafusion_common::Result { if arg_types.len() % 2 != 0 { return exec_err!( "map requires an even number of arguments, got {} instead", @@ -261,12 +268,12 @@ impl ScalarUDFImpl for MapFunc { let mut builder = SchemaBuilder::new(); builder.push(Field::new( "key", - get_element_type(&arg_types[0], 0)?.clone(), + get_element_type(&arg_types[0])?.clone(), false, )); builder.push(Field::new( "value", - get_element_type(&arg_types[1], 0)?.clone(), + get_element_type(&arg_types[1])?.clone(), true, )); let fields = builder.finish().fields; @@ -280,28 +287,14 @@ impl ScalarUDFImpl for MapFunc { make_map_batch(args) } } - -fn get_element_type( - data_type: &DataType, - level: u8, -) -> datafusion_common::Result<&DataType> { +fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType> { match data_type { - DataType::List(element) => Ok(get_element_type(element.data_type(), level + 1)?), - DataType::LargeList(element) => { - Ok(get_element_type(element.data_type(), level + 1)?) - } - DataType::FixedSizeList(element, _) => { - Ok(get_element_type(element.data_type(), level + 1)?) - } - t => { - if level <= 2 { - Ok(t) - } else { - exec_err!( - "Expected list, large_list or fixed_size_list, got {:?}", - data_type - ) - } - } + DataType::List(element) => Ok(element.data_type()), + DataType::LargeList(element) => Ok(element.data_type()), + DataType::FixedSizeList(element, _) => Ok(element.data_type()), + _ => exec_err!( + "Expected list, large_list or fixed_size_list, got {:?}", + data_type + ), } } diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 5b1e07cb6a8b..36468b1f4d4d 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -213,7 +213,7 @@ SELECT make_map(column1, column2, column3, column4) FROM t; query ? SELECT map(column5, column6) FROM t; ---- -{k1:1, k2:2} +{k1: 1, k2: 2} {k3: 3} {k5: 5} From c5032878e730b9a5f4bcd3f6fa9a7d7cc9a320e6 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 2 Aug 2024 12:15:51 +0530 Subject: [PATCH 06/15] add documentation --- datafusion/functions-nested/src/map.rs | 217 +++++++++++++++---------- 1 file changed, 135 insertions(+), 82 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index ff9c58aec8cc..638a2dde72a2 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -64,14 +64,7 @@ fn make_map_batch(args: &[ColumnarValue]) -> datafusion_common::Result make_map_array_internal::(key, value), - _ => make_map_array_internal::(key, value), - } - } + make_map_batch_internal(key, value, can_evaluate_to_const, data_type) } fn get_first_array_ref( @@ -92,6 +85,7 @@ fn make_map_batch_internal( keys: ArrayRef, values: ArrayRef, can_evaluate_to_const: bool, + data_type: DataType, ) -> datafusion_common::Result { if keys.null_count() > 0 { return exec_err!("map key cannot be null"); @@ -101,6 +95,14 @@ fn make_map_batch_internal( return exec_err!("map requires key and value lists to have the same length"); } + if !can_evaluate_to_const { + return if let DataType::LargeList(..) = data_type { + make_map_array_internal::(keys, values) + } else { + make_map_array_internal::(keys, values) + }; + } + let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false)); let value_field = Arc::new(Field::new("value", values.data_type().clone(), true)); let mut entry_struct_buffer: VecDeque<(Arc, ArrayRef)> = VecDeque::new(); @@ -140,80 +142,6 @@ fn make_map_batch_internal( }) } -fn make_map_array_internal( - keys: ArrayRef, - values: ArrayRef, -) -> datafusion_common::Result { - let mut offset_buffer = vec![O::usize_as(0)]; - let mut running_offset = O::usize_as(0); - - let keys = keys - .as_list::() - .iter() - .flatten() - .map(|x| match x.data_type() { - // To handle [[1,2,3]] - DataType::List(_) => x.as_list::().value(0), - DataType::LargeList(_) => x.as_list::().value(0), - _ => x, - }) - .collect::>(); - let values = values - .as_list::() - .iter() - .flatten() - .map(|x| match x.data_type() { - DataType::List(_) => x.as_list::().value(0), - DataType::LargeList(_) => x.as_list::().value(0), - _ => x, - }) - .collect::>(); - - let mut key_array_vec = vec![]; - let mut value_array_vec = vec![]; - for (k, v) in keys.iter().zip(values.iter()) { - running_offset = running_offset.add(O::usize_as(k.len())); - offset_buffer.push(running_offset); - key_array_vec.push(k.as_ref()); - value_array_vec.push(v.as_ref()); - } - - // concatenate all the arrays - let flattened_keys = arrow::compute::concat(key_array_vec.as_ref()).unwrap(); - let flattened_values = arrow::compute::concat(value_array_vec.as_ref()).unwrap(); - - let fields = vec![ - Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)), - Arc::new(Field::new( - "value", - flattened_values.data_type().clone(), - true, - )), - ]; - - let struct_data = ArrayData::builder(DataType::Struct(fields.into())) - .len(flattened_keys.len()) - .add_child_data(flattened_keys.to_data()) - .add_child_data(flattened_values.to_data()) - .build() - .unwrap(); - - let map_data = ArrayData::builder(DataType::Map( - Arc::new(Field::new( - "entries", - struct_data.data_type().clone(), - false, - )), - false, - )) - .len(keys.len()) - .add_child_data(struct_data) - .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice())) - .build() - .unwrap(); - Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) -} - #[derive(Debug)] pub struct MapFunc { signature: Signature, @@ -298,3 +226,128 @@ fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType ), } } + +/// Helper function to create MapArray from array of values to support arrays for Map scalar function +/// +/// ``` text +/// Format of input KEYS and VALUES column +/// keys values +/// +---------------------+ +---------------------+ +/// | +-----------------+ | | +-----------------+ | +/// | | [k11, k12, k13] | | | | [v11, v12, v13] | | +/// | +-----------------+ | | +-----------------+ | +/// | | | | +/// | +-----------------+ | | +-----------------+ | +/// | | [k21, k22, k23] | | | | [v21, v22, v23] | | +/// | +-----------------+ | | +-----------------+ | +/// | | | | +/// | +-----------------+ | | +-----------------+ | +/// | |[k31, k32, k33] | | | |[v31, v32, v33] | | +/// | +-----------------+ | | +-----------------+ | +/// +---------------------+ +---------------------+ +/// ``` +/// Flattened keys and values array to user create `StructArray`, +/// which serves as inner child for `MapArray` +/// +/// ``` text +/// Flattened Flattened +/// Keys Values +/// +-----------+ +-----------+ +/// | +-------+ | | +-------+ | +/// | | k11 | | | | v11 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k12 | | | | v12 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k13 | | | | v13 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k21 | | | | v21 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k22 | | | | v22 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k23 | | | | v23 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k31 | | | | v31 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k32 | | | | v32 | | +/// | +-------+ | | +-------+ | +/// | +-------+ | | +-------+ | +/// | | k33 | | | | v33 | | +/// | +-------+ | | +-------+ | +/// +-----------+ +-----------+ +/// ```text + +fn make_map_array_internal( + keys: ArrayRef, + values: ArrayRef, +) -> datafusion_common::Result { + let mut offset_buffer = vec![O::usize_as(0)]; + let mut running_offset = O::usize_as(0); + + let keys = collect_array_ref::(keys); + let values = collect_array_ref::(values); + + let mut key_array_vec = vec![]; + let mut value_array_vec = vec![]; + for (k, v) in keys.iter().zip(values.iter()) { + running_offset = running_offset.add(O::usize_as(k.len())); + offset_buffer.push(running_offset); + key_array_vec.push(k.as_ref()); + value_array_vec.push(v.as_ref()); + } + + // concatenate all the arrays + let flattened_keys = arrow::compute::concat(key_array_vec.as_ref()).unwrap(); + let flattened_values = arrow::compute::concat(value_array_vec.as_ref()).unwrap(); + + let fields = vec![ + Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)), + Arc::new(Field::new( + "value", + flattened_values.data_type().clone(), + true, + )), + ]; + + let struct_data = ArrayData::builder(DataType::Struct(fields.into())) + .len(flattened_keys.len()) + .add_child_data(flattened_keys.to_data()) + .add_child_data(flattened_values.to_data()) + .build() + .unwrap(); + + let map_data = ArrayData::builder(DataType::Map( + Arc::new(Field::new( + "entries", + struct_data.data_type().clone(), + false, + )), + false, + )) + .len(keys.len()) + .add_child_data(struct_data) + .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice())) + .build() + .unwrap(); + Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) +} + +pub fn collect_array_ref(a: ArrayRef) -> Vec { + a.as_list::() + .iter() + .flatten() + .map(|x| match x.data_type() { + // if an element is a list + // [[1,2,3]] -> [1,2,3] + DataType::List(_) => x.as_list::().value(0), + DataType::LargeList(_) => x.as_list::().value(0), + _ => x, + }) + .collect::>() +} From e4da847ccf6a9ed350b823eb1d09184cf5494f60 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 4 Aug 2024 00:41:56 +0530 Subject: [PATCH 07/15] remove unwrap method --- datafusion/functions-nested/src/map.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index 638a2dde72a2..44cb1c452efa 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -303,8 +303,8 @@ fn make_map_array_internal( } // concatenate all the arrays - let flattened_keys = arrow::compute::concat(key_array_vec.as_ref()).unwrap(); - let flattened_values = arrow::compute::concat(value_array_vec.as_ref()).unwrap(); + let flattened_keys = arrow::compute::concat(key_array_vec.as_ref())?; + let flattened_values = arrow::compute::concat(value_array_vec.as_ref())?; let fields = vec![ Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)), @@ -319,8 +319,7 @@ fn make_map_array_internal( .len(flattened_keys.len()) .add_child_data(flattened_keys.to_data()) .add_child_data(flattened_values.to_data()) - .build() - .unwrap(); + .build()?; let map_data = ArrayData::builder(DataType::Map( Arc::new(Field::new( @@ -333,8 +332,7 @@ fn make_map_array_internal( .len(keys.len()) .add_child_data(struct_data) .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice())) - .build() - .unwrap(); + .build()?; Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) } From 85a27549ed2325456627f00b4c9016bc3c51f504 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 5 Aug 2024 20:01:32 +0530 Subject: [PATCH 08/15] add more slt tests --- datafusion/sqllogictest/test_files/map.slt | 24 +++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index b7ac456cbc2e..b343ed01c525 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -199,9 +199,9 @@ SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'LargeList(Utf8)'), a statement ok create table t as values -('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2]), -('b', 2, 'k3', 30, ['k3'], [3]), -('d', 4, 'k5', 50, ['k5'], [5]); +('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2], 'POST', [[1,2,3]]), +('b', 2, 'k3', 30, ['k3'], [3], 'PUT', [[4]]), +('d', 4, 'k5', 50, ['k5'], [5], null, [[1,2]]); query ? SELECT make_map(column1, column2, column3, column4) FROM t; @@ -217,6 +217,24 @@ SELECT map(column5, column6) FROM t; {k3: 3} {k5: 5} +# TODO: Make me work +query +SELECT map(column6, column7) FROM t; +#---- +#{[1,2]: 'POST'} +#{[3]: 'PUT'} +#{[5]: NULL} + +query error +SELECT map(column7, column6) FROM t; + +query +SELECT map(column8, column7) FROM t; +#---- +#{[[1,2,3]]: 'POST'} +#{[[4]]: 'PUT'} +#{[[1,2]]: NULL} + query ? SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count from t; ---- From 4b083b6205ebcb3735d208756df8f2fab6578514 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 5 Aug 2024 20:01:54 +0530 Subject: [PATCH 09/15] typos --- datafusion/sqllogictest/test_files/map.slt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index b343ed01c525..24f32b3704f9 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -225,11 +225,11 @@ SELECT map(column6, column7) FROM t; #{[3]: 'PUT'} #{[5]: NULL} -query error -SELECT map(column7, column6) FROM t; +#query error +#SELECT map(column7, column6) FROM t; -query -SELECT map(column8, column7) FROM t; +#query +#SELECT map(column8, column7) FROM t; #---- #{[[1,2,3]]: 'POST'} #{[[4]]: 'PUT'} From 5e824d6ef7438638a0799f4f85dbf576487227cf Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 5 Aug 2024 20:02:08 +0530 Subject: [PATCH 10/15] typos --- datafusion/sqllogictest/test_files/map.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 24f32b3704f9..d5c53e946fc1 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -218,8 +218,8 @@ SELECT map(column5, column6) FROM t; {k5: 5} # TODO: Make me work -query -SELECT map(column6, column7) FROM t; +#query +#SELECT map(column6, column7) FROM t; #---- #{[1,2]: 'POST'} #{[3]: 'PUT'} From 6f126fc76adcfb84e5a285a1cfb63c446237b6ad Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 8 Aug 2024 22:52:41 +0530 Subject: [PATCH 11/15] remove extract based on dt --- datafusion/functions-nested/src/map.rs | 12 +--------- datafusion/sqllogictest/test_files/map.slt | 26 ++++++++-------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index 44cb1c452efa..dbfa79fa73fd 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -337,15 +337,5 @@ fn make_map_array_internal( } pub fn collect_array_ref(a: ArrayRef) -> Vec { - a.as_list::() - .iter() - .flatten() - .map(|x| match x.data_type() { - // if an element is a list - // [[1,2,3]] -> [1,2,3] - DataType::List(_) => x.as_list::().value(0), - DataType::LargeList(_) => x.as_list::().value(0), - _ => x, - }) - .collect::>() + a.as_list::().iter().flatten().collect::>() } diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index a6ed77b1502d..382d682a9b98 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -217,23 +217,15 @@ SELECT map(column5, column6) FROM t; {k3: 3} {k5: 5} -# TODO: Make me work -#query -#SELECT map(column6, column7) FROM t; -#---- -#{[1,2]: 'POST'} -#{[3]: 'PUT'} -#{[5]: NULL} - -#query error -#SELECT map(column7, column6) FROM t; - -#query -#SELECT map(column8, column7) FROM t; -#---- -#{[[1,2,3]]: 'POST'} -#{[[4]]: 'PUT'} -#{[[1,2]]: NULL} + +query error +SELECT map(column6, column7) FROM t; + +query error +SELECT map(column7, column6) FROM t; + +query error +SELECT map(column8, column7) FROM t; query ? SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count from t; From 8998cd8ab84619bd43c83b0f844c6830b82d15b8 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 8 Aug 2024 23:01:26 +0530 Subject: [PATCH 12/15] few more tests --- datafusion/sqllogictest/test_files/map.slt | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 382d682a9b98..722f4f0ef027 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -217,15 +217,22 @@ SELECT map(column5, column6) FROM t; {k3: 3} {k5: 5} - query error SELECT map(column6, column7) FROM t; -query error -SELECT map(column7, column6) FROM t; +query ? +select Map {column6: column7} from t; +---- +{[1, 2]: POST} +{[3]: PUT} +{[5]: } -query error -SELECT map(column8, column7) FROM t; +query ? +select Map {column8: column7} from t; +---- +{[[1, 2, 3]]: POST} +{[[4]]: PUT} +{[[1, 2]]: } query ? SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count from t; From 3f9d336513405cbae70da9a6ebce5b114a7e9c3e Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 8 Aug 2024 23:16:17 +0530 Subject: [PATCH 13/15] move back to return_type --- datafusion/functions-nested/src/map.rs | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index dbfa79fa73fd..45ad159d9c6d 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -25,7 +25,7 @@ use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray}; use arrow_buffer::{Buffer, ToByteSlice}; use arrow_schema::{DataType, Field, SchemaBuilder}; -use datafusion_common::{exec_err, internal_err, ExprSchema, ScalarValue}; +use datafusion_common::{exec_err, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility}; @@ -174,19 +174,7 @@ impl ScalarUDFImpl for MapFunc { &self.signature } - fn return_type( - &self, - _arg_types: &[DataType], - ) -> datafusion_common::Result { - internal_err!("map: return_type called instead of return_type_from_exprs") - } - - fn return_type_from_exprs( - &self, - _args: &[Expr], - _schema: &dyn ExprSchema, - arg_types: &[DataType], - ) -> datafusion_common::Result { + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { if arg_types.len() % 2 != 0 { return exec_err!( "map requires an even number of arguments, got {} instead", From 857bed6a08c868c194422cab4e20eb4cbc081bbf Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 9 Aug 2024 23:13:05 +0530 Subject: [PATCH 14/15] improve error & tests --- datafusion/common/src/utils/mod.rs | 9 ++++++++- datafusion/functions-nested/src/map.rs | 12 +++++------- datafusion/sqllogictest/test_files/map.slt | 16 +++++++++++++--- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 58dc8f40b577..de522aba4e4f 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -29,8 +29,10 @@ use arrow::compute; use arrow::compute::{partition, SortColumn, SortOptions}; use arrow::datatypes::{Field, SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; +use arrow_array::cast::AsArray; use arrow_array::{ - Array, FixedSizeListArray, LargeListArray, ListArray, RecordBatchOptions, + Array, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait, + RecordBatchOptions, }; use arrow_schema::DataType; use sqlparser::ast::Ident; @@ -440,6 +442,11 @@ pub fn arrays_into_list_array( )) } +/// Helper function to convert a ListArray into a vector of ArrayRefs. +pub fn list_to_arrays(a: ArrayRef) -> Vec { + a.as_list::().iter().flatten().collect::>() +} + /// Get the base type of a data type. /// /// Example diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index 45ad159d9c6d..d0234a908e08 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -20,7 +20,6 @@ use std::collections::VecDeque; use std::sync::Arc; use arrow::array::ArrayData; -use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray}; use arrow_buffer::{Buffer, ToByteSlice}; use arrow_schema::{DataType, Field, SchemaBuilder}; @@ -278,8 +277,8 @@ fn make_map_array_internal( let mut offset_buffer = vec![O::usize_as(0)]; let mut running_offset = O::usize_as(0); - let keys = collect_array_ref::(keys); - let values = collect_array_ref::(values); + let keys = datafusion_common::utils::list_to_arrays::(keys); + let values = datafusion_common::utils::list_to_arrays::(values); let mut key_array_vec = vec![]; let mut value_array_vec = vec![]; @@ -292,6 +291,9 @@ fn make_map_array_internal( // concatenate all the arrays let flattened_keys = arrow::compute::concat(key_array_vec.as_ref())?; + if flattened_keys.null_count() > 0 { + return exec_err!("keys cannot be null"); + } let flattened_values = arrow::compute::concat(value_array_vec.as_ref())?; let fields = vec![ @@ -323,7 +325,3 @@ fn make_map_array_internal( .build()?; Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) } - -pub fn collect_array_ref(a: ArrayRef) -> Vec { - a.as_list::().iter().flatten().collect::>() -} diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 722f4f0ef027..0dc37c68bca4 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -199,9 +199,9 @@ SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'LargeList(Utf8)'), a statement ok create table t as values -('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2], 'POST', [[1,2,3]]), -('b', 2, 'k3', 30, ['k3'], [3], 'PUT', [[4]]), -('d', 4, 'k5', 50, ['k5'], [5], null, [[1,2]]); +('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2], 'POST', [[1,2,3]], ['a']), +('b', 2, 'k3', 30, ['k3'], [3], 'PUT', [[4]], ['b']), +('d', 4, 'k5', 50, ['k5'], [5], null, [[1,2]], ['c']); query ? SELECT make_map(column1, column2, column3, column4) FROM t; @@ -217,6 +217,13 @@ SELECT map(column5, column6) FROM t; {k3: 3} {k5: 5} +query ? +SELECT map(column8, column9) FROM t; +---- +{[1, 2, 3]: a} +{[4]: b} +{[1, 2]: c} + query error SELECT map(column6, column7) FROM t; @@ -234,6 +241,9 @@ select Map {column8: column7} from t; {[[4]]: PUT} {[[1, 2]]: } +query error +select Map {column7: column8} from t; + query ? SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count from t; ---- From 247c93a6b4a380a4d0e191a6faa8f0df7885dd90 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 11 Aug 2024 20:44:14 +0530 Subject: [PATCH 15/15] Update datafusion/functions-nested/src/map.rs Co-authored-by: Alex Huang --- datafusion/functions-nested/src/map.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index d0234a908e08..b6068fdff0d5 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -274,8 +274,8 @@ fn make_map_array_internal( keys: ArrayRef, values: ArrayRef, ) -> datafusion_common::Result { - let mut offset_buffer = vec![O::usize_as(0)]; - let mut running_offset = O::usize_as(0); + let mut offset_buffer = vec![O::zero()]; + let mut running_offset = O::zero(); let keys = datafusion_common::utils::list_to_arrays::(keys); let values = datafusion_common::utils::list_to_arrays::(values);