diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 00059215dbea..48f897d73f75 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -112,6 +112,11 @@ units + + com.amazonaws + aws-java-sdk-core + + com.amazonaws aws-java-sdk-glue @@ -471,6 +476,7 @@ **/TestDeltaLakeRenameToWithGlueMetastore.java **/TestDeltaLakeRegisterTableProcedureWithGlue.java **/TestDeltaLakeViewsGlueMetastore.java + **/TestDeltaLakeConcurrentModificationGlueMetastore.java **/TestDeltaLakeGcsConnectorSmokeTest.java **/Test*FailureRecoveryTest.java @@ -500,6 +506,7 @@ **/TestDeltaLakeRenameToWithGlueMetastore.java **/TestDeltaLakeRegisterTableProcedureWithGlue.java **/TestDeltaLakeViewsGlueMetastore.java + **/TestDeltaLakeConcurrentModificationGlueMetastore.java diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java new file mode 100644 index 000000000000..794070f2de66 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.metastore.glue; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.ConcurrentModificationException; +import io.trino.Session; +import io.trino.plugin.deltalake.TestingDeltaLakePlugin; +import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; +import io.trino.plugin.hive.metastore.glue.DefaultGlueColumnStatisticsProviderFactory; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.spi.TrinoException; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.common.reflect.Reflection.newProxy; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.testng.Assert.assertFalse; + +@Test(singleThreaded = true) +public class TestDeltaLakeConcurrentModificationGlueMetastore + extends AbstractTestQueryFramework +{ + private static final String CATALOG_NAME = "test_delta_lake_concurrent"; + private static final String SCHEMA = "test_delta_lake_glue_concurrent_" + randomNameSuffix(); + private String dataDirectory; + private GlueHiveMetastore metastore; + private final AtomicBoolean failNextGlueDeleteTableCall = new AtomicBoolean(false); + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session deltaLakeSession = testSessionBuilder() + .setCatalog(CATALOG_NAME) + .setSchema(SCHEMA) + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build(); + + dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("data_delta_concurrent").toString(); + GlueMetastoreStats stats = new GlueMetastoreStats(); + GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() + .setDefaultWarehouseDir(dataDirectory); + + AWSGlueAsync glueClient = createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()); + AWSGlueAsync proxiedGlueClient = newProxy(AWSGlueAsync.class, (proxy, method, args) -> { + Object result; + try { + if (method.getName().equals("deleteTable") && failNextGlueDeleteTableCall.get()) { + // Simulate concurrent modifications on the table that is about to be dropped + failNextGlueDeleteTableCall.set(false); + throw new TrinoException(HIVE_METASTORE_ERROR, new ConcurrentModificationException("Test-simulated metastore concurrent modification exception")); + } + result = method.invoke(glueClient, args); + } + catch (InvocationTargetException e) { + throw e.getCause(); + } + return result; + }); + + metastore = new GlueHiveMetastore( + HDFS_ENVIRONMENT, + glueConfig, + directExecutor(), + new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()), + proxiedGlueClient, + stats, + table -> true); + + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE)); + queryRunner.createCatalog(CATALOG_NAME, "delta_lake"); + queryRunner.execute("CREATE SCHEMA " + SCHEMA); + return queryRunner; + } + + @Test + public void testDropTableWithConcurrentModifications() + { + String tableName = "test_glue_table_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS data", 1); + + failNextGlueDeleteTableCall.set(true); + assertUpdate("DROP TABLE " + tableName); + assertFalse(getQueryRunner().tableExists(getSession(), tableName)); + } + + @AfterClass(alwaysRun = true) + public void cleanup() + throws IOException + { + if (metastore != null) { + metastore.dropDatabase(SCHEMA, false); + deleteRecursively(Path.of(dataDirectory), ALLOW_INSECURE); + } + } +} diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index c32c87ee5f47..a3921a98aeb5 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -230,6 +230,11 @@ rubix-presto-shaded + + dev.failsafe + failsafe + + it.unimi.dsi fastutil @@ -425,12 +430,6 @@ test - - dev.failsafe - failsafe - test - - io.minio minio diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueClientUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueClientUtil.java new file mode 100644 index 000000000000..4d010d88b6e0 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueClientUtil.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.glue; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.metrics.RequestMetricCollector; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.google.common.collect.ImmutableList; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.hive.aws.AwsCurrentRegionHolder.getCurrentRegionFromEC2Metadata; + +public final class GlueClientUtil +{ + private GlueClientUtil() {} + + public static AWSGlueAsync createAsyncGlueClient( + GlueHiveMetastoreConfig config, + AWSCredentialsProvider credentialsProvider, + Optional requestHandler, + RequestMetricCollector metricsCollector) + { + ClientConfiguration clientConfig = new ClientConfiguration() + .withMaxConnections(config.getMaxGlueConnections()) + .withMaxErrorRetry(config.getMaxGlueErrorRetries()); + AWSGlueAsyncClientBuilder asyncGlueClientBuilder = AWSGlueAsyncClientBuilder.standard() + .withMetricsCollector(metricsCollector) + .withClientConfiguration(clientConfig); + + ImmutableList.Builder requestHandlers = ImmutableList.builder(); + requestHandler.ifPresent(requestHandlers::add); + config.getCatalogId().ifPresent(catalogId -> requestHandlers.add(new GlueCatalogIdRequestHandler(catalogId))); + asyncGlueClientBuilder.setRequestHandlers(requestHandlers.build().toArray(RequestHandler2[]::new)); + + if (config.getGlueEndpointUrl().isPresent()) { + checkArgument(config.getGlueRegion().isPresent(), "Glue region must be set when Glue endpoint URL is set"); + asyncGlueClientBuilder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + config.getGlueEndpointUrl().get(), + config.getGlueRegion().get())); + } + else if (config.getGlueRegion().isPresent()) { + asyncGlueClientBuilder.setRegion(config.getGlueRegion().get()); + } + else if (config.getPinGlueClientToCurrentRegion()) { + asyncGlueClientBuilder.setRegion(getCurrentRegionFromEC2Metadata().getName()); + } + + asyncGlueClientBuilder.setCredentials(credentialsProvider); + + return asyncGlueClientBuilder.build(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index 3200d3dd50e5..4164368fbca8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -15,15 +15,9 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonWebServiceRequest; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.handlers.AsyncHandler; -import com.amazonaws.handlers.RequestHandler2; -import com.amazonaws.metrics.RequestMetricCollector; import com.amazonaws.services.glue.AWSGlueAsync; -import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; import com.amazonaws.services.glue.model.AccessDeniedException; import com.amazonaws.services.glue.model.AlreadyExistsException; import com.amazonaws.services.glue.model.BatchCreatePartitionRequest; @@ -33,6 +27,7 @@ import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest; import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry; import com.amazonaws.services.glue.model.BatchUpdatePartitionResult; +import com.amazonaws.services.glue.model.ConcurrentModificationException; import com.amazonaws.services.glue.model.CreateDatabaseRequest; import com.amazonaws.services.glue.model.CreateTableRequest; import com.amazonaws.services.glue.model.DatabaseInput; @@ -63,11 +58,14 @@ import com.amazonaws.services.glue.model.UpdateTableRequest; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.concurrent.MoreFutures; import io.airlift.log.Logger; import io.trino.hdfs.DynamicHdfsConfiguration; @@ -117,6 +115,7 @@ import javax.annotation.Nullable; import javax.inject.Inject; +import java.time.Duration; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -134,7 +133,6 @@ import java.util.function.Predicate; import static com.google.common.base.MoreObjects.firstNonNull; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.Comparators.lexicographical; @@ -145,11 +143,11 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW; -import static io.trino.plugin.hive.aws.AwsCurrentRegionHolder.getCurrentRegionFromEC2Metadata; import static io.trino.plugin.hive.metastore.MetastoreUtil.makePartitionName; import static io.trino.plugin.hive.metastore.MetastoreUtil.toPartitionName; import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyCanDropColumn; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static io.trino.plugin.hive.metastore.glue.converter.GlueInputConverter.convertPartition; import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.getTableTypeNullable; import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.mappedCopy; @@ -179,6 +177,11 @@ public class GlueHiveMetastore private static final int AWS_GLUE_GET_PARTITIONS_MAX_RESULTS = 1000; private static final Comparator> PARTITION_VALUE_COMPARATOR = lexicographical(String.CASE_INSENSITIVE_ORDER); private static final Predicate VIEWS_FILTER = table -> VIRTUAL_VIEW.name().equals(getTableTypeNullable(table)); + private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() + .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) + .withDelay(Duration.ofMillis(100)) + .withMaxRetries(3) + .build(); private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; @@ -186,7 +189,7 @@ public class GlueHiveMetastore private final Optional defaultDir; private final int partitionSegments; private final Executor partitionsReadExecutor; - private final GlueMetastoreStats stats = new GlueMetastoreStats(); + private final GlueMetastoreStats stats; private final GlueColumnStatisticsProvider columnStatisticsProvider; private final boolean assumeCanonicalPartitionKeys; private final Predicate tableFilter; @@ -195,70 +198,40 @@ public class GlueHiveMetastore public GlueHiveMetastore( HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig, - AWSCredentialsProvider credentialsProvider, @ForGlueHiveMetastore Executor partitionsReadExecutor, GlueColumnStatisticsProviderFactory columnStatisticsProviderFactory, - @ForGlueHiveMetastore Optional requestHandler, + AWSGlueAsync glueClient, + @ForGlueHiveMetastore GlueMetastoreStats stats, @ForGlueHiveMetastore Predicate tableFilter) { - requireNonNull(credentialsProvider, "credentialsProvider is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hdfsContext = new HdfsContext(ConnectorIdentity.ofUser(DEFAULT_METASTORE_USER)); - this.glueClient = createAsyncGlueClient(glueConfig, credentialsProvider, requestHandler, stats.newRequestMetricsCollector()); + this.glueClient = requireNonNull(glueClient, "glueClient is null"); this.defaultDir = glueConfig.getDefaultWarehouseDir(); this.partitionSegments = glueConfig.getPartitionSegments(); this.partitionsReadExecutor = requireNonNull(partitionsReadExecutor, "partitionsReadExecutor is null"); this.assumeCanonicalPartitionKeys = glueConfig.isAssumeCanonicalPartitionKeys(); this.tableFilter = requireNonNull(tableFilter, "tableFilter is null"); + this.stats = requireNonNull(stats, "stats is null"); this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats); } - public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, AWSCredentialsProvider credentialsProvider, Optional requestHandler, RequestMetricCollector metricsCollector) - { - ClientConfiguration clientConfig = new ClientConfiguration() - .withMaxConnections(config.getMaxGlueConnections()) - .withMaxErrorRetry(config.getMaxGlueErrorRetries()); - AWSGlueAsyncClientBuilder asyncGlueClientBuilder = AWSGlueAsyncClientBuilder.standard() - .withMetricsCollector(metricsCollector) - .withClientConfiguration(clientConfig); - - ImmutableList.Builder requestHandlers = ImmutableList.builder(); - requestHandler.ifPresent(requestHandlers::add); - config.getCatalogId().ifPresent(catalogId -> requestHandlers.add(new GlueCatalogIdRequestHandler(catalogId))); - asyncGlueClientBuilder.setRequestHandlers(requestHandlers.build().toArray(RequestHandler2[]::new)); - - if (config.getGlueEndpointUrl().isPresent()) { - checkArgument(config.getGlueRegion().isPresent(), "Glue region must be set when Glue endpoint URL is set"); - asyncGlueClientBuilder.setEndpointConfiguration(new EndpointConfiguration( - config.getGlueEndpointUrl().get(), - config.getGlueRegion().get())); - } - else if (config.getGlueRegion().isPresent()) { - asyncGlueClientBuilder.setRegion(config.getGlueRegion().get()); - } - else if (config.getPinGlueClientToCurrentRegion()) { - asyncGlueClientBuilder.setRegion(getCurrentRegionFromEC2Metadata().getName()); - } - - asyncGlueClientBuilder.setCredentials(credentialsProvider); - - return asyncGlueClientBuilder.build(); - } - @VisibleForTesting public static GlueHiveMetastore createTestingGlueHiveMetastore(String defaultWarehouseDir) { HdfsConfig hdfsConfig = new HdfsConfig(); HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + GlueMetastoreStats stats = new GlueMetastoreStats(); + GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() + .setDefaultWarehouseDir(defaultWarehouseDir); return new GlueHiveMetastore( hdfsEnvironment, - new GlueHiveMetastoreConfig() - .setDefaultWarehouseDir(defaultWarehouseDir), - DefaultAWSCredentialsProviderChain.getInstance(), + glueConfig, directExecutor(), new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()), - Optional.empty(), + createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()), + stats, table -> true); } @@ -605,12 +578,13 @@ public void createTable(Table table, PrincipalPrivileges principalPrivileges) public void dropTable(String databaseName, String tableName, boolean deleteData) { Table table = getExistingTable(databaseName, tableName); - + DeleteTableRequest deleteTableRequest = new DeleteTableRequest() + .withDatabaseName(databaseName) + .withName(tableName); try { - stats.getDeleteTable().call(() -> - glueClient.deleteTable(new DeleteTableRequest() - .withDatabaseName(databaseName) - .withName(tableName))); + Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) + .run(() -> stats.getDeleteTable().call(() -> + glueClient.deleteTable(deleteTableRequest))); } catch (AmazonServiceException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java index 3388f80ce814..1c855ca4f4fd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java @@ -15,6 +15,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.services.glue.AWSGlueAsync; import com.amazonaws.services.glue.model.Table; import com.google.inject.Binder; import com.google.inject.Key; @@ -66,6 +67,8 @@ protected void setup(Binder binder) // export under the old name, for backwards compatibility binder.bind(GlueHiveMetastoreFactory.class).in(Scopes.SINGLETON); + binder.bind(Key.get(GlueMetastoreStats.class, ForGlueHiveMetastore.class)).toInstance(new GlueMetastoreStats()); + binder.bind(AWSGlueAsync.class).toProvider(HiveGlueClientProvider.class).in(Scopes.SINGLETON); newExporter(binder).export(GlueHiveMetastoreFactory.class).as(generator -> generator.generatedNameOf(GlueHiveMetastore.class)); binder.bind(Key.get(boolean.class, AllowHiveTableRename.class)).toInstance(false); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/HiveGlueClientProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/HiveGlueClientProvider.java new file mode 100644 index 000000000000..696f3bf28b12 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/HiveGlueClientProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.glue; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.services.glue.AWSGlueAsync; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; +import static java.util.Objects.requireNonNull; + +public class HiveGlueClientProvider + implements Provider +{ + private final GlueMetastoreStats stats; + private final AWSCredentialsProvider credentialsProvider; + private final GlueHiveMetastoreConfig glueConfig; // TODO do not keep mutable config instance on a field + private final Optional requestHandler; + + @Inject + public HiveGlueClientProvider( + @ForGlueHiveMetastore GlueMetastoreStats stats, + AWSCredentialsProvider credentialsProvider, + @ForGlueHiveMetastore Optional requestHandler, + GlueHiveMetastoreConfig glueConfig) + { + this.stats = requireNonNull(stats, "stats is null"); + this.credentialsProvider = requireNonNull(credentialsProvider, "credentialsProvider is null"); + this.requestHandler = requireNonNull(requestHandler, "requestHandler is null"); + this.glueConfig = glueConfig; + } + + @Override + public AWSGlueAsync get() + { + return createAsyncGlueClient(glueConfig, credentialsProvider, requestHandler, stats.newRequestMetricsCollector()); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java index d4a2621111bc..9a05b1db007f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java @@ -95,6 +95,7 @@ import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createIntegerColumnStatistics; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static io.trino.plugin.hive.metastore.glue.PartitionFilterBuilder.DECIMAL_TYPE; import static io.trino.plugin.hive.metastore.glue.PartitionFilterBuilder.decimalOf; import static io.trino.plugin.hive.util.HiveUtil.DELTA_LAKE_PROVIDER; @@ -223,13 +224,14 @@ protected HiveMetastore createMetastore(File tempDir) glueConfig.setAssumeCanonicalPartitionKeys(true); Executor executor = new BoundedExecutor(this.executor, 10); + GlueMetastoreStats stats = new GlueMetastoreStats(); return new GlueHiveMetastore( HDFS_ENVIRONMENT, glueConfig, - DefaultAWSCredentialsProviderChain.getInstance(), executor, new DefaultGlueColumnStatisticsProviderFactory(executor, executor), - Optional.empty(), + createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()), + stats, new DefaultGlueMetastoreTableFilterProvider(true).get()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java index 7235352b6baa..5e4da1d8d2d7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java @@ -23,7 +23,7 @@ import java.util.Optional; -import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static java.util.Objects.requireNonNull; public class GlueClientProvider diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java index f8fc97dc223f..5c68ee95bee7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java @@ -27,7 +27,7 @@ import java.util.Optional; -import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static java.util.Objects.requireNonNull; public class TestingGlueIcebergTableOperationsProvider diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index 28f1a78ef177..6eb8f415c12e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -32,6 +32,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnTrino; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; @@ -87,7 +88,7 @@ public void testAddColumnUnsupportedWriterVersion() .hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -120,7 +121,7 @@ public void testRenameColumn() .containsOnly(row(1), row(2)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -154,7 +155,7 @@ public void testRenamePartitionedColumn() .containsOnly(row(1, "part1"), row(2, "part2")); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -369,7 +370,7 @@ public void testTrinoAlterTablePreservesGeneratedColumn() .containsOnly(row(1, 2, 3)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java index 7295c3d05a8b..3786adf4fe23 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java @@ -27,6 +27,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -62,7 +63,7 @@ public void testCheckConstraintCompatibility(String columnDefinition, String che .containsOnly(row(insertedValue), row(insertedValue)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -112,7 +113,7 @@ public void testCheckConstraintAcrossColumns() .containsOnly(row(1, 1)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -136,7 +137,7 @@ public void testWritesToTableWithCheckConstraintFails() .hasMessageContaining("Cannot merge into a table with check constraints"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -155,7 +156,7 @@ public void testMetadataOperationsRetainCheckConstraint() onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'example table comment'"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index 0efc3968b513..df3da7041265 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -32,6 +32,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -63,7 +64,7 @@ public void testColumnMappingModeNone() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -122,7 +123,7 @@ public void testColumnMappingMode(String mode) .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -158,7 +159,7 @@ public void testColumnMappingModeNameWithNonLowerCaseColumn(String mode) row(null, null, null, null, 2.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -217,7 +218,7 @@ public void testColumnMappingModeAddColumn(String mode) .containsOnly(row(1, null), row(2, null)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -250,7 +251,7 @@ public void testShowStatsFromJsonForColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -286,7 +287,7 @@ public void testShowStatsFromParquetForColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -319,7 +320,7 @@ public void testShowStatsOnPartitionedForColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -356,7 +357,7 @@ public void testUnsupportedOperationsColumnMappingModeName(String mode) .hasMessageContaining("This connector does not support dropping columns"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -387,7 +388,7 @@ public void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java index af598315a917..9efca06aefa1 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java @@ -28,6 +28,7 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -109,7 +110,7 @@ public void testUpdatePartitionedTableWithCdf() row("testValue5", 3, "partition3", "update_postimage", 4L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -136,7 +137,7 @@ public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled() row("testValue3", 5, "update_postimage", 2L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -168,7 +169,7 @@ public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdf row("testValue5", 3, "partition3", "update_postimage", 2L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -205,7 +206,7 @@ public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated() row("testValue4", 4, "partition2", "update_postimage", 3L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -256,7 +257,7 @@ public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated() row("testValue6", 6, "insert", 7L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -284,7 +285,7 @@ public void testDeleteFromTableWithCdf() row("testValue3", 3, "delete", 4L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -339,8 +340,8 @@ public void testMergeUpdateIntoTableWithCdfEnabled() row(20004, "nation2", 200, "update_postimage", 4)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName1); - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName2); + dropDeltaTableWithRetry("default." + tableName1); + dropDeltaTableWithRetry("default." + tableName2); } } @@ -393,8 +394,8 @@ public void testMergeDeleteIntoTableWithCdfEnabled() row(2, "nation2", 200, "delete", 4)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName1); - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName2); + dropDeltaTableWithRetry("default." + tableName1); + dropDeltaTableWithRetry("default." + tableName2); } } @@ -454,8 +455,8 @@ public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled() row(4, "pageUrl4", 400, "update_preimage", 5)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + targetTableName); - onDelta().executeQuery("DROP TABLE IF EXISTS default." + sourceTableName); + dropDeltaTableWithRetry("default." + targetTableName); + dropDeltaTableWithRetry("default." + sourceTableName); } } @@ -493,7 +494,7 @@ public void testDeleteFromNullPartitionWithCdfEnabled() row("testValue3", 3, null, "delete", 2L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java index ea0f8351d93c..30e8d719a978 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java @@ -45,6 +45,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -127,7 +128,7 @@ public void testDatabricksCanReadTrinoCheckpoint() } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -187,7 +188,7 @@ public void testTrinoUsesCheckpointInterval() } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -255,7 +256,7 @@ public void testDatabricksUsesCheckpointInterval() } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -340,7 +341,7 @@ private void testCheckpointMinMaxStatisticsForRowType(Consumer sqlExecut } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -405,7 +406,7 @@ private void testCheckpointNullStatisticsForRowType(Consumer sqlExecutor } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -448,7 +449,7 @@ private void testWriteStatsAsJsonDisabled(Consumer sqlExecutor, String t row(null, null, null, null, 1.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -492,7 +493,7 @@ private void testWriteStatsAsStructDisabled(Consumer sqlExecutor, String row(null, null, null, null, null, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -550,7 +551,7 @@ private void testWriteStatsAsJsonEnabled(Consumer sqlExecutor, String ta row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -640,7 +641,7 @@ private void testWriteStatsAsStructEnabled(Consumer sqlExecutor, String row(null, null, null, null, 1.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java index bcc8ad4bc14f..af763f339fc1 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java @@ -43,6 +43,7 @@ import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -91,7 +92,7 @@ public void testPrestoTypesWithDatabricks() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -118,7 +119,7 @@ public void testPrestoTimestampsWithDatabricks() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -146,7 +147,7 @@ public void testPrestoCacheInvalidatedOnCreateTable() .map(QueryAssert.Row::new) .collect(toImmutableList())); - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); removeS3Directory(bucketName, "databricks-compatibility-test-" + tableName); assertThat(onTrino().executeQuery("CREATE TABLE delta.default.\"" + tableName + "\" " + @@ -168,7 +169,7 @@ public void testPrestoCacheInvalidatedOnCreateTable() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -223,7 +224,7 @@ public void testReplaceTableWithSchemaChange() assertThat(onTrino().executeQuery("SELECT to_iso8601(ts) FROM delta.default." + tableName)).containsOnly(expected.build()); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -252,7 +253,7 @@ public void testReplaceTableWithSchemaChangeOnCheckpoint() assertThat(onTrino().executeQuery("SELECT to_iso8601(ts) FROM delta.default." + tableName)).containsOnly(expected.build()); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java index e37e9fe32778..cfd1b3356739 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java @@ -31,6 +31,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_113_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnTrino; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; @@ -88,7 +89,7 @@ public void testDatabricksCanReadInitialCreateTable() testInsert(tableName, ImmutableList.of()); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -131,7 +132,7 @@ public void testDatabricksCanReadInitialCreatePartitionedTable() testInsert(tableName, ImmutableList.of()); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -176,7 +177,7 @@ public void testDatabricksCanReadInitialCreateTableAs() row(null, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -223,7 +224,7 @@ public void testDatabricksCanReadInitialCreatePartitionedTableAs() row(null, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -308,7 +309,7 @@ public void testCreateTableWithColumnCommentOnDelta() assertEquals(getColumnCommentOnTrino("default", tableName, "col"), "test comment"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index e9fcb82212d5..e50f21096e05 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -38,6 +38,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -87,7 +88,7 @@ public void testInsertCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -125,7 +126,7 @@ public void testPartitionedInsertCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -179,7 +180,7 @@ public void testDeltaPartitionedDifferentOrderInsertCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -217,7 +218,7 @@ public void testInsertNonLowercaseColumnsCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -257,7 +258,7 @@ public void testInsertNestedNonLowercaseColumnsCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -296,7 +297,7 @@ public void testPartitionedInsertNonLowercaseColumnsCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -324,7 +325,7 @@ public void testDeleteCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -429,7 +430,7 @@ public void testWritesToTableWithGeneratedColumnFails() .hasMessageContaining("Writing to tables with generated columns is not supported"); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java index 6462d7cba82e..0d5260898a8e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java @@ -25,6 +25,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -79,7 +80,7 @@ private void testDatabricksCanReadFromCtasTableCreatedByTrinoWithSpecialCharacte assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -133,7 +134,7 @@ private void testTrinoCanReadFromCtasTableCreatedByDatabricksWithSpecialCharacte assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -186,7 +187,7 @@ private void testDatabricksCanReadTableCreatedByTrinoWithSpecialCharactersInPart assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -242,7 +243,7 @@ private void testTrinoCanReadTableCreatedByDatabricksWithSpecialCharactersInPart assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -295,7 +296,7 @@ private void testDatabricksCanReadFromTableUpdatedByTrinoWithCpIntervalSet(int i assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -351,7 +352,7 @@ private void testTrinoCanReadFromTableUpdatedByDatabricksWithCpIntervalSet(int i assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -381,7 +382,7 @@ public void testTrinoCanReadFromTablePartitionChangedByDatabricks() assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java index 3d726e3e9ed3..a7b3024fb335 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java @@ -30,6 +30,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -95,7 +96,7 @@ public void testUpdatesFromDatabricks() assertThat(databricksResult).containsExactly(toRows(prestoResult)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java index 3f090492c0db..badec3dd8a79 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java @@ -31,6 +31,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.hive.Engine.DELTA; import static io.trino.tests.product.hive.Engine.TRINO; import static io.trino.tests.product.utils.QueryExecutors.onDelta; @@ -129,7 +130,7 @@ private void testDropTableAccuracy(Engine creator, Engine dropper, boolean expli } } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS " + schemaName + "." + tableName); + dropDeltaTableWithRetry(schemaName + "." + tableName); onDelta().executeQuery("DROP SCHEMA " + schemaName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java index aa27d9902598..19f7070ed50d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java @@ -29,6 +29,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -92,7 +93,7 @@ public void testPartitionedSelectSpecialCharacters() .containsOnly(row(1, "spark=equal")); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java index f5be11a84ce4..d4017e295705 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java @@ -37,6 +37,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -89,7 +90,7 @@ public void testUpdateCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -119,7 +120,7 @@ public void testDeleteCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -149,7 +150,7 @@ public void testDeleteOnPartitionedTableCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -176,7 +177,7 @@ public void testDeleteOnPartitionKeyCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -265,7 +266,7 @@ public void testTrinoRespectsDatabricksSettingNonNullableColumn() .containsOnly(row(1, 2)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -320,7 +321,7 @@ public void testInsertingIntoDatabricksTableWithAddedNotNullConstraint() .containsOnly(row(1, 2)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -369,7 +370,7 @@ private void testVacuumRemoveChangeDataFeedFiles(Consumer vacuumExecutor Assertions.assertThat(s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries()).hasSize(0); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -474,7 +475,7 @@ Stream rows() @Override public void close() { - onDelta().executeQuery("DROP TABLE default." + name); + dropDeltaTableWithRetry("default." + name); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java index 8985c3fe4be4..83284c5b37e2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java @@ -41,6 +41,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -65,7 +66,7 @@ public void testHiveToDeltaRedirect() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -107,7 +108,7 @@ public void testHiveToNonexistentDeltaCatalogRedirectFailure() .hasMessageMatching(".*Table 'hive.default.test_redirect_to_nonexistent_delta_.*' redirected to 'epsilon.default.test_redirect_to_nonexistent_delta_.*', but the target catalog 'epsilon' does not exist"); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -130,7 +131,7 @@ public void testHiveToDeltaRedirectWithDefaultSchemaInSession() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -148,7 +149,7 @@ public void testHiveToUnpartitionedDeltaPartitionsRedirectFailure() "but the target table 'delta.default.test_delta_lake_unpartitioned_table_.*\\$partitions' does not exist"); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -166,7 +167,7 @@ public void testHiveToPartitionedDeltaPartitionsRedirectFailure() "but the target table 'delta.default.test_delta_lake_partitioned_table_.*\\$partitions' does not exist"); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -359,7 +360,7 @@ public void testHiveToDeltaInsert() assertThat(onTrino().executeQuery(format("SELECT count(*) FROM hive.default.\"%s\"", tableName))).containsOnly(row(5)); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -402,7 +403,7 @@ public void testHiveToDeltaDescribe() .containsOnly(expectedResults); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -436,7 +437,7 @@ public void testHiveToDeltaShowCreateTable() .hasRowsCount(1); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -477,10 +478,10 @@ public void testHiveToDeltaAlterTable() try { onTrino().executeQuery("ALTER TABLE hive.default.\"" + tableName + "\" RENAME TO \"" + newTableName + "\""); - onDelta().executeQuery("DROP TABLE " + newTableName); + dropDeltaTableWithRetry(newTableName); } catch (QueryExecutionException e) { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); throw e; } } @@ -524,7 +525,7 @@ public void testHiveToDeltaCommentTable() assertTableComment("delta", "default", tableName).isEqualTo(tableComment); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -571,7 +572,7 @@ public void testHiveToDeltaCommentColumn() assertColumnComment("delta", "default", tableName, columnName).isEqualTo(columnComment); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -870,8 +871,8 @@ public void testViewReferencingHiveAndDeltaTable(boolean legacyHiveViewTranslati } finally { onDelta().executeQuery("DROP VIEW IF EXISTS " + viewName); - onDelta().executeQuery("DROP TABLE IF EXISTS " + deltaTableName); - onDelta().executeQuery("DROP TABLE IF EXISTS " + deltaRegionTableName); + dropDeltaTableWithRetry(deltaTableName); + dropDeltaTableWithRetry(deltaRegionTableName); onTrino().executeQuery("DROP TABLE IF EXISTS hive.default." + hiveTableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java index bb72c050e0fb..595f455c6664 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java @@ -13,9 +13,15 @@ */ package io.trino.tests.product.deltalake.util; +import com.amazonaws.services.glue.model.ConcurrentModificationException; +import com.google.common.base.Throwables; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import io.airlift.log.Logger; import io.trino.tempto.query.QueryResult; import org.intellij.lang.annotations.Language; +import java.time.temporal.ChronoUnit; import java.util.Optional; import static com.google.common.collect.MoreCollectors.onlyElement; @@ -25,10 +31,18 @@ public final class DeltaLakeTestUtils { + private static final Logger log = Logger.get(DeltaLakeTestUtils.class); + public static final String DATABRICKS_COMMUNICATION_FAILURE_ISSUE = "https://github.com/trinodb/trino/issues/14391"; @Language("RegExp") public static final String DATABRICKS_COMMUNICATION_FAILURE_MATCH = "\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown."; + private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() + .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) + .withBackoff(1, 10, ChronoUnit.SECONDS) + .withMaxRetries(3) + .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount())) + .build(); private DeltaLakeTestUtils() {} @@ -63,4 +77,13 @@ public static String getTableCommentOnDelta(String schemaName, String tableName) .map(row -> row.get(1)) .collect(onlyElement()); } + + /** + * Workaround method to avoid Table being modified concurrently error in Glue. + */ + public static QueryResult dropDeltaTableWithRetry(String tableName) + { + return Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) + .get(() -> onDelta().executeQuery("DROP TABLE IF EXISTS " + tableName)); + } }