Skip to content

Commit

Permalink
wire it in for real!
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 29, 2024
1 parent 3221378 commit b7af342
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 35 deletions.
18 changes: 7 additions & 11 deletions datafusion-examples/examples/parquet_index_advanced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,6 @@ impl ParquetScanBuilder {
.with_limit(limit)
.with_projection(projection);

// need a set of files that matches the file scan config groups exactly
let mut row_group_sets = vec![];

// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
for (file_name, scanned_file) in files {
Expand All @@ -379,12 +376,12 @@ impl ParquetScanBuilder {

let path = dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
// TODO add the row group indexes somehow
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
));
row_group_sets.push(vec![row_group_set]);
let partitioned_file = PartitionedFile::new(canonical_path.display().to_string(), file_size)
// add the row group set as an extension
.with_extensions(Arc::new(row_group_set) as _);


file_scan_config = file_scan_config.with_file(partitioned_file);
}

let Some(parquet_file_reader_factory) = parquet_file_reader_factory else {
Expand All @@ -393,8 +390,7 @@ impl ParquetScanBuilder {

// build the actual parquet exec
let mut builder = ParquetExec::builder(file_scan_config)
.with_parquet_file_reader_factory(parquet_file_reader_factory)
.with_row_groups(row_group_sets);
.with_parquet_file_reader_factory(parquet_file_reader_factory);

if let Some(predicate) = predicate {
builder = builder.with_predicate(predicate);
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ impl PartitionedFile {
self.range = Some(FileRange { start, end });
self
}

/// Update the file with optional user metadata
pub fn with_extensions(mut self, extensions: Arc<dyn std::any::Any + Send + Sync>) -> Self {
self.extensions = Some(extensions);
self
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
47 changes: 23 additions & 24 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use parquet::basic::{ConvertedType, LogicalType};
use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
use parquet::schema::types::ColumnDescriptor;
use tokio::task::JoinSet;
use datafusion_common::internal_err;

mod metrics;
mod page_filter;
Expand Down Expand Up @@ -199,10 +200,6 @@ pub struct ParquetExec {
table_parquet_options: TableParquetOptions,
/// Optional user defined schema adapter
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Optional starting RowGroupSets for each file in the file groups
/// TODO encapsulate into some sort of struct that can also have
/// page filters / selections
row_groups: Vec<Vec<RowGroupSet>>,
}

/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`].
Expand All @@ -215,8 +212,6 @@ pub struct ParquetExecBuilder {
table_parquet_options: TableParquetOptions,
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Optional starting RowGroupSets for each file in the file groups
row_groups: Vec<Vec<RowGroupSet>>,
}

impl ParquetExecBuilder {
Expand All @@ -238,7 +233,6 @@ impl ParquetExecBuilder {
table_parquet_options,
parquet_file_reader_factory: None,
schema_adapter_factory: None,
row_groups: vec![],
}
}

Expand Down Expand Up @@ -295,20 +289,6 @@ impl ParquetExecBuilder {
self
}

/// Set the row group filter for the scan
///
/// The ParquetExec will only scan row groups specified
/// the format is a vec of of row group indexes
/// for each file in the file groups
/// For example
/// ```
/// fooo
/// ```
pub fn with_row_groups(mut self, row_groups: Vec<Vec<RowGroupSet>>) -> Self {
self.row_groups = row_groups;
self
}

/// Set optional schema adapter factory.
///
/// [`SchemaAdapterFactory`] allows user to specify how fields from the
Expand Down Expand Up @@ -338,7 +318,6 @@ impl ParquetExecBuilder {
table_parquet_options,
parquet_file_reader_factory,
schema_adapter_factory,
row_groups,
} = self;

let base_config = file_scan_config;
Expand Down Expand Up @@ -397,7 +376,6 @@ impl ParquetExecBuilder {
cache,
table_parquet_options,
schema_adapter_factory,
row_groups,
}
}
}
Expand Down Expand Up @@ -749,6 +727,7 @@ impl FileOpener for ParquetOpener {
file_meta.location().as_ref(),
&self.metrics,
);
let extensions = file_meta.extensions.clone();

let reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
Expand Down Expand Up @@ -823,7 +802,7 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let mut row_groups = RowGroupSet::new(rg_metadata.len());
let mut row_groups = create_row_group_set(extensions, rg_metadata.len())?;
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
Expand Down Expand Up @@ -890,6 +869,26 @@ impl FileOpener for ParquetOpener {
}
}

/// Return a `RowGroupSet` to read from a parquet file. If there is a
/// RowGroupSet on the metadata, uses that, otherwise creates a new one.
fn create_row_group_set(extensions: Option<Arc<dyn Any + Send + Sync>>, num_row_groups: usize) -> Result<RowGroupSet> {
if let Some(extensions) = extensions {
println!("Had extensions");
if let Some(initial_row_group_set) = extensions.downcast_ref::<RowGroupSet>() {
// use the row group set from the metadata
println!("using row group set from metadata: {:?}", initial_row_group_set);
if initial_row_group_set.len() != num_row_groups {
return internal_err!(
"Provided RowGroupSet length ({}) does not match number of row groups in file: {num_row_groups}",
initial_row_group_set.len());
}
return Ok(initial_row_group_set.clone());
}
}
// default to scanning all row groups
Ok(RowGroupSet::new(num_row_groups))
}

fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ impl RowGroupSet {
}
}

/// Provide a reference to this struct as an `Any` reference
pub fn as_any(&self) -> &dyn std::any::Any {
self
}

/// Set the i-th row group to true (should scan)
pub fn do_scan(&mut self, idx: usize) {
self.row_groups[idx] = true;
Expand Down

0 comments on commit b7af342

Please sign in to comment.