Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ParquetExec::new() #10643

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,11 @@ impl FileFormat for ParquetFormat {
// will not prune data based on the statistics.
let predicate = self.enable_pruning().then(|| filters.cloned()).flatten();

Ok(Arc::new(ParquetExec::new(
conf,
predicate,
self.metadata_size_hint(),
self.options.clone(),
)))
Ok(Arc::new(
ParquetExec::new_with_options(conf, self.options.clone())
.with_predicate(predicate)
.with_metadata_size_hint(self.metadata_size_hint()),
))
}

async fn create_writer_physical_plan(
Expand Down
256 changes: 144 additions & 112 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,48 +102,27 @@ pub struct ParquetExec {
}

impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and schema.
pub fn new(
/// Create a `ParquetExec` to read the provided file list.
///
/// # Example
/// ```
/// TODO
/// ```
pub fn new(base_config: FileScanConfig) -> Self {
Self::new_with_options(base_config, TableParquetOptions::default())
}

/// Create a `ParquetExec` to read the provided file list with options
pub fn new_with_options(
base_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
table_parquet_options: TableParquetOptions,
) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);
debug!(
"Creating ParquetExec, files: {:?}, projection {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, base_config.limit
);

let metrics = ExecutionPlanMetricsSet::new();
let predicate_creation_errors =
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");

let file_schema = &base_config.file_schema;
let pruning_predicate = predicate
.clone()
.and_then(|predicate_expr| {
match PruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!("Could not create pruning predicate for: {e}");
predicate_creation_errors.add(1);
None
}
}
})
.filter(|p| !p.always_true());

let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!(
"Could not create page pruning predicate for '{:?}': {}",
pruning_predicate, e
);
predicate_creation_errors.add(1);
None
}
}
});

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
Expand All @@ -156,10 +135,10 @@ impl ParquetExec {
base_config,
projected_statistics,
metrics,
predicate,
pruning_predicate,
page_pruning_predicate,
metadata_size_hint,
predicate: None,
pruning_predicate: None,
page_pruning_predicate: None,
metadata_size_hint: None,
parquet_file_reader_factory: None,
cache,
table_parquet_options,
Expand All @@ -177,6 +156,78 @@ impl ParquetExec {
&self.table_parquet_options
}

/// Set the predicate (and [`PruningPredicate`]) that will be used to prune
/// row groups and pages
pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalExpr>>) -> Self {
debug!(" Setting ParquetExec predicate: {:?}", predicate);
let Some(predicate) = predicate else {
self.predicate = None;
self.pruning_predicate = None;
self.page_pruning_predicate = None;
return self;
};

let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");

let file_schema = &self.base_config.file_schema;

self.pruning_predicate =
match PruningPredicate::try_new(predicate.clone(), file_schema.clone()) {
Ok(pruning_predicate) if !pruning_predicate.always_true() => {
Some(Arc::new(pruning_predicate))
}
Ok(_pruning_predicate) => {
debug!(
"Pruning predicate is always true: {:?}, skipping",
predicate
);
None
}
Err(e) => {
debug!("Could not create pruning predicate for: {e}");
predicate_creation_errors.add(1);
None
}
};

self.page_pruning_predicate =
match PagePruningPredicate::try_new(&predicate, file_schema.clone()) {
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!(
"Could not create page pruning predicate for '{:?}': {}",
self.pruning_predicate, e
);
predicate_creation_errors.add(1);
None
}
};

self.predicate = Some(predicate);

self
}

/// Set the options for reading Parquet files
pub fn with_table_parquet_options(
mut self,
table_parquet_options: TableParquetOptions,
) -> Self {
self.table_parquet_options = table_parquet_options;
self
}

/// Set the metadata size hint for the parquet reader
///
/// This value determines how many bytes at the end of the file the
/// ParquetExec will request in the initial IO. If this is too small, the
/// ParquetExec will need to make additional IO requests to read the footer.
pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option<usize>) -> Self {
self.metadata_size_hint = metadata_size_hint;
self
}

/// Optional predicate.
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
self.predicate.as_ref()
Expand Down Expand Up @@ -931,21 +982,17 @@ mod tests {
let predicate = predicate.map(|p| logical2physical(&p, &file_schema));

// prepare the scan
let mut parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
},
predicate,
None,
Default::default(),
);
let mut parquet_exec = ParquetExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
})
.with_predicate(predicate);

if pushdown_predicate {
parquet_exec = parquet_exec
Expand Down Expand Up @@ -1589,21 +1636,16 @@ mod tests {
expected_row_num: Option<usize>,
file_schema: SchemaRef,
) -> Result<()> {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups,
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
},
None,
None,
Default::default(),
);
let parquet_exec = ParquetExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups,
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
assert_eq!(
parquet_exec
.properties()
Expand Down Expand Up @@ -1699,33 +1741,28 @@ mod tests {
),
]);

let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url,
file_groups: vec![vec![partitioned_file]],
file_schema: schema.clone(),
statistics: Statistics::new_unknown(&schema),
// file has 10 cols so index 12 should be month and 13 should be day
projection: Some(vec![0, 1, 2, 12, 13]),
limit: None,
table_partition_cols: vec![
Field::new("year", DataType::Utf8, false),
Field::new("month", DataType::UInt8, false),
Field::new(
"day",
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
false,
let parquet_exec = ParquetExec::new(FileScanConfig {
object_store_url,
file_groups: vec![vec![partitioned_file]],
file_schema: schema.clone(),
statistics: Statistics::new_unknown(&schema),
// file has 10 cols so index 12 should be month and 13 should be day
projection: Some(vec![0, 1, 2, 12, 13]),
limit: None,
table_partition_cols: vec![
Field::new("year", DataType::Utf8, false),
Field::new("month", DataType::UInt8, false),
Field::new(
"day",
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
],
output_ordering: vec![],
},
None,
None,
Default::default(),
);
false,
),
],
output_ordering: vec![],
});
assert_eq!(
parquet_exec.cache.output_partitioning().partition_count(),
1
Expand Down Expand Up @@ -1779,21 +1816,16 @@ mod tests {
extensions: None,
};

let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![partitioned_file]],
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::new_unknown(&Schema::empty()),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
},
None,
None,
Default::default(),
);
let parquet_exec = ParquetExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![partitioned_file]],
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::new_unknown(&Schema::empty()),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});

let mut results = parquet_exec.execute(0, state.task_ctx())?;
let batch = results.next().await.unwrap();
Expand Down
25 changes: 10 additions & 15 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,16 @@ mod tests {
}

fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::new_unknown(schema),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
},
None,
None,
Default::default(),
))
Arc::new(ParquetExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::new_unknown(schema),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
}))
}

fn partial_aggregate_exec(
Expand Down
Loading
Loading