Skip to content

Commit

Permalink
Add ParquetExec::builder(), deprecate ParquetExec::new (#10636)
Browse files Browse the repository at this point in the history
* Add `ParquetExec::builder()`, deprecate `ParquetExec::new`

* Add a #[must_use]
  • Loading branch information
alamb authored May 29, 2024
1 parent 2796e01 commit 77352b2
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 142 deletions.
22 changes: 14 additions & 8 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::arrow::array::{
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, ParquetExec};
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
Expand Down Expand Up @@ -75,6 +75,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
Expand Down Expand Up @@ -253,17 +254,22 @@ impl FileFormat for ParquetFormat {
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut builder =
ParquetExecBuilder::new_with_options(conf, self.options.clone());

// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = self.enable_pruning().then(|| filters.cloned()).flatten();
if self.enable_pruning() {
if let Some(predicate) = filters.cloned() {
builder = builder.with_predicate(predicate);
}
}
if let Some(metadata_size_hint) = self.metadata_size_hint() {
builder = builder.with_metadata_size_hint(metadata_size_hint);
}

Ok(Arc::new(ParquetExec::new(
conf,
predicate,
self.metadata_size_hint(),
self.options.clone(),
)))
Ok(builder.build_arc())
}

async fn create_writer_physical_plan(
Expand Down
Loading

0 comments on commit 77352b2

Please sign in to comment.