-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pushdown RowFilter
in ParquetExec
#3380
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
3ff4d40
Initial implementation
thinkharderdev f78627c
Remove debugging cruft
thinkharderdev d4b9f01
Add license header
thinkharderdev ad1920f
PR comments
thinkharderdev b945298
no need to prep null mask
thinkharderdev 3b969b0
PR comments
thinkharderdev 285f978
unit tests
thinkharderdev 5c711dc
Fix comment
thinkharderdev d2e4d70
fix doc string
thinkharderdev 14d022b
Update datafusion/core/src/physical_plan/file_format/row_filter.rs
thinkharderdev a8ca802
Update datafusion/expr/src/expr_fn.rs
thinkharderdev a9efaf8
Update datafusion/core/src/physical_plan/file_format/parquet.rs
thinkharderdev 06223ae
PR comments
thinkharderdev c46975c
Fix compiler error after rebase
thinkharderdev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ use crate::datasource::listing::FileRange; | |
use crate::physical_plan::file_format::file_stream::{ | ||
FileOpenFuture, FileOpener, FileStream, | ||
}; | ||
use crate::physical_plan::file_format::row_filter::build_row_filter; | ||
use crate::physical_plan::file_format::FileMeta; | ||
use crate::{ | ||
error::{DataFusionError, Result}, | ||
|
@@ -67,6 +68,30 @@ use parquet::file::{ | |
}; | ||
use parquet::schema::types::ColumnDescriptor; | ||
|
||
#[derive(Debug, Clone, Default)] | ||
/// Specify options for the parquet scan | ||
pub struct ParquetScanOptions { | ||
/// If true, any available `pruning_predicate` will be converted to a `RowFilter` | ||
/// and pushed down to the `ParquetRecordBatchStream`. This will enable row level | ||
/// filter at the decoder level. Defaults to false | ||
pushdown_filters: bool, | ||
/// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize | ||
/// the cost of filter evaluation. | ||
reorder_predicates: bool, | ||
} | ||
|
||
impl ParquetScanOptions { | ||
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self { | ||
self.pushdown_filters = pushdown_filters; | ||
self | ||
} | ||
|
||
pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self { | ||
self.reorder_predicates = reorder_predicates; | ||
self | ||
} | ||
} | ||
|
||
/// Execution plan for scanning one or more Parquet partitions | ||
#[derive(Debug, Clone)] | ||
pub struct ParquetExec { | ||
|
@@ -81,6 +106,8 @@ pub struct ParquetExec { | |
metadata_size_hint: Option<usize>, | ||
/// Optional user defined parquet file reader factory | ||
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>, | ||
/// Options to specify behavior of parquet scan | ||
scan_options: ParquetScanOptions, | ||
} | ||
|
||
impl ParquetExec { | ||
|
@@ -121,6 +148,7 @@ impl ParquetExec { | |
pruning_predicate, | ||
metadata_size_hint, | ||
parquet_file_reader_factory: None, | ||
scan_options: ParquetScanOptions::default(), | ||
} | ||
} | ||
|
||
|
@@ -148,6 +176,12 @@ impl ParquetExec { | |
self.parquet_file_reader_factory = Some(parquet_file_reader_factory); | ||
self | ||
} | ||
|
||
/// Configure `ParquetScanOptions` | ||
pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) -> Self { | ||
self.scan_options = scan_options; | ||
self | ||
} | ||
} | ||
|
||
/// Stores metrics about the parquet execution for a particular parquet file. | ||
|
@@ -258,6 +292,7 @@ impl ExecutionPlan for ParquetExec { | |
metadata_size_hint: self.metadata_size_hint, | ||
metrics: self.metrics.clone(), | ||
parquet_file_reader_factory, | ||
scan_options: self.scan_options.clone(), | ||
}; | ||
|
||
let stream = FileStream::new( | ||
|
@@ -319,6 +354,7 @@ struct ParquetOpener { | |
metadata_size_hint: Option<usize>, | ||
metrics: ExecutionPlanMetricsSet, | ||
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, | ||
scan_options: ParquetScanOptions, | ||
} | ||
|
||
impl FileOpener for ParquetOpener { | ||
|
@@ -347,9 +383,12 @@ impl FileOpener for ParquetOpener { | |
let batch_size = self.batch_size; | ||
let projection = self.projection.clone(); | ||
let pruning_predicate = self.pruning_predicate.clone(); | ||
let table_schema = self.table_schema.clone(); | ||
let reorder_predicates = self.scan_options.reorder_predicates; | ||
let pushdown_filters = self.scan_options.pushdown_filters; | ||
|
||
Ok(Box::pin(async move { | ||
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; | ||
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?; | ||
let adapted_projections = | ||
schema_adapter.map_projections(builder.schema(), &projection)?; | ||
|
||
|
@@ -358,6 +397,21 @@ impl FileOpener for ParquetOpener { | |
adapted_projections.iter().cloned(), | ||
); | ||
|
||
if let Some(predicate) = pushdown_filters | ||
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr())) | ||
.flatten() | ||
{ | ||
if let Ok(Some(filter)) = build_row_filter( | ||
predicate.clone(), | ||
builder.schema().as_ref(), | ||
table_schema.as_ref(), | ||
builder.metadata(), | ||
reorder_predicates, | ||
) { | ||
builder = builder.with_row_filter(filter); | ||
} | ||
}; | ||
|
||
let groups = builder.metadata().row_groups(); | ||
let row_groups = | ||
prune_row_groups(groups, file_range, pruning_predicate, &metrics); | ||
|
@@ -839,6 +893,7 @@ mod tests { | |
projection: Option<Vec<usize>>, | ||
schema: Option<SchemaRef>, | ||
predicate: Option<Expr>, | ||
pushdown_predicate: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe in a follow on PR we can adjust the tests to do the same scan both with and without pushdown |
||
) -> Result<Vec<RecordBatch>> { | ||
let file_schema = match schema { | ||
Some(schema) => schema, | ||
|
@@ -851,7 +906,7 @@ mod tests { | |
let file_groups = meta.into_iter().map(Into::into).collect(); | ||
|
||
// prepare the scan | ||
let parquet_exec = ParquetExec::new( | ||
let mut parquet_exec = ParquetExec::new( | ||
FileScanConfig { | ||
object_store_url: ObjectStoreUrl::local_filesystem(), | ||
file_groups: vec![file_groups], | ||
|
@@ -865,6 +920,14 @@ mod tests { | |
None, | ||
); | ||
|
||
if pushdown_predicate { | ||
parquet_exec = parquet_exec.with_scan_options( | ||
ParquetScanOptions::default() | ||
.with_pushdown_filters(true) | ||
.with_reorder_predicates(true), | ||
); | ||
} | ||
|
||
let session_ctx = SessionContext::new(); | ||
let task_ctx = session_ctx.task_ctx(); | ||
collect(Arc::new(parquet_exec), task_ctx).await | ||
|
@@ -912,9 +975,10 @@ mod tests { | |
let batch3 = add_to_batch(&batch1, "c3", c3); | ||
|
||
// read/write them files: | ||
let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None) | ||
.await | ||
.unwrap(); | ||
let read = | ||
round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None, false) | ||
.await | ||
.unwrap(); | ||
let expected = vec![ | ||
"+-----+----+----+", | ||
"| c1 | c2 | c3 |", | ||
|
@@ -953,7 +1017,7 @@ mod tests { | |
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); | ||
|
||
// read/write them files: | ||
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None) | ||
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false) | ||
.await | ||
.unwrap(); | ||
let expected = vec![ | ||
|
@@ -987,7 +1051,7 @@ mod tests { | |
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); | ||
|
||
// read/write them files: | ||
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None) | ||
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false) | ||
.await | ||
.unwrap(); | ||
let expected = vec![ | ||
|
@@ -1020,24 +1084,60 @@ mod tests { | |
// batch2: c3(int8), c2(int64) | ||
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); | ||
|
||
let filter = col("c2").eq(lit(0_i64)); | ||
let filter = col("c2").eq(lit(2_i64)); | ||
|
||
// read/write them files: | ||
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter)) | ||
.await | ||
.unwrap(); | ||
let read = | ||
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) | ||
.await | ||
.unwrap(); | ||
let expected = vec![ | ||
"+-----+----+----+", | ||
"| c1 | c3 | c2 |", | ||
"+-----+----+----+", | ||
"| Foo | 10 | |", | ||
"| | | |", | ||
"| | 10 | 1 |", | ||
"| | 20 | |", | ||
"| | 20 | 2 |", | ||
"| Foo | 10 | |", | ||
"| bar | | |", | ||
"+-----+----+----+", | ||
]; | ||
assert_batches_sorted_eq!(expected, &read); | ||
} | ||
|
||
#[tokio::test] | ||
async fn evolved_schema_intersection_filter_with_filter_pushdown() { | ||
let c1: ArrayRef = | ||
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); | ||
|
||
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); | ||
|
||
let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); | ||
|
||
// batch1: c1(string), c3(int8) | ||
let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]); | ||
|
||
// batch2: c3(int8), c2(int64) | ||
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); | ||
|
||
let filter = col("c2").eq(lit(2_i64)); | ||
|
||
// read/write them files: | ||
let read = | ||
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true) | ||
.await | ||
.unwrap(); | ||
let expected = vec![ | ||
"+----+----+----+", | ||
"| c1 | c3 | c2 |", | ||
"+----+----+----+", | ||
"| | 20 | 2 |", | ||
"+----+----+----+", | ||
]; | ||
assert_batches_sorted_eq!(expected, &read); | ||
} | ||
|
||
#[tokio::test] | ||
async fn evolved_schema_projection() { | ||
let c1: ArrayRef = | ||
|
@@ -1061,10 +1161,15 @@ mod tests { | |
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); | ||
|
||
// read/write them files: | ||
let read = | ||
round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None, None) | ||
.await | ||
.unwrap(); | ||
let read = round_trip_to_parquet( | ||
vec![batch1, batch2], | ||
Some(vec![0, 3]), | ||
None, | ||
None, | ||
false, | ||
) | ||
.await | ||
.unwrap(); | ||
let expected = vec![ | ||
"+-----+-----+", | ||
"| c1 | c4 |", | ||
|
@@ -1102,9 +1207,10 @@ mod tests { | |
let filter = col("c3").eq(lit(0_i8)); | ||
|
||
// read/write them files: | ||
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter)) | ||
.await | ||
.unwrap(); | ||
let read = | ||
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) | ||
.await | ||
.unwrap(); | ||
|
||
// Predicate should prune all row groups | ||
assert_eq!(read.len(), 0); | ||
|
@@ -1123,12 +1229,13 @@ mod tests { | |
// batch2: c2(int64) | ||
let batch2 = create_batch(vec![("c2", c2)]); | ||
|
||
let filter = col("c2").eq(lit(0_i64)); | ||
let filter = col("c2").eq(lit(1_i64)); | ||
|
||
// read/write them files: | ||
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter)) | ||
.await | ||
.unwrap(); | ||
let read = | ||
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false) | ||
.await | ||
.unwrap(); | ||
|
||
// This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0` | ||
// but parquet pruning is not exact. If the min/max values are not defined (which they are not in this case since the it is | ||
|
@@ -1139,14 +1246,48 @@ mod tests { | |
"+-----+----+", | ||
"| c1 | c2 |", | ||
"+-----+----+", | ||
"| Foo | |", | ||
"| | |", | ||
"| | |", | ||
"| | 1 |", | ||
"| | 2 |", | ||
"| Foo | |", | ||
"| bar | |", | ||
"+-----+----+", | ||
]; | ||
assert_batches_sorted_eq!(expected, &read); | ||
} | ||
|
||
#[tokio::test] | ||
async fn evolved_schema_disjoint_schema_filter_with_pushdown() { | ||
let c1: ArrayRef = | ||
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); | ||
|
||
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); | ||
|
||
// batch1: c1(string) | ||
let batch1 = create_batch(vec![("c1", c1.clone())]); | ||
|
||
// batch2: c2(int64) | ||
let batch2 = create_batch(vec![("c2", c2)]); | ||
|
||
let filter = col("c2").eq(lit(1_i64)); | ||
|
||
// read/write them files: | ||
let read = | ||
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true) | ||
.await | ||
.unwrap(); | ||
|
||
let expected = vec![ | ||
"+----+----+", | ||
"| c1 | c2 |", | ||
"+----+----+", | ||
"| | 1 |", | ||
"+----+----+", | ||
]; | ||
assert_batches_sorted_eq!(expected, &read); | ||
} | ||
|
||
#[tokio::test] | ||
async fn evolved_schema_incompatible_types() { | ||
let c1: ArrayRef = | ||
|
@@ -1181,6 +1322,7 @@ mod tests { | |
None, | ||
Some(Arc::new(schema)), | ||
None, | ||
false, | ||
) | ||
.await; | ||
assert_contains!(read.unwrap_err().to_string(), | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is an error when build the row filter, why not throw the error out?