Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: API for collecting statistics/index for metadata of a parquet file + tests #10537

Merged
merged 13 commits into from
May 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod statistics;

pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -482,7 +483,6 @@ struct ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();

let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
Expand Down
156 changes: 153 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::new_empty_array;
use arrow_schema::{FieldRef, Schema};
use datafusion_common::{Result, ScalarValue};
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
use arrow_schema::{Field, FieldRef, Schema};
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, Result, ScalarValue,
};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use std::sync::Arc;

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
Expand Down Expand Up @@ -210,6 +214,152 @@ fn collect_scalars<I: Iterator<Item = Option<ScalarValue>>>(
}
}

/// What type of statistics should be extracted?
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RequestedStatistics {
/// Minimum Value
Min,
/// Maximum Value
Max,
/// Null Count, returned as a [`UInt64Array`])
NullCount,
}

/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
/// conversions. This information can be used for pruning parquet files or row
/// groups based on the statistics embedded in parquet files
///
/// # Schemas
///
/// The schema of the parquet file and the arrow schema are used to convert the
/// underlying statistics value (stored as a parquet value) into the
/// corresponding Arrow value. For example, Decimals are stored as binary in
/// parquet files.
///
/// The parquet_schema and arrow _schema do not have to be identical (for
/// example, the columns may be in different orders and one or the other schemas
/// may have additional columns). The function [`parquet_column`] is used to
/// match the column in the parquet file to the column in the arrow schema.
///
/// # Multiple parquet files
///
/// This API is designed to support efficiently extracting statistics from
/// multiple parquet files (hence why the parquet schema is passed in as an
/// argument). This is useful when building an index for a directory of parquet
/// files.
///
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
/// The name of the column to extract statistics for
column_name: &'a str,
/// The type of statistics to extract
statistics_type: RequestedStatistics,
/// The arrow schema of the query
arrow_schema: &'a Schema,
/// The field (with data type) of the column in the arrow schema
arrow_field: &'a Field,
}

impl<'a> StatisticsConverter<'a> {
/// Returns a [`UInt64Array`] with counts for each row group
///
/// The returned array has no nulls, and has one value for each row group.
/// Each value is the number of rows in the row group.
pub fn row_counts(metadata: &ParquetMetaData) -> Result<UInt64Array> {
let row_groups = metadata.row_groups();
let mut builder = UInt64Array::builder(row_groups.len());
for row_group in row_groups {
let row_count = row_group.num_rows();
let row_count: u64 = row_count.try_into().map_err(|e| {
internal_datafusion_err!(
"Parquet row count {row_count} too large to convert to u64: {e}"
)
})?;
builder.append_value(row_count);
}
Ok(builder.finish())
}

/// create an new statistics converter
pub fn try_new(
column_name: &'a str,
statistics_type: RequestedStatistics,
arrow_schema: &'a Schema,
) -> Result<Self> {
// ensure the requested column is in the arrow schema
let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
return plan_err!(
"Column '{}' not found in schema for statistics conversion",
column_name
);
};

Ok(Self {
column_name,
statistics_type,
arrow_schema,
arrow_field,
})
}

/// extract the statistics from a parquet file, given the parquet file's metadata
///
/// The returned array contains 1 value for each row group in the parquet
/// file in order
///
/// Each value is either
/// * the requested statistics type for the column
/// * a null value, if the statistics can not be extracted
///
/// Note that a null value does NOT mean the min or max value was actually
/// `null` it means it the requested statistic is unknown
///
/// Reasons for not being able to extract the statistics include:
/// * the column is not present in the parquet file
/// * statistics for the column are not present in the row group
/// * the stored statistic value can not be converted to the requested type
pub fn extract(&self, metadata: &ParquetMetaData) -> Result<ArrayRef> {
let data_type = self.arrow_field.data_type();
let num_row_groups = metadata.row_groups().len();

let parquet_schema = metadata.file_metadata().schema_descr();
let row_groups = metadata.row_groups();

// find the column in the parquet schema, if not, return a null array
let Some((parquet_idx, matched_field)) =
parquet_column(parquet_schema, self.arrow_schema, self.column_name)
else {
// column was in the arrow schema but not in the parquet schema, so return a null array
return Ok(new_null_array(data_type, num_row_groups));
};

// sanity check that matching field matches the arrow field
if matched_field.as_ref() != self.arrow_field {
return internal_err!(
"Matched column '{:?}' does not match original matched column '{:?}'",
matched_field,
self.arrow_field
);
}

// Get an iterator over the column statistics
let iter = row_groups
.iter()
.map(|x| x.column(parquet_idx).statistics());

match self.statistics_type {
RequestedStatistics::Min => min_statistics(data_type, iter),
RequestedStatistics::Max => max_statistics(data_type, iter),
RequestedStatistics::NullCount => {
let null_counts = iter.map(|stats| stats.map(|s| s.null_count()));
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
}
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading