From 77352b2411b5d9340374c30e21b861b0d0d46f82 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 May 2024 16:46:18 -0400 Subject: [PATCH] Add `ParquetExec::builder()`, deprecate `ParquetExec::new` (#10636) * Add `ParquetExec::builder()`, deprecate `ParquetExec::new` * Add a #[must_use] --- .../src/datasource/file_format/parquet.rs | 22 +- .../datasource/physical_plan/parquet/mod.rs | 239 ++++++++++++++---- .../core/src/datasource/schema_adapter.rs | 6 +- .../combine_partial_final_agg.rs | 8 +- .../enforce_distribution.rs | 16 +- .../core/src/physical_optimizer/test_utils.rs | 16 +- datafusion/core/src/test_util/parquet.rs | 22 +- .../core/tests/parquet/custom_reader.rs | 6 +- datafusion/core/tests/parquet/page_pruning.rs | 11 +- .../core/tests/parquet/schema_coercion.rs | 16 +- datafusion/proto/src/physical_plan/mod.rs | 11 +- .../tests/cases/roundtrip_physical_plan.rs | 18 +- .../substrait/src/physical_plan/consumer.rs | 8 +- .../tests/cases/roundtrip_physical_plan.rs | 8 +- 14 files changed, 265 insertions(+), 142 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e102cfc372dd..39e6900ed53a 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -30,7 +30,7 @@ use crate::arrow::array::{ }; use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, ParquetExec}; +use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig}; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; @@ -75,6 +75,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; +use crate::datasource::physical_plan::parquet::ParquetExecBuilder; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; @@ -253,17 +254,22 @@ impl FileFormat for ParquetFormat { conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { + let mut builder = + ParquetExecBuilder::new_with_options(conf, self.options.clone()); + // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. - let predicate = self.enable_pruning().then(|| filters.cloned()).flatten(); + if self.enable_pruning() { + if let Some(predicate) = filters.cloned() { + builder = builder.with_predicate(predicate); + } + } + if let Some(metadata_size_hint) = self.metadata_size_hint() { + builder = builder.with_metadata_size_hint(metadata_size_hint); + } - Ok(Arc::new(ParquetExec::new( - conf, - predicate, - self.metadata_size_hint(), - self.options.clone(), - ))) + Ok(builder.build_arc()) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 6655125ea876..ac7c39bbdb94 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -104,6 +104,27 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// `───────────────────' /// /// ``` +/// +/// # Example: Create a `ParquetExec` +/// ``` +/// # use std::sync::Arc; +/// # use arrow::datatypes::Schema; +/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +/// # use datafusion::datasource::listing::PartitionedFile; +/// # let file_schema = Arc::new(Schema::empty()); +/// # let object_store_url = ObjectStoreUrl::local_filesystem(); +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # use datafusion_physical_expr::expressions::lit; +/// # let predicate = lit(true); +/// // Create a ParquetExec for reading `file1.parquet` with a file size of 100MB +/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema) +/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)); +/// let exec = ParquetExec::builder(file_scan_config) +/// // Provide a predicate for filtering row groups/pages +/// .with_predicate(predicate) +/// .build(); +/// ``` +/// /// # Features /// /// Supports the following optimizations: @@ -131,7 +152,7 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// * metadata_size_hint: controls the number of bytes read from the end of the /// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a /// custom reader is used, it supplies the metadata directly and this parameter -/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more details. +/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details. /// /// # Execution Overview /// @@ -141,9 +162,9 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open /// the file. /// -/// * Step 3: The `ParquetOpener` gets the file metadata via -/// [`ParquetFileReaderFactory`] and applies any predicates -/// and projections to determine what pages must be read. +/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata) +/// via [`ParquetFileReaderFactory`] and applies any predicates and projections +/// to determine what pages must be read. /// /// * Step 4: The stream begins reading data, fetching the required pages /// and incrementally decoding them. @@ -154,6 +175,7 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// /// [`RecordBatch`]: arrow::record_batch::RecordBatch /// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter +/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan @@ -179,14 +201,125 @@ pub struct ParquetExec { schema_adapter_factory: Option>, } -impl ParquetExec { - /// Create a new Parquet reader execution plan provided file list and schema. - pub fn new( - base_config: FileScanConfig, - predicate: Option>, - metadata_size_hint: Option, +/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`]. +/// +/// See example on [`ParquetExec`]. +pub struct ParquetExecBuilder { + file_scan_config: FileScanConfig, + predicate: Option>, + metadata_size_hint: Option, + table_parquet_options: TableParquetOptions, + parquet_file_reader_factory: Option>, + schema_adapter_factory: Option>, +} + +impl ParquetExecBuilder { + /// Create a new builder to read the provided file scan configuration + pub fn new(file_scan_config: FileScanConfig) -> Self { + Self::new_with_options(file_scan_config, TableParquetOptions::default()) + } + + /// Create a new builder to read the data specified in the file scan + /// configuration with the provided `TableParquetOptions`. + pub fn new_with_options( + file_scan_config: FileScanConfig, table_parquet_options: TableParquetOptions, ) -> Self { + Self { + file_scan_config, + predicate: None, + metadata_size_hint: None, + table_parquet_options, + parquet_file_reader_factory: None, + schema_adapter_factory: None, + } + } + + /// Set the predicate for the scan. + /// + /// The ParquetExec uses this predicate to filter row groups and data pages + /// using the Parquet statistics and bloom filters. + /// + /// If the predicate can not be used to prune the scan, it is ignored (no + /// error is raised). + pub fn with_predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } + + /// Set the metadata size hint + /// + /// This value determines how many bytes at the end of the file the default + /// [`ParquetFileReaderFactory`] will request in the initial IO. If this is + /// too small, the ParquetExec will need to make additional IO requests to + /// read the footer. + pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { + self.metadata_size_hint = Some(metadata_size_hint); + self + } + + /// Set the table parquet options that control how the ParquetExec reads. + /// + /// See also [`Self::new_with_options`] + pub fn with_table_parquet_options( + mut self, + table_parquet_options: TableParquetOptions, + ) -> Self { + self.table_parquet_options = table_parquet_options; + self + } + + /// Set optional user defined parquet file reader factory. + /// + /// You can use [`ParquetFileReaderFactory`] to more precisely control how + /// data is read from parquet files (e.g. skip re-reading metadata, coalesce + /// I/O operations, etc). + /// + /// The default reader factory reads directly from an [`ObjectStore`] + /// instance using individual I/O operations for the footer and each page. + /// + /// If a custom `ParquetFileReaderFactory` is provided, then data access + /// operations will be routed to this factory instead of `ObjectStore`. + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } + + /// Set optional schema adapter factory. + /// + /// [`SchemaAdapterFactory`] allows user to specify how fields from the + /// parquet file get mapped to that of the table schema. The default schema + /// adapter uses arrow's cast library to map the parquet fields to the table + /// schema. + pub fn with_schema_adapter_factory( + mut self, + schema_adapter_factory: Arc, + ) -> Self { + self.schema_adapter_factory = Some(schema_adapter_factory); + self + } + + /// Convenience: build an `Arc`d `ParquetExec` from this builder + pub fn build_arc(self) -> Arc { + Arc::new(self.build()) + } + + /// Build a [`ParquetExec`] + #[must_use] + pub fn build(self) -> ParquetExec { + let Self { + file_scan_config, + predicate, + metadata_size_hint, + table_parquet_options, + parquet_file_reader_factory, + schema_adapter_factory, + } = self; + + let base_config = file_scan_config; debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -225,12 +358,12 @@ impl ParquetExec { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = Self::compute_properties( + let cache = ParquetExec::compute_properties( projected_schema, &projected_output_ordering, &base_config, ); - Self { + ParquetExec { base_config, projected_statistics, metrics, @@ -238,12 +371,44 @@ impl ParquetExec { pruning_predicate, page_pruning_predicate, metadata_size_hint, - parquet_file_reader_factory: None, + parquet_file_reader_factory, cache, table_parquet_options, - schema_adapter_factory: None, + schema_adapter_factory, } } +} + +impl ParquetExec { + /// Create a new Parquet reader execution plan provided file list and schema. + #[deprecated( + since = "39.0.0", + note = "use `ParquetExec::builder` or `ParquetExecBuilder`" + )] + pub fn new( + base_config: FileScanConfig, + predicate: Option>, + metadata_size_hint: Option, + table_parquet_options: TableParquetOptions, + ) -> Self { + let mut builder = + ParquetExecBuilder::new_with_options(base_config, table_parquet_options); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate); + } + if let Some(metadata_size_hint) = metadata_size_hint { + builder = builder.with_metadata_size_hint(metadata_size_hint); + } + builder.build() + } + + /// Return a [`ParquetExecBuilder`]. + /// + /// See example on [`ParquetExec`] and [`ParquetExecBuilder`] for specifying + /// parquet table options. + pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder { + ParquetExecBuilder::new(file_scan_config) + } /// [`FileScanConfig`] that controls this scan (such as which files to read) pub fn base_config(&self) -> &FileScanConfig { @@ -267,13 +432,7 @@ impl ParquetExec { /// Optional user defined parquet file reader factory. /// - /// You can use [`ParquetFileReaderFactory`] to more precisely control how - /// data is read from parquet files (e.g. skip re-reading metadata, coalesce - /// I/O operations, etc). - /// - /// The default reader factory reads directly from an [`ObjectStore`] - /// instance using individual I/O operations for the footer and then for - /// each page. + /// See documentation on [`ParquetExecBuilder::with_parquet_file_reader_factory`] pub fn with_parquet_file_reader_factory( mut self, parquet_file_reader_factory: Arc, @@ -284,9 +443,7 @@ impl ParquetExec { /// Optional schema adapter factory. /// - /// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to - /// that of the table schema. The default schema adapter uses arrow's cast library to map - /// the parquet fields to the table schema. + /// See documentation on [`ParquetExecBuilder::with_schema_adapter_factory`] pub fn with_schema_adapter_factory( mut self, schema_adapter_factory: Arc, @@ -1033,15 +1190,17 @@ mod tests { let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); // prepare the scan - let mut parquet_exec = ParquetExec::new( + let mut builder = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_group(file_group) .with_projection(projection), - predicate, - None, - Default::default(), ); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate); + } + let mut parquet_exec = builder.build(); + if pushdown_predicate { parquet_exec = parquet_exec .with_pushdown_filters(true) @@ -1684,13 +1843,11 @@ mod tests { expected_row_num: Option, file_schema: SchemaRef, ) -> Result<()> { - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_groups(file_groups), - None, - None, - Default::default(), - ); + ) + .build(); assert_eq!( parquet_exec .properties() @@ -1786,7 +1943,7 @@ mod tests { ), ]); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(object_store_url, schema.clone()) .with_file(partitioned_file) // file has 10 cols so index 12 should be month and 13 should be day @@ -1803,10 +1960,8 @@ mod tests { false, ), ]), - None, - None, - Default::default(), - ); + ) + .build(); assert_eq!( parquet_exec.cache.output_partitioning().partition_count(), 1 @@ -1861,13 +2016,11 @@ mod tests { }; let file_schema = Arc::new(Schema::empty()); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file(partitioned_file), - None, - None, - Default::default(), - ); + ) + .build(); let mut results = parquet_exec.execute(0, state.task_ctx())?; let batch = results.next().await.unwrap(); diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 1838a3354b9c..77fde608fd05 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -258,13 +258,11 @@ mod tests { let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); // prepare the scan - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) .with_file(partitioned_file), - None, - None, - Default::default(), ) + .build() .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); let session_ctx = SessionContext::new(); diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index b93f4012b093..909c8acdb816 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -245,16 +245,14 @@ mod tests { } fn parquet_exec(schema: &SchemaRef) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new( ObjectStoreUrl::parse("test:///").unwrap(), schema.clone(), ) .with_file(PartitionedFile::new("x".to_string(), 100)), - None, - None, - Default::default(), - )) + ) + .build_arc() } fn partial_aggregate_exec( diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 9eb5aafd81a2..88fa3a978af7 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1431,14 +1431,12 @@ pub(crate) mod tests { pub(crate) fn parquet_exec_with_sort( output_ordering: Vec>, ) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(output_ordering), - None, - None, - Default::default(), - )) + ) + .build_arc() } fn parquet_exec_multiple() -> Arc { @@ -1449,17 +1447,15 @@ pub(crate) mod tests { fn parquet_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) .with_file_groups(vec![ vec![PartitionedFile::new("x".to_string(), 100)], vec![PartitionedFile::new("y".to_string(), 100)], ]) .with_output_ordering(output_ordering), - None, - None, - Default::default(), - )) + ) + .build_arc() } fn csv_exec() -> Arc { diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 4d926847e465..cfd0312f813d 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -274,13 +274,11 @@ pub fn sort_preserving_merge_exec( /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) .with_file(PartitionedFile::new("x".to_string(), 100)), - None, - None, - Default::default(), - )) + ) + .build_arc() } // Created a sorted parquet exec @@ -290,14 +288,12 @@ pub fn parquet_exec_sorted( ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(vec![sort_exprs]), - None, - None, - Default::default(), - )) + ) + .build_arc() } pub fn union_exec(input: Vec>) -> Arc { diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index ed539d29bd26..9f06ad9308ab 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; use crate::prelude::{Expr, SessionConfig, SessionContext}; +use crate::datasource::physical_plan::parquet::ParquetExecBuilder; use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; @@ -163,22 +164,19 @@ impl TestParquetFile { let filter = simplifier.coerce(filter, &df_schema).unwrap(); let physical_filter_expr = create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?; - let parquet_exec = Arc::new(ParquetExec::new( - scan_config, - Some(physical_filter_expr.clone()), - None, - parquet_options, - )); + + let parquet_exec = + ParquetExecBuilder::new_with_options(scan_config, parquet_options) + .with_predicate(physical_filter_expr.clone()) + .build_arc(); let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { - Ok(Arc::new(ParquetExec::new( - scan_config, - None, - None, - parquet_options, - ))) + Ok( + ParquetExecBuilder::new_with_options(scan_config, parquet_options) + .build_arc(), + ) } } diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 4f50c55c627c..0e515fd4647b 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -75,17 +75,15 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { .collect(); // prepare the scan - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new( // just any url that doesn't point to in memory object store ObjectStoreUrl::local_filesystem(), file_schema, ) .with_file_group(file_group), - None, - None, - Default::default(), ) + .build() .with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory( Arc::clone(&in_memory_object_store), ))); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 2e9cda40c330..15efd4bcd9dd 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -70,13 +70,12 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { let execution_props = ExecutionProps::new(); let predicate = create_physical_expr(&filter, &df_schema, &execution_props).unwrap(); - let parquet_exec = ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(object_store_url, schema).with_file(partitioned_file), - Some(predicate), - None, - Default::default(), - ); - parquet_exec.with_enable_page_index(true) + ) + .with_predicate(predicate) + .build() + .with_enable_page_index(true) } #[tokio::test] diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index ac51b4f71201..af9411f40ecb 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -59,13 +59,11 @@ async fn multi_parquet_coercion() { Field::new("c2", DataType::Int32, true), Field::new("c3", DataType::Float64, true), ])); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_group(file_group), - None, - None, - Default::default(), - ); + ) + .build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -115,14 +113,12 @@ async fn multi_parquet_coercion_projection() { Field::new("c2", DataType::Int32, true), Field::new("c3", DataType::Float64, true), ])); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_group(file_group) .with_projection(Some(vec![1, 0, 2])), - None, - None, - Default::default(), - ); + ) + .build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index a9965e1c8151..550176a42e66 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -224,12 +224,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ) }) .transpose()?; - Ok(Arc::new(ParquetExec::new( - base_config, - predicate, - None, - Default::default(), - ))) + let mut builder = ParquetExec::builder(base_config); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate) + } + Ok(builder.build_arc()) } PhysicalPlanType::AvroScan(scan) => { Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 55b346a482d3..df1995f46533 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -582,12 +582,11 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { Operator::Eq, lit("1"), )); - roundtrip_test(Arc::new(ParquetExec::new( - scan_config, - Some(predicate), - None, - Default::default(), - ))) + roundtrip_test( + ParquetExec::builder(scan_config) + .with_predicate(predicate) + .build_arc(), + ) } #[tokio::test] @@ -613,12 +612,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { output_ordering: vec![], }; - roundtrip_test(Arc::new(ParquetExec::new( - scan_config, - None, - None, - Default::default(), - ))) + roundtrip_test(ParquetExec::builder(scan_config).build_arc()) } #[test] diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 68f8b02b0f09..39b38c94ec18 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -121,12 +121,8 @@ pub async fn from_substrait_rel( } } - Ok(Arc::new(ParquetExec::new( - base_config, - None, - None, - Default::default(), - )) as Arc) + Ok(ParquetExec::builder(base_config).build_arc() + as Arc) } _ => not_impl_err!( "Only LocalFile reads are supported when parsing physical" diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index aca044319406..4014670a7cbc 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -45,12 +45,8 @@ async fn parquet_exec() -> Result<()> { 123, )], ]); - let parquet_exec: Arc = Arc::new(ParquetExec::new( - scan_config, - None, - None, - Default::default(), - )); + let parquet_exec: Arc = + ParquetExec::builder(scan_config).build_arc(); let mut extension_info: ( Vec,