diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 731b1282172c..267149d3f000 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -546,6 +546,7 @@ **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java **/TestIcebergGlueCatalogMaterializedView.java + **/TestIcebergGlueCreateTableFailure.java **/TestIcebergGlueTableOperationsInsertFailure.java **/TestIcebergGlueCatalogSkipArchive.java **/TestIcebergGcsConnectorSmokeTest.java @@ -609,6 +610,7 @@ **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java **/TestIcebergGlueCatalogMaterializedView.java + **/TestIcebergGlueCreateTableFailure.java **/TestIcebergGlueTableOperationsInsertFailure.java **/TestIcebergGlueCatalogSkipArchive.java **/TestIcebergGcsConnectorSmokeTest.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java index e9a70be488d2..3201a9963c87 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.glue; import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.AlreadyExistsException; import com.amazonaws.services.glue.model.ConcurrentModificationException; import com.amazonaws.services.glue.model.CreateTableRequest; import com.amazonaws.services.glue.model.EntityNotFoundException; @@ -109,7 +110,17 @@ protected void commitNewTable(TableMetadata metadata) CreateTableRequest createTableRequest = new CreateTableRequest() .withDatabaseName(database) .withTableInput(tableInput); - stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest)); + try { + stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest)); + } + catch (AlreadyExistsException + | EntityNotFoundException + | InvalidInputException + | ResourceNumberLimitExceededException e) { + // clean up metadata files corresponding to the current transaction + fileIo.deleteFile(newMetadataLocation); + throw e; + } shouldRefresh = true; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java index 03cd0e721744..beb99d74a821 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg.catalog.hms; +import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.Table; @@ -21,6 +22,7 @@ import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.TableNotFoundException; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.io.FileIO; @@ -113,7 +115,15 @@ protected final void commitNewTable(TableMetadata metadata) Table table = builder.build(); PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); - metastore.createTable(table, privileges); + try { + metastore.createTable(table, privileges); + } + catch (SchemaNotFoundException + | TableAlreadyExistsException e) { + // clean up metadata files corresponding to the current transaction + fileIo.deleteFile(newMetadataLocation); + throw e; + } } protected Table getTable() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreCreateTableFailure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreCreateTableFailure.java new file mode 100644 index 000000000000..a4e0927cebc3 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreCreateTableFailure.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.file; + +import io.trino.Session; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.plugin.iceberg.TestingIcebergPlugin; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Test(singleThreaded = true) // testException is a shared mutable state +public class TestIcebergFileMetastoreCreateTableFailure + extends AbstractTestQueryFramework +{ + private static final String ICEBERG_CATALOG = "iceberg"; + private static final String SCHEMA_NAME = "test_schema"; + + private Path dataDirectory; + private HiveMetastore metastore; + private final AtomicReference testException = new AtomicReference<>(); + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + this.dataDirectory = Files.createTempDirectory("test_iceberg_create_table_failure"); + // Using FileHiveMetastore as approximation of HMS + this.metastore = new FileHiveMetastore( + new NodeVersion("testversion"), + HDFS_ENVIRONMENT, + new HiveMetastoreConfig().isHideDeltaLakeTables(), + new FileHiveMetastoreConfig() + .setCatalogDirectory(dataDirectory.toString())) + { + @Override + public synchronized void createTable(Table table, PrincipalPrivileges principalPrivileges) + { + throw testException.get(); + } + }; + + Session session = testSessionBuilder() + .setCatalog(ICEBERG_CATALOG) + .setSchema(SCHEMA_NAME) + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build(); + queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE)); + queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg"); + queryRunner.execute("CREATE SCHEMA " + SCHEMA_NAME); + + return queryRunner; + } + + @AfterClass(alwaysRun = true) + public void cleanup() + throws Exception + { + if (metastore != null) { + metastore.dropDatabase(SCHEMA_NAME, true); + } + if (dataDirectory != null) { + deleteRecursively(dataDirectory, ALLOW_INSECURE); + } + } + + @Test + public void testCreateTableFailureMetadataCleanedUp() + { + String exceptionMessage = "Test-simulated metastore schema not found exception"; + testException.set(new SchemaNotFoundException("simulated_test_schema", exceptionMessage)); + testCreateTableFailure(exceptionMessage, false); + } + + @Test + public void testCreateTableFailureMetadataNotCleanedUp() + { + String exceptionMessage = "Test-simulated metastore runtime exception"; + testException.set(new RuntimeException(exceptionMessage)); + testCreateTableFailure(exceptionMessage, true); + } + + protected void testCreateTableFailure(String expectedExceptionMessage, boolean shouldMetadataFileExist) + { + String tableName = "test_create_failure_" + randomNameSuffix(); + String tableLocation = Path.of(dataDirectory.toString(), tableName).toString(); + assertThatThrownBy(() -> getQueryRunner().execute("CREATE TABLE " + tableName + " (a varchar) WITH (location = '" + tableLocation + "')")) + .hasMessageContaining(expectedExceptionMessage); + + Path metadataDirectory = Path.of(tableLocation, "metadata"); + if (shouldMetadataFileExist) { + assertThat(metadataDirectory).as("Metadata file should exist").isDirectoryContaining("glob:**.metadata.json"); + } + else { + assertThat(metadataDirectory).as("Metadata file should not exist").isEmptyDirectory(); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCreateTableFailure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCreateTableFailure.java new file mode 100644 index 000000000000..6855bce4d8f5 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCreateTableFailure.java @@ -0,0 +1,186 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.InvalidInputException; +import com.amazonaws.services.glue.model.OperationTimeoutException; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.Session; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; +import io.trino.plugin.hive.metastore.Database; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore; +import io.trino.plugin.iceberg.TestingIcebergConnectorFactory; +import io.trino.spi.security.PrincipalType; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.LocalQueryRunner; +import io.trino.testing.TestingConnectorSession; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.common.reflect.Reflection.newProxy; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createTestingGlueHiveMetastore; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/* + * The test currently uses AWS Default Credential Provider Chain, + * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default + * on ways to set your AWS credentials which will be needed to run this test. + */ +@Test(singleThreaded = true) // testException is a shared mutable state +public class TestIcebergGlueCreateTableFailure + extends AbstractTestQueryFramework +{ + private static final Logger LOG = Logger.get(TestIcebergGlueCreateTableFailure.class); + + private static final String ICEBERG_CATALOG = "iceberg"; + + private final String schemaName = "test_iceberg_glue_" + randomNameSuffix(); + + private Path dataDirectory; + private TrinoFileSystem fileSystem; + private GlueHiveMetastore glueHiveMetastore; + private final AtomicReference testException = new AtomicReference<>(); + + @Override + protected LocalQueryRunner createQueryRunner() + throws Exception + { + Session session = testSessionBuilder() + .setCatalog(ICEBERG_CATALOG) + .setSchema(schemaName) + .build(); + LocalQueryRunner queryRunner = LocalQueryRunner.create(session); + + AWSGlueAsyncAdapterProvider awsGlueAsyncAdapterProvider = delegate -> newProxy(AWSGlueAsync.class, (proxy, method, methodArgs) -> { + Object result; + if (method.getName().equals("createTable")) { + throw testException.get(); + } + try { + result = method.invoke(delegate, methodArgs); + } + catch (InvocationTargetException e) { + throw e.getCause(); + } + return result; + }); + + queryRunner.createCatalog( + ICEBERG_CATALOG, + new TestingIcebergConnectorFactory(Optional.of(new TestingIcebergGlueCatalogModule(awsGlueAsyncAdapterProvider)), Optional.empty(), EMPTY_MODULE), + ImmutableMap.of()); + + dataDirectory = Files.createTempDirectory("test_iceberg_create_table_failure"); + dataDirectory.toFile().deleteOnExit(); + + glueHiveMetastore = createTestingGlueHiveMetastore(dataDirectory.toString()); + fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT).create(TestingConnectorSession.SESSION); + + Database database = Database.builder() + .setDatabaseName(schemaName) + .setOwnerName(Optional.of("public")) + .setOwnerType(Optional.of(PrincipalType.ROLE)) + .setLocation(Optional.of(dataDirectory.toString())) + .build(); + glueHiveMetastore.createDatabase(database); + + return queryRunner; + } + + @AfterClass(alwaysRun = true) + public void cleanup() + throws IOException + { + try { + if (glueHiveMetastore != null) { + glueHiveMetastore.dropDatabase(schemaName, false); + } + if (dataDirectory != null) { + deleteRecursively(dataDirectory, ALLOW_INSECURE); + } + } + catch (Exception e) { + LOG.error(e, "Failed to clean up Glue database: %s", schemaName); + } + } + + @Test + public void testCreateTableFailureMetadataCleanedUp() + throws Exception + { + final String exceptionMessage = "Test-simulated metastore invalid input exception"; + testException.set(new InvalidInputException(exceptionMessage)); + testCreateTableFailure(exceptionMessage, false); + } + + @Test + public void testCreateTableFailureMetadataNotCleanedUp() + throws Exception + { + final String exceptionMessage = "Test-simulated metastore operation timeout exception"; + testException.set(new OperationTimeoutException(exceptionMessage)); + testCreateTableFailure(exceptionMessage, true); + } + + private void testCreateTableFailure(String expectedExceptionMessage, boolean shouldMetadataFileExist) + throws Exception + { + String tableName = "test_create_failure_" + randomNameSuffix(); + assertThatThrownBy(() -> getQueryRunner().execute("CREATE TABLE " + tableName + " (a_varchar) AS VALUES ('Trino')")) + .hasMessageContaining(expectedExceptionMessage); + + assertMetadataLocation(tableName, shouldMetadataFileExist); + } + + protected void assertMetadataLocation(String tableName, boolean shouldMetadataFileExist) + throws Exception + { + FileIterator fileIterator = fileSystem.listFiles(dataDirectory.toString()); + String tableLocationPrefix = Path.of(dataDirectory.toString(), tableName).toString(); + boolean metadataFileFound = false; + while (fileIterator.hasNext()) { + FileEntry fileEntry = fileIterator.next(); + String path = fileEntry.path(); + if (path.startsWith(tableLocationPrefix) && path.endsWith(".metadata.json")) { + metadataFileFound = true; + break; + } + } + if (shouldMetadataFileExist) { + assertThat(metadataFileFound).as("Metadata file should exist").isTrue(); + } + else { + assertThat(metadataFileFound).as("Metadata file should not exist").isFalse(); + } + } +}