Skip to content

Commit

Permalink
reorganize
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 29, 2024
1 parent 7195b37 commit 2f1b23f
Showing 1 changed file with 74 additions and 24 deletions.
98 changes: 74 additions & 24 deletions datafusion-examples/examples/parquet_index_advanced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
internal_datafusion_err, internal_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -259,27 +259,15 @@ impl TableProvider for IndexTableProvider {
// Use the index to find the row groups that might have data that matches the
// predicate. Any file that can not have data that matches the predicate
// will not be returned.
let scan_builder = self.index.get_row_groups(predicate.clone())?;
let file_scan_config = scan_builder
.build(self.schema(), &self.dir)?
let exec = self
.index
.get_row_groups(predicate.clone())?
.with_limit(limit)
.with_projection(projection.cloned());
.with_projection(projection.cloned())
.with_predicate(predicate)
.with_parquet_file_reader_factory(Arc::clone(&self.index.parquet_factory))
.build(self.schema(), &self.dir)?;

// build the actual parquet exec
let metadata_size_hint = None;
let table_parquet_options = TableParquetOptions::default();

// configure a Parquet opener that can provide the metadata for the
// files that are being scanned

// TODO make a builder for parquet exec
let exec = ParquetExec::new(
file_scan_config,
Some(predicate),
metadata_size_hint,
table_parquet_options,
)
.with_parquet_file_reader_factory(Arc::clone(&self.index.parquet_factory));
Ok(Arc::new(exec))
}

Expand All @@ -300,6 +288,16 @@ impl TableProvider for IndexTableProvider {
struct ParquetScanBuilder {
/// Files to scan. Use btree map for deterministic order
files: BTreeMap<String, ScannedFile>,
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
projection: Option<Vec<usize>>,
/// The maximum number of records to read from this plan. If `None`,
/// all records after filtering are returned.
limit: Option<usize>,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Arc<dyn PhysicalExpr>>,
/// user defined parquet file reader factory
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
}

impl ParquetScanBuilder {
Expand All @@ -319,18 +317,56 @@ impl ParquetScanBuilder {
}
}

/// Set the projection of the scan being built
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
/// Set the limit of the scan being built
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

/// Set the predicate of the scan being built
pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
self.predicate = Some(predicate);
self
}

pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
) -> Self {
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}

/// Creates a ParquetExec that only scans the files and row groups specified in this builder
///
/// # Parameters
/// * `file_scan_config` - the base configuration for the scan (e.g. projection, limit)
/// * `dir` - the directory containing the files
fn build(self, schema: SchemaRef, dir: &Path) -> Result<FileScanConfig> {
///
/// # Returns
/// * a ParquetExec that scans only the files and row groups specified in this builder
fn build(self, schema: SchemaRef, dir: &Path) -> Result<ParquetExec> {
let Self {
files,
projection,
limit,
predicate,
parquet_file_reader_factory,
} = self;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, schema);
let mut file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection);

// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
for (file_name, scanned_file) in self.files {
for (file_name, scanned_file) in files {
let ScannedFile {
file_size,
row_groups,
Expand All @@ -345,7 +381,21 @@ impl ParquetScanBuilder {
));
}

Ok(file_scan_config)
let Some(parquet_file_reader_factory) = parquet_file_reader_factory else {
return internal_err!("Parquet file reader factory not set");
};

// build the actual parquet exec
let metadata_size_hint = None;
let table_parquet_options = TableParquetOptions::default();
let exec = ParquetExec::new(
file_scan_config,
predicate,
metadata_size_hint,
table_parquet_options,
)
.with_parquet_file_reader_factory(parquet_file_reader_factory);
Ok(exec)
}
}

Expand Down

0 comments on commit 2f1b23f

Please sign in to comment.