Skip to content

Commit

Permalink
Support Decimal and Decimal256 Parquet Data Page Statistics (apache#1…
Browse files Browse the repository at this point in the history
…1138)

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and findepi committed Jul 16, 2024
1 parent 89d8756 commit 7b152f2
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
118 changes: 118 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,119 @@ make_data_page_stats_iterator!(
Index::DOUBLE,
f64
);

macro_rules! get_decimal_page_stats_iterator {
($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
iter: I,
}

impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}

impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
type Item = Vec<Option<$stat_value_type>>;

fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
match next {
Some((len, index)) => match index {
Index::INT32(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| {
Some($stat_value_type::from(
x.$func.unwrap_or_default(),
))
})
.collect::<Vec<_>>(),
),
Index::INT64(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| {
Some($stat_value_type::from(
x.$func.unwrap_or_default(),
))
})
.collect::<Vec<_>>(),
),
Index::BYTE_ARRAY(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| {
Some($convert_func(
x.clone().$func.unwrap_or_default().data(),
))
})
.collect::<Vec<_>>(),
),
Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| {
Some($convert_func(
x.clone().$func.unwrap_or_default().data(),
))
})
.collect::<Vec<_>>(),
),
_ => Some(vec![None; len]),
},
_ => None,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}

get_decimal_page_stats_iterator!(
MinDecimal128DataPageStatsIterator,
min,
i128,
from_bytes_to_i128
);

get_decimal_page_stats_iterator!(
MaxDecimal128DataPageStatsIterator,
max,
i128,
from_bytes_to_i128
);

get_decimal_page_stats_iterator!(
MinDecimal256DataPageStatsIterator,
min,
i256,
from_bytes_to_i256
);

get_decimal_page_stats_iterator!(
MaxDecimal256DataPageStatsIterator,
max,
i256,
from_bytes_to_i256
);
make_data_page_stats_iterator!(
MinByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.min.clone() },
Expand All @@ -612,6 +725,7 @@ make_data_page_stats_iterator!(
Index::BYTE_ARRAY,
ByteArray
);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
Expand Down Expand Up @@ -755,6 +869,10 @@ macro_rules! get_data_page_statistics {
)
)
),
Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new(
Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new(
Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
_ => unimplemented!()
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1683,7 +1683,7 @@ async fn test_decimal() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "decimal_col",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down Expand Up @@ -1721,7 +1721,7 @@ async fn test_decimal_256() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "decimal256_col",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down

0 comments on commit 7b152f2

Please sign in to comment.