-
Notifications
You must be signed in to change notification settings - Fork 3k
Hive: Make lock check retries backoff exponentially #1873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| private static final String HIVE_LOCK_CHECK_BACKOFF_SCALE_FACTOR = "iceberg.hive.lock-check-backoff-scale-factor"; | ||
| private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes | ||
| private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds | ||
| private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might worth to mention in the documentation, or somewhere that this should be smaller than hive.txn.timeout or in newer versions metastore.txn.timeout otherwise the locks might be timed out without because of the lack of heartbeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add these configs to configuration.md to the rest of Hadoop conf, @raptond.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Glue catalog is introducing support for a lock using DynamoDB. It would be nice to standardize these options across catalogs so that we only need to document them once and they work the same way. FYI @jackye1995.
I think it would also make sense for these to be catalog options, rather than pulled from the Hive configuration. We used HiveConf originally because we didn't have catalog-specific configuration, but now I think it would make sense to move these into catalog properties. We don't want to increase the cases where we use a Hadoop Configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes agree. But Hive is currently built around reading from Hadoop configs. If we want to change it to use catalog properties, we need to also change all the places that loads HiveCatalog using the constructor public HiveCatalog(Configuration conf), such as https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java#L215
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is also used from Spark 2 where we don't have a way to specify catalog options. Plus, we already have a Hadoop conf for the lock timeout. How do we approach this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That being said, I am +1 for adding catalog options. I am just not sure we can get rid of Hadoop conf completely in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, we should default from Configuration, but prefer options passed to the initialize method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I got @rdblue and @jackye1995. You are talking about generalizing catalog options, right?
| private TServer server; | ||
| private HiveMetaStore.HMSHandler baseHandler; | ||
| private HiveClientPool clientPool; | ||
| protected HiveClientPool clientPool; // Exposed for testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could we use VisibleForTesting annotation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this was a test class, I didn't add this annotation. I finally ended up not using it, so I reverted back the change. Thanks for the review.
|
Minor comments, looks good to me (non-binding) |
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
|
This change looks great to me, just minor comments in addition to what @pvary mentioned. We could add fewer configs but I'd be in favor of what this PR does. It was really painful when we hit this problem and couldn't do anything without changing the code so having props to configure every aspect sounds good to me. It is rather a sensitive area and more control here seems justified to me. |
| if (state.get().equals(LockState.WAITING)) { | ||
| try { | ||
| Tasks.foreach(lockId) | ||
| .retry(Integer.MAX_VALUE - 100) // Endless retries bound by timeouts. Tasks.retry adds 1 for "first try". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why - 100?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only wanted to keep a big number for retry. Eg Integer.MAX_VALUE. But, the setter adds 1 overflowing to MIN_VALUE.
Integer.MAX_VALUE - 1 would simply suffice, but I chose conservatively to set Integer.MAX_VALUE - 100.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be worth noting the rational for a choice like this in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 +1 I have added the rationale in the comments.
| } | ||
|
|
||
| @Test | ||
| public void testLockAcquisitionAfterRetries() throws TException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is InterruptedException needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, here the HiveTableOperations.doUnlock method throws InterruptedException.
| conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT); | ||
| this.lockCheckBackoffScaleFactor = | ||
| conf.getDouble(HIVE_LOCK_CHECK_BACKOFF_SCALE_FACTOR, HIVE_LOCK_CHECK_BACKOFF_SCALE_FACTOR_DEFAULT); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken care.
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
| if (newState.equals(LockState.WAITING)) { | ||
| throw WAITING_FOR_LOCK_EXCEPTION; | ||
| } | ||
| } catch (InterruptedException | TException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For InterruptedException, why not throw WaitingForLockException and signal that the thread was interrupted? Then this could use the checked exception call, run(id -> {...}, TException.class) and would not need to wrap the exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks better after this comment. However throwing WaitingForLockException ends up losing the source of the original InterruptedException because it would get handled here: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/util/Tasks.java#L452
So, I chose to throw RuntimeException which will stop the retry and preserve the source stack trace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably opt to suppress the interrupt and let the code carry on after setting that the thread was interrupted. That results in a CommitFailedException. I don't think that preserving the stack of the InterruptedException is really needed, but I'm fin with it this way if you prefer it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken care as per the other comment.
50 milliseconds (constant) sleep time between "checking lock status" thrashes hive metastore databases when multiple jobs try to commit to the same Iceberg table. This fix allows the frequency of "checking the WAITING lock status" configurable and makes use of Tasks to backoff exponentially.
b201863 to
c53108c
Compare
|
|
||
| protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO, | ||
| String catalogName, String database, String table) { | ||
| String catalogName, String database, String table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary whitespace change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException("Interrupted while checking lock status.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is necessary to throw RuntimeException here. If this doesn't throw WaitingForLockException then it will exit and move on. Since timeout is not set, it would hit the check for whether the lock was acquired and fail, resulting in the CommitFailedException.
I think that's a fairly reasonable way to handle an interrupt without wrapping it in a RuntimeException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. I was fixated on failing the execution. This suggestion works nicely and I have a test case for this. Thank you.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. @aokolnychyi can you take another look?
99f7ac1 to
e1f4d48
Compare
|
All comments taken care. |
|
I'll do a pass in 15 mins |
|
Thanks everyone! The change looks solid so I merged it! |
|
@raptond, do you want to work on catalog options for this next? |
|
@aokolnychyi - Sure, I will work on to submit a new one with catalog options (properties via initialize method) overriding the Configuration. |

50 milliseconds (constant) sleep time between "checking lock status" thrashes hive metastore databases when multiple jobs try to commit to the same Iceberg table. This fix allows the frequency of "checking the WAITING lock status" configurable and makes use of Tasks to backoff exponentially.
Every time a check on the lock is made, the HMS performs heartbeats on the lock record and the transaction record. It eventually ends up with the below errors if the number of jobs on the same table grew and commit at the same time. Ability to configure the delay between retries and slowing down retries further exponentially would help. Thanks.