diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index ed0fc5f0169e..1eea4eab8ba2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::file_stream::FileStream; use crate::datasource::physical_plan::{ - parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner, + parquet::page_filter::PagePruningAccessPlanFilter, DisplayAs, FileGroupPartitioner, FileScanConfig, }; use crate::{ @@ -39,13 +39,11 @@ use crate::{ }, }; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::SchemaRef; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; use itertools::Itertools; use log::debug; -use parquet::basic::{ConvertedType, LogicalType}; -use parquet::schema::types::ColumnDescriptor; mod access_plan; mod metrics; @@ -225,7 +223,7 @@ pub struct ParquetExec { /// Optional predicate for pruning row groups (derived from `predicate`) pruning_predicate: Option>, /// Optional predicate for pruning pages (derived from `predicate`) - page_pruning_predicate: Option>, + page_pruning_predicate: Option>, /// Optional hint for the size of the parquet metadata metadata_size_hint: Option, /// Optional user defined parquet file reader factory @@ -381,19 +379,12 @@ impl ParquetExecBuilder { }) .filter(|p| !p.always_true()); - let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| { - match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) { - Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)), - Err(e) => { - debug!( - "Could not create page pruning predicate for '{:?}': {}", - pruning_predicate, e - ); - predicate_creation_errors.add(1); - None - } - } - }); + let page_pruning_predicate = predicate + .as_ref() + .map(|predicate_expr| { + PagePruningAccessPlanFilter::new(predicate_expr, file_schema.clone()) + }) + .map(Arc::new); let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); @@ -739,7 +730,7 @@ impl ExecutionPlan for ParquetExec { fn should_enable_page_index( enable_page_index: bool, - page_pruning_predicate: &Option>, + page_pruning_predicate: &Option>, ) -> bool { enable_page_index && page_pruning_predicate.is_some() @@ -749,26 +740,6 @@ fn should_enable_page_index( .unwrap_or(false) } -// Convert parquet column schema to arrow data type, and just consider the -// decimal data type. -pub(crate) fn parquet_to_arrow_decimal_type( - parquet_column: &ColumnDescriptor, -) -> Option { - let type_ptr = parquet_column.self_type_ptr(); - match type_ptr.get_basic_info().logical_type() { - Some(LogicalType::Decimal { scale, precision }) => { - Some(DataType::Decimal128(precision as u8, scale as i8)) - } - _ => match type_ptr.get_basic_info().converted_type() { - ConvertedType::DECIMAL => Some(DataType::Decimal128( - type_ptr.get_precision() as u8, - type_ptr.get_scale() as i8, - )), - _ => None, - }, - } -} - #[cfg(test)] mod tests { // See also `parquet_exec` integration test @@ -798,7 +769,7 @@ mod tests { }; use arrow::datatypes::{Field, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; - use arrow_schema::Fields; + use arrow_schema::{DataType, Fields}; use datafusion_common::{assert_contains, ScalarValue}; use datafusion_expr::{col, lit, when, Expr}; use datafusion_physical_expr::planner::logical2physical; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index c97b0282626a..ffe879eb8de0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,7 +17,7 @@ //! [`ParquetOpener`] for opening Parquet files -use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; +use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ row_filter, should_enable_page_index, ParquetAccessPlan, @@ -46,7 +46,7 @@ pub(super) struct ParquetOpener { pub limit: Option, pub predicate: Option>, pub pruning_predicate: Option>, - pub page_pruning_predicate: Option>, + pub page_pruning_predicate: Option>, pub table_schema: SchemaRef, pub metadata_size_hint: Option, pub metrics: ExecutionPlanMetricsSet, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 7429ca593820..d658608ab4f1 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -17,40 +17,33 @@ //! Contains code to filter entire pages -use arrow::array::{ - BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array, - StringArray, -}; -use arrow::datatypes::DataType; +use crate::datasource::physical_plan::parquet::ParquetAccessPlan; +use crate::datasource::physical_plan::parquet::StatisticsConverter; +use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use arrow::array::BooleanArray; use arrow::{array::ArrayRef, datatypes::SchemaRef}; use arrow_schema::Schema; -use datafusion_common::{Result, ScalarValue}; -use datafusion_physical_expr::expressions::Column; +use datafusion_common::ScalarValue; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use log::{debug, trace}; -use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; +use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex}; +use parquet::format::PageLocation; +use parquet::schema::types::SchemaDescriptor; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, - file::{ - metadata::{ParquetMetaData, RowGroupMetaData}, - page_index::index::Index, - }, - format::PageLocation, + file::metadata::{ParquetMetaData, RowGroupMetaData}, }; use std::collections::HashSet; use std::sync::Arc; -use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::statistics::{ - from_bytes_to_i128, parquet_column, -}; -use crate::datasource::physical_plan::parquet::ParquetAccessPlan; -use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; - use super::metrics::ParquetFileMetrics; -/// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`] -/// based on parquet page level statistics, if any +/// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present +/// +/// It does so by evaluating statistics from the [`ParquetColumnIndex`] and +/// [`ParquetOffsetIndex`] and converting them to [`RowSelection`]. +/// +/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md /// /// For example, given a row group with two column (chunks) for `A` /// and `B` with the following with page level statistics: @@ -103,30 +96,52 @@ use super::metrics::ParquetFileMetrics; /// /// So we can entirely skip rows 0->199 and 250->299 as we know they /// can not contain rows that match the predicate. +/// +/// # Implementation notes +/// +/// Single column predicates are evaluated using the PageIndex information +/// for that column to determine which row ranges can be skipped based. +/// +/// The resulting [`RowSelection`]'s are combined into a final +/// row selection that is added to the [`ParquetAccessPlan`]. #[derive(Debug)] -pub struct PagePruningPredicate { +pub struct PagePruningAccessPlanFilter { + /// single column predicates (e.g. (`col = 5`) extracted from the overall + /// predicate. Must all be true for a row to be included in the result. predicates: Vec, } -impl PagePruningPredicate { - /// Create a new [`PagePruningPredicate`] - // TODO: this is infallaible -- it can not return an error - pub fn try_new(expr: &Arc, schema: SchemaRef) -> Result { +impl PagePruningAccessPlanFilter { + /// Create a new [`PagePruningAccessPlanFilter`] from a physical + /// expression. + pub fn new(expr: &Arc, schema: SchemaRef) -> Self { + // extract any single column predicates let predicates = split_conjunction(expr) .into_iter() .filter_map(|predicate| { - match PruningPredicate::try_new(predicate.clone(), schema.clone()) { - Ok(p) - if (!p.always_true()) - && (p.required_columns().n_columns() < 2) => - { - Some(Ok(p)) - } - _ => None, + let pp = + match PruningPredicate::try_new(predicate.clone(), schema.clone()) { + Ok(pp) => pp, + Err(e) => { + debug!("Ignoring error creating page pruning predicate: {e}"); + return None; + } + }; + + if pp.always_true() { + debug!("Ignoring always true page pruning predicate: {predicate}"); + return None; + } + + if pp.required_columns().single_column().is_none() { + debug!("Ignoring multi-column page pruning predicate: {predicate}"); + return None; } + + Some(pp) }) - .collect::>>()?; - Ok(Self { predicates }) + .collect::>(); + Self { predicates } } /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the @@ -136,7 +151,7 @@ impl PagePruningPredicate { mut access_plan: ParquetAccessPlan, arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, - file_metadata: &ParquetMetaData, + parquet_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, ) -> ParquetAccessPlan { // scoped timer updates on drop @@ -146,18 +161,18 @@ impl PagePruningPredicate { } let page_index_predicates = &self.predicates; - let groups = file_metadata.row_groups(); + let groups = parquet_metadata.row_groups(); if groups.is_empty() { return access_plan; } - let (Some(file_offset_indexes), Some(file_page_indexes)) = - (file_metadata.offset_index(), file_metadata.column_index()) - else { - trace!( - "skip page pruning due to lack of indexes. Have offset: {}, column index: {}", - file_metadata.offset_index().is_some(), file_metadata.column_index().is_some() + if parquet_metadata.offset_index().is_none() + || parquet_metadata.column_index().is_none() + { + debug!( + "Can not prune pages due to lack of indexes. Have offset: {}, column index: {}", + parquet_metadata.offset_index().is_some(), parquet_metadata.column_index().is_some() ); return access_plan; }; @@ -165,33 +180,39 @@ impl PagePruningPredicate { // track the total number of rows that should be skipped let mut total_skip = 0; + // for each row group specified in the access plan let row_group_indexes = access_plan.row_group_indexes(); - for r in row_group_indexes { + for row_group_index in row_group_indexes { // The selection for this particular row group let mut overall_selection = None; for predicate in page_index_predicates { - // find column index in the parquet schema - let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); - let row_group_metadata = &groups[r]; - - let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = ( - file_page_indexes.get(r), - file_offset_indexes.get(r), - col_idx, - ) else { - trace!( - "Did not have enough metadata to prune with page indexes, \ - falling back to all rows", - ); - continue; + let column = predicate + .required_columns() + .single_column() + .expect("Page pruning requires single column predicates"); + + let converter = StatisticsConverter::try_new( + column.name(), + arrow_schema, + parquet_schema, + ); + + let converter = match converter { + Ok(converter) => converter, + Err(e) => { + debug!( + "Could not create statistics converter for column {}: {e}", + column.name() + ); + continue; + } }; let selection = prune_pages_in_one_row_group( - row_group_metadata, + row_group_index, predicate, - rg_offset_indexes.get(col_idx), - rg_page_indexes.get(col_idx), - groups[r].column(col_idx).column_descr(), + converter, + parquet_metadata, file_metrics, ); @@ -224,15 +245,15 @@ impl PagePruningPredicate { let rows_skipped = rows_skipped(&overall_selection); trace!("Overall selection from predicate skipped {rows_skipped}: {overall_selection:?}"); total_skip += rows_skipped; - access_plan.scan_selection(r, overall_selection) + access_plan.scan_selection(row_group_index, overall_selection) } else { // Selection skips all rows, so skip the entire row group - let rows_skipped = groups[r].num_rows() as usize; - access_plan.skip(r); + let rows_skipped = groups[row_group_index].num_rows() as usize; + access_plan.skip(row_group_index); total_skip += rows_skipped; trace!( "Overall selection from predicate is empty, \ - skipping all {rows_skipped} rows in row group {r}" + skipping all {rows_skipped} rows in row group {row_group_index}" ); } } @@ -242,7 +263,7 @@ impl PagePruningPredicate { access_plan } - /// Returns the number of filters in the [`PagePruningPredicate`] + /// Returns the number of filters in the [`PagePruningAccessPlanFilter`] pub fn filter_number(&self) -> usize { self.predicates.len() } @@ -266,97 +287,53 @@ fn update_selection( } } -/// Returns the column index in the row parquet schema for the single -/// column of a single column pruning predicate. -/// -/// For example, give the predicate `y > 5` +/// Returns a [`RowSelection`] for the rows in this row group to scan. /// -/// And columns in the RowGroupMetadata like `['x', 'y', 'z']` will -/// return 1. +/// This Row Selection is formed from the page index and the predicate skips row +/// ranges that can be ruled out based on the predicate. /// -/// Returns `None` if the column is not found, or if there are no -/// required columns, which is the case for predicate like `abs(i) = -/// 1` which are rewritten to `lit(true)` -/// -/// Panics: -/// -/// If the predicate contains more than one column reference (assumes -/// that `extract_page_index_push_down_predicates` only returns -/// predicate with one col) -fn find_column_index( - predicate: &PruningPredicate, - arrow_schema: &Schema, - parquet_schema: &SchemaDescriptor, -) -> Option { - let mut found_required_column: Option<&Column> = None; - - for required_column_details in predicate.required_columns().iter() { - let column = &required_column_details.0; - if let Some(found_required_column) = found_required_column.as_ref() { - // make sure it is the same name we have seen previously - assert_eq!( - column.name(), - found_required_column.name(), - "Unexpected multi column predicate" - ); - } else { - found_required_column = Some(column); - } - } - - let Some(column) = found_required_column.as_ref() else { - trace!("No column references in pruning predicate"); - return None; - }; - - parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0) -} - -/// Returns a `RowSelection` for the pages in this RowGroup if any -/// rows can be pruned based on the page index +/// Returns `None` if there is an error evaluating the predicate or the required +/// page information is not present. fn prune_pages_in_one_row_group( - group: &RowGroupMetaData, - predicate: &PruningPredicate, - col_offset_indexes: Option<&Vec>, - col_page_indexes: Option<&Index>, - col_desc: &ColumnDescriptor, + row_group_index: usize, + pruning_predicate: &PruningPredicate, + converter: StatisticsConverter<'_>, + parquet_metadata: &ParquetMetaData, metrics: &ParquetFileMetrics, ) -> Option { - let num_rows = group.num_rows() as usize; - let (Some(col_offset_indexes), Some(col_page_indexes)) = - (col_offset_indexes, col_page_indexes) - else { - return None; - }; - - let target_type = parquet_to_arrow_decimal_type(col_desc); - let pruning_stats = PagesPruningStatistics { - col_page_indexes, - col_offset_indexes, - target_type: &target_type, - num_rows_in_row_group: group.num_rows(), - }; + let pruning_stats = + PagesPruningStatistics::try_new(row_group_index, converter, parquet_metadata)?; - let values = match predicate.prune(&pruning_stats) { + // Each element in values is a boolean indicating whether the page may have + // values that match the predicate (true) or could not possibly have values + // that match the predicate (false). + let values = match pruning_predicate.prune(&pruning_stats) { Ok(values) => values, Err(e) => { - // stats filter array could not be built - // return a result which will not filter out any pages debug!("Error evaluating page index predicate values {e}"); metrics.predicate_evaluation_errors.add(1); return None; } }; + // Convert the information of which pages to skip into a RowSelection + // that describes the ranges of rows to skip. + let Some(page_row_counts) = pruning_stats.page_row_counts() else { + debug!( + "Can not determine page row counts for row group {row_group_index}, skipping" + ); + metrics.predicate_evaluation_errors.add(1); + return None; + }; + let mut vec = Vec::with_capacity(values.len()); - let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); - assert_eq!(row_vec.len(), values.len()); - let mut sum_row = *row_vec.first().unwrap(); + assert_eq!(page_row_counts.len(), values.len()); + let mut sum_row = *page_row_counts.first().unwrap(); let mut selected = *values.first().unwrap(); trace!("Pruned to {:?} using {:?}", values, pruning_stats); for (i, &f) in values.iter().enumerate().skip(1) { if f == selected { - sum_row += *row_vec.get(i).unwrap(); + sum_row += *page_row_counts.get(i).unwrap(); } else { let selector = if selected { RowSelector::select(sum_row) @@ -364,7 +341,7 @@ fn prune_pages_in_one_row_group( RowSelector::skip(sum_row) }; vec.push(selector); - sum_row = *row_vec.get(i).unwrap(); + sum_row = *page_row_counts.get(i).unwrap(); selected = f; } } @@ -378,206 +355,143 @@ fn prune_pages_in_one_row_group( Some(RowSelection::from(vec)) } -fn create_row_count_in_each_page( - location: &[PageLocation], - num_rows: usize, -) -> Vec { - let mut vec = Vec::with_capacity(location.len()); - location.windows(2).for_each(|x| { - let start = x[0].first_row_index as usize; - let end = x[1].first_row_index as usize; - vec.push(end - start); - }); - vec.push(num_rows - location.last().unwrap().first_row_index as usize); - vec -} - -/// Wraps one col page_index in one rowGroup statistics in a way -/// that implements [`PruningStatistics`] +/// Implement [`PruningStatistics`] for one column's PageIndex (column_index + offset_index) #[derive(Debug)] struct PagesPruningStatistics<'a> { - col_page_indexes: &'a Index, - col_offset_indexes: &'a Vec, - // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the - // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY` - target_type: &'a Option, - num_rows_in_row_group: i64, + row_group_index: usize, + row_group_metadatas: &'a [RowGroupMetaData], + converter: StatisticsConverter<'a>, + column_index: &'a ParquetColumnIndex, + offset_index: &'a ParquetOffsetIndex, + page_offsets: &'a Vec, } -// Extract the min or max value calling `func` from page idex -macro_rules! get_min_max_values_for_page_index { - ($self:expr, $func:ident) => {{ - match $self.col_page_indexes { - Index::NONE => None, - Index::INT32(index) => { - match $self.target_type { - // int32 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - let vec = &index.indexes; - let vec: Vec> = vec - .iter() - .map(|x| x.$func().and_then(|x| Some(*x as i128))) - .collect(); - Decimal128Array::from(vec) - .with_precision_and_scale(*precision, *scale) - .ok() - .map(|arr| Arc::new(arr) as ArrayRef) - } - _ => { - let vec = &index.indexes; - Some(Arc::new(Int32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - } - } - Index::INT64(index) => { - match $self.target_type { - // int64 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - let vec = &index.indexes; - let vec: Vec> = vec - .iter() - .map(|x| x.$func().and_then(|x| Some(*x as i128))) - .collect(); - Decimal128Array::from(vec) - .with_precision_and_scale(*precision, *scale) - .ok() - .map(|arr| Arc::new(arr) as ArrayRef) - } - _ => { - let vec = &index.indexes; - Some(Arc::new(Int64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - } - } - Index::FLOAT(index) => { - let vec = &index.indexes; - Some(Arc::new(Float32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::DOUBLE(index) => { - let vec = &index.indexes; - Some(Arc::new(Float64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::BOOLEAN(index) => { - let vec = &index.indexes; - Some(Arc::new(BooleanArray::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::BYTE_ARRAY(index) => match $self.target_type { - Some(DataType::Decimal128(precision, scale)) => { - let vec = &index.indexes; - Decimal128Array::from( - vec.iter() - .map(|x| { - x.$func() - .and_then(|x| Some(from_bytes_to_i128(x.as_ref()))) - }) - .collect::>>(), - ) - .with_precision_and_scale(*precision, *scale) - .ok() - .map(|arr| Arc::new(arr) as ArrayRef) - } - _ => { - let vec = &index.indexes; - let array: StringArray = vec - .iter() - .map(|x| x.$func()) - .map(|x| x.and_then(|x| std::str::from_utf8(x.as_ref()).ok())) - .collect(); - Some(Arc::new(array)) - } - }, - Index::INT96(_) => { - //Todo support these type - None - } - Index::FIXED_LEN_BYTE_ARRAY(index) => match $self.target_type { - Some(DataType::Decimal128(precision, scale)) => { - let vec = &index.indexes; - Decimal128Array::from( - vec.iter() - .map(|x| { - x.$func() - .and_then(|x| Some(from_bytes_to_i128(x.as_ref()))) - }) - .collect::>>(), - ) - .with_precision_and_scale(*precision, *scale) - .ok() - .map(|arr| Arc::new(arr) as ArrayRef) - } - _ => None, - }, - } - }}; +impl<'a> PagesPruningStatistics<'a> { + /// Creates a new [`PagesPruningStatistics`] for a column in a row group, if + /// possible. + /// + /// Returns None if the `parquet_metadata` does not have sufficient + /// information to create the statistics. + fn try_new( + row_group_index: usize, + converter: StatisticsConverter<'a>, + parquet_metadata: &'a ParquetMetaData, + ) -> Option { + let Some(parquet_column_index) = converter.parquet_index() else { + trace!( + "Column {:?} not in parquet file, skipping", + converter.arrow_field() + ); + return None; + }; + + let column_index = parquet_metadata.column_index()?; + let offset_index = parquet_metadata.offset_index()?; + let row_group_metadatas = parquet_metadata.row_groups(); + + let Some(row_group_page_offsets) = offset_index.get(row_group_index) else { + trace!("No page offsets for row group {row_group_index}, skipping"); + return None; + }; + let Some(page_offsets) = row_group_page_offsets.get(parquet_column_index) else { + trace!( + "No page offsets for column {:?} in row group {row_group_index}, skipping", + converter.arrow_field() + ); + return None; + }; + + Some(Self { + row_group_index, + row_group_metadatas, + converter, + column_index, + offset_index, + page_offsets, + }) + } + + /// return the row counts in each data page, if possible. + fn page_row_counts(&self) -> Option> { + let row_group_metadata = self + .row_group_metadatas + .get(self.row_group_index) + // fail fast/panic if row_group_index is out of bounds + .unwrap(); + + let num_rows_in_row_group = row_group_metadata.num_rows() as usize; + + let page_offsets = self.page_offsets; + let mut vec = Vec::with_capacity(page_offsets.len()); + page_offsets.windows(2).for_each(|x| { + let start = x[0].first_row_index as usize; + let end = x[1].first_row_index as usize; + vec.push(end - start); + }); + vec.push(num_rows_in_row_group - page_offsets.last()?.first_row_index as usize); + Some(vec) + } } impl<'a> PruningStatistics for PagesPruningStatistics<'a> { fn min_values(&self, _column: &datafusion_common::Column) -> Option { - get_min_max_values_for_page_index!(self, min) + match self.converter.data_page_mins( + self.column_index, + self.offset_index, + [&self.row_group_index], + ) { + Ok(min_values) => Some(min_values), + Err(e) => { + debug!("Error evaluating data page min values {e}"); + None + } + } } fn max_values(&self, _column: &datafusion_common::Column) -> Option { - get_min_max_values_for_page_index!(self, max) + match self.converter.data_page_maxes( + self.column_index, + self.offset_index, + [&self.row_group_index], + ) { + Ok(min_values) => Some(min_values), + Err(e) => { + debug!("Error evaluating data page max values {e}"); + None + } + } } fn num_containers(&self) -> usize { - self.col_offset_indexes.len() + self.page_offsets.len() } fn null_counts(&self, _column: &datafusion_common::Column) -> Option { - match self.col_page_indexes { - Index::NONE => None, - Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT32(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT64(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT96(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::BYTE_ARRAY(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::FIXED_LEN_BYTE_ARRAY(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), + match self.converter.data_page_null_counts( + self.column_index, + self.offset_index, + [&self.row_group_index], + ) { + Ok(null_counts) => Some(Arc::new(null_counts)), + Err(e) => { + debug!("Error evaluating data page null counts {e}"); + None + } } } fn row_counts(&self, _column: &datafusion_common::Column) -> Option { - // see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982 - - let row_count_per_page = self.col_offset_indexes.windows(2).map(|location| { - Some(location[1].first_row_index - location[0].first_row_index) - }); - - // append the last page row count - let row_count_per_page = row_count_per_page.chain(std::iter::once(Some( - self.num_rows_in_row_group - - self.col_offset_indexes.last().unwrap().first_row_index, - ))); - - Some(Arc::new(Int64Array::from_iter(row_count_per_page))) + match self.converter.data_page_row_counts( + self.offset_index, + self.row_group_metadatas, + [&self.row_group_index], + ) { + Ok(row_counts) => row_counts.map(|a| Arc::new(a) as ArrayRef), + Err(e) => { + debug!("Error evaluating data page row counts {e}"); + None + } + } } fn contained( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 44e22f778075..3d250718f736 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -1136,6 +1136,16 @@ pub struct StatisticsConverter<'a> { } impl<'a> StatisticsConverter<'a> { + /// Return the index of the column in the parquet file, if any + pub fn parquet_index(&self) -> Option { + self.parquet_index + } + + /// Return the arrow field of the column in the arrow schema + pub fn arrow_field(&self) -> &'a Field { + self.arrow_field + } + /// Returns a [`UInt64Array`] with row counts for each row group /// /// # Return Value diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index a1ace229985e..3c18e53497fd 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -609,6 +609,8 @@ impl PruningPredicate { /// /// This happens if the predicate is a literal `true` and /// literal_guarantees is empty. + /// + /// This can happen when a predicate is simplified to a constant `true` pub fn always_true(&self) -> bool { is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty() } @@ -736,12 +738,25 @@ impl RequiredColumns { Self::default() } - /// Returns number of unique columns - pub(crate) fn n_columns(&self) -> usize { - self.iter() - .map(|(c, _s, _f)| c) - .collect::>() - .len() + /// Returns Some(column) if this is a single column predicate. + /// + /// Returns None if this is a multi-column predicate. + /// + /// Examples: + /// * `a > 5 OR a < 10` returns `Some(a)` + /// * `a > 5 OR b < 10` returns `None` + /// * `true` returns None + pub(crate) fn single_column(&self) -> Option<&phys_expr::Column> { + if self.columns.windows(2).all(|w| { + // check if all columns are the same (ignoring statistics and field) + let c1 = &w[0].0; + let c2 = &w[1].0; + c1 == c2 + }) { + self.columns.first().map(|r| &r.0) + } else { + None + } } /// Returns an iterator over items in columns (see doc on