Skip to content
7 changes: 7 additions & 0 deletions rust/arrow/src/array/array_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,13 @@ impl DecimalArray {
let data = builder.build();
Self::from(data)
}
pub fn precision(&self) -> usize {
self.precision
}

pub fn scale(&self) -> usize {
self.scale
}
}

impl From<ArrayDataRef> for DecimalArray {
Expand Down
20 changes: 16 additions & 4 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use std::vec::Vec;

use arrow::array::{
Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, FixedSizeBinaryArray,
FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, ListBuilder,
OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
StructArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, DecimalBuilder,
FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder,
Int64Array, ListBuilder, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
StringArray, StringBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{
Expand Down Expand Up @@ -350,6 +350,18 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
let a = arrow::compute::cast(&array, &ArrowType::Date32(DateUnit::Day))?;
arrow::compute::cast(&a, target_type)?
}
ArrowType::Decimal(p, s) => {
let to_int64 = arrow::compute::cast(&array, &ArrowType::Int64)?;
let mut builder = DecimalBuilder::new(to_int64.len(), *p, *s);
let values = to_int64.as_any().downcast_ref::<Int64Array>().unwrap();
for maybe_value in values.iter() {
match maybe_value {
Some(value) => builder.append_value(value as i128)?,
None => builder.append_null()?,
}
}
Arc::new(builder.finish()) as ArrayRef
}
_ => arrow::compute::cast(&array, target_type)?,
};

Expand Down
44 changes: 25 additions & 19 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,25 +406,31 @@ mod tests {
fn test_read_decimal_file() {
use arrow::array::DecimalArray;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/fixed_length_decimal.parquet", testdata);
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));

let mut record_reader = arrow_reader.get_record_reader(32).unwrap();

let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 24);
let col = batch
.column(0)
.as_any()
.downcast_ref::<DecimalArray>()
.unwrap();

let expected = 1..25;

for (i, v) in expected.enumerate() {
assert_eq!(col.value(i), v * 100_i128);
let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)];
for (prefix, target_precision) in file_variants {
let path = format!("{}/{}_decimal.parquet", testdata, prefix);
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));

let mut record_reader = arrow_reader.get_record_reader(32).unwrap();

let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 24);
let col = batch
.column(0)
.as_any()
.downcast_ref::<DecimalArray>()
.unwrap();

let expected = 1..25;

assert_eq!(col.precision(), target_precision);
assert_eq!(col.scale(), 2);

for (i, v) in expected.enumerate() {
assert_eq!(col.value(i), v * 100_i128);
}
}
}

Expand Down
30 changes: 15 additions & 15 deletions rust/parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ impl ParquetTypeConverter<'_> {
LogicalType::INT_32 => Ok(DataType::Int32),
LogicalType::DATE => Ok(DataType::Date32(DateUnit::Day)),
LogicalType::TIME_MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)),
LogicalType::DECIMAL => Ok(self.to_decimal()),
other => Err(ArrowError(format!(
"Unable to convert parquet INT32 logical type {}",
other
Expand All @@ -610,6 +611,7 @@ impl ParquetTypeConverter<'_> {
LogicalType::TIMESTAMP_MICROS => {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
LogicalType::DECIMAL => Ok(self.to_decimal()),
other => Err(ArrowError(format!(
"Unable to convert parquet INT64 logical type {}",
other
Expand All @@ -619,21 +621,7 @@ impl ParquetTypeConverter<'_> {

fn from_fixed_len_byte_array(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::DECIMAL => {
let (precision, scale) = match self.schema {
Type::PrimitiveType {
ref precision,
ref scale,
..
} => (*precision, *scale),
_ => {
return Err(ArrowError(
"Expected a physical type, not a group type".to_string(),
))
}
};
Ok(DataType::Decimal(precision as usize, scale as usize))
}
LogicalType::DECIMAL => Ok(self.to_decimal()),
LogicalType::INTERVAL => {
// There is currently no reliable way of determining which IntervalUnit
// to return. Thus without the original Arrow schema, the results
Expand All @@ -657,6 +645,18 @@ impl ParquetTypeConverter<'_> {
}
}

fn to_decimal(&self) -> DataType {
assert!(self.schema.is_primitive());
if let Type::PrimitiveType {
precision, scale, ..
} = self.schema
{
DataType::Decimal(*precision as usize, *scale as usize)
} else {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this branch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think I am not familiar enough with Rust yet. I was under the impression that I have to provide an implementation for all possible branches and I get a compiler error when I remove the else branch. Is it possible to unwrap the if part somehow?

error[E0317]: `if` may be missing an `else` clause
   --> parquet/src/arrow/schema.rs:650:9
    |
650 | /         if let Type::PrimitiveType {
651 | |             precision, scale, ..
652 | |         } = self.schema
653 | |         {
654 | |             DataType::Decimal(*precision as usize, *scale as usize)
    | |             ------------------------------------------------------- found here
655 | |         } 
    | |_________^ expected `()`, found enum `arrow::datatypes::DataType`
    |
    = note: `if` expressions without `else` evaluate to `()`
    = help: consider adding an `else` block that evaluates to the expected type

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops my bad - this looks good then.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may make sense to add get_scale and get_precision to Type as well, similar to other getters there, but it's just a good to have :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! I added both methods which cleans up to_decimal quite a bit.

unreachable!()
}
}

fn from_byte_array(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Binary),
Expand Down