-
Notifications
You must be signed in to change notification settings - Fork 3k
(#2317) Stop removal of files when catalog state is uncertain - HiveCatalog #2328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(#2317) Stop removal of files when catalog state is uncertain - HiveCatalog #2328
Conversation
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
| taskOps.commit(base, updated.withUUID()); | ||
| }); | ||
|
|
||
| } catch (CommitFailedException commitFailedException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rymurr This is something I want to make sure you are ok with. Basically we have to also not clean up data files, I basically changed the rules as specified in TableOperations. Now if you throw CFE we clean up, other wise the client is on their own.
| Exceptions.suppressAndThrow(commitFailedException, this::cleanAll); | ||
| } catch (RuntimeException e) { | ||
| Exceptions.suppressAndThrow(e, this::cleanAll); | ||
| LOG.error("Cannot determine whether the commit was successful or not, the underlying data files may or " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid this will be dangerous. This means we have to update all places that throw a specific exception like AlreadyExistsException to now throw CommitFailedException. This is error-prone and we potentially lose helpful details. We could throw CommitFailedException and add the specific exception as a cause but the first point still holds.
Also, an error log message will most likely be ignored by the user. This is a case where we really want to propagate as much info as possible.
Would it make sense to introduce a new exception type? E.g., UnknownCommitStateException? That way, we can keep the existing logic for handling exceptions except cases where we really don't know what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts, @RussellSpitzer @pvary @rymurr @danielcweeks @Parth-Brahmbhatt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @omalley @rdblue @shardulm94 @rdsr too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree wtih @aokolnychyi here, I think changing the contract for the table ops is dangerous, especially as the change reduces the information available to the user.
I think Anton is right, rather than saying "CommitFailedException is the only valid known failure mode" we should probably say "NewExceptionTypeX means the commit failed in an unknown way and the user may be required to do some immediate intervention"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's fine, I just want to make sure every catalog implements it then, if we always treat all non CME excepts as unknown commit state we know we are never deleting state we need. If we instead do it in the opposite direction we need to be extra sure that all exceptions that may have an unknown state are marked that way. In my mind this left more chance of corruption when someone forgets to mark a particular edgecase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the property should be persisted at table level as a single table could be written from both idempotent and non idempotent clients (flink writer (idempotent) + background a compaction process (nonidempotent)). We can either expose the API at Table or PendingUpdate level.
In absence of more external use cases it is hard to say what is worst , corrupting a table so nothing can read or write until manual intervention or having duplicate data that can go undetected. I would pick duplicate data based on general use cases that I have seen so if we are voting, I would vote to default the behavior to assuming idempotent client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An API on PendingUpdate sounds good to me.
One more reason why I'd vote for not deleting by default is due to this situation:
Also, getting a reply from the metastore or checking the commit status may take a substantial amount of time allowing a concurrent operation to succeed (if the lock expires). That's the worst what can happen as we will silently corrupt the table and will detect it only while querying a specific portion of the table. I had to fix such a case and we were simply lucky we found this out only after a couple of days while rewriting manifests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Parth-Brahmbhatt one thing to remember is that in our current code we actually get both data duplication and corrupt tables, it just depends when the failure occurs. Ie if you fail to check the metastore and lose connection to the file system, you won't clean up and will still throw an error. Or if say the commit is successful but for some other reason the job fails (OOM, Interrupt, power loss, I spill my coke on the server) we still end up with a failed job which if we retry will duplicate data / work.
So you can still get data duplication if a container loses connectivity to the network after committing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are a couple of issues here that we should address separately.
First, I agree that it is a good idea to add a way to signal that a commit is idempotent. Some operations are already idempotent, like file rewrites because we validate that all of the rewritten files are still part of the table. If we update paths that guarantee idempotent commits to signal this and handle UnknownCommitStateException, then we really reduce the incidence of the problem.
Second, I think we still need to agree on the default behavior. While I really don't like the idea of allowing a retry that will write duplicate data, I think that this has convinced me that silently duplicating data is a better outcome.
In our environment, we use versioned buckets so we can always un-delete metadata files that are missing, but that's not always the case. If those files are actually gone, then it is much worse because you have missing data and don't know which data files were in the missing commit without a lot of work. I think this problem is worse than the duplicate data.
A second compelling argument for changing the default is that deleting the metadata files leaks the problem to other writers. All concurrent processes are blocked if a table is broken, rather than blocking just a single writer.
In the end, I think that the right thing is to not delete the files and to throw UnknownCommitStateException as proposed. That handles interactive cases and also makes it so schedulers can handle a job failure by blocking just a single workflow and not all workflows operating on a table. And idempotent jobs should not be affected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would go for API level flag that the change is idempotent since this is more dependent on the client / action then the actual table.
Also I would go for a few connection retries and then throwing the UnknownCommitStateException if we are not able to determine the status of the commit. We should fail fast as soon as possible, so the user is able to mitigate the issue. If they ignore the exception it is better to have a state where we can recover so I would keep the files in case we are not able to handle the error cleanly.
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Show resolved
Hide resolved
804c007 to
7c6e4e4
Compare
49552b1 to
87d13c9
Compare
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Outdated
Show resolved
Hide resolved
| public class CommitStateUnknownException extends RuntimeException { | ||
|
|
||
| private static final String COMMON_INFO = | ||
| "Cannot determine whether the commit was successful or not, the underlying data files may or " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks descriptive enough!
api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Show resolved
Hide resolved
|
|
||
| persistTable(tbl, updateHiveTable); | ||
| threw = false; | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much cleaner now!
| } | ||
|
|
||
| /** | ||
| * Attempt to load the table and see if any current or past metadata location matches the one we were attempting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the javadoc! It is going to be helpful for folks who touch this code next time.
hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
Outdated
Show resolved
Hide resolved
hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
Outdated
Show resolved
Hide resolved
This is to address the issue found in apache/iceberg#2328
* set read timeout on client This is to address the issue found in apache/iceberg#2328 * fix checkstyle * code review * code review pt 2
In case the Nessie endpoint did not respond or some other network error that makes it impossible to detect whether the Nessie server got the request and, more importantly, get the response. This PR adds a `catch (org.projectnessie.client.http.HttpClientException)` and re-throws it as the new `CommitStateUnknownException`. Also related refactoring of `NessieCatalog.dropTable`/`dropTableInner` Related to apache#2328
In case the Nessie endpoint did not respond or some other network error that makes it impossible to detect whether the Nessie server got the request and, more importantly, get the response. This PR adds a `catch (org.projectnessie.client.http.HttpClientException)` and re-throws it as the new `CommitStateUnknownException`. Related to apache#2328
In case the Nessie endpoint did not respond or some other network error that makes it impossible to detect whether the Nessie server got the request and, more importantly, get the response. This PR adds a `catch (org.projectnessie.client.http.HttpClientException)` and re-throws it as the new `CommitStateUnknownException`. Related to #2328
| * Attempt to load the table and see if any current or past metadata location matches the one we were attempting | ||
| * to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has | ||
| * failed but are not proof that this is the case. Past locations must also be searched on the chance that a second | ||
| * committer was able to successfully commit on top of our commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, I have a question about the past locations check here that when checkCommitStatus() is called, we still holds the metastore lock, so is it possible for another commiter to commit on top of our commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We request an EXCLUSIVE lock before committing. Until we hold the lock, no other committer can get another lock on the same table, so no other committer can commit
Iceberg table properties are the canonical source of truth HMS table properties should be maintained as much as possible to be in sync with the Iceberg table, but it can only happen on a best effort basis This PR makes the following changes: Ensures that all Iceberg table properties are propagated to the HMS table during HiveTableOperations commit All HMS table properties are pushed down to Iceberg as well during table creation (except for metadata location and spec props) Refactors the various property check assertions scattered throughout various test cases into a single property-focused unit test case What is left out and should be done in the future: Push property changes occurring via Hive DDL (ALTER TABLE SET TBLPROPERTIES) down to Iceberg as well. Currently this can't be done reliably because the HiveMetaHook interface only contains a preAlterTable method, but no commitAlterTable method. We'll need to extend this interface and include the change in an upcoming Hive upstream release. Author: Marton Bod <[email protected]> PR: apache/iceberg#2123 Backport Reason: To accomdate(I) for fix apache/iceberg#2328
Raw commit message: Addressing apache/iceberg#2249 Backport Reason: Acommodate (II) for the fix apache/iceberg#2328 Author: Marton Bod <[email protected]>
Raw commit message: Currently, there is no way to call unlock if HiveTableOperations.acquireLock fails at waiting for lock on hive table. This PR aims to try to invoke unlock in the finally block. Backport Reason: Accomodate (III) for the fix apache/iceberg#2328 Author: ZorTsou <[email protected]>
Raw Commit Message: This patch: Introduces a new snapshot summary metric for total-files-size. It was somehow missing up till now, even though it has its companion metrics added-files-size and removed-files-size. Introducing this total metric makes it consistent with the other 'metric groups'. On HiveTableOperations commit, we should populate the HMS statistics using these snapshot metrics. Having these stats populated makes the Hive read query planning significantly faster. In some cases, @pvary's research showed that it led to 10x+ improvement on query compilation times, since in the absence of HMS stats the Hive query planner will recursively list the data files to gather their sizes first before execution. Backport Reason: Accomodate (IV) for the fix apache/iceberg#2328 Author: Marton Bod <[email protected]>
Raw commit message: #2317 - We discovered that Iceberg is currently treating all failures during commit as full commit failures. This can lead to an unstable/corrupt table if the catalog was successfully updated and it was only a network or other error that prevented the client from learning of this. In this state, the client will attempt to clean up files related to the commit while other clients and the table believe that files are successfully added to the table. To fix this we change snapshot producer to only do a cleanup when a true CommitFailureException is thrown and stop our HMSTableOperations from removing metadata.json files when an uncertain exception is thrown. Backport Reason: Bug fix Author: Russell Spitzer <[email protected]>
Iceberg table properties are the canonical source of truth HMS table properties should be maintained as much as possible to be in sync with the Iceberg table, but it can only happen on a best effort basis This PR makes the following changes: Ensures that all Iceberg table properties are propagated to the HMS table during HiveTableOperations commit All HMS table properties are pushed down to Iceberg as well during table creation (except for metadata location and spec props) Refactors the various property check assertions scattered throughout various test cases into a single property-focused unit test case What is left out and should be done in the future: Push property changes occurring via Hive DDL (ALTER TABLE SET TBLPROPERTIES) down to Iceberg as well. Currently this can't be done reliably because the HiveMetaHook interface only contains a preAlterTable method, but no commitAlterTable method. We'll need to extend this interface and include the change in an upcoming Hive upstream release. Author: Marton Bod <[email protected]> PR: apache/iceberg#2123 Backport Reason: To accomdate(I) for fix apache/iceberg#2328
Raw commit message: Addressing apache/iceberg#2249 Backport Reason: Acommodate (II) for the fix apache/iceberg#2328 Author: Marton Bod <[email protected]>
Raw commit message: Currently, there is no way to call unlock if HiveTableOperations.acquireLock fails at waiting for lock on hive table. This PR aims to try to invoke unlock in the finally block. Backport Reason: Accomodate (III) for the fix apache/iceberg#2328 Author: ZorTsou <[email protected]>
Raw Commit Message: This patch: Introduces a new snapshot summary metric for total-files-size. It was somehow missing up till now, even though it has its companion metrics added-files-size and removed-files-size. Introducing this total metric makes it consistent with the other 'metric groups'. On HiveTableOperations commit, we should populate the HMS statistics using these snapshot metrics. Having these stats populated makes the Hive read query planning significantly faster. In some cases, @pvary's research showed that it led to 10x+ improvement on query compilation times, since in the absence of HMS stats the Hive query planner will recursively list the data files to gather their sizes first before execution. Backport Reason: Accomodate (IV) for the fix apache/iceberg#2328 Author: Marton Bod <[email protected]>
Raw commit message: #2317 - We discovered that Iceberg is currently treating all failures during commit as full commit failures. This can lead to an unstable/corrupt table if the catalog was successfully updated and it was only a network or other error that prevented the client from learning of this. In this state, the client will attempt to clean up files related to the commit while other clients and the table believe that files are successfully added to the table. To fix this we change snapshot producer to only do a cleanup when a true CommitFailureException is thrown and stop our HMSTableOperations from removing metadata.json files when an uncertain exception is thrown. Backport Reason: Bug fix Author: Russell Spitzer <[email protected]>
Iceberg table properties are the canonical source of truth HMS table properties should be maintained as much as possible to be in sync with the Iceberg table, but it can only happen on a best effort basis This PR makes the following changes: Ensures that all Iceberg table properties are propagated to the HMS table during HiveTableOperations commit All HMS table properties are pushed down to Iceberg as well during table creation (except for metadata location and spec props) Refactors the various property check assertions scattered throughout various test cases into a single property-focused unit test case What is left out and should be done in the future: Push property changes occurring via Hive DDL (ALTER TABLE SET TBLPROPERTIES) down to Iceberg as well. Currently this can't be done reliably because the HiveMetaHook interface only contains a preAlterTable method, but no commitAlterTable method. We'll need to extend this interface and include the change in an upcoming Hive upstream release. Author: Marton Bod <[email protected]> PR: apache/iceberg#2123 Backport Reason: To accomdate(I) for fix apache/iceberg#2328
Raw commit message: Addressing apache/iceberg#2249 Backport Reason: Acommodate (II) for the fix apache/iceberg#2328 Author: Marton Bod <[email protected]>
Raw commit message: Currently, there is no way to call unlock if HiveTableOperations.acquireLock fails at waiting for lock on hive table. This PR aims to try to invoke unlock in the finally block. Backport Reason: Accomodate (III) for the fix apache/iceberg#2328 Author: ZorTsou <[email protected]>
Raw Commit Message: This patch: Introduces a new snapshot summary metric for total-files-size. It was somehow missing up till now, even though it has its companion metrics added-files-size and removed-files-size. Introducing this total metric makes it consistent with the other 'metric groups'. On HiveTableOperations commit, we should populate the HMS statistics using these snapshot metrics. Having these stats populated makes the Hive read query planning significantly faster. In some cases, @pvary's research showed that it led to 10x+ improvement on query compilation times, since in the absence of HMS stats the Hive query planner will recursively list the data files to gather their sizes first before execution. Backport Reason: Accomodate (IV) for the fix apache/iceberg#2328 Author: Marton Bod <[email protected]>
Raw commit message: #2317 - We discovered that Iceberg is currently treating all failures during commit as full commit failures. This can lead to an unstable/corrupt table if the catalog was successfully updated and it was only a network or other error that prevented the client from learning of this. In this state, the client will attempt to clean up files related to the commit while other clients and the table believe that files are successfully added to the table. To fix this we change snapshot producer to only do a cleanup when a true CommitFailureException is thrown and stop our HMSTableOperations from removing metadata.json files when an uncertain exception is thrown. Backport Reason: Bug fix Author: Russell Spitzer <[email protected]>
#2317 - We discovered that Iceberg is currently treating all failures during commit
as full commit failures. This can lead to an unstable/corrupt table if the
catalog was successfully updated and it was only a network or other error
that prevented the client from learning of this. In this state, the client
will attempt to clean up files related to the commit while other clients and the table believe that files are successfully added to the table.
To fix this we change snapshot producer to only do a cleanup when a true CommitFailureException is thrown and stop our HMSTableOperations from removing metadata.json files when an uncertain exception is thrown.