Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions rust/parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use arrow::record_batch::RecordBatch;
use arrow_array::Array;

use super::levels::LevelInfo;
use super::schema::add_encoded_arrow_schema_to_metadata;
use super::schema::{
add_encoded_arrow_schema_to_metadata, decimal_length_from_precision,
};

use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -143,7 +145,8 @@ fn write_leaves(
| ArrowDataType::LargeBinary
| ArrowDataType::Binary
| ArrowDataType::Utf8
| ArrowDataType::LargeUtf8 => {
| ArrowDataType::LargeUtf8
| ArrowDataType::Decimal(_, _) => {
let mut col_writer = get_col_writer(&mut row_group_writer)?;
write_leaf(
&mut col_writer,
Expand Down Expand Up @@ -188,7 +191,6 @@ fn write_leaves(
)),
ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::FixedSizeBinary(_)
| ArrowDataType::Decimal(_, _)
| ArrowDataType::Union(_) => Err(ParquetError::NYI(
"Attempting to write an Arrow type that is not yet implemented".to_string(),
)),
Expand Down Expand Up @@ -315,6 +317,13 @@ fn write_leaf(
.unwrap();
get_fsb_array_slice(&array, &indices)
}
ArrowDataType::Decimal(_, _) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::DecimalArray>()
.unwrap();
get_decimal_array_slice(&array, &indices)
}
_ => {
return Err(ParquetError::NYI(
"Attempting to write an Arrow type that is not yet implemented"
Expand Down Expand Up @@ -416,6 +425,20 @@ fn get_interval_dt_array_slice(
values
}

fn get_decimal_array_slice(
array: &arrow_array::DecimalArray,
indices: &[usize],
) -> Vec<FixedLenByteArray> {
let mut values = Vec::with_capacity(indices.len());
let size = decimal_length_from_precision(array.precision());
for i in indices {
let as_be_bytes = array.value(*i).to_be_bytes();
let resized_value = as_be_bytes[(16 - size)..].to_vec();
values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
}
values
}

fn get_fsb_array_slice(
array: &arrow_array::FixedSizeBinaryArray,
indices: &[usize],
Expand Down Expand Up @@ -633,6 +656,49 @@ mod tests {
}
}

#[test]
fn arrow_writer_decimal() {
let decimal_field = Field::new("a", DataType::Decimal(5, 2), false);
let schema = Schema::new(vec![decimal_field]);

let mut dec_builder = DecimalBuilder::new(4, 5, 2);
dec_builder.append_value(10_000).unwrap();
dec_builder.append_value(50_000).unwrap();
dec_builder.append_value(0).unwrap();
dec_builder.append_value(-100).unwrap();

let raw_decimal_i128_values: Vec<i128> = vec![10_000, 50_000, 0, -100];
let decimal_values = dec_builder.finish();
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(decimal_values)],
)
.unwrap();

let mut file = get_temp_file("test_arrow_writer_decimal.parquet", &[]);
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

file.seek(std::io::SeekFrom::Start(0)).unwrap();
let file_reader = SerializedFileReader::new(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();

let batch = record_batch_reader.next().unwrap().unwrap();
let decimal_col = batch
.column(0)
.as_any()
.downcast_ref::<DecimalArray>()
.unwrap();

for i in 0..batch.num_rows() {
assert_eq!(decimal_col.value(i), raw_decimal_i128_values[i]);
}
}

#[test]
#[ignore = "See ARROW-11294, data is correct but list field name is incorrect"]
fn arrow_writer_complex() {
Expand Down
8 changes: 4 additions & 4 deletions rust/parquet/src/arrow/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ impl LevelInfo {
| DataType::Duration(_)
| DataType::Interval(_)
| DataType::Binary
| DataType::LargeBinary => {
| DataType::LargeBinary
| DataType::Decimal(_, _) => {
// we return a vector of 1 value to represent the primitive
vec![self.calculate_child_levels(
array_offsets,
Expand All @@ -145,7 +146,6 @@ impl LevelInfo {
)]
}
DataType::FixedSizeBinary(_) => unimplemented!(),
DataType::Decimal(_, _) => unimplemented!(),
DataType::List(list_field) | DataType::LargeList(list_field) => {
// Calculate the list level
let list_level = self.calculate_child_levels(
Expand Down Expand Up @@ -188,7 +188,8 @@ impl LevelInfo {
| DataType::LargeBinary
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Dictionary(_, _) => {
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _) => {
vec![list_level.calculate_child_levels(
child_offsets,
child_mask,
Expand All @@ -197,7 +198,6 @@ impl LevelInfo {
)]
}
DataType::FixedSizeBinary(_) => unimplemented!(),
DataType::Decimal(_, _) => unimplemented!(),
DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
list_level.calculate_array_levels(&child_array, list_field)
}
Expand Down
20 changes: 13 additions & 7 deletions rust/parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field
.map(|opt| opt.unwrap())
}

pub fn decimal_length_from_precision(precision: usize) -> usize {
(10.0_f64.powi(precision as i32).log2() / 8.0).ceil() as usize
}

/// Convert an arrow field to a parquet `Type`
fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
let name = field.name().as_str();
Expand Down Expand Up @@ -409,13 +413,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_length(*length)
.build()
}
DataType::Decimal(precision, _) => Type::primitive_type_builder(
name,
PhysicalType::FIXED_LEN_BYTE_ARRAY,
)
.with_repetition(repetition)
.with_length((10.0_f64.powi(*precision as i32).log2() / 8.0).ceil() as i32)
.build(),
DataType::Decimal(precision, scale) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we taking the option of always writing decimals as FLBA? I'm fine with that, just confirming.
I don't know under what conditions one would want to use i32 or i64. Is it related to precision and scale?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed Length Byte Array makes sense to me (as the interpretation of the bytes for decimal is different than either i32 or i64).

Reasons I could imagine using i32 or i64 to write decimals into Parquet would be

  1. ecosystem compatibility (aka that the pandas parquet reader assumed decimals were stored using those types)
    2.possibly so better / more performant encodings could be used.

But I am just SWAG'ing it here

.with_repetition(repetition)
.with_length(decimal_length_from_precision(*precision) as i32)
.with_logical_type(LogicalType::DECIMAL)
.with_precision(*precision as i32)
.with_scale(*scale as i32)
.build()
}
DataType::Utf8 | DataType::LargeUtf8 => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::UTF8)
Expand Down