Skip to content

Commit

Permalink
Use upstream StatisticsConveter
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 16, 2024
1 parent f0a4ec2 commit 28e41c7
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 5,137 deletions.
6 changes: 2 additions & 4 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
parquet::StatisticsConverter,
{FileScanConfig, ParquetExec},
};
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
};
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,3 @@ name = "sort"
[[bench]]
harness = false
name = "topk_aggregate"

[[bench]]
harness = false
name = "parquet_statistic"
287 changes: 0 additions & 287 deletions datafusion/core/benches/parquet_statistic.rs

This file was deleted.

7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::{
ParquetExecBuilder, StatisticsConverter,
};
use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
Expand Down Expand Up @@ -1310,7 +1309,7 @@ mod tests {
.map(|i| i.to_string())
.collect();
let coll: Vec<_> = schema
.all_fields()
.flattened_fields()
.into_iter()
.map(|i| i.name().to_string())
.collect();
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ mod page_filter;
mod reader;
mod row_filter;
mod row_groups;
mod statistics;
mod writer;

use crate::datasource::schema_adapter::{
Expand All @@ -62,7 +61,6 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use statistics::StatisticsConverter;
pub use writer::plan_to_parquet;

/// Execution plan for reading one or more Parquet files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

//! Contains code to filter entire pages

use super::metrics::ParquetFileMetrics;
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
use crate::datasource::physical_plan::parquet::StatisticsConverter;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use arrow::array::BooleanArray;
use arrow::{array::ArrayRef, datatypes::SchemaRef};
use arrow_schema::Schema;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::format::PageLocation;
use parquet::schema::types::SchemaDescriptor;
Expand All @@ -36,8 +37,6 @@ use parquet::{
use std::collections::HashSet;
use std::sync::Arc;

use super::metrics::ParquetFileMetrics;

/// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present
///
/// It does so by evaluating statistics from the [`ParquetColumnIndex`] and
Expand Down Expand Up @@ -376,7 +375,7 @@ impl<'a> PagesPruningStatistics<'a> {
converter: StatisticsConverter<'a>,
parquet_metadata: &'a ParquetMetaData,
) -> Option<Self> {
let Some(parquet_column_index) = converter.parquet_index() else {
let Some(parquet_column_index) = converter.parquet_column_index() else {
trace!(
"Column {:?} not in parquet file, skipping",
converter.arrow_field()
Expand Down Expand Up @@ -431,7 +430,6 @@ impl<'a> PagesPruningStatistics<'a> {
Some(vec)
}
}

impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
fn min_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
match self.converter.data_page_mins(
Expand Down
Loading

0 comments on commit 28e41c7

Please sign in to comment.