From 086d6d208dffd628f2aad5ec72eb287fe54cc327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=BCller?= Date: Wed, 9 Dec 2020 16:47:31 +0100 Subject: [PATCH 1/4] feat: add parquet arrow reader for decimal --- rust/parquet/src/arrow/array_reader.rs | 35 +++++++++++++++-- rust/parquet/src/arrow/arrow_reader.rs | 27 +++++++++++++ rust/parquet/src/arrow/converter.rs | 52 ++++++++++++++++++++++++-- rust/parquet/src/arrow/schema.rs | 15 ++++++++ 4 files changed, 121 insertions(+), 8 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 145f3156dca..037a881c7e1 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -55,10 +55,10 @@ use arrow::datatypes::{ use arrow::util::bit_util; use crate::arrow::converter::{ - BinaryArrayConverter, BinaryConverter, Converter, FixedLenBinaryConverter, - FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, - LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, - LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, + BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter, + DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, + Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, + LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -1542,6 +1542,33 @@ impl<'a> ArrayReaderBuilder { )?)) } } + PhysicalType::FIXED_LEN_BYTE_ARRAY + if cur_type.get_basic_info().logical_type() == LogicalType::DECIMAL => + { + let (precision, scale) = match *cur_type { + Type::PrimitiveType { + ref precision, + ref scale, + .. + } => (*precision, *scale), + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) + } + }; + let converter = + DecimalConverter::new(DecimalArrayConverter::new(precision, scale)); + Ok(Box::new(ComplexObjectArrayReader::< + FixedLenByteArrayType, + DecimalConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } PhysicalType::FIXED_LEN_BYTE_ARRAY => { let byte_width = match *cur_type { Type::PrimitiveType { diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index 7d5b5b864bb..f25c53b4e76 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -381,6 +381,33 @@ mod tests { >(2, message_type, &converter); } + #[test] + fn test_read_decimal_file() { + use arrow::array::DecimalArray; + let testdata = + env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined"); + let path = format!("{}/fixed_length_decimal.parquet", testdata); + let parquet_reader = + SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + + let mut record_reader = arrow_reader.get_record_reader(32).unwrap(); + + let batch = record_reader.next().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 24); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + let expected = 1..25; + + for (i, v) in expected.enumerate() { + assert_eq!(col.value(i), v * 100 as i128); + } + } + /// Parameters for single_column_reader_test #[derive(Debug)] struct TestOptions { diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 94d6987a033..449f6d45492 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -18,9 +18,9 @@ use crate::data_type::{ByteArray, DataType, FixedLenByteArray, Int96}; // TODO: clean up imports (best done when there are few moving parts) use arrow::array::{ - Array, ArrayRef, BinaryBuilder, FixedSizeBinaryBuilder, LargeBinaryBuilder, - LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder, StringBuilder, - StringDictionaryBuilder, TimestampNanosecondBuilder, + Array, ArrayRef, BinaryBuilder, DecimalBuilder, FixedSizeBinaryBuilder, + LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder, + StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, }; use arrow::compute::cast; use std::convert::From; @@ -30,7 +30,7 @@ use crate::errors::Result; use arrow::datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType}; use arrow::array::{ - BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray, + BinaryArray, DecimalArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray, LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray, }; use std::marker::PhantomData; @@ -76,6 +76,47 @@ impl Converter>, FixedSizeBinaryArray> } } +pub struct DecimalArrayConverter { + precision: i32, + scale: i32, +} + +impl DecimalArrayConverter { + pub fn new(precision: i32, scale: i32) -> Self { + Self { precision, scale } + } + + fn from_bytes_to_i128(b: &[u8]) -> i128 { + assert!(b.len() <= 16, "DecimalArray supports only up to size 16"); + let first_bit = b[0] & 128u8 == 128u8; + let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; + for (i, v) in b.iter().enumerate() { + result[i + (16 - b.len())] = *v; + } + i128::from_be_bytes(result) + } +} + +impl Converter>, DecimalArray> for DecimalArrayConverter { + fn convert(&self, source: Vec>) -> Result { + let mut builder = DecimalBuilder::new( + source.len(), + self.precision as usize, + self.scale as usize, + ); + for v in source { + match v { + Some(array) => { + builder.append_value(Self::from_bytes_to_i128(array.data())) + } + None => builder.append_null(), + }? + } + + Ok(builder.finish()) + } +} + pub struct Int96ArrayConverter {} impl Converter>, TimestampNanosecondArray> for Int96ArrayConverter { @@ -287,6 +328,9 @@ pub type FixedLenBinaryConverter = ArrayRefConverter< FixedSizeArrayConverter, >; +pub type DecimalConverter = + ArrayRefConverter>, DecimalArray, DecimalArrayConverter>; + pub struct FromConverter { _source: PhantomData, _dest: PhantomData, diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index e11dc45fa75..f8bda5e5b59 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -606,6 +606,21 @@ impl ParquetTypeConverter<'_> { } fn from_fixed_len_byte_array(&self) -> Result { + if self.schema.get_basic_info().logical_type() == LogicalType::DECIMAL { + let (precision, scale) = match self.schema { + Type::PrimitiveType { + ref precision, + ref scale, + .. + } => (*precision, *scale), + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) + } + }; + return Ok(DataType::Decimal(precision as usize, scale as usize)); + } let byte_width = match self.schema { Type::PrimitiveType { ref type_length, .. From 41fee883c9ad09af29cf85398ab296ecd13b8dab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=BCller?= Date: Thu, 10 Dec 2020 16:36:45 +0100 Subject: [PATCH 2/4] fix: avoid unnecessary cast --- rust/parquet/src/arrow/arrow_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index f25c53b4e76..ee8ff67571f 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -404,7 +404,7 @@ mod tests { let expected = 1..25; for (i, v) in expected.enumerate() { - assert_eq!(col.value(i), v * 100 as i128); + assert_eq!(col.value(i), v * 100_i128); } } From f2fc2131f5336eb44f360c85cc760cda9a8e5b65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=BCller?= Date: Thu, 17 Dec 2020 08:05:17 +0100 Subject: [PATCH 3/4] fix: replace ByteArray with FixedLenByteArray --- rust/parquet/src/arrow/converter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 449f6d45492..c23c774b94f 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -97,8 +97,8 @@ impl DecimalArrayConverter { } } -impl Converter>, DecimalArray> for DecimalArrayConverter { - fn convert(&self, source: Vec>) -> Result { +impl Converter>, DecimalArray> for DecimalArrayConverter { + fn convert(&self, source: Vec>) -> Result { let mut builder = DecimalBuilder::new( source.len(), self.precision as usize, @@ -329,7 +329,7 @@ pub type FixedLenBinaryConverter = ArrayRefConverter< >; pub type DecimalConverter = - ArrayRefConverter>, DecimalArray, DecimalArrayConverter>; + ArrayRefConverter>, DecimalArray, DecimalArrayConverter>; pub struct FromConverter { _source: PhantomData, From 56b643b0817c0a7cce27f71280bf4ad111539b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20M=C3=BCller?= Date: Thu, 17 Dec 2020 08:23:58 +0100 Subject: [PATCH 4/4] fix: formatting --- rust/parquet/src/arrow/converter.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index c23c774b94f..2c109bcfbb5 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -328,8 +328,11 @@ pub type FixedLenBinaryConverter = ArrayRefConverter< FixedSizeArrayConverter, >; -pub type DecimalConverter = - ArrayRefConverter>, DecimalArray, DecimalArrayConverter>; +pub type DecimalConverter = ArrayRefConverter< + Vec>, + DecimalArray, + DecimalArrayConverter, +>; pub struct FromConverter { _source: PhantomData,