[SPARK-32985][SQL] Decouple bucket scan and bucket filter pruning for data source v1#31413
[SPARK-32985][SQL] Decouple bucket scan and bucket filter pruning for data source v1#31413c21 wants to merge 10 commits intoapache:masterfrom
Conversation
| withTable("bucketed_table") { | ||
| val numBuckets = NumBucketsForPruningDF | ||
| val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) | ||
| // json does not support predicate push-down, and thus json is used here |
There was a problem hiding this comment.
This is not true anymore as json filter push down was added in https://issues.apache.org/jira/browse/SPARK-30648 .
| ("SELECT j FROM t1", 0, 0), | ||
| // Filter on bucketed column | ||
| ("SELECT * FROM t1 WHERE i = 1", 1, 1), | ||
| ("SELECT * FROM t1 WHERE i = 1", 0, 1), |
There was a problem hiding this comment.
This unit test change is expected, as we no longer need to do bucket scan for this kind of query. See related change in DisableUnnecessaryBucketedScan.scala
|
cc @cloud-fan and @maropu could you guys take a look when you have time? Thanks. |
|
Test build #134712 has finished for PR 31413 at commit
|
|
Kubernetes integration test starting |
| partitionValues = partition.values | ||
| ) | ||
|
|
||
| if (filePruning(filePath)) { |
| val filePruning: Path => Boolean = optionalBucketSet match { | ||
| case Some(bucketSet) => | ||
| filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName) | ||
| .getOrElse(sys.error(s"Invalid bucket file $filePath"))) |
There was a problem hiding this comment.
Could you use IllegalStateException instead of sys.error?
There was a problem hiding this comment.
I was following code path for creating bucketed RDD. Do we want to change this place as well?
Yea, since the fix is trivial, changing it in this PR looks fine to me.
Just wondering why we prefer IllegalStateException here? Is it better error classification?
This is not a strict rule, but I think we tend to use IllegalStateException for a unexpected code path. For example, see the related previous comment:
#28810 (comment)
There was a problem hiding this comment.
+1, sys.error crashes the JVM and should be avoided.
| s"open cost is considered as scanning $openCostInBytes bytes.") | ||
|
|
||
| // Filter files with bucket pruning if possible | ||
| val filePruning: Path => Boolean = optionalBucketSet match { |
There was a problem hiding this comment.
nit: filePruning -> can[Bucket]Prune?
|
I left minor comments and it looks fine otherwise. cc: @viirya , too. |
| val filePruning: Path => Boolean = optionalBucketSet match { | ||
| case Some(bucketSet) => | ||
| filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName) | ||
| .getOrElse(sys.error(s"Invalid bucket file $filePath"))) |
There was a problem hiding this comment.
It looks not good to fail the query here. The scan node actually reads bucket implicitly because bucket scan is disabled for this path. Instead of query failure, maybe log warning and skip pruning?
There was a problem hiding this comment.
The error here indicates there's data corruption (invalid file name) for spark data source bucketed table. The benefit for logging warning here is to unblock read these kind of corrupted bucketed tables with disabling bucketing. I feel this is dangerous. Users should not rely on disabling bucketing to read potentially wrong data from bucketed table, they should correct the table. I am more preferring to fail loud here with exception, as warning logging would be very hard to debug. But I am open to others opinions as well, cc @maropu and @cloud-fan .
There was a problem hiding this comment.
Your reason sounds reasonable. But it still sounds like a potential breaking change. I'm not sure if some users read table in this way, but it is indeed possible. It will very confuse to them as they won't ask to read table in bucketed way and it works previously.
If eventually we still decide to fail the query, then a good to understand error message is necessary to let users know why we use bucketed spec here and why Spark fails to read the table. Maybe we should also provide some hints for solving it, if possible.
There was a problem hiding this comment.
We already have some options to control missing & corrupted files for data sources, so how about following the semantics?
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Lines 1282 to 1298 in 4e7e7ee
For the data corruption case above, how about throwing an exception by default and then stating in the exception message that you should use a specified option if you want to ignore it?
There was a problem hiding this comment.
Yea, I think we better can avoid the case that no way to overcome the bucket id corruption. Using existing option to ignore the failure sounds good.
|
Kubernetes integration test status success |
|
Test build #134717 has finished for PR 31413 at commit
|
c21
left a comment
There was a problem hiding this comment.
Addressed all comments, and ready to review again, thanks. cc @maropu , @viirya and @cloud-fan .
| s"open cost is considered as scanning $openCostInBytes bytes.") | ||
|
|
||
| // Filter files with bucket pruning if possible | ||
| val filePruning: Path => Boolean = optionalBucketSet match { |
| val filePruning: Path => Boolean = optionalBucketSet match { | ||
| case Some(bucketSet) => | ||
| filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName) | ||
| .getOrElse(sys.error(s"Invalid bucket file $filePath"))) |
| val filePruning: Path => Boolean = optionalBucketSet match { | ||
| case Some(bucketSet) => | ||
| filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName) | ||
| .getOrElse(sys.error(s"Invalid bucket file $filePath"))) |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
| s"open cost is considered as scanning $openCostInBytes bytes.") | ||
|
|
||
| // Filter files with bucket pruning if possible | ||
| val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles |
| throw new IllegalStateException( | ||
| s"Invalid bucket file $filePath when doing bucket pruning. " + | ||
| s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable exception " + | ||
| "and read the file.") |
| } else { | ||
| throw new IllegalStateException( | ||
| s"Invalid bucket file $filePath when doing bucket pruning. " + | ||
| s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable exception " + |
|
Test build #134746 has finished for PR 31413 at commit
|
|
Kubernetes integration test starting |
|
|
||
| // Filter files with bucket pruning if possible | ||
| lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles | ||
| val canPrune: Path => Boolean = optionalBucketSet match { |
There was a problem hiding this comment.
nit: perhaps rename this to shouldNotPrune or shouldProcess? canPrune sounds like the path should be ignored.
There was a problem hiding this comment.
I am very bad at naming :) This is suggested from #31413 (comment). Shall I change again? cc @maropu .
There was a problem hiding this comment.
Changed to shouldProcess as I feel shouldNotPrune is hard to reason about.
| case Some(id) => bucketSet.get(id) | ||
| case None => | ||
| if (ignoreCorruptFiles) { | ||
| // If ignoring corrupt file, do not prune when bucket file name is invalid |
There was a problem hiding this comment.
curious what's the previous behavior of this, or is this newly introduced? we may need to add the info to the PR description (user-facing change).
Also I'm not sure if this is the best choice: if a bucketed table is corrupted, should we read the corrupt file? it will likely lead to incorrect results. On the other hand we can choose to ignore the file which seems to be more aligned with the name of the config, although result could still be incorrect.
There was a problem hiding this comment.
@sunchao - this is newly introduced. Updated PR description.
Also I'm not sure if this is the best choice: if a bucketed table is corrupted, should we read the corrupt file? it will likely lead to incorrect results. On the other hand we can choose to ignore the file which seems to be more aligned with the name of the config, although result could still be incorrect.
Note by default the exception will be thrown here and query will be failed loud. We allow a config here to help existing users to work around if they want. See relevant discussion in #31413 (comment) .
There was a problem hiding this comment.
Cool thanks for pointing to the discussion. I'm just not sure whether the corrupted file should be ignored or processed if the flag is turned on. ignoreCorruptedFiles seems to indicate that the problematic file should be ignored so it is a bit confusing that we still process it here. Also IMO ignoring it seems to be slightly safer (thinking someone dump garbage files into the bucketed partition dir)?
There was a problem hiding this comment.
I feel either skipping or processing the file is no way perfect. There can be other corruption case, where e.g. the table (specified with 1024 buckets), but only had 500 files underneath. This could be due to some other compute engines or users accidentally dump data here without respecting spark bucketing metadata. We have no efficient way to handle if number of files fewer than number of buckets.
The existing usage of ignoreCorruptFiles skip reading some of content of file, so it's also not completely ignoring. But I am fine if we think we need another config name for this.
There was a problem hiding this comment.
Given users explicitly disable bucketing here for reading the table, I would assume they want to read the table as a non-bucketed table, so they would like to read all of input files, no? cc @viirya what's the use case you are thinking here? Thanks.
There was a problem hiding this comment.
I feel either skipping or processing the file is no way perfect.
Yes agreed.
Given users explicitly disable bucketing here for reading the table, I would assume they want to read the table as a non-bucketed table, so they would like to read all of input files
Good point. Although it seems a bit weird that someone would do this.
There was a problem hiding this comment.
This seems unrelated to "decouple bucket scan and bucket filter pruning". Can we do it in a followup PR and discuss there? Let's not introduce extra behavior change when not necessary.
There was a problem hiding this comment.
@cloud-fan - sorry which part you are suggesting to do in a followup PR? Here we anyway need to decide how do we handle when file name is not a valid bucket file name (process or not process the file) for pruning. Does I miss anything?
There was a problem hiding this comment.
Here we anyway need to decide how do we handle when file name is not a valid bucket file name (process or not process the file) for pruning
We should follow whatever behavior before this PR, since the correct behavior is not obvious and triggers discussion here.
There was a problem hiding this comment.
@cloud-fan - okay. The behavior before this PR is to process all files for bucketed table if disabling bucketing. I changed the code to not prune the file if bucket file name is invalid. So this should follow previous behavior, and we can discuss whether to throw exception/ignore file/process file in followup PR. cc @maropu and @viirya for code change here.
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #134757 has finished for PR 31413 at commit
|
|
Test build #134765 has finished for PR 31413 at commit
|
c21
left a comment
There was a problem hiding this comment.
@cloud-fan - addressed your comment, and it's ready for review again, thanks.
| case Some(id) => bucketSet.get(id) | ||
| case None => | ||
| if (ignoreCorruptFiles) { | ||
| // If ignoring corrupt file, do not prune when bucket file name is invalid |
There was a problem hiding this comment.
@cloud-fan - okay. The behavior before this PR is to process all files for bucketed table if disabling bucketing. I changed the code to not prune the file if bucket file name is invalid. So this should follow previous behavior, and we can discuss whether to throw exception/ignore file/process file in followup PR. cc @maropu and @viirya for code change here.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
| } | ||
|
|
||
| withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { | ||
| // Bucket pruning should still work when bucketing is disabled |
There was a problem hiding this comment.
then do we have a config to turn on/off bucketing file pruning, or it's always applied?
There was a problem hiding this comment.
@cloud-fan - always applied. I can add one if this makes surprise to existing users/queries, and they can turn off if needed. e.g. spark.sql.sources.bucketing.pruning.enabled?
There was a problem hiding this comment.
Or we can keep the previous behavior: when SQLConf.BUCKETING_ENABLED is off, don't do any bucket optimization, including bucket scan and bucket pruning.
There was a problem hiding this comment.
@cloud-fan - make sense to me. Updated the change.
|
Test build #134848 has finished for PR 31413 at commit
|
| s"open cost is considered as scanning $openCostInBytes bytes.") | ||
|
|
||
| // Filter files with bucket pruning if possible | ||
| lazy val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled |
There was a problem hiding this comment.
There is already a lazy val bucketedScan: Boolean in L261
There was a problem hiding this comment.
@cloud-fan - that's bucketedScan, but we need ...conf.bucketingEnabled, here we already in the code path where bucketedScan is false - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L413 .
There was a problem hiding this comment.
ah sorry I missed that. Probably we can just make it val, since reading conf is very cheap, while lazy val has overhead.
There was a problem hiding this comment.
since reading conf is very cheap
That's what I feel too, but I got feedback earlier here - #31413 (comment). @maropu - could you help provide more context here? Thanks.
There was a problem hiding this comment.
Yea, reverting it back looks okay.
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #134857 has finished for PR 31413 at commit
|
c21
left a comment
There was a problem hiding this comment.
@cloud-fan - addressed all comments, and this is ready for review again, thanks.
| s"open cost is considered as scanning $openCostInBytes bytes.") | ||
|
|
||
| // Filter files with bucket pruning if possible | ||
| lazy val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #134910 has finished for PR 31413 at commit
|
jaceklaskowski
left a comment
There was a problem hiding this comment.
LGTM (non-binding)
|
thanks, merging to master! |
|
Thank you all for review! |
| logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + | ||
| s"open cost is considered as scanning $openCostInBytes bytes.") | ||
|
|
||
| // Filter files with bucket pruning if possible |
There was a problem hiding this comment.
It's a bit odd that we call method name as createNonBucketedReadRDD but do something with buckets. I guess we could name createNonBucketedReadRDD like just createReadRDD or createStandardReadRDD
| case Some(id) => bucketSet.get(id) | ||
| case None => | ||
| // Do not prune the file if bucket file name is invalid | ||
| true |
There was a problem hiding this comment.
Hm, it could be one liner:
filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)If it looks less readable we could:
filePath => BucketingUtils.getBucketId(filePath.getName).map(bucketSet.get).getOrElseIf we worry about perf penalty from pattern matching, etc. we could do:
filePath => {
val bucketId = BucketingUtils.getBucketId(filePath.getName)
if (bucketId.isEmpty) true else bucketSet.get(bucketId.get)
}…r change in FileSourceScanExec ### What changes were proposed in this pull request? This PR is a followup change to address comments in #31413 (comment) and #31413 (comment) . Minor change in `FileSourceScanExec`. No actual logic change here. ### Why are the changes needed? Better readability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. Closes #32000 from c21/bucket-scan. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
As a followup from discussion in #29804 (comment) . Currently in data source v1 file scan
FileSourceScanExec, bucket filter pruning will only take effect with bucket table scan. However this is unnecessary, as bucket filter pruning can also happen if we disable bucketed table scan. Read files with bucket hash partitioning, and bucket filter pruning are two orthogonal features, and do not need to couple together.Why are the changes needed?
This help query leverage the benefit from bucket filter pruning to save CPU/IO to not read unnecessary bucket files, and do not bound by bucket table scan when the parallelism of tasks is a concern.
In addition, this also resolves the issue to reduce number of tasks launched for simple query with bucket column filter - SPARK-33207, because with bucket scan, we launch # of tasks to equal to # of buckets, and this is unnecessary.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
BucketedReadSuite.scalato make all existing unit tests for bucket filter work with this PR.