Skip to content

Commit

Permalink
start wiring in row group index
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 29, 2024
1 parent e392afa commit cd0fb1b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
45 changes: 33 additions & 12 deletions datafusion-examples/examples/parquet_index_advanced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
RequestedStatistics, StatisticsConverter,
RequestedStatistics, RowGroupSet, StatisticsConverter,
};
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetExec, ParquetFileReaderFactory,
Expand Down Expand Up @@ -302,13 +302,19 @@ impl ParquetScanBuilder {
}

/// Specify that a certain row group in a file should be scanned
fn add_row_group(&mut self, file_name: &str, file_size: u64, row_group_index: u64) {
fn add_row_group(
&mut self,
file_name: &str,
file_size: u64,
num_row_groups: usize,
row_group_index: u64,
) {
if let Some(scanned_file) = self.files.get_mut(file_name) {
scanned_file.row_groups.push(row_group_index)
scanned_file.row_group_set.do_scan(row_group_index as usize)
} else {
self.files.insert(
file_name.to_string(),
ScannedFile::new(file_size, row_group_index),
ScannedFile::new(file_size, num_row_groups, row_group_index),
);
}
}
Expand Down Expand Up @@ -360,12 +366,15 @@ impl ParquetScanBuilder {
.with_limit(limit)
.with_projection(projection);

// need a set of files that matches the file scan config groups exactly
let mut row_group_sets = vec![];

// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
for (file_name, scanned_file) in files {
let ScannedFile {
file_size,
row_groups,
row_group_set,
} = scanned_file;

let path = dir.join(file_name);
Expand All @@ -375,6 +384,7 @@ impl ParquetScanBuilder {
canonical_path.display().to_string(),
file_size,
));
row_group_sets.push(vec![row_group_set]);
}

let Some(parquet_file_reader_factory) = parquet_file_reader_factory else {
Expand All @@ -383,7 +393,8 @@ impl ParquetScanBuilder {

// build the actual parquet exec
let mut builder = ParquetExec::builder(file_scan_config)
.with_parquet_file_reader_factory(parquet_file_reader_factory);
.with_parquet_file_reader_factory(parquet_file_reader_factory)
.with_row_groups(row_group_sets);

if let Some(predicate) = predicate {
builder = builder.with_predicate(predicate);
Expand All @@ -392,19 +403,22 @@ impl ParquetScanBuilder {
}
}

#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
struct ScannedFile {
/// the size of the file
file_size: u64,
// which row groups to scan
row_groups: Vec<u64>,
/// which row groups to scan
row_group_set: RowGroupSet,
}

impl ScannedFile {
fn new(file_size: u64, row_group_index: u64) -> Self {
fn new(file_size: u64, num_row_groups: usize, row_group_index: u64) -> Self {
let mut row_group_set = RowGroupSet::new_none(num_row_groups);
row_group_set.do_scan(row_group_index as usize);

Self {
file_size,
row_groups: vec![row_group_index],
row_group_set,
}
}
}
Expand Down Expand Up @@ -518,7 +532,14 @@ impl DetailedParquetMetadataIndex {
let file_size = self.file_size().value(file_offset);
let row_group_index = self.row_group_indexes().value(file_offset);

scan_builder.add_row_group(file_name, file_size, row_group_index);
// TODO make the actual number of row groups
let num_row_groups = 10;
scan_builder.add_row_group(
file_name,
file_size,
num_row_groups,
row_group_index,
);
});

Ok(scan_builder)
Expand Down
20 changes: 19 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ mod row_filter;
mod row_groups;
mod statistics;

use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
pub use metrics::ParquetFileMetrics;
pub use row_groups::RowGroupSet;
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for reading one or more Parquet files.
Expand Down Expand Up @@ -211,6 +211,8 @@ pub struct ParquetExecBuilder {
table_parquet_options: TableParquetOptions,
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Optional starting RowGroupSets for each file in the file groups
row_groups: Vec<Vec<RowGroupSet>>,
}

impl ParquetExecBuilder {
Expand All @@ -232,6 +234,7 @@ impl ParquetExecBuilder {
table_parquet_options,
parquet_file_reader_factory: None,
schema_adapter_factory: None,
row_groups: vec![],
}
}

Expand Down Expand Up @@ -288,6 +291,20 @@ impl ParquetExecBuilder {
self
}

/// Set the row group filter for the scan
///
/// The ParquetExec will only scan row groups specified
/// the format is a vec of of row group indexes
/// for each file in the file groups
/// For example
/// ```
/// fooo
/// ```
pub fn with_row_groups(mut self, row_groups: Vec<Vec<RowGroupSet>>) -> Self {
self.row_groups = row_groups;
self
}

/// Set optional schema adapter factory.
///
/// [`SchemaAdapterFactory`] allows user to specify how fields from the
Expand Down Expand Up @@ -317,6 +334,7 @@ impl ParquetExecBuilder {
table_parquet_options,
parquet_file_reader_factory,
schema_adapter_factory,
row_groups,
} = self;

let base_config = file_scan_config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::ParquetFileMetrics;
/// This struct encapsulates the various types of pruning that can be applied to
/// a set of row groups within a parquet file, progressively narrowing down the
/// set of row groups that should be scanned.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct RowGroupSet {
/// `row_groups[i]` is true if the i-th row group should be scanned
row_groups: Vec<bool>,
Expand All @@ -57,6 +57,18 @@ impl RowGroupSet {
}
}

/// Create a new `RowGroupSet` with all row groups set to false (will not be scanned)
pub fn new_none(num_row_groups: usize) -> Self {
Self {
row_groups: vec![false; num_row_groups],
}
}

/// Set the i-th row group to true (should scan)
pub fn do_scan(&mut self, idx: usize) {
self.row_groups[idx] = true;
}

/// Set the i-th row group to false (should not be scanned)
pub fn do_not_scan(&mut self, idx: usize) {
self.row_groups[idx] = false;
Expand Down

0 comments on commit cd0fb1b

Please sign in to comment.