diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 52dc8e9bce85d..355a98c90e954 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -71,8 +71,8 @@ pub struct ParquetExec { projected_schema: SchemaRef, /// Execution metrics metrics: ExecutionPlanMetricsSet, - /// Optional predicate builder - predicate_builder: Option, + /// Optional predicate for pruning row groups + pruning_predicate: Option, } /// Stores metrics about the parquet execution for a particular parquet file @@ -95,12 +95,12 @@ impl ParquetExec { let predicate_creation_errors = MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors"); - let predicate_builder = predicate.and_then(|predicate_expr| { + let pruning_predicate = predicate.and_then(|predicate_expr| { match PruningPredicate::try_new( &predicate_expr, base_config.file_schema.clone(), ) { - Ok(predicate_builder) => Some(predicate_builder), + Ok(pruning_predicate) => Some(pruning_predicate), Err(e) => { debug!( "Could not create pruning predicate for {:?}: {}", @@ -119,7 +119,7 @@ impl ParquetExec { projected_schema, projected_statistics, metrics, - predicate_builder, + pruning_predicate, } } @@ -200,7 +200,7 @@ impl ExecutionPlan for ParquetExec { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; - let predicate_builder = self.predicate_builder.clone(); + let pruning_predicate = self.pruning_predicate.clone(); let batch_size = self.base_config.batch_size; let limit = self.base_config.limit; let object_store = Arc::clone(&self.base_config.object_store); @@ -216,7 +216,7 @@ impl ExecutionPlan for ParquetExec { partition, metrics, &projection, - &predicate_builder, + &pruning_predicate, batch_size, response_tx, limit, @@ -356,17 +356,17 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } fn build_row_group_predicate( - predicate_builder: &PruningPredicate, + pruning_predicate: &PruningPredicate, metrics: ParquetFileMetrics, row_group_metadata: &[RowGroupMetaData], ) -> Box bool> { - let parquet_schema = predicate_builder.schema().as_ref(); + let parquet_schema = pruning_predicate.schema().as_ref(); let pruning_stats = RowGroupPruningStatistics { row_group_metadata, parquet_schema, }; - let predicate_values = predicate_builder.prune(&pruning_stats); + let predicate_values = pruning_predicate.prune(&pruning_stats); match predicate_values { Ok(values) => { @@ -392,7 +392,7 @@ fn read_partition( partition: Vec, metrics: ExecutionPlanMetricsSet, projection: &[usize], - predicate_builder: &Option, + pruning_predicate: &Option, batch_size: usize, response_tx: Sender>, limit: Option, @@ -409,9 +409,9 @@ fn read_partition( object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; let mut file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?; - if let Some(predicate_builder) = predicate_builder { + if let Some(pruning_predicate) = pruning_predicate { let row_group_predicate = build_row_group_predicate( - predicate_builder, + pruning_predicate, file_metrics, file_reader.metadata().row_groups(), ); @@ -582,12 +582,12 @@ mod tests { } #[test] - fn row_group_predicate_builder_simple_expr() -> Result<()> { + fn row_group_pruning_predicate_simple_expr() -> Result<()> { use crate::logical_plan::{col, lit}; // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let predicate_builder = PruningPredicate::try_new(&expr, Arc::new(schema))?; + let pruning_predicate = PruningPredicate::try_new(&expr, Arc::new(schema))?; let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -600,7 +600,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = build_row_group_predicate( - &predicate_builder, + &pruning_predicate, parquet_file_metrics(), &row_group_metadata, ); @@ -615,12 +615,12 @@ mod tests { } #[test] - fn row_group_predicate_builder_missing_stats() -> Result<()> { + fn row_group_pruning_predicate_missing_stats() -> Result<()> { use crate::logical_plan::{col, lit}; // int > 1 => c1_max > 1 let expr = col("c1").gt(lit(15)); let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let predicate_builder = PruningPredicate::try_new(&expr, Arc::new(schema))?; + let pruning_predicate = PruningPredicate::try_new(&expr, Arc::new(schema))?; let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); let rgm1 = get_row_group_meta_data( @@ -633,7 +633,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = build_row_group_predicate( - &predicate_builder, + &pruning_predicate, parquet_file_metrics(), &row_group_metadata, ); @@ -650,7 +650,7 @@ mod tests { } #[test] - fn row_group_predicate_builder_partial_expr() -> Result<()> { + fn row_group_pruning_predicate_partial_expr() -> Result<()> { use crate::logical_plan::{col, lit}; // test row group predicate with partially supported expression // int > 1 and int % 2 => c1_max > 1 and true @@ -659,7 +659,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), ])); - let predicate_builder = PruningPredicate::try_new(&expr, schema.clone())?; + let pruning_predicate = PruningPredicate::try_new(&expr, schema.clone())?; let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), @@ -681,7 +681,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = build_row_group_predicate( - &predicate_builder, + &pruning_predicate, parquet_file_metrics(), &row_group_metadata, ); @@ -697,9 +697,9 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); - let predicate_builder = PruningPredicate::try_new(&expr, schema)?; + let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; let row_group_predicate = build_row_group_predicate( - &predicate_builder, + &pruning_predicate, parquet_file_metrics(), &row_group_metadata, ); @@ -714,7 +714,7 @@ mod tests { } #[test] - fn row_group_predicate_builder_null_expr() -> Result<()> { + fn row_group_pruning_predicate_null_expr() -> Result<()> { use crate::logical_plan::{col, lit}; // test row group predicate with an unknown (Null) expr // @@ -726,7 +726,7 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); - let predicate_builder = PruningPredicate::try_new(&expr, schema)?; + let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), @@ -748,7 +748,7 @@ mod tests { ); let row_group_metadata = vec![rgm1, rgm2]; let row_group_predicate = build_row_group_predicate( - &predicate_builder, + &pruning_predicate, parquet_file_metrics(), &row_group_metadata, );