diff --git a/datafusion-examples/examples/parquet_index_advanced.rs b/datafusion-examples/examples/parquet_index_advanced.rs index 6ed86147d703..8e3144dd5866 100644 --- a/datafusion-examples/examples/parquet_index_advanced.rs +++ b/datafusion-examples/examples/parquet_index_advanced.rs @@ -366,9 +366,6 @@ impl ParquetScanBuilder { .with_limit(limit) .with_projection(projection); - // need a set of files that matches the file scan config groups exactly - let mut row_group_sets = vec![]; - // Transform to the format needed to pass to ParquetExec // Create one file group per file (default to scanning them all in parallel) for (file_name, scanned_file) in files { @@ -379,12 +376,12 @@ impl ParquetScanBuilder { let path = dir.join(file_name); let canonical_path = fs::canonicalize(path)?; - // TODO add the row group indexes somehow - file_scan_config = file_scan_config.with_file(PartitionedFile::new( - canonical_path.display().to_string(), - file_size, - )); - row_group_sets.push(vec![row_group_set]); + let partitioned_file = PartitionedFile::new(canonical_path.display().to_string(), file_size) + // add the row group set as an extension + .with_extensions(Arc::new(row_group_set) as _); + + + file_scan_config = file_scan_config.with_file(partitioned_file); } let Some(parquet_file_reader_factory) = parquet_file_reader_factory else { @@ -393,8 +390,7 @@ impl ParquetScanBuilder { // build the actual parquet exec let mut builder = ParquetExec::builder(file_scan_config) - .with_parquet_file_reader_factory(parquet_file_reader_factory) - .with_row_groups(row_group_sets); + .with_parquet_file_reader_factory(parquet_file_reader_factory); if let Some(predicate) = predicate { builder = builder.with_predicate(predicate); diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 04aec9d77d58..5dde6aa88951 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -134,6 +134,12 @@ impl PartitionedFile { self.range = Some(FileRange { start, end }); self } + + /// Update the file with optional user metadata + pub fn with_extensions(mut self, extensions: Arc) -> Self { + self.extensions = Some(extensions); + self + } } impl From for PartitionedFile { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 07c40f2e98eb..745f3b3f3a11 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -62,6 +62,7 @@ use parquet::basic::{ConvertedType, LogicalType}; use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; use tokio::task::JoinSet; +use datafusion_common::internal_err; mod metrics; mod page_filter; @@ -199,10 +200,6 @@ pub struct ParquetExec { table_parquet_options: TableParquetOptions, /// Optional user defined schema adapter schema_adapter_factory: Option>, - /// Optional starting RowGroupSets for each file in the file groups - /// TODO encapsulate into some sort of struct that can also have - /// page filters / selections - row_groups: Vec>, } /// [`ParquetExecBuilder`]`, builder for [`ParquetExec`]. @@ -215,8 +212,6 @@ pub struct ParquetExecBuilder { table_parquet_options: TableParquetOptions, parquet_file_reader_factory: Option>, schema_adapter_factory: Option>, - /// Optional starting RowGroupSets for each file in the file groups - row_groups: Vec>, } impl ParquetExecBuilder { @@ -238,7 +233,6 @@ impl ParquetExecBuilder { table_parquet_options, parquet_file_reader_factory: None, schema_adapter_factory: None, - row_groups: vec![], } } @@ -295,20 +289,6 @@ impl ParquetExecBuilder { self } - /// Set the row group filter for the scan - /// - /// The ParquetExec will only scan row groups specified - /// the format is a vec of of row group indexes - /// for each file in the file groups - /// For example - /// ``` - /// fooo - /// ``` - pub fn with_row_groups(mut self, row_groups: Vec>) -> Self { - self.row_groups = row_groups; - self - } - /// Set optional schema adapter factory. /// /// [`SchemaAdapterFactory`] allows user to specify how fields from the @@ -338,7 +318,6 @@ impl ParquetExecBuilder { table_parquet_options, parquet_file_reader_factory, schema_adapter_factory, - row_groups, } = self; let base_config = file_scan_config; @@ -397,7 +376,6 @@ impl ParquetExecBuilder { cache, table_parquet_options, schema_adapter_factory, - row_groups, } } } @@ -749,6 +727,7 @@ impl FileOpener for ParquetOpener { file_meta.location().as_ref(), &self.metrics, ); + let extensions = file_meta.extensions.clone(); let reader: Box = self.parquet_file_reader_factory.create_reader( @@ -823,7 +802,7 @@ impl FileOpener for ParquetOpener { let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read - let mut row_groups = RowGroupSet::new(rg_metadata.len()); + let mut row_groups = create_row_group_set(extensions, rg_metadata.len())?; // if there is a range restricting what parts of the file to read if let Some(range) = file_range.as_ref() { row_groups.prune_by_range(rg_metadata, range); @@ -890,6 +869,26 @@ impl FileOpener for ParquetOpener { } } +/// Return a `RowGroupSet` to read from a parquet file. If there is a +/// RowGroupSet on the metadata, uses that, otherwise creates a new one. +fn create_row_group_set(extensions: Option>, num_row_groups: usize) -> Result { + if let Some(extensions) = extensions { + println!("Had extensions"); + if let Some(initial_row_group_set) = extensions.downcast_ref::() { + // use the row group set from the metadata + println!("using row group set from metadata: {:?}", initial_row_group_set); + if initial_row_group_set.len() != num_row_groups { + return internal_err!( + "Provided RowGroupSet length ({}) does not match number of row groups in file: {num_row_groups}", + initial_row_group_set.len()); + } + return Ok(initial_row_group_set.clone()); + } + } + // default to scanning all row groups + Ok(RowGroupSet::new(num_row_groups)) +} + fn should_enable_page_index( enable_page_index: bool, page_pruning_predicate: &Option>, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index d6c4eb78655d..06c0cab3a46d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -64,6 +64,11 @@ impl RowGroupSet { } } + /// Provide a reference to this struct as an `Any` reference + pub fn as_any(&self) -> &dyn std::any::Any { + self + } + /// Set the i-th row group to true (should scan) pub fn do_scan(&mut self, idx: usize) { self.row_groups[idx] = true;