Skip to content

Commit

Permalink
Simplify parquet_index example
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 24, 2024
1 parent 82f8f76 commit 5b112a1
Showing 1 changed file with 17 additions and 39 deletions.
56 changes: 17 additions & 39 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ use datafusion::parquet::arrow::ArrowWriter;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, Statistics,
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -245,45 +244,24 @@ impl TableProvider for IndexTableProvider {

// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
let file_groups = files
.into_iter()
.map(|(file_name, file_size)| {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
Ok(vec![PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
)])
})
.collect::<Result<Vec<_>>>()?;

// for now, simply use ParquetExec
// TODO make a builder for FileScanConfig
let object_store_url = ObjectStoreUrl::parse("file://")?;
let base_config = FileScanConfig {
object_store_url,
file_schema: self.schema(),
file_groups,
statistics: Statistics::new_unknown(self.index.schema()),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
output_ordering: vec![],
};

let metadata_size_hint = None;

let table_parquet_options = TableParquetOptions::default();

// TODO make a builder for parquet exec
let exec = ParquetExec::new(
base_config,
Some(predicate),
metadata_size_hint,
table_parquet_options,
);
let mut base_config = FileScanConfig::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);

for (file_name, file_size) in files {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
base_config = base_config.with_file(PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
));
}

Ok(Arc::new(exec))
// Create the actual ParquetExec
Ok(ParquetExec::builder(base_config)
.with_predicate(predicate)
.build_arc())
}

/// Tell DataFusion to push filters down to the scan method
Expand Down

0 comments on commit 5b112a1

Please sign in to comment.