-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17091][SQL] ParquetFilters rewrite IN to OR of Eq #14671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| filterClass: Class[_ <: FilterPredicate], | ||
| checker: (DataFrame, Seq[Row]) => Unit, | ||
| expected: Seq[Row]): Unit = { | ||
| df: DataFrame, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the indentation changes, I can revert all of these.
|
Test build #63867 has finished for PR 14671 at commit
|
|
cc @HyukjinKwon @rdblue for Parquet-related change |
|
Test build #63877 has finished for PR 14671 at commit
|
|
Thanks for cc me! As you might already know, I think it makes sense allowing to filter rowgroups but this will be also applied to row-by-row for normal parquet reader and this was removed by SPARK-16400. So, let me please cc @rxin and @liancheng here. IMHO, I remember there is a concern (sorry I can't find the reference) that Spark-side codegen row-by-row filtering might be faster than Parquet's one in general due to type-boxing and virtual function calls which Spark's one tries to avoid. So, actually, I was thinking of bringing back this after (maybe) Parquet row-by-row filtering is disabled in Spark to allow to filter rowgroups properly. I am pretty sure filtering rowgroups will make sense but I am a bit hesitated for row-by-row one because it seems it was removed for better performance and bringing it back might be performance regression although the implementation is different. Do we maybe need a benchmark? (or just avoid row-by-row one for this case if possible?) Otherwise, maybe we should experiment to check if Spark codegen one is actually faster than Parquet's so that we can decide to disable row-by-row filtering in Parquet first (although I am not sure if this was done somewhere or offline). EDITED: If it was just removed only for cleaning up the codebase, I agree with this change. |
| test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { | ||
| import testImplicits._ | ||
| Seq("true", "false").map { vectorized => | ||
| Seq("true", "false").foreach { vectorized => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I remember I was told that this case is even essential in some cases, #14416 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, maybe this belongs in a separate PR as a followup to the previous one, I just changed it because Intellij was yelling at me :P
|
Yea unfortunately the row-by-row filtering doesn't make much sense in Parquet. |
|
Thanks for the comments guys! Had to search through some code, but I think I understand the current state of things. Correct me if I'm wrong, but it seems that record-by-record filtering only occurs if the vectorized reader is disabled, as there is no logic in SpecificParquetRecordReaderBase to perform individual record filtering, so currently this will only be happening if we fallback to the Parquet-provided ParquetRecordReader. After doing some digging into ParquetRecordReader, it pushes down the "record-by-record" filter to InternalParquetRecordReader. However, it appears that inside of the InternalParquetRecordReader, you can actually disable row-by-row filtering, with the magic conf So in theory if we set this configuration to false when we construct ParquetRecordReader in ParquetFileFormat, we wouldn't have to worry about row-by-row filtering being applied along either code path (I think). |
|
Yea, that is all true. Actually, it would be okay just not to pass the filter here for the normal parquet reader because we are setting the filter for rowgroups here for both normal one and vectorized one. (BTW, you might already notice the actually filter2 for each row is related in However, if my understanding is correct, the point is, it seems even not clear whether we should disable Parquet's row-by-row filtering or not. In my point of view, we need a benchmark to see if Spark's codegen one is really faster than Parquet's one for row-by-row. I guess we are assuming Spark one is faster than Parquet's one. |
|
I mean, maybe we should disable the row-by-row one in Parquet with a proper benchmark first before handling |
|
That is true, but currently all filters are being pushed down to row-by-row anyway when not using the vectorized reader, so I'm unclear why the IN filter is special |
|
@andreweduffy @rxin Maybe I can go for the simple benchmark quickly (maybe within this weekend) and open a PR to disable Parquet row-by-row filtering if it makes sense and this can be the reason to hold on this PR? |
|
Yeah benchmarking is definitely a great idea, as it is likely Spark will be better than Parquet at filtering individual records, but I'm still not quite understanding why this filter is any different and should block on row-by-row filtering decision. All filters are being processed row-by-row using ParquetRecordReader to my understanding, and this one is no different from any of the others in ParquetFilters. |
|
@andreweduffy's comments about this make sense to me. Improving the filters that are pushed is a good idea, even if we decide to disable Parquet's row-by-row filtering. The option to disable row-level filtering, |
|
@rxin Did you get the chance to take a closer look at this? |
|
cc @davies @cloud-fan for parquet change, seems I got @rdblue's stamp of approval |
|
@HyukjinKwon do you have time to work on that benchmark over the next week? |
|
@ash211 I am happy to do so but I would like to make sure if there is a offline benchmark performed already and if we can disable this if the performance is better. I don't want to duplicate some efforts so I just, at least, want a stamp of approval from one of committers. This is also partly because I believe there should be a benchmark already. AFAIK, this is a pretty old issue. |
|
@HyukjinKwon would you like to file a separate ticket for benchmarking? It's pretty orthogonal to this PR, see rdblue's comment above. |
|
@andreweduffy Yup, filed here, https://issues.apache.org/jira/browse/SPARK-17310. |
|
cool, ping to @davies @cloud-fan would either of you be able to look at this? |
|
Before disable the record level filter in parquet reader, I think pushing more non-efficient predicates into parquet reader will be even worse, right? |
|
@davies Row-level filtering doesn't occur with the vectorized reader, which is now enabled by default |
|
@andreweduffy Good point, but we still use the parquet-mr when there is any complex type in the schema. |
|
@davies Do you mind if I ask whether it is sensible to perform a benchmark and try to submit a PR to disable this (maybe with adding an extra option to enable/disable this but false by default)? If agreed, I would rather like to do this quickly (as I said if it looks the reason to block this PR). |
|
@HyukjinKwon That sounds good, thanks! |
|
Thanks for confirming this. I will work on this. |
|
In light of @HyukjinKwon's benchmark it seems like Spark-side filtering is the right thing to do here, so I think this should be good? |
…rquet-side ## What changes were proposed in this pull request? There is a concern that Spark-side codegen row-by-row filtering might be faster than Parquet's one in general due to type-boxing and additional fuction calls which Spark's one tries to avoid. So, this PR adds an option to disable/enable record-by-record filtering in Parquet side. It sets the default to `false` to take the advantage of the improvement. This was also discussed in #14671. ## How was this patch tested? Manually benchmarks were performed. I generated a billion (1,000,000,000) records and tested equality comparison concatenated with `OR`. This filter combinations were made from 5 to 30. It seem indeed Spark-filtering is faster in the test case and the gap increased as the filter tree becomes larger. The details are as below: **Code** ``` scala test("Parquet-side filter vs Spark-side filter - record by record") { withTempPath { path => val N = 1000 * 1000 * 1000 val df = spark.range(N).toDF("a") df.write.parquet(path.getAbsolutePath) val benchmark = new Benchmark("Parquet-side vs Spark-side", N) Seq(5, 10, 20, 30).foreach { num => val filterExpr = (0 to num).map(i => s"a = $i").mkString(" OR ") benchmark.addCase(s"Parquet-side filter - number of filters [$num]", 3) { _ => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString, SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> true.toString) { // We should strip Spark-side filter to compare correctly. stripSparkFilter( spark.read.parquet(path.getAbsolutePath).filter(filterExpr)).count() } } benchmark.addCase(s"Spark-side filter - number of filters [$num]", 3) { _ => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString, SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> false.toString) { spark.read.parquet(path.getAbsolutePath).filter(filterExpr).count() } } } benchmark.run() } } ``` **Result** ``` Parquet-side vs Spark-side: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Parquet-side filter - number of filters [5] 4268 / 4367 234.3 4.3 0.8X Spark-side filter - number of filters [5] 3709 / 3741 269.6 3.7 0.9X Parquet-side filter - number of filters [10] 5673 / 5727 176.3 5.7 0.6X Spark-side filter - number of filters [10] 3588 / 3632 278.7 3.6 0.9X Parquet-side filter - number of filters [20] 8024 / 8440 124.6 8.0 0.4X Spark-side filter - number of filters [20] 3912 / 3946 255.6 3.9 0.8X Parquet-side filter - number of filters [30] 11936 / 12041 83.8 11.9 0.3X Spark-side filter - number of filters [30] 3929 / 3978 254.5 3.9 0.8X ``` Author: hyukjinkwon <[email protected]> Closes #15049 from HyukjinKwon/SPARK-17310.
What changes were proposed in this pull request?
Allow for pushdown of
INclauses. Previous implementations relied upon custom user defined predicates in Parquet, instead here we just convert an IN over a set to an OR over a set of equality expressions, which can be pushed down properly to Parquet.How was this patch tested?
Unit tests from previous PR's, specifically #10278. They pass with the change and fail when the case block is commented out, indicating the pushdown is successfully being applied in Parquet. Because it is a disjunction of equality checks this should be applied at the row group level.