From fddd4bd0f830de93874940c2505d067b0391bd00 Mon Sep 17 00:00:00 2001 From: dongwang Date: Wed, 17 Sep 2025 14:25:57 +0800 Subject: [PATCH] Core: Avoid removing reused metadata file when registerTable fails --- .../aws/dynamodb/DynamoDbTableOperations.java | 2 +- .../iceberg/aws/glue/GlueTableOperations.java | 8 +++-- .../gcp/bigquery/BigQueryTableOperations.java | 10 +++--- .../iceberg/BaseMetastoreTableOperations.java | 7 ++-- .../iceberg/hive/HiveTableOperations.java | 6 +++- .../apache/iceberg/hive/TestHiveCatalog.java | 35 +++++++++++++++++++ .../iceberg/nessie/NessieTableOperations.java | 2 +- 7 files changed, 58 insertions(+), 12 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java index a1a330b11889..764ca4315562 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java @@ -152,7 +152,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } } finally { try { - if (commitStatus == CommitStatus.FAILURE) { + if (commitStatus == CommitStatus.FAILURE && !reuseMetadataLocation(newTable, metadata)) { // if anything went wrong, clean up the uncommitted metadata file io().deleteFile(newMetadataLocation); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java index 4c63dfdb2a70..d159c53f5206 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java @@ -148,10 +148,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = null; boolean glueTempTableCreated = false; + boolean newTable = base == null; try { glueTempTableCreated = createGlueTempTableIfNecessary(base, metadata.location()); - boolean newTable = base == null; newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); lock(newMetadataLocation); Table glueTable = getGlueTable(); @@ -190,7 +190,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw new CommitStateUnknownException(persistFailure); } } finally { - cleanupMetadataAndUnlock(commitStatus, newMetadataLocation); + if (!reuseMetadataLocation(newTable, metadata)) { + cleanupMetadataAndUnlock(commitStatus, newMetadataLocation); + } else if (lockManager != null) { + lockManager.release(commitLockEntityId, newMetadataLocation); + } cleanupGlueTempTableIfNecessary(glueTempTableCreated, commitStatus); } } diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java index d57aab50530a..317bd7b78289 100644 --- a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java +++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java @@ -77,10 +77,8 @@ public void doRefresh() { // atomically @Override public void doCommit(TableMetadata base, TableMetadata metadata) { - String newMetadataLocation = - base == null && metadata.metadataFileLocation() != null - ? metadata.metadataFileLocation() - : writeNewMetadata(metadata, currentVersion() + 1); + boolean newTable = base == null; + String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); BaseMetastoreOperations.CommitStatus commitStatus = BaseMetastoreOperations.CommitStatus.FAILURE; try { @@ -110,7 +108,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { try { if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) { LOG.warn("Failed to commit updates to table {}", tableName()); - io().deleteFile(newMetadataLocation); + if (!reuseMetadataLocation(newTable, metadata)) { + io().deleteFile(newMetadataLocation); + } } } catch (RuntimeException e) { LOG.error( diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 9fa52d52ea5d..42a6bc7dfa5d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; -import org.apache.iceberg.BaseMetastoreOperations.CommitStatus; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -146,11 +145,15 @@ protected void disableRefresh() { } protected String writeNewMetadataIfRequired(boolean newTable, TableMetadata metadata) { - return newTable && metadata.metadataFileLocation() != null + return reuseMetadataLocation(newTable, metadata) ? metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1); } + protected boolean reuseMetadataLocation(boolean newTable, TableMetadata metadata) { + return newTable && metadata.metadataFileLocation() != null; + } + protected String writeNewMetadata(TableMetadata metadata, int newVersion) { String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion); OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index f0183fc5c578..80187f5fff2d 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -289,7 +289,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw new CommitFailedException(e); } finally { - HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock); + if (!reuseMetadataLocation(newTable, metadata)) { + HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock); + } else { + lock.unlock(); + } } LOG.info( diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 5c3907670c52..8dca4db62ec4 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -73,6 +74,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -283,6 +285,39 @@ public void testInitializeCatalogWithProperties() { .isEqualTo("/user/hive/testwarehouse"); } + @Test + public void testRegisterTableFailsOnCommit() { + TableIdentifier identifier = TableIdentifier.of("a", "t1"); + TableIdentifier identifier2 = TableIdentifier.of("b", "t2"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(identifier.namespace()); + } + + catalog.createTable(identifier, SCHEMA); + Table sourceTable = catalog.loadTable(identifier); + TableOperations ops = ((BaseTable) sourceTable).operations(); + String metadataLocation = ops.current().metadataFileLocation(); + + assertThatThrownBy(() -> catalog.registerTable(identifier2, metadataLocation)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Invalid Hive object for"); + + assertThatThrownBy(() -> catalog.loadTable(identifier2)) + .isInstanceOf(NoSuchTableException.class) + .hasMessageContaining("Table does not exist:"); + + // The failed register table operation do not affect the source table + Table sourceTableAfter = catalog.loadTable(identifier); + assertThat(((BaseTable) sourceTableAfter).operations().current().metadataFileLocation()) + .isEqualTo(metadataLocation); + assertThat(catalog.dropTable(identifier)).isTrue(); + } + + protected Table failWhenRegisterTable(TableIdentifier identifier, String metadataFileLocation) { + throw new UnsupportedOperationException("Registering tables is not supported"); + } + @Test public void testCreateTableTxnBuilder() throws Exception { Schema schema = getTestSchema(); diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index 6ce3e1b763ca..0721a20f6778 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -132,7 +132,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE) .orElse(ex); } finally { - if (failure) { + if (failure && !reuseMetadataLocation(newTable, metadata)) { io().deleteFile(newMetadataLocation); } }