diff --git a/rust/arrow/src/util/display.rs b/rust/arrow/src/util/display.rs index bf0cade562f..87c18d26629 100644 --- a/rust/arrow/src/util/display.rs +++ b/rust/arrow/src/util/display.rs @@ -44,6 +44,22 @@ macro_rules! make_string { }}; } +macro_rules! make_string_from_list { + ($column: ident, $row: ident) => {{ + let list = $column + .as_any() + .downcast_ref::() + .ok_or(ArrowError::InvalidArgumentError(format!( + "Repl error: could not convert list column to list array." + )))? + .value($row); + let string_values = (0..list.len()) + .map(|i| array_value_to_string(&list.clone(), i)) + .collect::>>()?; + Ok(format!("[{}]", string_values.join(", "))) + }}; +} + /// Get the value at the given row in an array as a String. /// /// Note this function is quite inefficient and is unlikely to be @@ -89,6 +105,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result { make_string!(array::Time64NanosecondArray, column, row) } + DataType::List(_) => make_string_from_list!(column, row), DataType::Dictionary(index_type, _value_type) => match **index_type { DataType::Int8 => dict_array_value_to_string::(column, row), DataType::Int16 => dict_array_value_to_string::(column, row), diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 52027a4080b..7322b63994d 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. +use std::convert::TryFrom; use std::env; use std::sync::Arc; extern crate arrow; extern crate datafusion; -use arrow::record_batch::RecordBatch; use arrow::{array::*, datatypes::TimeUnit}; +use arrow::{datatypes::Int64Type, record_batch::RecordBatch}; use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, util::display::array_value_to_string, @@ -128,6 +129,100 @@ async fn parquet_single_nan_schema() { } } +#[tokio::test] +async fn parquet_list_columns() { + let mut ctx = ExecutionContext::new(); + let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined"); + ctx.register_parquet( + "list_columns", + &format!("{}/list_columns.parquet", testdata), + ) + .unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "int64_list", + DataType::List(Box::new(DataType::Int64)), + true, + ), + Field::new("utf8_list", DataType::List(Box::new(DataType::Utf8)), true), + ])); + + let sql = "SELECT int64_list, utf8_list FROM list_columns"; + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).unwrap(); + let results = ctx.collect(plan).await.unwrap(); + + // int64_list utf8_list + // 0 [1, 2, 3] [abc, efg, hij] + // 1 [None, 1] None + // 2 [4] [efg, None, hij, xyz] + + assert_eq!(1, results.len()); + let batch = &results[0]; + assert_eq!(3, batch.num_rows()); + assert_eq!(2, batch.num_columns()); + assert_eq!(schema, batch.schema()); + + let int_list_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let utf8_list_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!( + int_list_array + .value(0) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(1), Some(2), Some(3),]) + ); + + assert_eq!( + utf8_list_array + .value(0) + .as_any() + .downcast_ref::() + .unwrap(), + &StringArray::try_from(vec![Some("abc"), Some("efg"), Some("hij"),]).unwrap() + ); + + assert_eq!( + int_list_array + .value(1) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![None, Some(1),]) + ); + + assert!(utf8_list_array.is_null(1)); + + assert_eq!( + int_list_array + .value(2) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(4),]) + ); + + let result = utf8_list_array.value(2); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), "efg"); + assert!(result.is_null(1)); + assert_eq!(result.value(2), "hij"); + assert_eq!(result.value(3), "xyz"); +} + #[tokio::test] async fn csv_count_star() -> Result<()> { let mut ctx = ExecutionContext::new(); diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 40df2840523..b4434e54d4b 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -25,13 +25,35 @@ use std::sync::Arc; use std::vec::Vec; use arrow::array::{ - ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder, BufferBuilderTrait, - Int16BufferBuilder, StructArray, + Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray, + BinaryBuilder, BooleanBufferBuilder, BufferBuilderTrait, FixedSizeBinaryArray, + FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, ListBuilder, + OffsetSizeTrait, PrimitiveArray, PrimitiveArrayOps, PrimitiveBuilder, StringArray, + StringBuilder, StructArray, }; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{ - DataType as ArrowType, DateUnit, Field, IntervalUnit, Schema, TimeUnit, + BooleanType as ArrowBooleanType, DataType as ArrowType, + Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DateUnit, + DurationMicrosecondType as ArrowDurationMicrosecondType, + DurationMillisecondType as ArrowDurationMillisecondType, + DurationNanosecondType as ArrowDurationNanosecondType, + DurationSecondType as ArrowDurationSecondType, Field, + Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, + Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type, + Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema, + Time32MillisecondType as ArrowTime32MillisecondType, + Time32SecondType as ArrowTime32SecondType, + Time64MicrosecondType as ArrowTime64MicrosecondType, + Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit, + TimeUnit as ArrowTimeUnit, TimestampMicrosecondType as ArrowTimestampMicrosecondType, + TimestampMillisecondType as ArrowTimestampMillisecondType, + TimestampNanosecondType as ArrowTimestampNanosecondType, + TimestampSecondType as ArrowTimestampSecondType, ToByteSlice, + UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type, + UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type, }; +use arrow::util::bit_util; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, @@ -441,6 +463,400 @@ where } } +/// Implementation of list array reader. +pub struct ListArrayReader { + item_reader: Box, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + def_level_buffer: Option, + rep_level_buffer: Option, + _marker: PhantomData, +} + +impl ListArrayReader { + /// Construct list array reader. + pub fn new( + item_reader: Box, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + _marker: PhantomData, + } + } +} + +macro_rules! build_empty_list_array_with_primitive_items { + ($item_type:ident) => {{ + let values_builder = PrimitiveBuilder::<$item_type>::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +macro_rules! build_empty_list_array_with_non_primitive_items { + ($builder:ident) => {{ + let values_builder = $builder::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +fn build_empty_list_array(item_type: ArrowType) -> Result { + match item_type { + ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type), + ArrowType::UInt16 => { + build_empty_list_array_with_primitive_items!(ArrowUInt16Type) + } + ArrowType::UInt32 => { + build_empty_list_array_with_primitive_items!(ArrowUInt32Type) + } + ArrowType::UInt64 => { + build_empty_list_array_with_primitive_items!(ArrowUInt64Type) + } + ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type), + ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type), + ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type), + ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type), + ArrowType::Float32 => { + build_empty_list_array_with_primitive_items!(ArrowFloat32Type) + } + ArrowType::Float64 => { + build_empty_list_array_with_primitive_items!(ArrowFloat64Type) + } + ArrowType::Boolean => { + build_empty_list_array_with_primitive_items!(ArrowBooleanType) + } + ArrowType::Date32(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate32Type) + } + ArrowType::Date64(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate64Type) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowTime32SecondType) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowDurationSecondType) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType) + } + ArrowType::Utf8 => { + build_empty_list_array_with_non_primitive_items!(StringBuilder) + } + ArrowType::Binary => { + build_empty_list_array_with_non_primitive_items!(BinaryBuilder) + } + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +macro_rules! remove_primitive_array_indices { + ($arr: expr, $item_type:ty, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_array_indices_custom_builder { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = $item_builder::new(array_data.len()); + + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_fixed_size_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +fn remove_indices( + arr: ArrayRef, + item_type: ArrowType, + indices: Vec, +) -> Result { + match item_type { + ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), + ArrowType::UInt16 => { + remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) + } + ArrowType::UInt32 => { + remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) + } + ArrowType::UInt64 => { + remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) + } + ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), + ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), + ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), + ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), + ArrowType::Float32 => { + remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) + } + ArrowType::Float64 => { + remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) + } + ArrowType::Boolean => { + remove_primitive_array_indices!(arr, ArrowBooleanType, indices) + } + ArrowType::Date32(_) => { + remove_primitive_array_indices!(arr, ArrowDate32Type, indices) + } + ArrowType::Date64(_) => { + remove_primitive_array_indices!(arr, ArrowDate64Type, indices) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) + } + ArrowType::Utf8 => { + remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) + } + ArrowType::Binary => { + remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) + } + ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( + arr, + FixedSizeBinaryArray, + FixedSizeBinaryBuilder, + indices, + size + ), + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported. +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + let next_batch_array = self.item_reader.next_batch(batch_size)?; + let item_type = self.item_reader.get_data_type().clone(); + + if next_batch_array.len() == 0 { + return build_empty_list_array(item_type); + } + let def_levels = self + .item_reader + .get_def_levels() + .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?; + let rep_levels = self + .item_reader + .get_rep_levels() + .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?; + + if !((def_levels.len() == rep_levels.len()) + && (rep_levels.len() == next_batch_array.len())) + { + return Err(ArrowError( + "Expected item_reader def_levels and rep_levels to be same length as batch".to_string(), + )); + } + + // Need to remove from the values array the nulls that represent null lists rather than null items + // null lists have def_level = 0 + let mut null_list_indices: Vec = Vec::new(); + for i in 0..def_levels.len() { + if def_levels[i] == 0 { + null_list_indices.push(i); + } + } + let batch_values = match null_list_indices.len() { + 0 => next_batch_array.clone(), + _ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?, + }; + + // null list has def_level = 0 + // empty list has def_level = 1 + // null item in a list has def_level = 2 + // non-null item has def_level = 3 + // first item in each list has rep_level = 0, subsequent items have rep_level = 1 + + let mut offsets: Vec = Vec::new(); + let mut cur_offset = OffsetSize::zero(); + for i in 0..rep_levels.len() { + if rep_levels[i] == 0 { + offsets.push(cur_offset) + } + if def_levels[i] > 0 { + cur_offset = cur_offset + OffsetSize::one(); + } + } + offsets.push(cur_offset); + + let num_bytes = bit_util::ceil(offsets.len(), 8); + let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let null_slice = null_buf.data_mut(); + let mut list_index = 0; + for i in 0..rep_levels.len() { + if rep_levels[i] == 0 && def_levels[i] != 0 { + bit_util::set_bit(null_slice, list_index); + } + if rep_levels[i] == 0 { + list_index += 1; + } + } + let value_offsets = Buffer::from(&offsets.to_byte_slice()); + + // null list has def_level = 0 + let null_count = def_levels.iter().filter(|x| x == &&0).count(); + + let list_data = ArrayData::builder(self.get_data_type().clone()) + .len(offsets.len() - 1) + .add_buffer(value_offsets) + .add_child_data(batch_values.data()) + .null_bit_buffer(null_buf.freeze()) + .null_count(null_count) + .offset(next_batch_array.offset()) + .build(); + + let result_array = GenericListArray::::from(list_data); + Ok(Arc::new(result_array)) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_level_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_level_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + /// Implementation of struct array reader. pub struct StructArrayReader { children: Vec>, @@ -784,16 +1200,94 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext } /// Build array reader for list type. - /// Currently this is not supported. fn visit_list_with_item( &mut self, - _list_type: Rc, - _item_type: &Type, - _context: &'a ArrayReaderBuilderContext, + list_type: Rc, + item_type: Rc, + context: &'a ArrayReaderBuilderContext, ) -> Result>> { - Err(ArrowError( - "Reading parquet list array into arrow is not supported yet!".to_string(), - )) + let list_child = &list_type + .get_fields() + .first() + .ok_or_else(|| ArrowError("List field must have a child.".to_string()))?; + let mut new_context = context.clone(); + + new_context.path.append(vec![list_type.name().to_string()]); + + match list_type.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + + match list_child.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + + let item_reader = self + .dispatch(item_type.clone(), &new_context) + .unwrap() + .unwrap(); + + let item_reader_type = item_reader.get_data_type().clone(); + + match item_reader_type { + ArrowType::List(_) + | ArrowType::FixedSizeList(_, _) + | ArrowType::Struct(_) + | ArrowType::Dictionary(_, _) => Err(ArrowError(format!( + "reading List({:?}) into arrow not supported yet", + item_type + ))), + _ => { + let arrow_type = self + .arrow_schema + .field_with_name(list_type.name()) + .ok() + .map(|f| f.data_type().to_owned()) + .unwrap_or_else(|| { + ArrowType::List(Box::new(item_reader_type.clone())) + }); + + let list_array_reader: Box = match arrow_type { + ArrowType::List(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_reader_type, + new_context.def_level, + new_context.rep_level, + )), + ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_reader_type, + new_context.def_level, + new_context.rep_level, + )), + + _ => { + return Err(ArrowError(format!( + "creating ListArrayReader with type {:?} should be unreachable", + arrow_type + ))) + } + }; + + Ok(Some(list_array_reader)) + } + } } } @@ -903,25 +1397,23 @@ impl<'a> ArrayReaderBuilder { page_iterator, column_desc, converter )?)) } + } else if let Some(ArrowType::LargeBinary) = arrow_type { + let converter = + LargeBinaryConverter::new(LargeBinaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + LargeBinaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) } else { - if let Some(ArrowType::LargeBinary) = arrow_type { - let converter = - LargeBinaryConverter::new(LargeBinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeBinaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } else { - let converter = BinaryConverter::new(BinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } + let converter = BinaryConverter::new(BinaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + BinaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) } } PhysicalType::FIXED_LEN_BYTE_ARRAY => { @@ -1002,7 +1494,10 @@ mod tests { DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, }; use crate::util::test_common::{get_test_file, make_pages}; - use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray}; + use arrow::array::{ + Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray, + StructArray, + }; use arrow::datatypes::{ ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, Int32Type as ArrowInt32, Int64Type as ArrowInt64, @@ -1661,4 +2156,113 @@ mod tests { assert_eq!(array_reader.get_data_type(), &arrow_type); } + + #[test] + fn test_list_array_reader() { + // [[1, null, 2], null, [3, 4]] + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + Some(4), + ])); + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![3, 2, 3, 0, 3, 3]), + Some(vec![0, 1, 1, 0, 0, 1]), + ); + + let mut list_array_reader = ListArrayReader::::new( + Box::new(item_array_reader), + ArrowType::List(Box::new(ArrowType::Int32)), + ArrowType::Int32, + 1, + 1, + ); + + let next_batch = list_array_reader.next_batch(1024).unwrap(); + let list_array = next_batch.as_any().downcast_ref::().unwrap(); + + assert_eq!(3, list_array.len()); + // This passes as I expect + assert_eq!(1, list_array.null_count()); + + assert_eq!( + list_array + .value(0) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + ); + + assert!(list_array.is_null(1)); + + assert_eq!( + list_array + .value(2) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(3), Some(4)]) + ); + } + + #[test] + fn test_large_list_array_reader() { + // [[1, null, 2], null, [3, 4]] + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + Some(4), + ])); + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![3, 2, 3, 0, 3, 3]), + Some(vec![0, 1, 1, 0, 0, 1]), + ); + + let mut list_array_reader = ListArrayReader::::new( + Box::new(item_array_reader), + ArrowType::LargeList(Box::new(ArrowType::Int32)), + ArrowType::Int32, + 1, + 1, + ); + + let next_batch = list_array_reader.next_batch(1024).unwrap(); + let list_array = next_batch + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(3, list_array.len()); + + assert_eq!( + list_array + .value(0) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + ); + + assert!(list_array.is_null(1)); + + assert_eq!( + list_array + .value(2) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(3), Some(4)]) + ); + } } diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index a17e4244d35..66412c0313b 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -527,6 +527,7 @@ mod tests { } #[test] + #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] fn arrow_writer_list() { // define schema let schema = Schema::new(vec![Field::new( @@ -539,7 +540,7 @@ mod tests { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); // Construct a buffer for value offsets, for the nested array: - // [[false], [true, false], null, [true, false, true], [false, true, false, true]] + // [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]] let a_value_offsets = arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); @@ -555,6 +556,9 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap(); + // I think this setup is incorrect because this should pass + assert_eq!(batch.column(0).data().null_count(), 1); + let file = get_temp_file("test_arrow_writer_list.parquet", &[]); let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); writer.write(&batch).unwrap(); @@ -1049,9 +1053,7 @@ mod tests { } #[test] - #[should_panic( - expected = "Reading parquet list array into arrow is not supported yet!" - )] + #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] fn list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = @@ -1061,16 +1063,18 @@ mod tests { .add_buffer(a_value_offsets) .add_child_data(a_values.data()) .build(); - let a = ListArray::from(a_list_data); + // I think this setup is incorrect because this should pass + assert_eq!(a_list_data.null_count(), 1); + + let a = ListArray::from(a_list_data); let values = Arc::new(a); + one_column_roundtrip("list_single_column", values, false); } #[test] - #[should_panic( - expected = "Reading parquet list array into arrow is not supported yet!" - )] + #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] fn large_list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = @@ -1081,14 +1085,17 @@ mod tests { .add_buffer(a_value_offsets) .add_child_data(a_values.data()) .build(); - let a = LargeListArray::from(a_list_data); + // I think this setup is incorrect because this should pass + assert_eq!(a_list_data.null_count(), 1); + + let a = LargeListArray::from(a_list_data); let values = Arc::new(a); + one_column_roundtrip("large_list_single_column", values, false); } #[test] - #[ignore] // Struct support isn't correct yet - null_bitmap doesn't match fn struct_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let struct_field_a = Field::new("f", DataType::Int32, false); diff --git a/rust/parquet/src/schema/visitor.rs b/rust/parquet/src/schema/visitor.rs index 6d712ce441f..a1866fb1471 100644 --- a/rust/parquet/src/schema/visitor.rs +++ b/rust/parquet/src/schema/visitor.rs @@ -50,7 +50,7 @@ pub trait TypeVisitor { { self.visit_list_with_item( list_type.clone(), - list_item, + list_item.clone(), context, ) } else { @@ -70,13 +70,13 @@ pub trait TypeVisitor { { self.visit_list_with_item( list_type.clone(), - fields.first().unwrap(), + fields.first().unwrap().clone(), context, ) } else { self.visit_list_with_item( list_type.clone(), - list_item, + list_item.clone(), context, ) } @@ -114,7 +114,7 @@ pub trait TypeVisitor { fn visit_list_with_item( &mut self, list_type: TypePtr, - item_type: &Type, + item_type: TypePtr, context: C, ) -> Result; } @@ -125,7 +125,7 @@ mod tests { use crate::basic::Type as PhysicalType; use crate::errors::Result; use crate::schema::parser::parse_message_type; - use crate::schema::types::{Type, TypePtr}; + use crate::schema::types::TypePtr; use std::rc::Rc; struct TestVisitorContext {} @@ -174,7 +174,7 @@ mod tests { fn visit_list_with_item( &mut self, list_type: TypePtr, - item_type: &Type, + item_type: TypePtr, _context: TestVisitorContext, ) -> Result { assert_eq!(