Skip to content

Commit

Permalink
Use upstream StatisticsConveter
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 29, 2024
1 parent ea8c287 commit 409023e
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 5,554 deletions.
6 changes: 2 additions & 4 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
parquet::StatisticsConverter,
{FileScanConfig, ParquetExec},
};
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
};
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ name = "sort"
harness = false
name = "topk_aggregate"

[[bench]]
harness = false
name = "parquet_statistic"

[[bench]]
harness = false
name = "map_query_sql"
Expand Down
287 changes: 0 additions & 287 deletions datafusion/core/benches/parquet_statistic.rs

This file was deleted.

5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::{
ParquetExecBuilder, StatisticsConverter,
};
use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ mod page_filter;
mod reader;
mod row_filter;
mod row_group_filter;
mod statistics;
mod writer;

use crate::datasource::schema_adapter::{
Expand All @@ -62,7 +61,6 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use statistics::StatisticsConverter;
pub use writer::plan_to_parquet;

/// Execution plan for reading one or more Parquet files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

//! Contains code to filter entire pages

use super::metrics::ParquetFileMetrics;
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
use crate::datasource::physical_plan::parquet::StatisticsConverter;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use arrow::array::BooleanArray;
use arrow::{array::ArrayRef, datatypes::SchemaRef};
use arrow_schema::Schema;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::format::PageLocation;
use parquet::schema::types::SchemaDescriptor;
Expand All @@ -36,8 +37,6 @@ use parquet::{
use std::collections::HashSet;
use std::sync::Arc;

use super::metrics::ParquetFileMetrics;

/// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present
///
/// It does so by evaluating statistics from the [`ParquetColumnIndex`] and
Expand Down Expand Up @@ -377,7 +376,7 @@ impl<'a> PagesPruningStatistics<'a> {
converter: StatisticsConverter<'a>,
parquet_metadata: &'a ParquetMetaData,
) -> Option<Self> {
let Some(parquet_column_index) = converter.parquet_index() else {
let Some(parquet_column_index) = converter.parquet_column_index() else {
trace!(
"Column {:?} not in parquet file, skipping",
converter.arrow_field()
Expand Down Expand Up @@ -432,7 +431,6 @@ impl<'a> PagesPruningStatistics<'a> {
Some(vec)
}
}

impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
fn min_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
match self.converter.data_page_mins(
Expand Down
Loading

0 comments on commit 409023e

Please sign in to comment.