Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ use arrow::datatypes::{
use arrow::util::bit_util;

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter,
LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter,
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter,
LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down Expand Up @@ -1542,6 +1542,33 @@ impl<'a> ArrayReaderBuilder {
)?))
}
}
PhysicalType::FIXED_LEN_BYTE_ARRAY
if cur_type.get_basic_info().logical_type() == LogicalType::DECIMAL =>
{
let (precision, scale) = match *cur_type {
Type::PrimitiveType {
ref precision,
ref scale,
..
} => (*precision, *scale),
_ => {
return Err(ArrowError(
"Expected a physical type, not a group type".to_string(),
))
}
};
let converter =
DecimalConverter::new(DecimalArrayConverter::new(precision, scale));
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
DecimalConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
let byte_width = match *cur_type {
Type::PrimitiveType {
Expand Down
27 changes: 27 additions & 0 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,33 @@ mod tests {
>(2, message_type, &converter);
}

#[test]
fn test_read_decimal_file() {
use arrow::array::DecimalArray;
let testdata =
env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
let path = format!("{}/fixed_length_decimal.parquet", testdata);
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));

let mut record_reader = arrow_reader.get_record_reader(32).unwrap();

let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 24);
let col = batch
.column(0)
.as_any()
.downcast_ref::<DecimalArray>()
.unwrap();

let expected = 1..25;

for (i, v) in expected.enumerate() {
assert_eq!(col.value(i), v * 100_i128);
}
}

/// Parameters for single_column_reader_test
#[derive(Debug)]
struct TestOptions {
Expand Down
55 changes: 51 additions & 4 deletions rust/parquet/src/arrow/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
use crate::data_type::{ByteArray, DataType, FixedLenByteArray, Int96};
// TODO: clean up imports (best done when there are few moving parts)
use arrow::array::{
Array, ArrayRef, BinaryBuilder, FixedSizeBinaryBuilder, LargeBinaryBuilder,
LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder, StringBuilder,
StringDictionaryBuilder, TimestampNanosecondBuilder,
Array, ArrayRef, BinaryBuilder, DecimalBuilder, FixedSizeBinaryBuilder,
LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder,
StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
};
use arrow::compute::cast;
use std::convert::From;
Expand All @@ -30,7 +30,7 @@ use crate::errors::Result;
use arrow::datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType};

use arrow::array::{
BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray,
BinaryArray, DecimalArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray,
LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray,
};
use std::marker::PhantomData;
Expand Down Expand Up @@ -76,6 +76,47 @@ impl Converter<Vec<Option<FixedLenByteArray>>, FixedSizeBinaryArray>
}
}

pub struct DecimalArrayConverter {
precision: i32,
scale: i32,
}

impl DecimalArrayConverter {
pub fn new(precision: i32, scale: i32) -> Self {
Self { precision, scale }
}

fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "DecimalArray supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
i128::from_be_bytes(result)
}
}

impl Converter<Vec<Option<FixedLenByteArray>>, DecimalArray> for DecimalArrayConverter {
fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<DecimalArray> {
let mut builder = DecimalBuilder::new(
source.len(),
self.precision as usize,
self.scale as usize,
);
for v in source {
match v {
Some(array) => {
builder.append_value(Self::from_bytes_to_i128(array.data()))
}
None => builder.append_null(),
}?
}

Ok(builder.finish())
}
}

pub struct Int96ArrayConverter {}

impl Converter<Vec<Option<Int96>>, TimestampNanosecondArray> for Int96ArrayConverter {
Expand Down Expand Up @@ -287,6 +328,12 @@ pub type FixedLenBinaryConverter = ArrayRefConverter<
FixedSizeArrayConverter,
>;

pub type DecimalConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
DecimalArray,
DecimalArrayConverter,
>;

pub struct FromConverter<S, T> {
_source: PhantomData<S>,
_dest: PhantomData<T>,
Expand Down
15 changes: 15 additions & 0 deletions rust/parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,21 @@ impl ParquetTypeConverter<'_> {
}

fn from_fixed_len_byte_array(&self) -> Result<DataType> {
if self.schema.get_basic_info().logical_type() == LogicalType::DECIMAL {
let (precision, scale) = match self.schema {
Type::PrimitiveType {
ref precision,
ref scale,
..
} => (*precision, *scale),
_ => {
return Err(ArrowError(
"Expected a physical type, not a group type".to_string(),
))
}
};
return Ok(DataType::Decimal(precision as usize, scale as usize));
}
let byte_width = match self.schema {
Type::PrimitiveType {
ref type_length, ..
Expand Down