diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java index 7872c4329dc4..8a37f1a6d9b9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java @@ -100,6 +100,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; @@ -582,7 +583,7 @@ public synchronized void replaceTable(String databaseName, String tableName, Tab } if (isIcebergTable(table) && !Objects.equals(table.getParameters().get("metadata_location"), newTable.getParameters().get("previous_metadata_location"))) { - throw new TrinoException(HIVE_METASTORE_ERROR, "Cannot update Iceberg table: supplied previous location does not match current location"); + throw new TrinoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Cannot update Iceberg table: supplied previous location does not match current location"); } Path tableMetadataDirectory = getTableMetadataDirectory(table); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java index fc7a47eb328b..e935bf962c39 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java @@ -18,9 +18,11 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.iceberg.catalog.hms.AbstractMetastoreTableOperations; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.io.FileIO; import javax.annotation.concurrent.NotThreadSafe; @@ -28,6 +30,7 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; @@ -76,7 +79,12 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) metastore.replaceTable(database, tableName, table, privileges); } catch (RuntimeException e) { - throw new CommitFailedException(e, "Failed to commit transaction to FileHiveMetastore"); + if (e instanceof TrinoException trinoException && + trinoException.getErrorCode() == HIVE_CONCURRENT_MODIFICATION_DETECTED.toErrorCode()) { + // CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry + throw new CommitFailedException(e, "Failed to replace table due to concurrent updates: %s.%s", database, tableName); + } + throw new CommitStateUnknownException(e); } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java index c5ba96df15af..d3068d6f0387 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java @@ -23,6 +23,7 @@ import io.trino.spi.connector.TableNotFoundException; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.io.FileIO; import javax.annotation.concurrent.NotThreadSafe; @@ -90,8 +91,9 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) metastore.replaceTable(database, tableName, table, privileges); } catch (RuntimeException e) { - // CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry - throw new CommitFailedException(e, "Failed to commit to table %s.%s", database, tableName); + // Cannot determine whether the `replaceTable` operation was successful, + // regardless of the exception thrown (e.g. : timeout exception) or it actually failed + throw new CommitStateUnknownException(e); } } finally { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreTableOperationsInsertFailure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreTableOperationsInsertFailure.java new file mode 100644 index 000000000000..e7cc6c5c1f76 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreTableOperationsInsertFailure.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.file; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.metastore.Database; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.plugin.iceberg.TestingIcebergConnectorFactory; +import io.trino.spi.security.PrincipalType; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.LocalQueryRunner; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Test(singleThreaded = true) +public class TestIcebergFileMetastoreTableOperationsInsertFailure + extends AbstractTestQueryFramework +{ + private static final String ICEBERG_CATALOG = "iceberg"; + private static final String SCHEMA_NAME = "test_schema"; + private File baseDir; + + @Override + protected LocalQueryRunner createQueryRunner() + { + Session session = testSessionBuilder() + .setCatalog(ICEBERG_CATALOG) + .setSchema(SCHEMA_NAME) + .build(); + + try { + baseDir = Files.createTempDirectory(null).toFile(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + + HiveMetastore metastore = new FileHiveMetastore( + new NodeVersion("testversion"), + HDFS_ENVIRONMENT, + new HiveMetastoreConfig().isHideDeltaLakeTables(), + new FileHiveMetastoreConfig() + .setCatalogDirectory(baseDir.toURI().toString()) + .setMetastoreUser("test")) + { + @Override + public synchronized void replaceTable(String databaseName, String tableName, Table newTable, PrincipalPrivileges principalPrivileges) + { + super.replaceTable(databaseName, tableName, newTable, principalPrivileges); + throw new RuntimeException("Test-simulated metastore timeout exception"); + } + }; + LocalQueryRunner queryRunner = LocalQueryRunner.create(session); + + queryRunner.createCatalog( + ICEBERG_CATALOG, + new TestingIcebergConnectorFactory(Optional.of(metastore), Optional.empty(), EMPTY_MODULE), + ImmutableMap.of()); + + Database database = Database.builder() + .setDatabaseName(SCHEMA_NAME) + .setOwnerName(Optional.of("public")) + .setOwnerType(Optional.of(PrincipalType.ROLE)) + .build(); + metastore.createDatabase(database); + + return queryRunner; + } + + @AfterClass(alwaysRun = true) + public void cleanup() + throws Exception + { + if (baseDir != null) { + deleteRecursively(baseDir.toPath(), ALLOW_INSECURE); + } + } + + @Test + public void testInsertFailureDoesNotCorruptTheTableMetadata() + { + String tableName = "test_insert_failure"; + + getQueryRunner().execute(format("CREATE TABLE %s (a_varchar) AS VALUES ('Trino')", tableName)); + assertThatThrownBy(() -> getQueryRunner().execute("INSERT INTO " + tableName + " VALUES 'rocks'")) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageContaining("Test-simulated metastore timeout exception"); + assertQuery("SELECT * FROM " + tableName, "VALUES 'Trino', 'rocks'"); + } +}