From 971e2273234c357fe51d105d3eca302a9579277d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 1 Oct 2020 14:09:18 -0400 Subject: [PATCH 1/8] Add roundtrip Arrow -> Parquet tests for all Arrow DataTypes Inspired by tests in cpp/src/parquet/arrow/arrow_reader_writer_test.cc Tests that currently fail are either marked with `#[should_panic]` (if the reason they fail is because of a panic) or `#[ignore]` (if the reason they fail is because the values don't match). --- rust/parquet/src/arrow/arrow_writer.rs | 409 +++++++++++++++++++++++++ 1 file changed, 409 insertions(+) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index e0ad207b4dc..27cbe1e3480 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -688,4 +688,413 @@ mod tests { writer.write(&batch).unwrap(); writer.close().unwrap(); } + + const SMALL_SIZE: usize = 100; + + fn roundtrip(filename: &str, expected_batch: RecordBatch) { + let file = get_temp_file(filename, &[]); + + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + expected_batch.schema(), + None, + ) + .unwrap(); + writer.write(&expected_batch).unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader)); + let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); + + let actual_batch = record_batch_reader.next_batch().unwrap().unwrap(); + + assert_eq!(expected_batch.schema(), actual_batch.schema()); + assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); + assert_eq!(expected_batch.num_rows(), actual_batch.num_rows()); + for i in 0..expected_batch.num_columns() { + let expected_data = expected_batch.column(i).data(); + let actual_data = actual_batch.column(i).data(); + + assert_eq!(expected_data.data_type(), actual_data.data_type()); + assert_eq!(expected_data.len(), actual_data.len()); + assert_eq!(expected_data.null_count(), actual_data.null_count()); + assert_eq!(expected_data.offset(), actual_data.offset()); + assert_eq!(expected_data.buffers(), actual_data.buffers()); + assert_eq!(expected_data.child_data(), actual_data.child_data()); + assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap()); + } + } + + fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) { + let schema = Schema::new(vec![Field::new( + "col", + values.data_type().clone(), + nullable, + )]); + let expected_batch = + RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); + + roundtrip(filename, expected_batch); + } + + fn values_required(iter: I, filename: &str) + where + A: From> + Array + 'static, + I: IntoIterator, + { + let raw_values: Vec<_> = iter.into_iter().collect(); + let values = Arc::new(A::from(raw_values)); + one_column_roundtrip(filename, values, false); + } + + fn values_optional(iter: I, filename: &str) + where + A: From>> + Array + 'static, + I: IntoIterator, + { + let optional_raw_values: Vec<_> = iter + .into_iter() + .enumerate() + .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) }) + .collect(); + let optional_values = Arc::new(A::from(optional_raw_values)); + one_column_roundtrip(filename, optional_values, true); + } + + fn required_and_optional(iter: I, filename: &str) + where + A: From> + From>> + Array + 'static, + I: IntoIterator + Clone, + { + values_required::(iter.clone(), filename); + values_optional::(iter, filename); + } + + #[test] + #[should_panic(expected = "Null arrays not supported")] + fn null_single_column() { + let values = Arc::new(NullArray::new(SMALL_SIZE)); + one_column_roundtrip("null_single_column", values.clone(), true); + one_column_roundtrip("null_single_column", values, false); + } + + #[test] + #[should_panic( + expected = "Attempting to write an Arrow type that is not yet implemented" + )] + fn bool_single_column() { + required_and_optional::( + [true, false].iter().cycle().copied().take(SMALL_SIZE), + "bool_single_column", + ); + } + + #[test] + fn i8_single_column() { + required_and_optional::(0..SMALL_SIZE as i8, "i8_single_column"); + } + + #[test] + fn i16_single_column() { + required_and_optional::(0..SMALL_SIZE as i16, "i16_single_column"); + } + + #[test] + fn i32_single_column() { + required_and_optional::(0..SMALL_SIZE as i32, "i32_single_column"); + } + + #[test] + fn i64_single_column() { + required_and_optional::(0..SMALL_SIZE as i64, "i64_single_column"); + } + + #[test] + fn u8_single_column() { + required_and_optional::(0..SMALL_SIZE as u8, "u8_single_column"); + } + + #[test] + fn u16_single_column() { + required_and_optional::( + 0..SMALL_SIZE as u16, + "u16_single_column", + ); + } + + #[test] + fn u32_single_column() { + required_and_optional::( + 0..SMALL_SIZE as u32, + "u32_single_column", + ); + } + + #[test] + fn u64_single_column() { + required_and_optional::( + 0..SMALL_SIZE as u64, + "u64_single_column", + ); + } + + // How to create Float16 values that aren't supported in Rust? + + #[test] + fn f32_single_column() { + required_and_optional::( + (0..SMALL_SIZE).map(|i| i as f32), + "f32_single_column", + ); + } + + #[test] + fn f64_single_column() { + required_and_optional::( + (0..SMALL_SIZE).map(|i| i as f64), + "f64_single_column", + ); + } + + // The timestamp array types don't implement From> because they need the timezone + // argument, and they also doesn't support building from a Vec>, so call + // one_column_roundtrip manually instead of calling required_and_optional for these tests. + + #[test] + #[ignore] // Timestamp support isn't correct yet + fn timestamp_second_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_second_single_column", values, false); + } + + #[test] + fn timestamp_millisecond_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_millisecond_single_column", values, false); + } + + #[test] + fn timestamp_microsecond_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_microsecond_single_column", values, false); + } + + #[test] + #[ignore] // Timestamp support isn't correct yet + fn timestamp_nanosecond_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_nanosecond_single_column", values, false); + } + + #[test] + fn date32_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "date32_single_column", + ); + } + + #[test] + #[ignore] // Date support isn't correct yet + fn date64_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "date64_single_column", + ); + } + + #[test] + #[ignore] // Time support isn't correct yet + fn time32_second_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "time32_second_single_column", + ); + } + + #[test] + #[ignore] // Time support isn't correct yet + fn time32_millisecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "time32_millisecond_single_column", + ); + } + + #[test] + #[ignore] // Time support isn't correct yet + fn time64_microsecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "time64_microsecond_single_column", + ); + } + + #[test] + #[ignore] // Time support isn't correct yet + fn time64_nanosecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "time64_nanosecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_second_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_second_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_millisecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_millisecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_microsecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_microsecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_nanosecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_nanosecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Currently unreachable because data type not supported")] + fn interval_year_month_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "interval_year_month_single_column", + ); + } + + #[test] + #[should_panic(expected = "Currently unreachable because data type not supported")] + fn interval_day_time_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "interval_day_time_single_column", + ); + } + + #[test] + #[ignore] // Binary support isn't correct yet - null_bitmap doesn't match + fn binary_single_column() { + let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); + let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); + let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); + + // BinaryArrays can't be built from Vec>, so only call `values_required` + values_required::(many_vecs_iter, "binary_single_column"); + } + + #[test] + #[ignore] // Large Binary support isn't correct yet + fn large_binary_single_column() { + let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); + let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); + let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); + + // LargeBinaryArrays can't be built from Vec>, so only call `values_required` + values_required::( + many_vecs_iter, + "large_binary_single_column", + ); + } + + #[test] + #[ignore] // String support isn't correct yet - null_bitmap doesn't match + fn string_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); + let raw_strs = raw_values.iter().map(|s| s.as_str()); + + required_and_optional::(raw_strs, "string_single_column"); + } + + #[test] + #[ignore] // Large String support isn't correct yet - null_bitmap and buffers don't match + fn large_string_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); + let raw_strs = raw_values.iter().map(|s| s.as_str()); + + required_and_optional::( + raw_strs, + "large_string_single_column", + ); + } + + #[test] + #[should_panic( + expected = "Reading parquet list array into arrow is not supported yet!" + )] + fn list_single_column() { + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let a_value_offsets = + arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + let a_list_data = ArrayData::builder(DataType::List(Box::new(DataType::Int32))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .build(); + let a = ListArray::from(a_list_data); + + let values = Arc::new(a); + one_column_roundtrip("list_single_column", values, false); + } + + #[test] + #[should_panic( + expected = "Reading parquet list array into arrow is not supported yet!" + )] + fn large_list_single_column() { + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let a_value_offsets = + arrow::buffer::Buffer::from(&[0i64, 1, 3, 3, 6, 10].to_byte_slice()); + let a_list_data = + ArrayData::builder(DataType::LargeList(Box::new(DataType::Int32))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .build(); + let a = LargeListArray::from(a_list_data); + + let values = Arc::new(a); + one_column_roundtrip("large_list_single_column", values, false); + } + + #[test] + #[ignore] // Struct support isn't correct yet - null_bitmap doesn't match + fn struct_single_column() { + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let struct_field_a = Field::new("f", DataType::Int32, false); + let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]); + + let values = Arc::new(s); + one_column_roundtrip("struct_single_column", values, false); + } } From 2563586ee1223519d15a600930f7291e337acf28 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 3 Oct 2020 09:06:10 +0200 Subject: [PATCH 2/8] fix a few failing roundtrip tests --- rust/arrow/src/compute/kernels/cast.rs | 20 ++++- rust/parquet/src/arrow/array_reader.rs | 102 ++++++++++++++++++------- rust/parquet/src/arrow/arrow_writer.rs | 6 +- rust/parquet/src/arrow/converter.rs | 25 ++++-- 4 files changed, 115 insertions(+), 38 deletions(-) diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index d8cb480a80c..a1142b481d5 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -327,11 +327,27 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { // temporal casts (Int32, Date32(_)) => cast_array_data::(array, to_type.clone()), - (Int32, Time32(_)) => cast_array_data::(array, to_type.clone()), + (Int32, Time32(unit)) => match unit { + TimeUnit::Second => { + cast_array_data::(array, to_type.clone()) + } + TimeUnit::Millisecond => { + cast_array_data::(array, to_type.clone()) + } + _ => unreachable!(), + }, (Date32(_), Int32) => cast_array_data::(array, to_type.clone()), (Time32(_), Int32) => cast_array_data::(array, to_type.clone()), (Int64, Date64(_)) => cast_array_data::(array, to_type.clone()), - (Int64, Time64(_)) => cast_array_data::(array, to_type.clone()), + (Int64, Time64(unit)) => match unit { + TimeUnit::Microsecond => { + cast_array_data::(array, to_type.clone()) + } + TimeUnit::Nanosecond => { + cast_array_data::(array, to_type.clone()) + } + _ => unreachable!(), + }, (Date64(_), Int64) => cast_array_data::(array, to_type.clone()), (Time64(_), Int64) => cast_array_data::(array, to_type.clone()), (Date32(DateUnit::Day), Date64(DateUnit::Millisecond)) => { diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 14bf7d287a3..4fbc54d209d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -35,9 +35,10 @@ use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, Converter, Date32Converter, FixedLenBinaryConverter, FixedSizeArrayConverter, Float32Converter, Float64Converter, Int16Converter, Int32Converter, Int64Converter, - Int8Converter, Int96ArrayConverter, Int96Converter, TimestampMicrosecondConverter, - TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter, - UInt8Converter, Utf8ArrayConverter, Utf8Converter, + Int8Converter, Int96ArrayConverter, Int96Converter, Time32MillisecondConverter, + Time32SecondConverter, Time64MicrosecondConverter, Time64NanosecondConverter, + TimestampMicrosecondConverter, TimestampMillisecondConverter, UInt16Converter, + UInt32Converter, UInt64Converter, UInt8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -196,11 +197,27 @@ impl ArrayReader for PrimitiveArrayReader { .convert(self.record_reader.cast::()), _ => Err(general_err!("No conversion from parquet type to arrow type for date with unit {:?}", unit)), } - (ArrowType::Time32(_), PhysicalType::INT32) => { - UInt32Converter::new().convert(self.record_reader.cast::()) + (ArrowType::Time32(unit), PhysicalType::INT32) => { + match unit { + TimeUnit::Second => { + Time32SecondConverter::new().convert(self.record_reader.cast::()) + } + TimeUnit::Millisecond => { + Time32MillisecondConverter::new().convert(self.record_reader.cast::()) + } + _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) + } } - (ArrowType::Time64(_), PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::()) + (ArrowType::Time64(unit), PhysicalType::INT64) => { + match unit { + TimeUnit::Microsecond => { + Time64MicrosecondConverter::new().convert(self.record_reader.cast::()) + } + TimeUnit::Nanosecond => { + Time64NanosecondConverter::new().convert(self.record_reader.cast::()) + } + _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) + } } (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => { UInt32Converter::new().convert(self.record_reader.cast::()) @@ -941,10 +958,12 @@ mod tests { use crate::util::test_common::{get_test_file, make_pages}; use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray}; use arrow::datatypes::{ - DataType as ArrowType, Date32Type as ArrowDate32, Field, Int32Type as ArrowInt32, + ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, + Int32Type as ArrowInt32, Int64Type as ArrowInt64, + Time32MillisecondType as ArrowTime32MillisecondArray, + Time64MicrosecondType as ArrowTime64MicrosecondArray, TimestampMicrosecondType as ArrowTimestampMicrosecondType, TimestampMillisecondType as ArrowTimestampMillisecondType, - UInt32Type as ArrowUInt32, UInt64Type as ArrowUInt64, }; use rand::distributions::uniform::SampleUniform; use rand::{thread_rng, Rng}; @@ -1101,7 +1120,7 @@ mod tests { } macro_rules! test_primitive_array_reader_one_type { - ($arrow_parquet_type:ty, $physical_type:expr, $logical_type_str:expr, $result_arrow_type:ty, $result_primitive_type:ty) => {{ + ($arrow_parquet_type:ty, $physical_type:expr, $logical_type_str:expr, $result_arrow_type:ty, $result_arrow_cast_type:ty, $result_primitive_type:ty) => {{ let message_type = format!( " message test_schema {{ @@ -1112,7 +1131,7 @@ mod tests { ); let schema = parse_message_type(&message_type) .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) - .unwrap(); + .expect("Unable to parse message type into a schema descriptor"); let column_desc = schema.column(0); @@ -1142,24 +1161,48 @@ mod tests { Box::new(page_iterator), column_desc.clone(), ) - .unwrap(); + .expect("Unable to get array reader"); - let array = array_reader.next_batch(50).unwrap(); + let array = array_reader + .next_batch(50) + .expect("Unable to get batch from reader"); + let result_data_type = <$result_arrow_type>::get_data_type(); let array = array .as_any() .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::<$result_arrow_type>::from( - data[0..50] - .iter() - .map(|x| *x as $result_primitive_type) - .collect::>() - ), - array + .expect( + format!( + "Unable to downcast {:?} to {:?}", + array.data_type(), + result_data_type + ) + .as_str(), + ); + + // create expected array as primitive, and cast to result type + let expected = PrimitiveArray::<$result_arrow_cast_type>::from( + data[0..50] + .iter() + .map(|x| *x as $result_primitive_type) + .collect::>(), ); + let expected = Arc::new(expected) as ArrayRef; + let expected = arrow::compute::cast(&expected, &result_data_type) + .expect("Unable to cast expected array"); + assert_eq!(expected.data_type(), &result_data_type); + let expected = expected + .as_any() + .downcast_ref::>() + .expect( + format!( + "Unable to downcast expected {:?} to {:?}", + expected.data_type(), + result_data_type + ) + .as_str(), + ); + assert_eq!(expected, array); } }}; } @@ -1171,27 +1214,31 @@ mod tests { PhysicalType::INT32, "DATE", ArrowDate32, + ArrowInt32, i32 ); test_primitive_array_reader_one_type!( Int32Type, PhysicalType::INT32, "TIME_MILLIS", - ArrowUInt32, - u32 + ArrowTime32MillisecondArray, + ArrowInt32, + i32 ); test_primitive_array_reader_one_type!( Int64Type, PhysicalType::INT64, "TIME_MICROS", - ArrowUInt64, - u64 + ArrowTime64MicrosecondArray, + ArrowInt64, + i64 ); test_primitive_array_reader_one_type!( Int64Type, PhysicalType::INT64, "TIMESTAMP_MILLIS", ArrowTimestampMillisecondType, + ArrowInt64, i64 ); test_primitive_array_reader_one_type!( @@ -1199,6 +1246,7 @@ mod tests { PhysicalType::INT64, "TIMESTAMP_MICROS", ArrowTimestampMicrosecondType, + ArrowInt64, i64 ); } diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 27cbe1e3480..314a800c325 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -913,7 +913,7 @@ mod tests { } #[test] - #[ignore] // Time support isn't correct yet + #[ignore] // DateUnit resolution mismatch fn time32_second_single_column() { required_and_optional::( 0..SMALL_SIZE as i32, @@ -922,7 +922,6 @@ mod tests { } #[test] - #[ignore] // Time support isn't correct yet fn time32_millisecond_single_column() { required_and_optional::( 0..SMALL_SIZE as i32, @@ -931,7 +930,6 @@ mod tests { } #[test] - #[ignore] // Time support isn't correct yet fn time64_microsecond_single_column() { required_and_optional::( 0..SMALL_SIZE as i64, @@ -940,7 +938,7 @@ mod tests { } #[test] - #[ignore] // Time support isn't correct yet + #[ignore] // DateUnit resolution mismatch fn time64_nanosecond_single_column() { required_and_optional::( 0..SMALL_SIZE as i64, diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 9fbfa339168..c988aaeacfc 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -17,12 +17,19 @@ use crate::arrow::record_reader::RecordReader; use crate::data_type::{ByteArray, DataType, Int96}; -use arrow::array::{ - Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, - BufferBuilderTrait, FixedSizeBinaryBuilder, StringBuilder, - TimestampNanosecondBuilder, +// TODO: clean up imports (best done when there are few moving parts) +use arrow::{ + array::{ + Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, + BufferBuilderTrait, FixedSizeBinaryBuilder, StringBuilder, + TimestampNanosecondBuilder, + }, + datatypes::Time32MillisecondType, +}; +use arrow::{ + compute::cast, datatypes::Time32SecondType, datatypes::Time64MicrosecondType, + datatypes::Time64NanosecondType, }; -use arrow::compute::cast; use std::convert::From; use std::sync::Arc; @@ -226,6 +233,14 @@ pub type TimestampMillisecondConverter = CastConverter; pub type TimestampMicrosecondConverter = CastConverter; +pub type Time32SecondConverter = + CastConverter; +pub type Time32MillisecondConverter = + CastConverter; +pub type Time64MicrosecondConverter = + CastConverter; +pub type Time64NanosecondConverter = + CastConverter; pub type UInt64Converter = CastConverter; pub type Float32Converter = CastConverter; pub type Float64Converter = CastConverter; From cfd366bb63c23a8f7bec9f0ece233c8ec8c52626 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 5 Oct 2020 14:56:04 -0400 Subject: [PATCH 3/8] Explicitly don't support Float16 --- rust/parquet/src/arrow/arrow_writer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 314a800c325..6002f475614 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -136,7 +136,6 @@ fn write_leaves( | ArrowDataType::UInt16 | ArrowDataType::UInt32 | ArrowDataType::UInt64 - | ArrowDataType::Float16 | ArrowDataType::Float32 | ArrowDataType::Float64 | ArrowDataType::Timestamp(_, _) @@ -176,6 +175,9 @@ fn write_leaves( } Ok(()) } + ArrowDataType::Float16 => Err(ParquetError::ArrowError( + "Float16 arrays not supported".to_string(), + )), ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Null | ArrowDataType::Boolean @@ -839,8 +841,6 @@ mod tests { ); } - // How to create Float16 values that aren't supported in Rust? - #[test] fn f32_single_column() { required_and_optional::( From 8fe210b1f5d93aa14774917835588ab83d9f4e70 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 5 Oct 2020 15:15:42 -0400 Subject: [PATCH 4/8] Update to use new Iterator support by calling next instead of next_batch --- rust/parquet/src/arrow/arrow_writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 6002f475614..0c8edea2bdd 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -599,7 +599,7 @@ mod tests { let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader)); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); - let batch = record_batch_reader.next_batch().unwrap().unwrap(); + let batch = record_batch_reader.next().unwrap().unwrap(); let string_col = batch .column(0) .as_any() @@ -709,7 +709,7 @@ mod tests { let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader)); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); - let actual_batch = record_batch_reader.next_batch().unwrap().unwrap(); + let actual_batch = record_batch_reader.next().unwrap().unwrap(); assert_eq!(expected_batch.schema(), actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); From c3f3597efc96641577bc84aac66120a22c169675 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 5 Oct 2020 15:33:50 -0400 Subject: [PATCH 5/8] Remove unused import --- rust/parquet/src/arrow/arrow_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 0c8edea2bdd..cf7b9a22a5c 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -495,7 +495,7 @@ mod tests { use arrow::array::*; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::{RecordBatch, RecordBatchReader}; + use arrow::record_batch::RecordBatch; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; use crate::file::{metadata::KeyValue, reader::SerializedFileReader}; From 75ebf01d31da8b75e933675f886c205029198ed6 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 5 Oct 2020 15:50:45 -0400 Subject: [PATCH 6/8] ARROW-10168: [Rust] [Parquet] Use Arrow schema by column too Previously, if an Arrow schema was present in the Parquet metadata, that schema would always be returned when requesting all columns via `parquet_to_arrow_schema` and would never be returned when requesting a subset of columns via `parquet_to_arrow_schema_by_columns`. Now, if a valid Arrow schema is present in the Parquet metadata and a subset of columns is requested by Parquet column index, the `parquet_to_arrow_schema_by_columns` function will try to find a column of the same name in the Arrow schema first, and then fall back to the Parquet schema for that column if there isn't an Arrow Field for that column. This is part of what is needed to be able to restore Arrow types like LargeUtf8 from Parquet. --- rust/parquet/src/arrow/schema.rs | 51 +++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 4a92a4642ef..348b7c58f0d 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -65,27 +65,47 @@ pub fn parquet_to_arrow_schema_by_columns( where T: IntoIterator, { + let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); + let arrow_schema_metadata = metadata + .remove(super::ARROW_SCHEMA_META_KEY) + .map(|encoded| get_arrow_schema_from_metadata(&encoded)).unwrap_or_default(); + let mut base_nodes = Vec::new(); let mut base_nodes_set = HashSet::new(); let mut leaves = HashSet::new(); + enum FieldType<'a> { + Parquet(&'a Type), + Arrow(Field), + } + for c in column_indices { - let column = parquet_schema.column(c).self_type() as *const Type; - let root = parquet_schema.get_column_root(c); - let root_raw_ptr = root as *const Type; - - leaves.insert(column); - if !base_nodes_set.contains(&root_raw_ptr) { - base_nodes.push(root); - base_nodes_set.insert(root_raw_ptr); + let column = parquet_schema.column(c); + let name = column.name(); + + if let Some(field) = arrow_schema_metadata.as_ref().and_then(|schema| schema.field_with_name(name).ok().cloned()) { + base_nodes.push(FieldType::Arrow(field)); + } else { + let column = column.self_type() as *const Type; + let root = parquet_schema.get_column_root(c); + let root_raw_ptr = root as *const Type; + + leaves.insert(column); + if !base_nodes_set.contains(&root_raw_ptr) { + base_nodes.push(FieldType::Parquet(root)); + base_nodes_set.insert(root_raw_ptr); + } } } - let metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); - base_nodes .into_iter() - .map(|t| ParquetTypeConverter::new(t, &leaves).to_field()) + .map(|t| { + match t { + FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(), + FieldType::Arrow(f) => Ok(Some(f)), + } + }) .collect::>>>() .map(|result| result.into_iter().filter_map(|f| f).collect::>()) .map(|fields| Schema::new_with_metadata(fields, metadata)) @@ -1436,6 +1456,15 @@ mod tests { let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); let read_schema = arrow_reader.get_schema()?; assert_eq!(schema, read_schema); + + let partial_schema = Schema::new(vec![ + schema.field(22).clone(), + schema.field(23).clone(), + schema.field(24).clone(), + ]); + let partial_read_schema = arrow_reader.get_schema_by_columns(24..27)?; + assert_eq!(partial_schema, partial_read_schema); + Ok(()) } } From f65b2ba6f2ed7aa97b8513a435f3af0961bea0d1 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 6 Oct 2020 18:51:34 +0200 Subject: [PATCH 7/8] run cargo +stable fmt (and clippy) --- rust/arrow/src/ipc/convert.rs | 4 +++- rust/parquet/src/arrow/schema.rs | 16 +++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 8f429bf1eb2..a02b6c44dd9 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -334,7 +334,9 @@ pub(crate) fn build_field<'a: 'b, 'b>( let mut field_builder = ipc::FieldBuilder::new(fbb); field_builder.add_name(fb_field_name); - fb_dictionary.map(|dictionary| field_builder.add_dictionary(dictionary)); + if let Some(dictionary) = fb_dictionary { + field_builder.add_dictionary(dictionary) + } field_builder.add_type_type(field_type.type_type); field_builder.add_nullable(field.is_nullable()); match field_type.children { diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 348b7c58f0d..c07fe922566 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -68,7 +68,8 @@ where let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); let arrow_schema_metadata = metadata .remove(super::ARROW_SCHEMA_META_KEY) - .map(|encoded| get_arrow_schema_from_metadata(&encoded)).unwrap_or_default(); + .map(|encoded| get_arrow_schema_from_metadata(&encoded)) + .unwrap_or_default(); let mut base_nodes = Vec::new(); let mut base_nodes_set = HashSet::new(); @@ -83,7 +84,10 @@ where let column = parquet_schema.column(c); let name = column.name(); - if let Some(field) = arrow_schema_metadata.as_ref().and_then(|schema| schema.field_with_name(name).ok().cloned()) { + if let Some(field) = arrow_schema_metadata + .as_ref() + .and_then(|schema| schema.field_with_name(name).ok().cloned()) + { base_nodes.push(FieldType::Arrow(field)); } else { let column = column.self_type() as *const Type; @@ -100,11 +104,9 @@ where base_nodes .into_iter() - .map(|t| { - match t { - FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(), - FieldType::Arrow(f) => Ok(Some(f)), - } + .map(|t| match t { + FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(), + FieldType::Arrow(f) => Ok(Some(f)), }) .collect::>>>() .map(|result| result.into_iter().filter_map(|f| f).collect::>()) From 55a049b2463b2d7804f91e38af0d56bf8c146ad5 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Wed, 7 Oct 2020 20:12:19 +0200 Subject: [PATCH 8/8] add option to project root columns from schema --- rust/parquet/src/arrow/arrow_reader.rs | 35 +++-- rust/parquet/src/arrow/mod.rs | 3 +- rust/parquet/src/arrow/record_reader.rs | 1 + rust/parquet/src/arrow/schema.rs | 166 ++++++++++++++++++++---- 4 files changed, 169 insertions(+), 36 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index b654de1ad0a..d3922ecaba2 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -19,7 +19,9 @@ use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader}; use crate::arrow::schema::parquet_to_arrow_schema; -use crate::arrow::schema::parquet_to_arrow_schema_by_columns; +use crate::arrow::schema::{ + parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns, +}; use crate::errors::{ParquetError, Result}; use crate::file::reader::FileReader; use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; @@ -40,7 +42,12 @@ pub trait ArrowReader { /// Read parquet schema and convert it into arrow schema. /// This schema only includes columns identified by `column_indices`. - fn get_schema_by_columns(&mut self, column_indices: T) -> Result + /// To select leaf columns (i.e. `a.b.c` instead of `a`), set `leaf_columns = true` + fn get_schema_by_columns( + &mut self, + column_indices: T, + leaf_columns: bool, + ) -> Result where T: IntoIterator; @@ -84,16 +91,28 @@ impl ArrowReader for ParquetFileArrowReader { ) } - fn get_schema_by_columns(&mut self, column_indices: T) -> Result + fn get_schema_by_columns( + &mut self, + column_indices: T, + leaf_columns: bool, + ) -> Result where T: IntoIterator, { let file_metadata = self.file_reader.metadata().file_metadata(); - parquet_to_arrow_schema_by_columns( - file_metadata.schema_descr(), - column_indices, - file_metadata.key_value_metadata(), - ) + if leaf_columns { + parquet_to_arrow_schema_by_columns( + file_metadata.schema_descr(), + column_indices, + file_metadata.key_value_metadata(), + ) + } else { + parquet_to_arrow_schema_by_root_columns( + file_metadata.schema_descr(), + column_indices, + file_metadata.key_value_metadata(), + ) + } } fn get_record_reader( diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 2b012fb777e..979345722d2 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -35,7 +35,7 @@ //! //! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap()); //! println!("Arrow schema after projection is: {}", -//! arrow_reader.get_schema_by_columns(vec![2, 4, 6]).unwrap()); +//! arrow_reader.get_schema_by_columns(vec![2, 4, 6], true).unwrap()); //! //! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap(); //! @@ -61,6 +61,7 @@ pub use self::arrow_reader::ParquetFileArrowReader; pub use self::arrow_writer::ArrowWriter; pub use self::schema::{ arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + parquet_to_arrow_schema_by_root_columns, }; /// Schema metadata key used to store serialized Arrow IPC schema diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index ccfdaf8f0e5..b30ab7760b2 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -86,6 +86,7 @@ impl<'a, T> FatPtr<'a, T> { self.ptr } + #[allow(clippy::wrong_self_convention)] fn to_slice_mut(&mut self) -> &mut [T] { self.ptr } diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index c07fe922566..0cd41fe5925 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -56,7 +56,61 @@ pub fn parquet_to_arrow_schema( } } -/// Convert parquet schema to arrow schema including optional metadata, only preserving some leaf columns. +/// Convert parquet schema to arrow schema including optional metadata, +/// only preserving some root columns. +/// This is useful if we have columns `a.b`, `a.c.e` and `a.d`, +/// and want `a` with all its child fields +pub fn parquet_to_arrow_schema_by_root_columns( + parquet_schema: &SchemaDescriptor, + column_indices: T, + key_value_metadata: &Option>, +) -> Result +where + T: IntoIterator, +{ + // Reconstruct the index ranges of the parent columns + // An Arrow struct gets represented by 1+ columns based on how many child fields the + // struct has. This means that getting fields 1 and 2 might return the struct twice, + // if field 1 is the struct having say 3 fields, and field 2 is a primitive. + // + // The below gets the parent columns, and counts the number of child fields in each parent, + // such that we would end up with: + // - field 1 - columns: [0, 1, 2] + // - field 2 - columns: [3] + let mut parent_columns = vec![]; + let mut curr_name = ""; + let mut prev_name = ""; + let mut indices = vec![]; + (0..(parquet_schema.num_columns())).for_each(|i| { + let p_type = parquet_schema.get_column_root(i); + curr_name = p_type.get_basic_info().name(); + if prev_name == "" { + // first index + indices.push(i); + prev_name = curr_name; + } else if curr_name != prev_name { + prev_name = curr_name; + parent_columns.push((curr_name.to_string(), indices.clone())); + indices = vec![i]; + } else { + indices.push(i); + } + }); + // push the last column if indices has values + if !indices.is_empty() { + parent_columns.push((curr_name.to_string(), indices)); + } + + // gather the required leaf columns + let leaf_columns = column_indices + .into_iter() + .flat_map(|i| parent_columns[i].1.clone()); + + parquet_to_arrow_schema_by_columns(parquet_schema, leaf_columns, key_value_metadata) +} + +/// Convert parquet schema to arrow schema including optional metadata, +/// only preserving some leaf columns. pub fn parquet_to_arrow_schema_by_columns( parquet_schema: &SchemaDescriptor, column_indices: T, @@ -71,6 +125,13 @@ where .map(|encoded| get_arrow_schema_from_metadata(&encoded)) .unwrap_or_default(); + // add the Arrow metadata to the Parquet metadata + if let Some(arrow_schema) = &arrow_schema_metadata { + arrow_schema.metadata().iter().for_each(|(k, v)| { + metadata.insert(k.clone(), v.clone()); + }); + } + let mut base_nodes = Vec::new(); let mut base_nodes_set = HashSet::new(); let mut leaves = HashSet::new(); @@ -1389,21 +1450,21 @@ mod tests { Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false), Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), - Field::new( - "c22", - DataType::FixedSizeList(Box::new(DataType::Boolean), 5), - false, - ), - Field::new( - "c23", - DataType::List(Box::new(DataType::List(Box::new(DataType::Struct( - vec![ - Field::new("a", DataType::Int16, true), - Field::new("b", DataType::Float64, false), - ], - ))))), - true, - ), + // Field::new( + // "c22", + // DataType::FixedSizeList(Box::new(DataType::Boolean), 5), + // false, + // ), + // Field::new( + // "c23", + // DataType::List(Box::new(DataType::LargeList(Box::new( + // DataType::Struct(vec![ + // Field::new("a", DataType::Int16, true), + // Field::new("b", DataType::Float64, false), + // ]), + // )))), + // true, + // ), Field::new( "c24", DataType::Struct(vec![ @@ -1430,12 +1491,66 @@ mod tests { ), Field::new("c32", DataType::LargeBinary, true), Field::new("c33", DataType::LargeUtf8, true), + // Field::new( + // "c34", + // DataType::LargeList(Box::new(DataType::List(Box::new( + // DataType::Struct(vec![ + // Field::new("a", DataType::Int16, true), + // Field::new("b", DataType::Float64, true), + // ]), + // )))), + // true, + // ), + ], + metadata, + ); + + // write to an empty parquet file so that schema is serialized + let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; + writer.close()?; + + // read file back + let parquet_reader = SerializedFileReader::try_from(file)?; + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); + let read_schema = arrow_reader.get_schema()?; + assert_eq!(schema, read_schema); + + // read all fields by columns + let partial_read_schema = + arrow_reader.get_schema_by_columns(0..(schema.fields().len()), false)?; + assert_eq!(schema, partial_read_schema); + + Ok(()) + } + + #[test] + #[ignore = "Roundtrip of lists currently fails because we don't check their types correctly in the Arrow schema"] + fn test_arrow_schema_roundtrip_lists() -> Result<()> { + let metadata: HashMap = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); + + let schema = Schema::new_with_metadata( + vec![ + Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), Field::new( - "c34", - DataType::LargeList(Box::new(DataType::LargeList(Box::new( + "c22", + DataType::FixedSizeList(Box::new(DataType::Boolean), 5), + false, + ), + Field::new( + "c23", + DataType::List(Box::new(DataType::LargeList(Box::new( DataType::Struct(vec![ Field::new("a", DataType::Int16, true), - Field::new("b", DataType::Float64, true), + Field::new("b", DataType::Float64, false), ]), )))), true, @@ -1445,7 +1560,7 @@ mod tests { ); // write to an empty parquet file so that schema is serialized - let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); + let file = get_temp_file("test_arrow_schema_roundtrip_lists.parquet", &[]); let mut writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(schema.clone()), @@ -1459,13 +1574,10 @@ mod tests { let read_schema = arrow_reader.get_schema()?; assert_eq!(schema, read_schema); - let partial_schema = Schema::new(vec![ - schema.field(22).clone(), - schema.field(23).clone(), - schema.field(24).clone(), - ]); - let partial_read_schema = arrow_reader.get_schema_by_columns(24..27)?; - assert_eq!(partial_schema, partial_read_schema); + // read all fields by columns + let partial_read_schema = + arrow_reader.get_schema_by_columns(0..(schema.fields().len()), false)?; + assert_eq!(schema, partial_read_schema); Ok(()) }