Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Extract parquet data page statistics API #10852

Merged
merged 23 commits into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ed6a0f3
feat: enable page statistics
marvinlanhenke Jun 10, 2024
e4729e8
feat: prototype int64 data_page_min
marvinlanhenke Jun 10, 2024
5e6280a
feat: prototype MinInt64DataPageStatsIterator
marvinlanhenke Jun 10, 2024
5293d29
feat: add make_data_page_stats_iterator macro
marvinlanhenke Jun 10, 2024
c771504
feat: add get_data_page_statistics macro
marvinlanhenke Jun 10, 2024
72a2d4a
feat: add MaxInt64DataPageStatsIterator
marvinlanhenke Jun 10, 2024
6cedd47
feat: add test_data_page_stats param
marvinlanhenke Jun 10, 2024
3e2ffef
chore: add testcase int64_with_nulls
marvinlanhenke Jun 10, 2024
596913a
feat: add data page null_counts
marvinlanhenke Jun 10, 2024
23f1430
fix: clippy
marvinlanhenke Jun 10, 2024
8ccfe89
chore: rename column_page_index
marvinlanhenke Jun 10, 2024
fff96c4
feat: add data page row counts
marvinlanhenke Jun 10, 2024
5ade0be
feat: add num_data_pages to iterator
marvinlanhenke Jun 11, 2024
a302f4e
chore: update docs
marvinlanhenke Jun 11, 2024
96c99ce
fix: use colum_offset len in data_page_null_counts
marvinlanhenke Jun 11, 2024
d731f44
fix: docs
marvinlanhenke Jun 11, 2024
307e2ef
Merge remote-tracking branch 'apache/main' into extract_data_stats
alamb Jun 14, 2024
6f7e856
tweak comments
alamb Jun 14, 2024
8d1b99c
update test helper
alamb Jun 14, 2024
830f662
Add explicit multi-data page tests to statistics test
alamb Jun 14, 2024
d25dd9a
Add explicit data page test
alamb Jun 14, 2024
a381407
remove duplicate test
alamb Jun 14, 2024
a5b6b9b
update coverage
alamb Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 312 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use arrow_array::{
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
use half::f16;
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
use parquet::file::page_index::index::Index;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use paste::paste;
Expand Down Expand Up @@ -517,6 +518,74 @@ macro_rules! get_statistics {
}}}
}

macro_rules! make_data_page_stats_iterator {
($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
iter: I,
}

impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}

impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
type Item = Vec<Option<$stat_value_type>>;

fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
match next {
Some((len, index)) => match index {
$index_type(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| x.$func)
.collect::<Vec<_>>(),
alamb marked this conversation as resolved.
Show resolved Hide resolved
),
// No matching `Index` found;
// thus no statistics that can be extracted.
// We return vec![None; len] to effectively
// create an arrow null-array with the length
// corresponding to the number of entries in
// `ParquetOffsetIndex` per row group per column.
_ => Some(vec![None; len]),
},
_ => None,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}

make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
}
}
}
}

/// Lookups up the parquet column by name
///
/// Returns the parquet column index and the corresponding arrow field
Expand Down Expand Up @@ -563,6 +632,51 @@ fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
get_statistics!(Max, data_type, iterator)
}

/// Extracts the min statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn min_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Min, data_type, iterator)
}

/// Extracts the max statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn max_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Max, data_type, iterator)
}

/// Extracts the null count statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
///
/// The returned Array is an [`UInt64Array`]
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
let iter = iterator.flat_map(|(len, index)| match index {
Index::NONE => vec![None; len],
Index::INT64(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
_ => unimplemented!(),
});

Ok(Arc::new(UInt64Array::from_iter(iter)))
}

/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
Expand Down Expand Up @@ -771,10 +885,205 @@ impl<'a> StatisticsConverter<'a> {
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
}

/// Extract the minimum values from Data Page statistics.
///
/// In Parquet files, in addition to the Column Chunk level statistics
/// (stored for each column for each row group) there are also
/// optional statistics stored for each data page, as part of
/// the [`ParquetColumnIndex`].
///
/// Since a single Column Chunk is stored as one or more pages,
/// page level statistics can prune at a finer granularity.
///
/// However since they are stored in a separate metadata
/// structure ([`Index`]) there is different code to extract them as
/// compared to arrow statistics.
///
/// # Parameters:
///
/// * `column_page_index`: The parquet column page indices, read from
/// `ParquetMetaData` column_index
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column page index and offset index on a per row group
/// per column basis.
///
/// # Return Value
///
/// The returned array contains 1 value for each `NativeIndex`
/// in the underlying `Index`es, in the same order as they appear
/// in `metadatas`.
///
/// For example, if there are two `Index`es in `metadatas`:
/// 1. the first having `3` `PageIndex` entries
/// 2. the second having `2` `PageIndex` entries
///
/// The returned array would have 5 rows.
///
/// Each value is either:
/// * the minimum value for the page
/// * a null value, if the statistics can not be extracted
///
/// Note that a null value does NOT mean the min value was actually
/// `null` it means it the requested statistic is unknown
///
/// # Errors
///
/// Reasons for not being able to extract the statistics include:
/// * the column is not present in the parquet file
/// * statistics for the pages are not present in the row group
/// * the stored statistic value can not be converted to the requested type
pub fn data_page_mins<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});

min_page_statistics(Some(data_type), iter)
}

/// Extract the maximum values from Data Page statistics.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_maxes<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow those structures are hard to use 🤯 -- seems like having an accessor would help a lot. Something to consider upstream maybe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes a lot of sense.

&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});

max_page_statistics(Some(data_type), iter)
}

/// Extract the null counts from Data Page statistics.
///
/// The returned Array is an [`UInt64Array`]
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_null_counts<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
null_counts_page_statistics(iter)
}

/// Returns an [`ArrayRef`] with row counts for each row group.
///
/// This function iterates over the given row group indexes and computes
/// the row count for each page in the specified column.
///
/// # Parameters:
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
///
/// * `row_group_metadatas`: The metadata slice of the row groups, read
/// from `ParquetMetaData` row_groups
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column offset index on a per row group per column basis.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_row_counts<I>(
&self,
column_offset_index: &ParquetOffsetIndex,
row_group_metadatas: &[RowGroupMetaData],
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

// `offset_index[row_group_number][column_number][page_number]` holds
// the [`PageLocation`] corresponding to page `page_number` of column
// `column_number`of row group `row_group_number`.
let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];

let row_count_per_page = page_locations.windows(2).map(|loc| {
Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64)
});

let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();

// append the last page row count
let row_count_per_page = row_count_per_page
.chain(std::iter::once(Some(
*num_rows_in_row_group as u64
- page_locations.last().unwrap().first_row_index as u64,
)))
.collect::<Vec<_>>();

row_count_total.extend(row_count_per_page);
}

Ok(Arc::new(UInt64Array::from_iter(row_count_total)))
}

/// Returns a null array of data_type with one element per row group
fn make_null_array<I>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
I: IntoIterator<Item = A>,
{
// column was in the arrow schema but not in the parquet schema, so return a null array
let num_row_groups = metadatas.into_iter().count();
Expand Down
Loading