Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
} finally {
try {
if (commitStatus == CommitStatus.FAILURE) {
if (commitStatus == CommitStatus.FAILURE && !reuseMetadataLocation(newTable, metadata)) {
// if anything went wrong, clean up the uncommitted metadata file
io().deleteFile(newMetadataLocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

String newMetadataLocation = null;
boolean glueTempTableCreated = false;
boolean newTable = base == null;
try {
glueTempTableCreated = createGlueTempTableIfNecessary(base, metadata.location());

boolean newTable = base == null;
newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
lock(newMetadataLocation);
Table glueTable = getGlueTable();
Expand Down Expand Up @@ -190,7 +190,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new CommitStateUnknownException(persistFailure);
}
} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation);
if (!reuseMetadataLocation(newTable, metadata)) {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation);
} else if (lockManager != null) {
lockManager.release(commitLockEntityId, newMetadataLocation);
}
cleanupGlueTempTableIfNecessary(glueTempTableCreated, commitStatus);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,8 @@ public void doRefresh() {
// atomically
@Override
public void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation =
base == null && metadata.metadataFileLocation() != null
? metadata.metadataFileLocation()
: writeNewMetadata(metadata, currentVersion() + 1);
boolean newTable = base == null;
String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
BaseMetastoreOperations.CommitStatus commitStatus =
BaseMetastoreOperations.CommitStatus.FAILURE;
try {
Expand Down Expand Up @@ -110,7 +108,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
try {
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
LOG.warn("Failed to commit updates to table {}", tableName());
io().deleteFile(newMetadataLocation);
if (!reuseMetadataLocation(newTable, metadata)) {
io().deleteFile(newMetadataLocation);
}
}
} catch (RuntimeException e) {
LOG.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.BaseMetastoreOperations.CommitStatus;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand Down Expand Up @@ -146,11 +145,15 @@ protected void disableRefresh() {
}

protected String writeNewMetadataIfRequired(boolean newTable, TableMetadata metadata) {
return newTable && metadata.metadataFileLocation() != null
return reuseMetadataLocation(newTable, metadata)
? metadata.metadataFileLocation()
: writeNewMetadata(metadata, currentVersion() + 1);
}

protected boolean reuseMetadataLocation(boolean newTable, TableMetadata metadata) {
return newTable && metadata.metadataFileLocation() != null;
}

protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion);
OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new CommitFailedException(e);

} finally {
HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock);
if (!reuseMetadataLocation(newTable, metadata)) {
HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock);
} else {
lock.unlock();
}
}

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -283,6 +285,39 @@ public void testInitializeCatalogWithProperties() {
.isEqualTo("/user/hive/testwarehouse");
}

@Test
public void testRegisterTableFailsOnCommit() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
TableIdentifier identifier2 = TableIdentifier.of("b", "t2");

if (requiresNamespaceCreate()) {
catalog.createNamespace(identifier.namespace());
}

catalog.createTable(identifier, SCHEMA);
Table sourceTable = catalog.loadTable(identifier);
TableOperations ops = ((BaseTable) sourceTable).operations();
String metadataLocation = ops.current().metadataFileLocation();

assertThatThrownBy(() -> catalog.registerTable(identifier2, metadataLocation))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Invalid Hive object for");

assertThatThrownBy(() -> catalog.loadTable(identifier2))
.isInstanceOf(NoSuchTableException.class)
.hasMessageContaining("Table does not exist:");

// The failed register table operation do not affect the source table
Table sourceTableAfter = catalog.loadTable(identifier);
assertThat(((BaseTable) sourceTableAfter).operations().current().metadataFileLocation())
.isEqualTo(metadataLocation);
assertThat(catalog.dropTable(identifier)).isTrue();
}

protected Table failWhenRegisterTable(TableIdentifier identifier, String metadataFileLocation) {
throw new UnsupportedOperationException("Registering tables is not supported");
}

@Test
public void testCreateTableTxnBuilder() throws Exception {
Schema schema = getTestSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE)
.orElse(ex);
} finally {
if (failure) {
if (failure && !reuseMetadataLocation(newTable, metadata)) {
io().deleteFile(newMetadataLocation);
}
}
Expand Down