diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 10815d251d79..62696048fc2d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -171,8 +171,8 @@ public final class IcebergUtil public static final String METADATA_FOLDER_NAME = "metadata"; public static final String METADATA_FILE_EXTENSION = ".metadata.json"; + public static final String TRINO_QUERY_ID_NAME = "trino_query_id"; private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*"); - static final String TRINO_QUERY_ID_NAME = "trino_query_id"; // Metadata file name examples // - 00001-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json // - 00001-409702ba-4735-4645-8f14-09537cc0b2c8.gz.metadata.json (https://github.com/apache/iceberg/blob/ab398a0d5ff195f763f8c7a4358ac98fa38a8de7/core/src/main/java/org/apache/iceberg/TableMetadataParser.java#L141) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java index 9769819b3d3b..c9458be31721 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java @@ -40,6 +40,7 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; +import static io.trino.plugin.iceberg.IcebergUtil.TRINO_QUERY_ID_NAME; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -124,12 +125,32 @@ protected final void commitNewTable(TableMetadata metadata) try { metastore.createTable(table, privileges); } - catch (SchemaNotFoundException - | TableAlreadyExistsException e) { + catch (SchemaNotFoundException e) { // clean up metadata files corresponding to the current transaction fileIo.deleteFile(newMetadataLocation); throw e; } + catch (TableAlreadyExistsException e) { + // Ignore TableAlreadyExistsException when table looks like created by us. + // This may happen when an actually successful metastore create call is retried + // e.g. because of a timeout on our side. + refreshFromMetadataLocation(getRefreshedLocation(true)); + if (!isCreatedBy(this.currentMetadata, session.getQueryId())) { + fileIo.deleteFile(newMetadataLocation); + throw e; + } + } + } + + public static boolean isCreatedBy(TableMetadata existingTableMetadata, String queryId) + { + Optional tableQueryId = getQueryId(existingTableMetadata); + return tableQueryId.isPresent() && tableQueryId.get().equals(queryId); + } + + private static Optional getQueryId(TableMetadata tableMetadata) + { + return Optional.ofNullable(tableMetadata.currentSnapshot().summary().get(TRINO_QUERY_ID_NAME)); } protected Table.Builder updateMetastoreTable(Table.Builder builder, TableMetadata metadata, String metadataLocation, Optional previousMetadataLocation) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 394568b4e67f..c3bcde1a1b73 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -22,6 +22,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HiveSchemaProperties; +import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; @@ -331,11 +332,34 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN .withStorage(storage -> storage.setStorageFormat(ICEBERG_METASTORE_STORAGE_FORMAT)) // This is a must-have property for the EXTERNAL_TABLE table type .setParameter("EXTERNAL", "TRUE") + .setParameter(PRESTO_QUERY_ID_NAME, session.getQueryId()) .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH)) .setParameter(METADATA_LOCATION_PROP, tableMetadata.metadataFileLocation()); PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); - metastore.createTable(builder.build(), privileges); + try { + metastore.createTable(builder.build(), privileges); + } + catch (TableAlreadyExistsException e) { + // Ignore TableAlreadyExistsException when table looks like created by us. + // This may happen when an actually successful metastore create call is retried + // e.g. because of a timeout on our side. + Optional existingTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), session.getQueryId())) { + throw e; + } + } + } + + public static boolean isCreatedBy(io.trino.plugin.hive.metastore.Table table, String queryId) + { + Optional tableQueryId = getQueryId(table); + return tableQueryId.isPresent() && tableQueryId.get().equals(queryId); + } + + private static Optional getQueryId(io.trino.plugin.hive.metastore.Table table) + { + return Optional.ofNullable(table.getParameters().get(PRESTO_QUERY_ID_NAME)); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCreateTableInternalRetry.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCreateTableInternalRetry.java new file mode 100644 index 000000000000..fce50616513a --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCreateTableInternalRetry.java @@ -0,0 +1,155 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.TableAlreadyExistsException; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.io.File; +import java.io.IOException; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.common.base.Verify.verify; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestIcebergCreateTableInternalRetry + extends AbstractTestQueryFramework +{ + private static final String SCHEMA_NAME = "iceberg_internal_retry_schema"; + private File metastoreDir; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session session = testSessionBuilder() + .setCatalog(ICEBERG_CATALOG) + .setSchema(SCHEMA_NAME) + .build(); + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build(); + metastoreDir = queryRunner.getCoordinator().getBaseDataDir().resolve("test_iceberg_table_smoke_test").toFile(); + this.metastoreDir.deleteOnExit(); + HiveMetastore metastore = new FileHiveMetastore( + new NodeVersion("testversion"), + HDFS_FILE_SYSTEM_FACTORY, + new HiveMetastoreConfig().isHideDeltaLakeTables(), + new FileHiveMetastoreConfig() + .setCatalogDirectory(metastoreDir.toURI().toString()) + .setMetastoreUser("test")) + { + @Override + public synchronized void createTable(Table table, PrincipalPrivileges principalPrivileges) + { + if (table.getTableName().startsWith("test_different_session")) { + // By modifying query id test simulates that table was created from different session. + table = Table.builder(table) + .setParameters(ImmutableMap.of(PRESTO_QUERY_ID_NAME, "new_query_id")) + .build(); + } + // Simulate retry mechanism with timeout failure of ThriftHiveMetastore. + // 1. createTable correctly create table but timeout is triggered + // 2. Retry to createTable throws TableAlreadyExistsException + super.createTable(table, principalPrivileges); + throw new TableAlreadyExistsException(table.getSchemaTableName()); + } + }; + + queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE)); + queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.of("iceberg.register-table-procedure.enabled", "true")); + queryRunner.execute("CREATE SCHEMA " + SCHEMA_NAME); + return queryRunner; + } + + @AfterAll + public void tearDown() + throws IOException + { + deleteRecursively(metastoreDir.toPath(), ALLOW_INSECURE); + } + + @Test + public void testCreateTableInternalRetry() + { + assertQuerySucceeds("CREATE TABLE test_ct_internal_retry(a int)"); + assertQuery("SHOW TABLES LIKE 'test_ct_internal_retry'", "VALUES 'test_ct_internal_retry'"); + } + + @Test + public void testCreateTableAsSelectInternalRetry() + { + assertQuerySucceeds("CREATE TABLE test_ctas_internal_retry AS SELECT 1 a"); + assertQuery("SHOW TABLES LIKE 'test_ctas_internal_retry'", "VALUES 'test_ctas_internal_retry'"); + } + + @Test + public void testRegisterTableInternalRetry() + { + assertQuerySucceeds("CREATE TABLE test_register_table_internal_retry AS SELECT 1 a"); + String tableLocation = getTableLocation("test_register_table_internal_retry"); + assertUpdate("CALL system.unregister_table(current_schema, 'test_register_table_internal_retry')"); + + assertQuerySucceeds("CALL system.register_table(current_schema, 'test_register_table_internal_retry', '" + tableLocation + "')"); + assertQuery("SHOW TABLES LIKE 'test_register_table_internal_retry'", "VALUES 'test_register_table_internal_retry'"); + } + + @Test + public void testRegisterTableFailureWithDifferentSession() + { + assertQuerySucceeds("CREATE TABLE test_register_table_failure AS SELECT 1 a"); + String tableLocation = getTableLocation("test_register_table_failure"); + assertUpdate("CALL system.unregister_table(current_schema, 'test_register_table_failure')"); + + assertQueryFails( + "CALL system.register_table(current_schema, 'test_different_session_register_table_failure', '" + tableLocation + "')", + "Table already exists: .*"); + assertQuery("SHOW TABLES LIKE 'test_different_session_register_table_failure'", "VALUES 'test_different_session_register_table_failure'"); + } + + private String getTableLocation(String tableName) + { + Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL); + Matcher m = locationPattern.matcher((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()); + if (m.find()) { + String location = m.group(1); + verify(!m.find(), "Unexpected second match"); + return location; + } + throw new IllegalStateException("Location not found in SHOW CREATE TABLE result"); + } +}