From 4f9c3ec363fce2957982b34b444249c5f452b36c Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 15 Feb 2023 09:50:09 +0100 Subject: [PATCH 1/3] Extract Glue client provider in Hive --- plugin/trino-delta-lake/pom.xml | 5 ++ .../hive/metastore/glue/GlueClientUtil.java | 69 +++++++++++++++++++ .../metastore/glue/GlueHiveMetastore.java | 61 ++++------------ .../metastore/glue/GlueMetastoreModule.java | 3 + .../glue/HiveGlueClientProvider.java | 54 +++++++++++++++ .../metastore/glue/TestHiveGlueMetastore.java | 6 +- .../catalog/glue/GlueClientProvider.java | 2 +- ...ingGlueIcebergTableOperationsProvider.java | 2 +- 8 files changed, 149 insertions(+), 53 deletions(-) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueClientUtil.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/HiveGlueClientProvider.java diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 00059215dbea..d662a876524a 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -112,6 +112,11 @@ units + + com.amazonaws + aws-java-sdk-core + + com.amazonaws aws-java-sdk-glue diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueClientUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueClientUtil.java new file mode 100644 index 000000000000..4d010d88b6e0 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueClientUtil.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.glue; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.metrics.RequestMetricCollector; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.google.common.collect.ImmutableList; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.hive.aws.AwsCurrentRegionHolder.getCurrentRegionFromEC2Metadata; + +public final class GlueClientUtil +{ + private GlueClientUtil() {} + + public static AWSGlueAsync createAsyncGlueClient( + GlueHiveMetastoreConfig config, + AWSCredentialsProvider credentialsProvider, + Optional requestHandler, + RequestMetricCollector metricsCollector) + { + ClientConfiguration clientConfig = new ClientConfiguration() + .withMaxConnections(config.getMaxGlueConnections()) + .withMaxErrorRetry(config.getMaxGlueErrorRetries()); + AWSGlueAsyncClientBuilder asyncGlueClientBuilder = AWSGlueAsyncClientBuilder.standard() + .withMetricsCollector(metricsCollector) + .withClientConfiguration(clientConfig); + + ImmutableList.Builder requestHandlers = ImmutableList.builder(); + requestHandler.ifPresent(requestHandlers::add); + config.getCatalogId().ifPresent(catalogId -> requestHandlers.add(new GlueCatalogIdRequestHandler(catalogId))); + asyncGlueClientBuilder.setRequestHandlers(requestHandlers.build().toArray(RequestHandler2[]::new)); + + if (config.getGlueEndpointUrl().isPresent()) { + checkArgument(config.getGlueRegion().isPresent(), "Glue region must be set when Glue endpoint URL is set"); + asyncGlueClientBuilder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + config.getGlueEndpointUrl().get(), + config.getGlueRegion().get())); + } + else if (config.getGlueRegion().isPresent()) { + asyncGlueClientBuilder.setRegion(config.getGlueRegion().get()); + } + else if (config.getPinGlueClientToCurrentRegion()) { + asyncGlueClientBuilder.setRegion(getCurrentRegionFromEC2Metadata().getName()); + } + + asyncGlueClientBuilder.setCredentials(credentialsProvider); + + return asyncGlueClientBuilder.build(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index 3200d3dd50e5..f1f75a2d7b48 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -15,15 +15,9 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonWebServiceRequest; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.handlers.AsyncHandler; -import com.amazonaws.handlers.RequestHandler2; -import com.amazonaws.metrics.RequestMetricCollector; import com.amazonaws.services.glue.AWSGlueAsync; -import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; import com.amazonaws.services.glue.model.AccessDeniedException; import com.amazonaws.services.glue.model.AlreadyExistsException; import com.amazonaws.services.glue.model.BatchCreatePartitionRequest; @@ -134,7 +128,6 @@ import java.util.function.Predicate; import static com.google.common.base.MoreObjects.firstNonNull; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.Comparators.lexicographical; @@ -145,11 +138,11 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW; -import static io.trino.plugin.hive.aws.AwsCurrentRegionHolder.getCurrentRegionFromEC2Metadata; import static io.trino.plugin.hive.metastore.MetastoreUtil.makePartitionName; import static io.trino.plugin.hive.metastore.MetastoreUtil.toPartitionName; import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyCanDropColumn; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static io.trino.plugin.hive.metastore.glue.converter.GlueInputConverter.convertPartition; import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.getTableTypeNullable; import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.mappedCopy; @@ -186,7 +179,7 @@ public class GlueHiveMetastore private final Optional defaultDir; private final int partitionSegments; private final Executor partitionsReadExecutor; - private final GlueMetastoreStats stats = new GlueMetastoreStats(); + private final GlueMetastoreStats stats; private final GlueColumnStatisticsProvider columnStatisticsProvider; private final boolean assumeCanonicalPartitionKeys; private final Predicate tableFilter; @@ -195,70 +188,40 @@ public class GlueHiveMetastore public GlueHiveMetastore( HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig, - AWSCredentialsProvider credentialsProvider, @ForGlueHiveMetastore Executor partitionsReadExecutor, GlueColumnStatisticsProviderFactory columnStatisticsProviderFactory, - @ForGlueHiveMetastore Optional requestHandler, + AWSGlueAsync glueClient, + @ForGlueHiveMetastore GlueMetastoreStats stats, @ForGlueHiveMetastore Predicate tableFilter) { - requireNonNull(credentialsProvider, "credentialsProvider is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hdfsContext = new HdfsContext(ConnectorIdentity.ofUser(DEFAULT_METASTORE_USER)); - this.glueClient = createAsyncGlueClient(glueConfig, credentialsProvider, requestHandler, stats.newRequestMetricsCollector()); + this.glueClient = requireNonNull(glueClient, "glueClient is null"); this.defaultDir = glueConfig.getDefaultWarehouseDir(); this.partitionSegments = glueConfig.getPartitionSegments(); this.partitionsReadExecutor = requireNonNull(partitionsReadExecutor, "partitionsReadExecutor is null"); this.assumeCanonicalPartitionKeys = glueConfig.isAssumeCanonicalPartitionKeys(); this.tableFilter = requireNonNull(tableFilter, "tableFilter is null"); + this.stats = requireNonNull(stats, "stats is null"); this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats); } - public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, AWSCredentialsProvider credentialsProvider, Optional requestHandler, RequestMetricCollector metricsCollector) - { - ClientConfiguration clientConfig = new ClientConfiguration() - .withMaxConnections(config.getMaxGlueConnections()) - .withMaxErrorRetry(config.getMaxGlueErrorRetries()); - AWSGlueAsyncClientBuilder asyncGlueClientBuilder = AWSGlueAsyncClientBuilder.standard() - .withMetricsCollector(metricsCollector) - .withClientConfiguration(clientConfig); - - ImmutableList.Builder requestHandlers = ImmutableList.builder(); - requestHandler.ifPresent(requestHandlers::add); - config.getCatalogId().ifPresent(catalogId -> requestHandlers.add(new GlueCatalogIdRequestHandler(catalogId))); - asyncGlueClientBuilder.setRequestHandlers(requestHandlers.build().toArray(RequestHandler2[]::new)); - - if (config.getGlueEndpointUrl().isPresent()) { - checkArgument(config.getGlueRegion().isPresent(), "Glue region must be set when Glue endpoint URL is set"); - asyncGlueClientBuilder.setEndpointConfiguration(new EndpointConfiguration( - config.getGlueEndpointUrl().get(), - config.getGlueRegion().get())); - } - else if (config.getGlueRegion().isPresent()) { - asyncGlueClientBuilder.setRegion(config.getGlueRegion().get()); - } - else if (config.getPinGlueClientToCurrentRegion()) { - asyncGlueClientBuilder.setRegion(getCurrentRegionFromEC2Metadata().getName()); - } - - asyncGlueClientBuilder.setCredentials(credentialsProvider); - - return asyncGlueClientBuilder.build(); - } - @VisibleForTesting public static GlueHiveMetastore createTestingGlueHiveMetastore(String defaultWarehouseDir) { HdfsConfig hdfsConfig = new HdfsConfig(); HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + GlueMetastoreStats stats = new GlueMetastoreStats(); + GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() + .setDefaultWarehouseDir(defaultWarehouseDir); return new GlueHiveMetastore( hdfsEnvironment, - new GlueHiveMetastoreConfig() - .setDefaultWarehouseDir(defaultWarehouseDir), - DefaultAWSCredentialsProviderChain.getInstance(), + glueConfig, directExecutor(), new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()), - Optional.empty(), + createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()), + stats, table -> true); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java index 3388f80ce814..1c855ca4f4fd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java @@ -15,6 +15,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.services.glue.AWSGlueAsync; import com.amazonaws.services.glue.model.Table; import com.google.inject.Binder; import com.google.inject.Key; @@ -66,6 +67,8 @@ protected void setup(Binder binder) // export under the old name, for backwards compatibility binder.bind(GlueHiveMetastoreFactory.class).in(Scopes.SINGLETON); + binder.bind(Key.get(GlueMetastoreStats.class, ForGlueHiveMetastore.class)).toInstance(new GlueMetastoreStats()); + binder.bind(AWSGlueAsync.class).toProvider(HiveGlueClientProvider.class).in(Scopes.SINGLETON); newExporter(binder).export(GlueHiveMetastoreFactory.class).as(generator -> generator.generatedNameOf(GlueHiveMetastore.class)); binder.bind(Key.get(boolean.class, AllowHiveTableRename.class)).toInstance(false); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/HiveGlueClientProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/HiveGlueClientProvider.java new file mode 100644 index 000000000000..696f3bf28b12 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/HiveGlueClientProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.glue; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.services.glue.AWSGlueAsync; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; +import static java.util.Objects.requireNonNull; + +public class HiveGlueClientProvider + implements Provider +{ + private final GlueMetastoreStats stats; + private final AWSCredentialsProvider credentialsProvider; + private final GlueHiveMetastoreConfig glueConfig; // TODO do not keep mutable config instance on a field + private final Optional requestHandler; + + @Inject + public HiveGlueClientProvider( + @ForGlueHiveMetastore GlueMetastoreStats stats, + AWSCredentialsProvider credentialsProvider, + @ForGlueHiveMetastore Optional requestHandler, + GlueHiveMetastoreConfig glueConfig) + { + this.stats = requireNonNull(stats, "stats is null"); + this.credentialsProvider = requireNonNull(credentialsProvider, "credentialsProvider is null"); + this.requestHandler = requireNonNull(requestHandler, "requestHandler is null"); + this.glueConfig = glueConfig; + } + + @Override + public AWSGlueAsync get() + { + return createAsyncGlueClient(glueConfig, credentialsProvider, requestHandler, stats.newRequestMetricsCollector()); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java index d4a2621111bc..9a05b1db007f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastore.java @@ -95,6 +95,7 @@ import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createIntegerColumnStatistics; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static io.trino.plugin.hive.metastore.glue.PartitionFilterBuilder.DECIMAL_TYPE; import static io.trino.plugin.hive.metastore.glue.PartitionFilterBuilder.decimalOf; import static io.trino.plugin.hive.util.HiveUtil.DELTA_LAKE_PROVIDER; @@ -223,13 +224,14 @@ protected HiveMetastore createMetastore(File tempDir) glueConfig.setAssumeCanonicalPartitionKeys(true); Executor executor = new BoundedExecutor(this.executor, 10); + GlueMetastoreStats stats = new GlueMetastoreStats(); return new GlueHiveMetastore( HDFS_ENVIRONMENT, glueConfig, - DefaultAWSCredentialsProviderChain.getInstance(), executor, new DefaultGlueColumnStatisticsProviderFactory(executor, executor), - Optional.empty(), + createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()), + stats, new DefaultGlueMetastoreTableFilterProvider(true).get()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java index 7235352b6baa..5e4da1d8d2d7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueClientProvider.java @@ -23,7 +23,7 @@ import java.util.Optional; -import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static java.util.Objects.requireNonNull; public class GlueClientProvider diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java index f8fc97dc223f..5c68ee95bee7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestingGlueIcebergTableOperationsProvider.java @@ -27,7 +27,7 @@ import java.util.Optional; -import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; import static java.util.Objects.requireNonNull; public class TestingGlueIcebergTableOperationsProvider From 2161ad8fc07628c07b6e8f0c56b6e592fe5eccbd Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 15 Feb 2023 12:00:36 +0100 Subject: [PATCH 2/3] Retry dropping Glue table in the context of concurrent modifications Retry the query dropping the AWS Glue table, in the unlikely situation that another query engine may be calling asynchronously in the background an update operation on the table. --- plugin/trino-delta-lake/pom.xml | 2 + ...keConcurrentModificationGlueMetastore.java | 129 ++++++++++++++++++ plugin/trino-hive/pom.xml | 11 +- .../metastore/glue/GlueHiveMetastore.java | 21 ++- 4 files changed, 152 insertions(+), 11 deletions(-) create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index d662a876524a..48f897d73f75 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -476,6 +476,7 @@ **/TestDeltaLakeRenameToWithGlueMetastore.java **/TestDeltaLakeRegisterTableProcedureWithGlue.java **/TestDeltaLakeViewsGlueMetastore.java + **/TestDeltaLakeConcurrentModificationGlueMetastore.java **/TestDeltaLakeGcsConnectorSmokeTest.java **/Test*FailureRecoveryTest.java @@ -505,6 +506,7 @@ **/TestDeltaLakeRenameToWithGlueMetastore.java **/TestDeltaLakeRegisterTableProcedureWithGlue.java **/TestDeltaLakeViewsGlueMetastore.java + **/TestDeltaLakeConcurrentModificationGlueMetastore.java diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java new file mode 100644 index 000000000000..794070f2de66 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.metastore.glue; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.ConcurrentModificationException; +import io.trino.Session; +import io.trino.plugin.deltalake.TestingDeltaLakePlugin; +import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; +import io.trino.plugin.hive.metastore.glue.DefaultGlueColumnStatisticsProviderFactory; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.spi.TrinoException; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.common.reflect.Reflection.newProxy; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.metastore.glue.GlueClientUtil.createAsyncGlueClient; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.testng.Assert.assertFalse; + +@Test(singleThreaded = true) +public class TestDeltaLakeConcurrentModificationGlueMetastore + extends AbstractTestQueryFramework +{ + private static final String CATALOG_NAME = "test_delta_lake_concurrent"; + private static final String SCHEMA = "test_delta_lake_glue_concurrent_" + randomNameSuffix(); + private String dataDirectory; + private GlueHiveMetastore metastore; + private final AtomicBoolean failNextGlueDeleteTableCall = new AtomicBoolean(false); + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session deltaLakeSession = testSessionBuilder() + .setCatalog(CATALOG_NAME) + .setSchema(SCHEMA) + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build(); + + dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("data_delta_concurrent").toString(); + GlueMetastoreStats stats = new GlueMetastoreStats(); + GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() + .setDefaultWarehouseDir(dataDirectory); + + AWSGlueAsync glueClient = createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector()); + AWSGlueAsync proxiedGlueClient = newProxy(AWSGlueAsync.class, (proxy, method, args) -> { + Object result; + try { + if (method.getName().equals("deleteTable") && failNextGlueDeleteTableCall.get()) { + // Simulate concurrent modifications on the table that is about to be dropped + failNextGlueDeleteTableCall.set(false); + throw new TrinoException(HIVE_METASTORE_ERROR, new ConcurrentModificationException("Test-simulated metastore concurrent modification exception")); + } + result = method.invoke(glueClient, args); + } + catch (InvocationTargetException e) { + throw e.getCause(); + } + return result; + }); + + metastore = new GlueHiveMetastore( + HDFS_ENVIRONMENT, + glueConfig, + directExecutor(), + new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()), + proxiedGlueClient, + stats, + table -> true); + + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE)); + queryRunner.createCatalog(CATALOG_NAME, "delta_lake"); + queryRunner.execute("CREATE SCHEMA " + SCHEMA); + return queryRunner; + } + + @Test + public void testDropTableWithConcurrentModifications() + { + String tableName = "test_glue_table_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS data", 1); + + failNextGlueDeleteTableCall.set(true); + assertUpdate("DROP TABLE " + tableName); + assertFalse(getQueryRunner().tableExists(getSession(), tableName)); + } + + @AfterClass(alwaysRun = true) + public void cleanup() + throws IOException + { + if (metastore != null) { + metastore.dropDatabase(SCHEMA, false); + deleteRecursively(Path.of(dataDirectory), ALLOW_INSECURE); + } + } +} diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index c32c87ee5f47..a3921a98aeb5 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -230,6 +230,11 @@ rubix-presto-shaded + + dev.failsafe + failsafe + + it.unimi.dsi fastutil @@ -425,12 +430,6 @@ test - - dev.failsafe - failsafe - test - - io.minio minio diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index f1f75a2d7b48..4164368fbca8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -27,6 +27,7 @@ import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest; import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry; import com.amazonaws.services.glue.model.BatchUpdatePartitionResult; +import com.amazonaws.services.glue.model.ConcurrentModificationException; import com.amazonaws.services.glue.model.CreateDatabaseRequest; import com.amazonaws.services.glue.model.CreateTableRequest; import com.amazonaws.services.glue.model.DatabaseInput; @@ -57,11 +58,14 @@ import com.amazonaws.services.glue.model.UpdateTableRequest; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.concurrent.MoreFutures; import io.airlift.log.Logger; import io.trino.hdfs.DynamicHdfsConfiguration; @@ -111,6 +115,7 @@ import javax.annotation.Nullable; import javax.inject.Inject; +import java.time.Duration; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -172,6 +177,11 @@ public class GlueHiveMetastore private static final int AWS_GLUE_GET_PARTITIONS_MAX_RESULTS = 1000; private static final Comparator> PARTITION_VALUE_COMPARATOR = lexicographical(String.CASE_INSENSITIVE_ORDER); private static final Predicate VIEWS_FILTER = table -> VIRTUAL_VIEW.name().equals(getTableTypeNullable(table)); + private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() + .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) + .withDelay(Duration.ofMillis(100)) + .withMaxRetries(3) + .build(); private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; @@ -568,12 +578,13 @@ public void createTable(Table table, PrincipalPrivileges principalPrivileges) public void dropTable(String databaseName, String tableName, boolean deleteData) { Table table = getExistingTable(databaseName, tableName); - + DeleteTableRequest deleteTableRequest = new DeleteTableRequest() + .withDatabaseName(databaseName) + .withName(tableName); try { - stats.getDeleteTable().call(() -> - glueClient.deleteTable(new DeleteTableRequest() - .withDatabaseName(databaseName) - .withName(tableName))); + Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) + .run(() -> stats.getDeleteTable().call(() -> + glueClient.deleteTable(deleteTableRequest))); } catch (AmazonServiceException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); From 58ae39e511e35fa39dde6c78bea78b513d253f3b Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 15 Feb 2023 12:37:12 +0100 Subject: [PATCH 3/3] Retry dropping delta tables backed by AWS Glue on Databricks Retry the query dropping the delta table on the Databricks cluster, in the unlikely situation that the Databricks cluster may be calling asynchronously in the background an update operation on the table. --- .../TestDeltaLakeAlterTableCompatibility.java | 9 +++--- ...DeltaLakeCheckConstraintCompatibility.java | 9 +++--- .../TestDeltaLakeColumnMappingMode.java | 19 ++++++------ ...DatabricksChangeDataFeedCompatibility.java | 27 ++++++++--------- ...akeDatabricksCheckpointsCompatibility.java | 19 ++++++------ ...ricksCreateTableAsSelectCompatibility.java | 13 +++++---- ...akeDatabricksCreateTableCompatibility.java | 11 +++---- ...eltaLakeDatabricksInsertCompatibility.java | 17 ++++++----- ...keDatabricksPartitioningCompatibility.java | 15 +++++----- .../TestDeltaLakeDatabricksUpdates.java | 3 +- .../TestDeltaLakeDropTableCompatibility.java | 3 +- .../TestDeltaLakeSelectCompatibility.java | 3 +- ...DeltaLakeWriteDatabricksCompatibility.java | 17 ++++++----- .../TestHiveAndDeltaLakeRedirect.java | 29 ++++++++++--------- .../deltalake/util/DeltaLakeTestUtils.java | 23 +++++++++++++++ 15 files changed, 127 insertions(+), 90 deletions(-) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index 28f1a78ef177..6eb8f415c12e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -32,6 +32,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnTrino; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; @@ -87,7 +88,7 @@ public void testAddColumnUnsupportedWriterVersion() .hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -120,7 +121,7 @@ public void testRenameColumn() .containsOnly(row(1), row(2)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -154,7 +155,7 @@ public void testRenamePartitionedColumn() .containsOnly(row(1, "part1"), row(2, "part2")); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -369,7 +370,7 @@ public void testTrinoAlterTablePreservesGeneratedColumn() .containsOnly(row(1, 2, 3)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java index 7295c3d05a8b..3786adf4fe23 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java @@ -27,6 +27,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -62,7 +63,7 @@ public void testCheckConstraintCompatibility(String columnDefinition, String che .containsOnly(row(insertedValue), row(insertedValue)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -112,7 +113,7 @@ public void testCheckConstraintAcrossColumns() .containsOnly(row(1, 1)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -136,7 +137,7 @@ public void testWritesToTableWithCheckConstraintFails() .hasMessageContaining("Cannot merge into a table with check constraints"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -155,7 +156,7 @@ public void testMetadataOperationsRetainCheckConstraint() onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'example table comment'"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index 0efc3968b513..df3da7041265 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -32,6 +32,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -63,7 +64,7 @@ public void testColumnMappingModeNone() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -122,7 +123,7 @@ public void testColumnMappingMode(String mode) .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -158,7 +159,7 @@ public void testColumnMappingModeNameWithNonLowerCaseColumn(String mode) row(null, null, null, null, 2.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -217,7 +218,7 @@ public void testColumnMappingModeAddColumn(String mode) .containsOnly(row(1, null), row(2, null)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -250,7 +251,7 @@ public void testShowStatsFromJsonForColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -286,7 +287,7 @@ public void testShowStatsFromParquetForColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -319,7 +320,7 @@ public void testShowStatsOnPartitionedForColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -356,7 +357,7 @@ public void testUnsupportedOperationsColumnMappingModeName(String mode) .hasMessageContaining("This connector does not support dropping columns"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -387,7 +388,7 @@ public void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode) row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java index af598315a917..9efca06aefa1 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java @@ -28,6 +28,7 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -109,7 +110,7 @@ public void testUpdatePartitionedTableWithCdf() row("testValue5", 3, "partition3", "update_postimage", 4L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -136,7 +137,7 @@ public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled() row("testValue3", 5, "update_postimage", 2L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -168,7 +169,7 @@ public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdf row("testValue5", 3, "partition3", "update_postimage", 2L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -205,7 +206,7 @@ public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated() row("testValue4", 4, "partition2", "update_postimage", 3L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -256,7 +257,7 @@ public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated() row("testValue6", 6, "insert", 7L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -284,7 +285,7 @@ public void testDeleteFromTableWithCdf() row("testValue3", 3, "delete", 4L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -339,8 +340,8 @@ public void testMergeUpdateIntoTableWithCdfEnabled() row(20004, "nation2", 200, "update_postimage", 4)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName1); - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName2); + dropDeltaTableWithRetry("default." + tableName1); + dropDeltaTableWithRetry("default." + tableName2); } } @@ -393,8 +394,8 @@ public void testMergeDeleteIntoTableWithCdfEnabled() row(2, "nation2", 200, "delete", 4)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName1); - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName2); + dropDeltaTableWithRetry("default." + tableName1); + dropDeltaTableWithRetry("default." + tableName2); } } @@ -454,8 +455,8 @@ public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled() row(4, "pageUrl4", 400, "update_preimage", 5)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + targetTableName); - onDelta().executeQuery("DROP TABLE IF EXISTS default." + sourceTableName); + dropDeltaTableWithRetry("default." + targetTableName); + dropDeltaTableWithRetry("default." + sourceTableName); } } @@ -493,7 +494,7 @@ public void testDeleteFromNullPartitionWithCdfEnabled() row("testValue3", 3, null, "delete", 2L)); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java index ea0f8351d93c..30e8d719a978 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java @@ -45,6 +45,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -127,7 +128,7 @@ public void testDatabricksCanReadTrinoCheckpoint() } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -187,7 +188,7 @@ public void testTrinoUsesCheckpointInterval() } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -255,7 +256,7 @@ public void testDatabricksUsesCheckpointInterval() } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -340,7 +341,7 @@ private void testCheckpointMinMaxStatisticsForRowType(Consumer sqlExecut } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -405,7 +406,7 @@ private void testCheckpointNullStatisticsForRowType(Consumer sqlExecutor } finally { // cleanup - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -448,7 +449,7 @@ private void testWriteStatsAsJsonDisabled(Consumer sqlExecutor, String t row(null, null, null, null, 1.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -492,7 +493,7 @@ private void testWriteStatsAsStructDisabled(Consumer sqlExecutor, String row(null, null, null, null, null, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -550,7 +551,7 @@ private void testWriteStatsAsJsonEnabled(Consumer sqlExecutor, String ta row(null, null, null, null, 3.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -640,7 +641,7 @@ private void testWriteStatsAsStructEnabled(Consumer sqlExecutor, String row(null, null, null, null, 1.0, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java index bcc8ad4bc14f..af763f339fc1 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java @@ -43,6 +43,7 @@ import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -91,7 +92,7 @@ public void testPrestoTypesWithDatabricks() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -118,7 +119,7 @@ public void testPrestoTimestampsWithDatabricks() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -146,7 +147,7 @@ public void testPrestoCacheInvalidatedOnCreateTable() .map(QueryAssert.Row::new) .collect(toImmutableList())); - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); removeS3Directory(bucketName, "databricks-compatibility-test-" + tableName); assertThat(onTrino().executeQuery("CREATE TABLE delta.default.\"" + tableName + "\" " + @@ -168,7 +169,7 @@ public void testPrestoCacheInvalidatedOnCreateTable() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -223,7 +224,7 @@ public void testReplaceTableWithSchemaChange() assertThat(onTrino().executeQuery("SELECT to_iso8601(ts) FROM delta.default." + tableName)).containsOnly(expected.build()); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -252,7 +253,7 @@ public void testReplaceTableWithSchemaChangeOnCheckpoint() assertThat(onTrino().executeQuery("SELECT to_iso8601(ts) FROM delta.default." + tableName)).containsOnly(expected.build()); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java index e37e9fe32778..cfd1b3356739 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java @@ -31,6 +31,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_113_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnTrino; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; @@ -88,7 +89,7 @@ public void testDatabricksCanReadInitialCreateTable() testInsert(tableName, ImmutableList.of()); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -131,7 +132,7 @@ public void testDatabricksCanReadInitialCreatePartitionedTable() testInsert(tableName, ImmutableList.of()); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -176,7 +177,7 @@ public void testDatabricksCanReadInitialCreateTableAs() row(null, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -223,7 +224,7 @@ public void testDatabricksCanReadInitialCreatePartitionedTableAs() row(null, null, null))); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -308,7 +309,7 @@ public void testCreateTableWithColumnCommentOnDelta() assertEquals(getColumnCommentOnTrino("default", tableName, "col"), "test comment"); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index e9fcb82212d5..e50f21096e05 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -38,6 +38,7 @@ import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -87,7 +88,7 @@ public void testInsertCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -125,7 +126,7 @@ public void testPartitionedInsertCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -179,7 +180,7 @@ public void testDeltaPartitionedDifferentOrderInsertCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -217,7 +218,7 @@ public void testInsertNonLowercaseColumnsCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -257,7 +258,7 @@ public void testInsertNestedNonLowercaseColumnsCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -296,7 +297,7 @@ public void testPartitionedInsertNonLowercaseColumnsCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -324,7 +325,7 @@ public void testDeleteCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -429,7 +430,7 @@ public void testWritesToTableWithGeneratedColumnFails() .hasMessageContaining("Writing to tables with generated columns is not supported"); } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java index 6462d7cba82e..0d5260898a8e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksPartitioningCompatibility.java @@ -25,6 +25,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -79,7 +80,7 @@ private void testDatabricksCanReadFromCtasTableCreatedByTrinoWithSpecialCharacte assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -133,7 +134,7 @@ private void testTrinoCanReadFromCtasTableCreatedByDatabricksWithSpecialCharacte assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -186,7 +187,7 @@ private void testDatabricksCanReadTableCreatedByTrinoWithSpecialCharactersInPart assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -242,7 +243,7 @@ private void testTrinoCanReadTableCreatedByDatabricksWithSpecialCharactersInPart assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -295,7 +296,7 @@ private void testDatabricksCanReadFromTableUpdatedByTrinoWithCpIntervalSet(int i assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -351,7 +352,7 @@ private void testTrinoCanReadFromTableUpdatedByDatabricksWithCpIntervalSet(int i assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -381,7 +382,7 @@ public void testTrinoCanReadFromTablePartitionChangedByDatabricks() assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java index 3d726e3e9ed3..a7b3024fb335 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksUpdates.java @@ -30,6 +30,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -95,7 +96,7 @@ public void testUpdatesFromDatabricks() assertThat(databricksResult).containsExactly(toRows(prestoResult)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java index 3f090492c0db..badec3dd8a79 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDropTableCompatibility.java @@ -31,6 +31,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.hive.Engine.DELTA; import static io.trino.tests.product.hive.Engine.TRINO; import static io.trino.tests.product.utils.QueryExecutors.onDelta; @@ -129,7 +130,7 @@ private void testDropTableAccuracy(Engine creator, Engine dropper, boolean expli } } finally { - onDelta().executeQuery("DROP TABLE IF EXISTS " + schemaName + "." + tableName); + dropDeltaTableWithRetry(schemaName + "." + tableName); onDelta().executeQuery("DROP SCHEMA " + schemaName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java index aa27d9902598..19f7070ed50d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSelectCompatibility.java @@ -29,6 +29,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -92,7 +93,7 @@ public void testPartitionedSelectSpecialCharacters() .containsOnly(row(1, "spark=equal")); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java index f5be11a84ce4..d4017e295705 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java @@ -37,6 +37,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -89,7 +90,7 @@ public void testUpdateCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -119,7 +120,7 @@ public void testDeleteCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -149,7 +150,7 @@ public void testDeleteOnPartitionedTableCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -176,7 +177,7 @@ public void testDeleteOnPartitionKeyCompatibility() .containsOnly(expectedRows); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -265,7 +266,7 @@ public void testTrinoRespectsDatabricksSettingNonNullableColumn() .containsOnly(row(1, 2)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -320,7 +321,7 @@ public void testInsertingIntoDatabricksTableWithAddedNotNullConstraint() .containsOnly(row(1, 2)); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -369,7 +370,7 @@ private void testVacuumRemoveChangeDataFeedFiles(Consumer vacuumExecutor Assertions.assertThat(s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries()).hasSize(0); } finally { - onDelta().executeQuery("DROP TABLE default." + tableName); + dropDeltaTableWithRetry("default." + tableName); } } @@ -474,7 +475,7 @@ Stream rows() @Override public void close() { - onDelta().executeQuery("DROP TABLE default." + name); + dropDeltaTableWithRetry("default." + name); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java index 8985c3fe4be4..83284c5b37e2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java @@ -41,6 +41,7 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -65,7 +66,7 @@ public void testHiveToDeltaRedirect() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -107,7 +108,7 @@ public void testHiveToNonexistentDeltaCatalogRedirectFailure() .hasMessageMatching(".*Table 'hive.default.test_redirect_to_nonexistent_delta_.*' redirected to 'epsilon.default.test_redirect_to_nonexistent_delta_.*', but the target catalog 'epsilon' does not exist"); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -130,7 +131,7 @@ public void testHiveToDeltaRedirectWithDefaultSchemaInSession() .collect(toImmutableList())); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -148,7 +149,7 @@ public void testHiveToUnpartitionedDeltaPartitionsRedirectFailure() "but the target table 'delta.default.test_delta_lake_unpartitioned_table_.*\\$partitions' does not exist"); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -166,7 +167,7 @@ public void testHiveToPartitionedDeltaPartitionsRedirectFailure() "but the target table 'delta.default.test_delta_lake_partitioned_table_.*\\$partitions' does not exist"); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -359,7 +360,7 @@ public void testHiveToDeltaInsert() assertThat(onTrino().executeQuery(format("SELECT count(*) FROM hive.default.\"%s\"", tableName))).containsOnly(row(5)); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -402,7 +403,7 @@ public void testHiveToDeltaDescribe() .containsOnly(expectedResults); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -436,7 +437,7 @@ public void testHiveToDeltaShowCreateTable() .hasRowsCount(1); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -477,10 +478,10 @@ public void testHiveToDeltaAlterTable() try { onTrino().executeQuery("ALTER TABLE hive.default.\"" + tableName + "\" RENAME TO \"" + newTableName + "\""); - onDelta().executeQuery("DROP TABLE " + newTableName); + dropDeltaTableWithRetry(newTableName); } catch (QueryExecutionException e) { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); throw e; } } @@ -524,7 +525,7 @@ public void testHiveToDeltaCommentTable() assertTableComment("delta", "default", tableName).isEqualTo(tableComment); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -571,7 +572,7 @@ public void testHiveToDeltaCommentColumn() assertColumnComment("delta", "default", tableName, columnName).isEqualTo(columnComment); } finally { - onDelta().executeQuery("DROP TABLE " + tableName); + dropDeltaTableWithRetry(tableName); } } @@ -870,8 +871,8 @@ public void testViewReferencingHiveAndDeltaTable(boolean legacyHiveViewTranslati } finally { onDelta().executeQuery("DROP VIEW IF EXISTS " + viewName); - onDelta().executeQuery("DROP TABLE IF EXISTS " + deltaTableName); - onDelta().executeQuery("DROP TABLE IF EXISTS " + deltaRegionTableName); + dropDeltaTableWithRetry(deltaTableName); + dropDeltaTableWithRetry(deltaRegionTableName); onTrino().executeQuery("DROP TABLE IF EXISTS hive.default." + hiveTableName); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java index bb72c050e0fb..595f455c6664 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java @@ -13,9 +13,15 @@ */ package io.trino.tests.product.deltalake.util; +import com.amazonaws.services.glue.model.ConcurrentModificationException; +import com.google.common.base.Throwables; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import io.airlift.log.Logger; import io.trino.tempto.query.QueryResult; import org.intellij.lang.annotations.Language; +import java.time.temporal.ChronoUnit; import java.util.Optional; import static com.google.common.collect.MoreCollectors.onlyElement; @@ -25,10 +31,18 @@ public final class DeltaLakeTestUtils { + private static final Logger log = Logger.get(DeltaLakeTestUtils.class); + public static final String DATABRICKS_COMMUNICATION_FAILURE_ISSUE = "https://github.com/trinodb/trino/issues/14391"; @Language("RegExp") public static final String DATABRICKS_COMMUNICATION_FAILURE_MATCH = "\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown."; + private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() + .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) + .withBackoff(1, 10, ChronoUnit.SECONDS) + .withMaxRetries(3) + .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount())) + .build(); private DeltaLakeTestUtils() {} @@ -63,4 +77,13 @@ public static String getTableCommentOnDelta(String schemaName, String tableName) .map(row -> row.get(1)) .collect(onlyElement()); } + + /** + * Workaround method to avoid Table being modified concurrently error in Glue. + */ + public static QueryResult dropDeltaTableWithRetry(String tableName) + { + return Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) + .get(() -> onDelta().executeQuery("DROP TABLE IF EXISTS " + tableName)); + } }