-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark #32473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @huaxingao .
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Also @huaxingao it's recommended to generate benchmark results through Github actions: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #138272 has finished for PR 32473 at commit
|
|
The benchmark test result for Parquet bloom filter is quite depressing. Did I do anything wrong? |
|
Test build #138296 has finished for PR 32473 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
| Read a row from 100M rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Without bloom filter 1201 1230 41 83.3 12.0 1.0X | ||
| With bloom filter 1262 1301 54 79.2 12.6 1.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks strange to me, too.
The benchmark test result for Parquet bloom filter is quite depressing. Did I do anything wrong?
| } | ||
| benchmark.addCase("With bloom filter") { _ => | ||
| df.write.mode("overwrite") | ||
| .option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#value", true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please debug with this code path. This might be insufficient to enable bloom filters in Parquet library for some reason. The usual suspects are 1) the parameter is not handed over correctly to Parquet, 2) the parameter needs another parameters, 3) this specific data type is not supported well yet in Parquet.
.option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#value", true)cc @ggershinsky since he is a Parquet committer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
BTW, thank you for leading this effort in Apache Spark community, @huaxingao . Since this is the first try, you will remove the roadblocks in advance for the users. If there is a drawback, you can file a JIRA to Apache Parquet community and documents in Apache Spark 3.2.0 documentation until it's resolved in the Apache Parquet community. |
|
cc @wangyum too |
| df.write.parquet(path + "/withoutBF") | ||
| df.write.option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#value", true) | ||
| .parquet(path + "/withBF") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to set row group size, e.g.
df.write.option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#value", true)
.option("parquet.block.size", 1024 * 1024)
.parquet(path + "/withBF")Then you will see the benchmark difference.
[info] Running benchmark: Read a row from 100M rows
[info] Running case: Without bloom filter
[info] Stopped after 3 iterations, 2674 ms
[info] Running case: With bloom filter
[info] Stopped after 5 iterations, 2383 ms
[info] OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Mac OS X 10.16
[info] Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
[info] Read a row from 100M rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] Without bloom filter 872 892 19 114.7 8.7 1.0X
[info] With bloom filter 473 477 3 211.4 4.7 1.8X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow that's a great improvement! I wonder if we should make the row group size a parameter for the benchmark too. There seems to be other related parameters too such as DEFAULT_MAX_BLOOM_FILTER_BYTES
|
The result looks really good now. Thanks everyone! Your guys are extremely helpful! |
| withTempPath { dir => | ||
| val path = dir.getCanonicalPath | ||
|
|
||
| df.write.parquet(path + "/withoutBF") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When comparing withoutBF and withBF, I think we should use the same parquet.block.size for both cases. It might also affects the numbers of withoutBF.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, forgot to change that. Fixed. Will update results
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we change the grouping in order see the trend according to the block size? For example,
...
Without bloom filter, blocksize: 8388608 1005 1011 8 99.5 10.0 1.0X
Without bloom filter, blocksize: 9437184 992 1002 14 100.8 9.9 1.0X
With bloom filter, blocksize: 8388608 385 404 20 259.6 3.9 2.6X
With bloom filter, blocksize: 9437184 521 538 16 191.9 5.2 1.9X
...
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the benchmark run with the built-in In predicate in Parquet? Or still using current one in Spark?
|
Kubernetes integration test status success |
|
Test build #139523 has finished for PR 32473 at commit
|
|
Test build #139527 has finished for PR 32473 at commit
|
|
Test build #139533 has finished for PR 32473 at commit
|
|
Test build #139517 has finished for PR 32473 at commit
|
It's still using the current one in Spark. That's why I kept on decreasing the num of predicates, otherwise, i got OOM. |
| Without bloom filter 70 76 6 14.2 70.2 1.0X | ||
| With bloom filter 73 103 22 13.8 72.6 1.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bloom filter is slower. It is due to IN predicate problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For JDK8, bloom filter seems faster a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not due to IN predicate problem because ORC also seems a bit slower with bloom filter. I think the data is too small. Let me increase the data size and try again.
| df2.repartition(col("value")).sort(col("value")) | ||
| .write.option("orc.bloom.filter.columns", "value").orc(path + "/withBF") | ||
|
|
||
| runBenchmark(s"ORC Read for IN set") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s"" is not needed.
| .write.option("orc.bloom.filter.columns", "value").orc(path + "/withBF") | ||
|
|
||
| runBenchmark(s"ORC Read for IN set") { | ||
| val benchmark = new Benchmark(s"Read a row from 1M rows", 1000 * 1000, output = output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some unnecessary s"". And a question about IN predicate with bloom filter case in Parquet. Otherwise looks okay.
|
Test build #139689 has finished for PR 32473 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #142484 has finished for PR 32473 at commit
|
|
Test build #142490 has finished for PR 32473 at commit
|
|
This PR is kind of messy. I will close this. The new PR is here #34594 |
What changes were proposed in this pull request?
Add BloomFilter Benchmark test for Parquet
Why are the changes needed?
Currently, we only have BloomFilter Benchmark test for ORC. Will add one for Parquet too.
Does this PR introduce any user-facing change?
no
How was this patch tested?
tested the newly added benchmark test