Skip to content

Commit

Permalink
Initial Extract parquet data page statistics API (#10852)
Browse files Browse the repository at this point in the history
* feat: enable page statistics

* feat: prototype int64 data_page_min

* feat: prototype MinInt64DataPageStatsIterator

* feat: add make_data_page_stats_iterator macro

* feat: add get_data_page_statistics macro

* feat: add MaxInt64DataPageStatsIterator

* feat: add test_data_page_stats param

* chore: add testcase int64_with_nulls

* feat: add data page null_counts

* fix: clippy

* chore: rename column_page_index

* feat: add data page row counts

* feat: add num_data_pages to iterator

* chore: update docs

* fix: use colum_offset len in data_page_null_counts

* fix: docs

* tweak comments

* update test helper

* Add explicit multi-data page tests to statistics test

* Add explicit data page test

* remove duplicate test

* update coverage

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
marvinlanhenke and alamb authored Jun 15, 2024
1 parent ebca681 commit 2f43476
Show file tree
Hide file tree
Showing 3 changed files with 657 additions and 140 deletions.
315 changes: 312 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use arrow_array::{
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
use half::f16;
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
use parquet::file::page_index::index::Index;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use paste::paste;
Expand Down Expand Up @@ -517,6 +518,74 @@ macro_rules! get_statistics {
}}}
}

macro_rules! make_data_page_stats_iterator {
($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
iter: I,
}

impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}

impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
type Item = Vec<Option<$stat_value_type>>;

fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
match next {
Some((len, index)) => match index {
$index_type(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| x.$func)
.collect::<Vec<_>>(),
),
// No matching `Index` found;
// thus no statistics that can be extracted.
// We return vec![None; len] to effectively
// create an arrow null-array with the length
// corresponding to the number of entries in
// `ParquetOffsetIndex` per row group per column.
_ => Some(vec![None; len]),
},
_ => None,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}

make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
}
}
}
}

/// Lookups up the parquet column by name
///
/// Returns the parquet column index and the corresponding arrow field
Expand Down Expand Up @@ -563,6 +632,51 @@ fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
get_statistics!(Max, data_type, iterator)
}

/// Extracts the min statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn min_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Min, data_type, iterator)
}

/// Extracts the max statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn max_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Max, data_type, iterator)
}

/// Extracts the null count statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
///
/// The returned Array is an [`UInt64Array`]
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
let iter = iterator.flat_map(|(len, index)| match index {
Index::NONE => vec![None; len],
Index::INT64(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
_ => unimplemented!(),
});

Ok(Arc::new(UInt64Array::from_iter(iter)))
}

/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
Expand Down Expand Up @@ -771,10 +885,205 @@ impl<'a> StatisticsConverter<'a> {
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
}

/// Extract the minimum values from Data Page statistics.
///
/// In Parquet files, in addition to the Column Chunk level statistics
/// (stored for each column for each row group) there are also
/// optional statistics stored for each data page, as part of
/// the [`ParquetColumnIndex`].
///
/// Since a single Column Chunk is stored as one or more pages,
/// page level statistics can prune at a finer granularity.
///
/// However since they are stored in a separate metadata
/// structure ([`Index`]) there is different code to extract them as
/// compared to arrow statistics.
///
/// # Parameters:
///
/// * `column_page_index`: The parquet column page indices, read from
/// `ParquetMetaData` column_index
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column page index and offset index on a per row group
/// per column basis.
///
/// # Return Value
///
/// The returned array contains 1 value for each `NativeIndex`
/// in the underlying `Index`es, in the same order as they appear
/// in `metadatas`.
///
/// For example, if there are two `Index`es in `metadatas`:
/// 1. the first having `3` `PageIndex` entries
/// 2. the second having `2` `PageIndex` entries
///
/// The returned array would have 5 rows.
///
/// Each value is either:
/// * the minimum value for the page
/// * a null value, if the statistics can not be extracted
///
/// Note that a null value does NOT mean the min value was actually
/// `null` it means it the requested statistic is unknown
///
/// # Errors
///
/// Reasons for not being able to extract the statistics include:
/// * the column is not present in the parquet file
/// * statistics for the pages are not present in the row group
/// * the stored statistic value can not be converted to the requested type
pub fn data_page_mins<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});

min_page_statistics(Some(data_type), iter)
}

/// Extract the maximum values from Data Page statistics.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_maxes<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});

max_page_statistics(Some(data_type), iter)
}

/// Extract the null counts from Data Page statistics.
///
/// The returned Array is an [`UInt64Array`]
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_null_counts<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
null_counts_page_statistics(iter)
}

/// Returns an [`ArrayRef`] with row counts for each row group.
///
/// This function iterates over the given row group indexes and computes
/// the row count for each page in the specified column.
///
/// # Parameters:
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
///
/// * `row_group_metadatas`: The metadata slice of the row groups, read
/// from `ParquetMetaData` row_groups
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column offset index on a per row group per column basis.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_row_counts<I>(
&self,
column_offset_index: &ParquetOffsetIndex,
row_group_metadatas: &[RowGroupMetaData],
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

// `offset_index[row_group_number][column_number][page_number]` holds
// the [`PageLocation`] corresponding to page `page_number` of column
// `column_number`of row group `row_group_number`.
let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];

let row_count_per_page = page_locations.windows(2).map(|loc| {
Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64)
});

let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();

// append the last page row count
let row_count_per_page = row_count_per_page
.chain(std::iter::once(Some(
*num_rows_in_row_group as u64
- page_locations.last().unwrap().first_row_index as u64,
)))
.collect::<Vec<_>>();

row_count_total.extend(row_count_per_page);
}

Ok(Arc::new(UInt64Array::from_iter(row_count_total)))
}

/// Returns a null array of data_type with one element per row group
fn make_null_array<I>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
I: IntoIterator<Item = A>,
{
// column was in the arrow schema but not in the parquet schema, so return a null array
let num_row_groups = metadatas.into_iter().count();
Expand Down
Loading

0 comments on commit 2f43476

Please sign in to comment.