Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update parquet page pruning code to use the StatisticsExtractor #11483

Merged
merged 7 commits into from
Jul 18, 2024
51 changes: 11 additions & 40 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::file_stream::FileStream;
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner,
parquet::page_filter::PagePruningAccessPlanFilter, DisplayAs, FileGroupPartitioner,
FileScanConfig,
};
use crate::{
Expand All @@ -39,13 +39,11 @@ use crate::{
},
};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};

use itertools::Itertools;
use log::debug;
use parquet::basic::{ConvertedType, LogicalType};
use parquet::schema::types::ColumnDescriptor;

mod access_plan;
mod metrics;
Expand Down Expand Up @@ -225,7 +223,7 @@ pub struct ParquetExec {
/// Optional predicate for pruning row groups (derived from `predicate`)
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages (derived from `predicate`)
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to be consistent with what this is -- it isn't a pruning predicate per se

/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
Expand Down Expand Up @@ -381,19 +379,12 @@ impl ParquetExecBuilder {
})
.filter(|p| !p.always_true());

let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!(
"Could not create page pruning predicate for '{:?}': {}",
pruning_predicate, e
);
predicate_creation_errors.add(1);
None
}
}
});
let page_pruning_predicate = predicate
.as_ref()
.map(|predicate_expr| {
PagePruningAccessPlanFilter::new(predicate_expr, file_schema.clone())
})
.map(Arc::new);

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
Expand Down Expand Up @@ -739,7 +730,7 @@ impl ExecutionPlan for ParquetExec {

fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
) -> bool {
enable_page_index
&& page_pruning_predicate.is_some()
Expand All @@ -749,26 +740,6 @@ fn should_enable_page_index(
.unwrap_or(false)
}

// Convert parquet column schema to arrow data type, and just consider the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now handled entirely in the StatisticsConverter

// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
parquet_column: &ColumnDescriptor,
) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Some(DataType::Decimal128(precision as u8, scale as i8))
}
_ => match type_ptr.get_basic_info().converted_type() {
ConvertedType::DECIMAL => Some(DataType::Decimal128(
type_ptr.get_precision() as u8,
type_ptr.get_scale() as i8,
)),
_ => None,
},
}
}

#[cfg(test)]
mod tests {
// See also `parquet_exec` integration test
Expand Down Expand Up @@ -798,7 +769,7 @@ mod tests {
};
use arrow::datatypes::{Field, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
use arrow_schema::Fields;
use arrow_schema::{DataType, Fields};
use datafusion_common::{assert_contains, ScalarValue};
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::planner::logical2physical;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! [`ParquetOpener`] for opening Parquet files

use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
Expand Down Expand Up @@ -46,7 +46,7 @@ pub(super) struct ParquetOpener {
pub limit: Option<usize>,
pub predicate: Option<Arc<dyn PhysicalExpr>>,
pub pruning_predicate: Option<Arc<PruningPredicate>>,
pub page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
pub page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
pub table_schema: SchemaRef,
pub metadata_size_hint: Option<usize>,
pub metrics: ExecutionPlanMetricsSet,
Expand Down
Loading
Loading