From 2f1b23f61fa79aa1806dcb43cebb8b070a3f641c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 May 2024 16:45:06 -0400 Subject: [PATCH] reorganize --- .../examples/parquet_index_advanced.rs | 98 ++++++++++++++----- 1 file changed, 74 insertions(+), 24 deletions(-) diff --git a/datafusion-examples/examples/parquet_index_advanced.rs b/datafusion-examples/examples/parquet_index_advanced.rs index 370d6dc72fde..02966ab0ab51 100644 --- a/datafusion-examples/examples/parquet_index_advanced.rs +++ b/datafusion-examples/examples/parquet_index_advanced.rs @@ -46,7 +46,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use datafusion_common::config::TableParquetOptions; use datafusion_common::{ - internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, + internal_datafusion_err, internal_err, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::utils::conjunction; use datafusion_expr::{TableProviderFilterPushDown, TableType}; @@ -259,27 +259,15 @@ impl TableProvider for IndexTableProvider { // Use the index to find the row groups that might have data that matches the // predicate. Any file that can not have data that matches the predicate // will not be returned. - let scan_builder = self.index.get_row_groups(predicate.clone())?; - let file_scan_config = scan_builder - .build(self.schema(), &self.dir)? + let exec = self + .index + .get_row_groups(predicate.clone())? .with_limit(limit) - .with_projection(projection.cloned()); + .with_projection(projection.cloned()) + .with_predicate(predicate) + .with_parquet_file_reader_factory(Arc::clone(&self.index.parquet_factory)) + .build(self.schema(), &self.dir)?; - // build the actual parquet exec - let metadata_size_hint = None; - let table_parquet_options = TableParquetOptions::default(); - - // configure a Parquet opener that can provide the metadata for the - // files that are being scanned - - // TODO make a builder for parquet exec - let exec = ParquetExec::new( - file_scan_config, - Some(predicate), - metadata_size_hint, - table_parquet_options, - ) - .with_parquet_file_reader_factory(Arc::clone(&self.index.parquet_factory)); Ok(Arc::new(exec)) } @@ -300,6 +288,16 @@ impl TableProvider for IndexTableProvider { struct ParquetScanBuilder { /// Files to scan. Use btree map for deterministic order files: BTreeMap, + /// Columns on which to project the data. Indexes that are higher than the + /// number of columns of `file_schema` refer to `table_partition_cols`. + projection: Option>, + /// The maximum number of records to read from this plan. If `None`, + /// all records after filtering are returned. + limit: Option, + /// Optional predicate for row filtering during parquet scan + predicate: Option>, + /// user defined parquet file reader factory + parquet_file_reader_factory: Option>, } impl ParquetScanBuilder { @@ -319,18 +317,56 @@ impl ParquetScanBuilder { } } + /// Set the projection of the scan being built + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + /// Set the limit of the scan being built + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Set the predicate of the scan being built + pub fn with_predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } + + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } + /// Creates a ParquetExec that only scans the files and row groups specified in this builder /// /// # Parameters /// * `file_scan_config` - the base configuration for the scan (e.g. projection, limit) /// * `dir` - the directory containing the files - fn build(self, schema: SchemaRef, dir: &Path) -> Result { + /// + /// # Returns + /// * a ParquetExec that scans only the files and row groups specified in this builder + fn build(self, schema: SchemaRef, dir: &Path) -> Result { + let Self { + files, + projection, + limit, + predicate, + parquet_file_reader_factory, + } = self; + let object_store_url = ObjectStoreUrl::parse("file://")?; - let mut file_scan_config = FileScanConfig::new(object_store_url, schema); + let mut file_scan_config = FileScanConfig::new(object_store_url, schema) + .with_limit(limit) + .with_projection(projection); // Transform to the format needed to pass to ParquetExec // Create one file group per file (default to scanning them all in parallel) - for (file_name, scanned_file) in self.files { + for (file_name, scanned_file) in files { let ScannedFile { file_size, row_groups, @@ -345,7 +381,21 @@ impl ParquetScanBuilder { )); } - Ok(file_scan_config) + let Some(parquet_file_reader_factory) = parquet_file_reader_factory else { + return internal_err!("Parquet file reader factory not set"); + }; + + // build the actual parquet exec + let metadata_size_hint = None; + let table_parquet_options = TableParquetOptions::default(); + let exec = ParquetExec::new( + file_scan_config, + predicate, + metadata_size_hint, + table_parquet_options, + ) + .with_parquet_file_reader_factory(parquet_file_reader_factory); + Ok(exec) } }