From 5b112a133477b25899c069e4d0d9ceb1ccc7d31b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 24 May 2024 05:46:08 -0400 Subject: [PATCH] Simplify `parquet_index` example --- datafusion-examples/examples/parquet_index.rs | 56 ++++++------------- 1 file changed, 17 insertions(+), 39 deletions(-) diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 0ed337611389c..a365a5843f193 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -36,9 +36,8 @@ use datafusion::parquet::arrow::ArrowWriter; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; -use datafusion_common::config::TableParquetOptions; use datafusion_common::{ - internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, Statistics, + internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::utils::conjunction; use datafusion_expr::{TableProviderFilterPushDown, TableType}; @@ -245,45 +244,24 @@ impl TableProvider for IndexTableProvider { // Transform to the format needed to pass to ParquetExec // Create one file group per file (default to scanning them all in parallel) - let file_groups = files - .into_iter() - .map(|(file_name, file_size)| { - let path = self.dir.join(file_name); - let canonical_path = fs::canonicalize(path)?; - Ok(vec![PartitionedFile::new( - canonical_path.display().to_string(), - file_size, - )]) - }) - .collect::>>()?; - - // for now, simply use ParquetExec - // TODO make a builder for FileScanConfig let object_store_url = ObjectStoreUrl::parse("file://")?; - let base_config = FileScanConfig { - object_store_url, - file_schema: self.schema(), - file_groups, - statistics: Statistics::new_unknown(self.index.schema()), - projection: projection.cloned(), - limit, - table_partition_cols: vec![], - output_ordering: vec![], - }; - - let metadata_size_hint = None; - - let table_parquet_options = TableParquetOptions::default(); - - // TODO make a builder for parquet exec - let exec = ParquetExec::new( - base_config, - Some(predicate), - metadata_size_hint, - table_parquet_options, - ); + let mut base_config = FileScanConfig::new(object_store_url, self.schema()) + .with_projection(projection.cloned()) + .with_limit(limit); + + for (file_name, file_size) in files { + let path = self.dir.join(file_name); + let canonical_path = fs::canonicalize(path)?; + base_config = base_config.with_file(PartitionedFile::new( + canonical_path.display().to_string(), + file_size, + )); + } - Ok(Arc::new(exec)) + // Create the actual ParquetExec + Ok(ParquetExec::builder(base_config) + .with_predicate(predicate) + .build_arc()) } /// Tell DataFusion to push filters down to the scan method