-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25561][SQL] Implement a new config to control partition pruning fallback (if partition push-down to Hive fails) #22614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
Test build #96868 has finished for PR 22614 at commit
|
|
Looks ok to me based on discussion in the bug. Will leave here to see if others have any comments. |
| } catch { | ||
| case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && | ||
| !tryDirectSql => | ||
| case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not blindly call getAllPartitions. This will be super slow. We should do some retries. It depends on the errors we got.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @srinathshankar @ericl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile From HMS side, the error is always the same "MetaException" and there is no way to tell apart a direct SQL error from an error of "not supported" (unfortunately!). How do you propose we address this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it's not blindly calling that API right? It was already being called before if direct sql was disabled. In the other case, it was just throwing an exception. So now instead of erroring out it will work, just more slowly than expected.
Unless there's some retry at a higher layer that I'm not aware of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @sameeragarwal @tejasapatil Could you share what FB does for the retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile : Sorry for late reply. We had seen issues with this in past and resorted to do exponential backoff with retries. Fetching all the partitions is going to be bad in a prod setting.... even if it makes it through, the underlying problem if left un-noticed is bad for the system health.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you review the newer changes I have done? Basically, yes, I agree that fetching all partitions is going to be bad and hence we'll leave it up to the user. They can disable fetching all the partitions by setting "spark.sql.hive.metastorePartitionPruning.fallback.enabled" to false. In that case, we'll never retry. If it is set to "true", then we'll retry. As simple as that.
I don't completely understand "exponential backoff with retries". Do you do this at the HMS level? or at the query level? If HMS filter pushdown fails once, does it mean it will succeed in the future? Maybe this is a future improvement to this where instead of a boolean "fallback-enabled" or "fallback-disabled", we can have multiple levels for trying the fallback with timing etc. Thoughts @tejasapatil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kmanamcheri : Lets do this:
- We should prefer doing
getPartitionsByFilterMethod(). If it fails, we retry with increasing delay across retries. - If retries are exhausted, we could fetch all the partitions of the table. Some people might not want this so lets control this using a conf flag. For those who don't want it, the query could fail at this point.
What do you think ?
| // To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: | ||
| // val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean | ||
| val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, | ||
| tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping @wangyum, too.
| s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) | ||
| "degrade performance. Enable direct SQL mode in hive metastore to attempt " + | ||
| "to improve performance. However, Hive's direct SQL mode is an optimistic " + | ||
| "optimization and does not guarantee improved performance.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kmanamcheri . Could you show different and more correct warning messages based on tryDirectSql value here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Yes that would be better. I'll add that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I do not think we should issue a warning message and call getAllPartitions. When the number of partitions is huge, getAllPartitions will be super super slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sometimes failing is better than spending several hours to get all partitions. Shall we add a config to switch the behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the original warning message is more accurate. Direct sql mode isn't just about performance. It's also about enhanced capability, e.g. supporting filtering on non-string type columns. As the original comment states, setting the direct sql config value to true may resolve a problem around metastore-side partition filtering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive has a config "hive.metastore.limit.partition.request" that can limit number of partitions that can be requested from HMS. So I think there is no need for a new config on the Spark side.
Also since direct sql is a best effort approach just failing when direct sql is enabled is not good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mallman I haven't tried using that config option. If I am understanding the documentation for HIVE_MANAGE_FILESOURCE_PARTITIONS correctly, if I set that value to false, partitions will not be stored in HMS. That sounds like it is addressing a different issue, no?
If that's the suggested way to deal with non-supported partition filters, then this code should always fail if getPartitionsByFilter fails, no? Why even have a fallback (as we do currently)? SPARK-17992 seems to say that Spark should handle certain cases of partition pushdown failures (such as HMS ORM mode). My argument is that the case should be expanded to include even if hive.metastore.try.direct.sql is enabled to be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun @mallman I have updated the log messages to be more descriptive and helpful for the user to indicate what they should try doing. Does that help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option if we want to get all fancy is to add a configurable timeout in the fallback case - assuming it's possible to cancel an ongoing call (run in a separate thread + interrupt maybe?).
My main concern with the fallback, really, isn't the slowness, but that in the case where it would be slow (= too many partitions), the HMS might just run itself out of memory trying to serve the request.
Reza mentions the Hive config which I think is the right thing to do by the HMS admin, since it avoids apps DoS'ing the server. Not sure what's the behavior there, but I hope if fails the call if there are too many partitions (instead of returning a subset). IMO that config seems to cover all the concerns here assuming the call will fail when you have too many partitions, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if the query tries to get more partitions than "hive.metastore.limit.partition.request", the query will fail. Using the hive config the user can judge if he wants to get all the partitions or not. I also think that config covers all the concerns
|
Let us add a conf to control it? Failing fast is better than hanging. If users want to get all partitions, they can change the conf by themselves. |
@gatorsmile We already have a config option "spark.sql.hive.metastorePartitionPruning". If that is set to false, we will never push down the partitions to HMS. I will add "spark.sql.hive.metastorePartitionPruningFallback" which in addition to the previous one controls the fallback behavior. Irrespective of the value of Hive direct SQL, if we enable the pruning fallback, we will catch the exception and fallback to fetch all partitions. Does this sound like a reasonable compromise @mallman ? |
|
Yes. Let us add a conf for controlling the fallback. Please also add the test cases for verifying it. Thanks! |
|
Test build #96899 has finished for PR 22614 at commit
|
|
@gatorsmile I have added the config option and an additional test. Here's the new behavior
@dongjoon-hyun @mallman @vanzin If these look good, can we move on this to merge? Thanks a lot for all the comments and discussions. |
| .createWithDefault(true) | ||
|
|
||
| val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK = | ||
| buildConf("spark.sql.hive.metastorePartitionPruningFallback") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use spark.sql.legacy prefix like SPARK-19724, @rxin and @cloud-fan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reasoning for marking this as legacy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.hive.metastorePartitionPruning.fallback.enabled
|
Test build #96999 has finished for PR 22614 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #97014 has finished for PR 22614 at commit
|
|
The PR description and title may need to change accordingly. Can you update it? |
|
@viirya I have updated the title and description. |
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
Outdated
Show resolved
Hide resolved
| s"configuration setting ${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to " + | ||
| "false as a work around, however this will result in degraded performance. " + | ||
| "Please report a bug to Hive stating that direct SQL is failing consistently " + | ||
| "for the specified query: https://issues.apache.org/jira/browse/HIVE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should remove the suggestion to file a Hive project bug. Even with the direct SQL configuration setting enabled, there are valid metastore deployments for which it will be ignored. For example, my understanding is that if the metastore uses MongoDB for its underlying storage, the direct SQL configuration setting will be ignored. That means a failure here is not a Hive bug with direct SQL.
| val client = init(false) | ||
| val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), | ||
| Seq(attr("ds") === 20170101)) | ||
| test(s"getPartitionsByFilter returns all partitions when $partPruningFallbackKey=true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change test name to
s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false and $partPruningFallbackKey=true"
?
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
Outdated
Show resolved
Hide resolved
| .doc("When true, enable fallback to fetch all partitions if Hive metastore partition " + | ||
| "push down fails. This is applicable only if partition pruning is enabled (see " + | ||
| s" ${HIVE_METASTORE_PARTITION_PRUNING.key}). Enabling this may degrade performance " + | ||
| "if there are a large number of partitions." ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"if there is"
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #97119 has finished for PR 22614 at commit
|
|
As @tejasapatil suggested above, this fallback is not suggested to be on in a prod setting. It could also impact the system health (e.g., the other concurrent queries that are querying the same Hive metastore could be blocked). Thus, I would suggest to make the new conf as internal and make the configuration description more clear regarding the impact of this fallback. |
|
Also, #22614 (comment) proposes a better and safer solution by introducing |
I'm confused about that suggestion. What is being retried? The user query? The HMS call? What is changing in between the retries that gives any hope that a future call will succeed? And if all the retries fail, do you fallback like this change is doing, or what? |
|
@gatorsmile, @tejasapatil was reviewing the code before I added the new config option. I have asked him to review the new code. Lets see what his thoughts are on that. I have also asked him clarification on what he means by exponential backoff with retries. I want to take a step back and revisit SPARK-17992 and in particular one of the comments from @ericl
It looks like a compromise was reached where we don't support fetching all the time (and only for a subset of cases). My suggested fix is a cleaner way of approaching it through a SQLConf instead of looking at the Hive config. |
|
Test build #97126 has finished for PR 22614 at commit
|
|
Based on my understanding, the solution of FB team is to retry the following commands multiple times: This really depends on what is the actual errors that fail If it still fails, I would suggest to fail fast or depends on the conf value of |
@gatorsmile hmm my understanding was different. I thought they were retrying the fetchAllpartitions method. Maybe @tejasapatil can clarify here?
Doesn't this apply with every other HMS API as well? If so, shouldn't we be building a complete solution in HiveShim around this to do an
Ok I agree. I think we need clarification from @tejasapatil on which call they retry. |
|
@tejasapatil @kmanamcheri any update? thank you very much in advance. |
|
Can one of the admins verify this patch? |
|
Closing this due to author's inactivity. |
What changes were proposed in this pull request?
When using partition filter pushdown to Hive Metastore, the pushdown might fail (depending on the type of partition and if direct SQL mode is enabled in HMS). This change implements a new config option in Spark SQL to decide what to do when a partition filter pushdown fails. The fallback behavior would be to fetch all partitions instead. This might degrade performance depending on the total number of partitions and warnings are added as appropriate.
How was this patch tested?
New unit tests were added to confirm behavior of the new config. All Unit tests on the Spark SQL component were run successfully.