diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java index 205c1fe02557..7f7d333072db 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java @@ -37,6 +37,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; @@ -165,7 +166,7 @@ private void deleteGen2Directory(AzureLocation location) DataLakeDirectoryClient directoryClient = createDirectoryClient(fileSystemClient, location.path()); if (directoryClient.exists()) { if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + location); + throw new TrinoFileSystemException("Location is not a directory: " + location); } directoryClient.deleteIfExistsWithResponse(deleteRecursiveOptions, null, null); } @@ -191,10 +192,10 @@ public void renameFile(Location source, Location target) AzureLocation sourceLocation = new AzureLocation(source); AzureLocation targetLocation = new AzureLocation(target); if (!sourceLocation.account().equals(targetLocation.account())) { - throw new IOException("Cannot rename across storage accounts"); + throw new TrinoFileSystemException("Cannot rename across storage accounts"); } if (!Objects.equals(sourceLocation.container(), targetLocation.container())) { - throw new IOException("Cannot rename across storage account containers"); + throw new TrinoFileSystemException("Cannot rename across storage account containers"); } // DFS rename file works with all storage types @@ -208,7 +209,7 @@ private void renameGen2File(AzureLocation source, AzureLocation target) DataLakeFileSystemClient fileSystemClient = createFileSystemClient(source); DataLakeFileClient dataLakeFileClient = createFileClient(fileSystemClient, source.path()); if (dataLakeFileClient.getProperties().isDirectory()) { - throw new IOException("Rename file from %s to %s, source is a directory".formatted(source, target)); + throw new TrinoFileSystemException("Rename file from %s to %s, source is a directory".formatted(source, target)); } createDirectoryIfNotExists(fileSystemClient, target.location().parentDirectory().path()); @@ -255,7 +256,7 @@ private FileIterator listGen2Files(AzureLocation location) return FileIterator.empty(); } if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + location); + throw new TrinoFileSystemException("Location is not a directory: " + location); } pathItems = directoryClient.listPaths(true, false, null, null); } @@ -315,7 +316,7 @@ public void createDirectory(Location location) DataLakeFileSystemClient fileSystemClient = createFileSystemClient(azureLocation); DataLakeDirectoryClient directoryClient = createDirectoryIfNotExists(fileSystemClient, azureLocation.path()); if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + azureLocation); + throw new TrinoFileSystemException("Location is not a directory: " + azureLocation); } } catch (RuntimeException e) { @@ -330,26 +331,26 @@ public void renameDirectory(Location source, Location target) AzureLocation sourceLocation = new AzureLocation(source); AzureLocation targetLocation = new AzureLocation(target); if (!sourceLocation.account().equals(targetLocation.account())) { - throw new IOException("Cannot rename across storage accounts"); + throw new TrinoFileSystemException("Cannot rename across storage accounts"); } if (!Objects.equals(sourceLocation.container(), targetLocation.container())) { - throw new IOException("Cannot rename across storage account containers"); + throw new TrinoFileSystemException("Cannot rename across storage account containers"); } if (!isHierarchicalNamespaceEnabled(sourceLocation)) { - throw new IOException("Azure non-hierarchical does not support directory renames"); + throw new TrinoFileSystemException("Azure non-hierarchical does not support directory renames"); } if (sourceLocation.path().isEmpty() || targetLocation.path().isEmpty()) { - throw new IOException("Cannot rename %s to %s".formatted(source, target)); + throw new TrinoFileSystemException("Cannot rename %s to %s".formatted(source, target)); } try { DataLakeFileSystemClient fileSystemClient = createFileSystemClient(sourceLocation); DataLakeDirectoryClient directoryClient = createDirectoryClient(fileSystemClient, sourceLocation.path()); if (!directoryClient.exists()) { - throw new IOException("Source directory does not exist: " + source); + throw new TrinoFileSystemException("Source directory does not exist: " + source); } if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Source is not a directory: " + source); + throw new TrinoFileSystemException("Source is not a directory: " + source); } directoryClient.rename(null, targetLocation.path()); } @@ -416,7 +417,7 @@ private Set listGen2Directories(AzureLocation location) return ImmutableSet.of(); } if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + location); + throw new TrinoFileSystemException("Location is not a directory: " + location); } pathItems = directoryClient.listPaths(false, false, null, null); } diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java index 2117af82671f..5de1aa7660b5 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java @@ -17,6 +17,7 @@ import com.azure.storage.blob.models.BlobErrorCode; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.file.datalake.models.DataLakeStorageException; +import io.trino.filesystem.TrinoFileSystemException; import java.io.FileNotFoundException; import java.io.IOException; @@ -32,7 +33,7 @@ public static IOException handleAzureException(RuntimeException exception, Strin throw withCause(new FileNotFoundException(location.toString()), exception); } if (exception instanceof AzureException) { - throw new IOException("Azure service error %s file: %s".formatted(action, location), exception); + throw new TrinoFileSystemException("Azure service error %s file: %s".formatted(action, location), exception); } throw new IOException("Error %s file: %s".formatted(action, location), exception); } diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java index d51839e8068e..48dba70d775a 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java @@ -27,6 +27,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; @@ -150,7 +151,7 @@ public void deleteDirectory(Location location) public void renameFile(Location source, Location target) throws IOException { - throw new IOException("GCS does not support renames"); + throw new TrinoFileSystemException("GCS does not support renames"); } @Override @@ -241,7 +242,7 @@ public void createDirectory(Location location) public void renameDirectory(Location source, Location target) throws IOException { - throw new IOException("GCS does not support directory renames"); + throw new TrinoFileSystemException("GCS does not support directory renames"); } @Override diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java index fe6d28c632dd..cc9f85cb61c5 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java @@ -13,11 +13,13 @@ */ package io.trino.filesystem.gcs; +import com.google.cloud.BaseServiceException; import com.google.cloud.ReadChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemException; import java.io.FileNotFoundException; import java.io.IOException; @@ -36,12 +38,18 @@ private GcsUtils() {} public static IOException handleGcsException(RuntimeException exception, String action, GcsLocation location) throws IOException { + if (exception instanceof BaseServiceException) { + throw new TrinoFileSystemException("GCS service error %s: %s".formatted(action, location), exception); + } throw new IOException("Error %s: %s".formatted(action, location), exception); } public static IOException handleGcsException(RuntimeException exception, String action, Collection locations) throws IOException { + if (exception instanceof BaseServiceException) { + throw new TrinoFileSystemException("GCS service error %s: %s".formatted(action, locations), exception); + } throw new IOException("Error %s: %s".formatted(action, locations), exception); } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index d3130af4fe8e..193e30b2a0e0 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -18,6 +18,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import software.amazon.awssdk.core.exception.SdkException; @@ -100,7 +101,7 @@ public void deleteFile(Location location) client.deleteObject(request); } catch (SdkException e) { - throw new IOException("Failed to delete file: " + location, e); + throw new TrinoFileSystemException("Failed to delete file: " + location, e); } } @@ -158,7 +159,7 @@ private void deleteObjects(Collection locations) } } catch (SdkException e) { - throw new IOException("Error while batch deleting files", e); + throw new TrinoFileSystemException("Error while batch deleting files", e); } } } @@ -206,7 +207,7 @@ private FileIterator listObjects(Location location, boolean includeDirectoryObje return new S3FileIterator(s3Location, s3ObjectStream.iterator()); } catch (SdkException e) { - throw new IOException("Failed to list location: " + location, e); + throw new TrinoFileSystemException("Failed to list location: " + location, e); } } @@ -262,7 +263,7 @@ public Set listDirectories(Location location) .collect(toImmutableSet()); } catch (SdkException e) { - throw new IOException("Failed to list location: " + location, e); + throw new TrinoFileSystemException("Failed to list location: " + location, e); } } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java index 9fa15c6003bd..fe8ac5a271d5 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java @@ -14,6 +14,7 @@ package io.trino.filesystem.s3; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoInputStream; @@ -130,7 +131,7 @@ private boolean headObject() return false; } catch (SdkException e) { - throw new IOException("S3 HEAD request failed for file: " + location, e); + throw new TrinoFileSystemException("S3 HEAD request failed for file: " + location, e); } } } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java index 5166710d2daf..288a574f81eb 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java @@ -14,6 +14,7 @@ package io.trino.filesystem.s3; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputStream; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.AbortedException; @@ -208,7 +209,7 @@ private void seekStream() throw ex; } catch (SdkException e) { - throw new IOException("Failed to open S3 file: " + location, e); + throw new TrinoFileSystemException("Failed to open S3 file: " + location, e); } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index 21c4b99423b0..39ecc9ba644d 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -13,6 +13,9 @@ */ package io.trino.filesystem; +import com.google.common.base.Throwables; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; import java.util.Optional; @@ -223,4 +226,20 @@ Set listDirectories(Location location) */ Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) throws IOException; + + /** + * Checks whether given exception is unrecoverable, so that further retries won't help + *

+ * By default, all third party (AWS, Azure, GCP) SDKs will retry appropriate exceptions + * (either client side IO errors, or 500/503), so there is no need to retry those additionally. + *

+ * If any custom retry behavior is needed, it is advised to change SDK's retry handlers, + * rather than introducing outer retry loop, which combined with SDKs default retries, + * could lead to prolonged, unnecessary retries + */ + static boolean isUnrecoverableException(Throwable throwable) + { + return Throwables.getCausalChain(throwable).stream() + .anyMatch(t -> t instanceof TrinoFileSystemException || t instanceof FileNotFoundException); + } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystemException.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystemException.java new file mode 100644 index 000000000000..30d09b429d98 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystemException.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem; + +import java.io.IOException; + +/** + * Unrecoverable file system exception. + * This exception is thrown for fatal errors, or after retries have already been performed, + * so additional retries must not be performed when this is caught. + */ +public class TrinoFileSystemException + extends IOException +{ + public TrinoFileSystemException(String message, Throwable cause) + { + super(message, cause); + } + + public TrinoFileSystemException(String message) + { + super(message); + } +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java index 97b94e556bae..b0c8e9711e47 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java @@ -113,7 +113,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List log.debug("Request failed, retrying: %s", event.getLastException())) - .abortOn(failure -> !BigQueryUtil.isRetryable(failure)) + .handleIf(BigQueryUtil::isRetryable) .build()) .get(() -> { try { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java deleted file mode 100644 index c44b3865ff07..000000000000 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.deltalake.metastore.glue; - -import com.google.common.collect.ImmutableSet; -import io.opentelemetry.api.OpenTelemetry; -import io.trino.Session; -import io.trino.plugin.deltalake.TestingDeltaLakePlugin; -import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; -import io.trino.plugin.hive.metastore.glue.GlueCache; -import io.trino.plugin.hive.metastore.glue.GlueContext; -import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore; -import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; -import io.trino.spi.TrinoException; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import software.amazon.awssdk.services.glue.GlueClient; -import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.nio.file.Path; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static com.google.common.reflect.Reflection.newProxy; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.TableKind.DELTA; -import static io.trino.plugin.hive.metastore.glue.GlueMetastoreModule.createGlueClient; -import static io.trino.testing.TestingNames.randomNameSuffix; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - -@TestInstance(PER_CLASS) -public class TestDeltaLakeConcurrentModificationGlueMetastore - extends AbstractTestQueryFramework -{ - private static final String CATALOG_NAME = "test_delta_lake_concurrent"; - private static final String SCHEMA = "test_delta_lake_glue_concurrent_" + randomNameSuffix(); - private Path dataDirectory; - private GlueHiveMetastore metastore; - private final AtomicBoolean failNextGlueDeleteTableCall = new AtomicBoolean(false); - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session deltaLakeSession = testSessionBuilder() - .setCatalog(CATALOG_NAME) - .setSchema(SCHEMA) - .build(); - - QueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build(); - - dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("data_delta_concurrent"); - GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() - .setDefaultWarehouseDir(dataDirectory.toUri().toString()); - - GlueClient glueClient = closeAfterClass(createGlueClient(new GlueHiveMetastoreConfig(), OpenTelemetry.noop())); - GlueClient proxiedGlueClient = newProxy(GlueClient.class, (proxy, method, args) -> { - Object result; - try { - if (method.getName().equals("deleteTable") && failNextGlueDeleteTableCall.get()) { - // Simulate concurrent modifications on the table that is about to be dropped - failNextGlueDeleteTableCall.set(false); - throw new TrinoException(HIVE_METASTORE_ERROR, ConcurrentModificationException.builder() - .message("Test-simulated metastore concurrent modification exception") - .build()); - } - result = method.invoke(glueClient, args); - } - catch (InvocationTargetException e) { - throw e.getCause(); - } - return result; - }); - - metastore = new GlueHiveMetastore( - proxiedGlueClient, - new GlueContext(glueConfig), - GlueCache.NOOP, - HDFS_FILE_SYSTEM_FACTORY, - glueConfig, - ImmutableSet.of(DELTA)); - - queryRunner.installPlugin(new TestingDeltaLakePlugin(dataDirectory, Optional.of(new TestingDeltaLakeMetastoreModule(metastore)))); - queryRunner.createCatalog(CATALOG_NAME, "delta_lake"); - queryRunner.execute("CREATE SCHEMA " + SCHEMA); - return queryRunner; - } - - @Test - public void testDropTableWithConcurrentModifications() - { - String tableName = "test_glue_table_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS data", 1); - - failNextGlueDeleteTableCall.set(true); - assertUpdate("DROP TABLE " + tableName); - assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); - } - - @AfterAll - public void cleanup() - throws IOException - { - if (metastore != null) { - metastore.dropDatabase(SCHEMA, false); - metastore.shutdown(); - deleteRecursively(dataDirectory, ALLOW_INSECURE); - } - } -} diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index 2463ffdf8070..dbf21135b2d9 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -276,6 +276,16 @@ regions + + software.amazon.awssdk + retries + + + + software.amazon.awssdk + retries-spi + + software.amazon.awssdk sdk-core @@ -609,6 +619,18 @@ + + + org.apache.maven.plugins + maven-dependency-plugin + + + + software.amazon.awssdk:retries + + + diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 124525254ec1..274641269dcc 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -164,6 +164,7 @@ public class SemiTransactionalHiveMetastore .withDelay(java.time.Duration.ofSeconds(1)) .withMaxDuration(java.time.Duration.ofSeconds(30)) .withMaxAttempts(3) + .abortOn(TrinoFileSystem::isUnrecoverableException) .build(); private static final Map ACID_OPERATION_ACTION_TYPES = ImmutableMap.of( diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index 65ba98816c06..95ea0c950764 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -14,15 +14,11 @@ package io.trino.plugin.hive.metastore.glue; import com.google.common.base.Strings; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.inject.Inject; -import dev.failsafe.Failsafe; -import dev.failsafe.FailsafeException; -import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; @@ -65,7 +61,6 @@ import software.amazon.awssdk.services.glue.model.BatchGetPartitionResponse; import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequestEntry; import software.amazon.awssdk.services.glue.model.ColumnStatistics; -import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; import software.amazon.awssdk.services.glue.model.CreateTableRequest; import software.amazon.awssdk.services.glue.model.DatabaseInput; import software.amazon.awssdk.services.glue.model.DeleteTableRequest; @@ -84,7 +79,6 @@ import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -101,6 +95,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -168,12 +163,6 @@ public class GlueHiveMetastore private static final int BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE = 100; private static final int AWS_GLUE_GET_FUNCTIONS_MAX_RESULTS = 100; - private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() - .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) - .withDelay(Duration.ofMillis(100)) - .withMaxRetries(3) - .build(); - private static final AtomicInteger poolCounter = new AtomicInteger(); private final GlueClient glueClient; @@ -525,8 +514,7 @@ public void dropTable(String databaseName, String tableName, boolean deleteData) .name(tableName) .build(); try { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .run(() -> stats.getDeleteTable().call(() -> glueClient.deleteTable(deleteTableRequest))); + stats.getDeleteTable().call(() -> glueClient.deleteTable(deleteTableRequest)); } catch (EntityNotFoundException e) { throw new TableNotFoundException(new SchemaTableName(databaseName, tableName)); @@ -563,34 +551,29 @@ public void replaceTable(String databaseName, String tableName, Table newTable, updateTable(databaseName, tableName, _ -> newTable); } - private void updateTable(String databaseName, String tableName, TableModifier modifier) + private void updateTable(String databaseName, String tableName, UnaryOperator modifier) { + Table existingTable = getTable(databaseName, tableName) + .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName))); + Table newTable = modifier.apply(existingTable); + if (!existingTable.getDatabaseName().equals(newTable.getDatabaseName()) || !existingTable.getTableName().equals(newTable.getTableName())) { + throw new TrinoException(NOT_SUPPORTED, "Update cannot be used to change rename a table"); + } + if (existingTable.getParameters().getOrDefault("table_type", "").equalsIgnoreCase("iceberg") && !Objects.equals( + existingTable.getParameters().get("metadata_location"), + newTable.getParameters().get("previous_metadata_location"))) { + throw new TrinoException(NOT_SUPPORTED, "Cannot update Iceberg table: supplied previous location does not match current location"); + } try { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY).run(() -> { - Table existingTable = getTable(databaseName, tableName) - .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName))); - Table newTable = modifier.modify(existingTable); - if (!existingTable.getDatabaseName().equals(newTable.getDatabaseName()) || !existingTable.getTableName().equals(newTable.getTableName())) { - throw new TrinoException(NOT_SUPPORTED, "Update cannot be used to change rename a table"); - } - if (existingTable.getParameters().getOrDefault("table_type", "").equalsIgnoreCase("iceberg") && !Objects.equals( - existingTable.getParameters().get("metadata_location"), - newTable.getParameters().get("previous_metadata_location"))) { - throw new TrinoException(NOT_SUPPORTED, "Cannot update Iceberg table: supplied previous location does not match current location"); - } - try { - stats.getUpdateTable().call(() -> glueClient.updateTable(builder -> builder - .applyMutation(glueContext::configureClient) - .databaseName(databaseName) - .tableInput(toGlueTableInput(newTable)))); - } - catch (EntityNotFoundException e) { - throw new TableNotFoundException(new SchemaTableName(databaseName, tableName)); - } - }); + stats.getUpdateTable().call(() -> glueClient.updateTable(builder -> builder + .applyMutation(glueContext::configureClient) + .databaseName(databaseName) + .tableInput(toGlueTableInput(newTable)))); } - catch (FailsafeException e) { - throwIfUnchecked(e.getCause()); + catch (EntityNotFoundException e) { + throw new TableNotFoundException(new SchemaTableName(databaseName, tableName)); + } + catch (SdkException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); } finally { @@ -598,12 +581,6 @@ private void updateTable(String databaseName, String tableName, TableModifier mo } } - private interface TableModifier - { - Table modify(Table table) - throws Exception; - } - @Override public void renameTable(String databaseName, String tableName, String newDatabaseName, String newTableName) { @@ -752,11 +729,9 @@ public void updateTableStatistics(String databaseName, String tableName, Optiona // first update the basic statistics on the table, which requires a read-modify-write cycle BasicTableStatisticsResult result; try { - result = Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .get(context -> updateBasicTableStatistics(databaseName, tableName, mode, statisticsUpdate, existingColumnStatistics)); + result = updateBasicTableStatistics(databaseName, tableName, mode, statisticsUpdate, existingColumnStatistics); } - catch (FailsafeException e) { - throwIfUnchecked(e.getCause()); + catch (SdkException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); } finally { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java index 771eea4a6313..c8b439385d28 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java @@ -38,6 +38,7 @@ import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.GlueClientBuilder; +import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.StsClientBuilder; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; @@ -98,7 +99,7 @@ public static GlueCache createGlueCache(CachingHiveMetastoreConfig config, Catal // Note: while we could skip CachingHiveMetastoreModule altogether on workers, we retain it so that catalog // configuration can remain identical for all nodes, making cluster configuration easier. boolean enabled = nodeManager.getCurrentNode().isCoordinator() && - (metadataCacheTtl.toMillis() > 0 || statsCacheTtl.toMillis() > 0); + (metadataCacheTtl.toMillis() > 0 || statsCacheTtl.toMillis() > 0); checkState(config.isPartitionCacheEnabled(), "Disabling partitions cache is not supported with Glue v2"); checkState(config.isCacheMissing(), "Disabling cache missing is not supported with Glue v2"); @@ -128,8 +129,9 @@ public static GlueClient createGlueClient(GlueHiveMetastoreConfig config, OpenTe .setCaptureExperimentalSpanAttributes(true) .setRecordIndividualHttpError(true) .build().newExecutionInterceptor()) - .retryPolicy(retry -> retry - .numRetries(config.getMaxGlueErrorRetries()))); + .retryStrategy(retryBuilder -> retryBuilder + .retryOnException(throwable -> throwable instanceof ConcurrentModificationException) + .maxAttempts(config.getMaxGlueErrorRetries()))); Optional staticCredentialsProvider = getStaticCredentialsProvider(config); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java index 9b3cb7e1092f..9a795f8cb765 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java @@ -18,8 +18,12 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.metrics.RequestMetricCollector; +import com.amazonaws.retry.PredefinedRetryPolicies; +import com.amazonaws.retry.RetryPolicy; +import com.amazonaws.retry.RetryPolicy.RetryCondition; import com.amazonaws.services.glue.AWSGlueAsync; import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.amazonaws.services.glue.model.ConcurrentModificationException; import java.util.Set; @@ -36,9 +40,23 @@ public static AWSGlueAsync createAsyncGlueClient( Set requestHandlers, RequestMetricCollector metricsCollector) { + RetryPolicy defaultRetryPolicy = PredefinedRetryPolicies.getDefaultRetryPolicy(); + + RetryCondition customRetryCondition = (requestContext, exception, retriesAttempted) -> + defaultRetryPolicy.getRetryCondition().shouldRetry(requestContext, exception, retriesAttempted) + || exception instanceof ConcurrentModificationException; + + RetryPolicy glueRetryPolicy = RetryPolicy.builder() + .withRetryMode(defaultRetryPolicy.getRetryMode()) + .withRetryCondition(customRetryCondition) + .withBackoffStrategy(defaultRetryPolicy.getBackoffStrategy()) + .withFastFailRateLimiting(defaultRetryPolicy.isFastFailRateLimiting()) + .withMaxErrorRetry(config.getMaxGlueErrorRetries()) + .build(); + ClientConfiguration clientConfig = new ClientConfiguration() .withMaxConnections(config.getMaxGlueConnections()) - .withMaxErrorRetry(config.getMaxGlueErrorRetries()); + .withRetryPolicy(glueRetryPolicy); AWSGlueAsyncClientBuilder asyncGlueClientBuilder = AWSGlueAsyncClientBuilder.standard() .withMetricsCollector(metricsCollector) .withClientConfiguration(clientConfig); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java index eb7de260bbbe..62d56342329d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java @@ -26,7 +26,6 @@ import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest; import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry; import com.amazonaws.services.glue.model.BatchUpdatePartitionResult; -import com.amazonaws.services.glue.model.ConcurrentModificationException; import com.amazonaws.services.glue.model.CreateDatabaseRequest; import com.amazonaws.services.glue.model.CreateTableRequest; import com.amazonaws.services.glue.model.CreateUserDefinedFunctionRequest; @@ -63,15 +62,12 @@ import com.amazonaws.services.glue.model.UpdateUserDefinedFunctionRequest; import com.amazonaws.services.glue.model.UserDefinedFunctionInput; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.Inject; -import dev.failsafe.Failsafe; -import dev.failsafe.RetryPolicy; import io.airlift.concurrent.MoreFutures; import io.airlift.log.Logger; import io.trino.filesystem.Location; @@ -116,7 +112,6 @@ import org.weakref.jmx.Managed; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -187,11 +182,6 @@ public class GlueHiveMetastore private static final int AWS_GLUE_GET_FUNCTIONS_MAX_RESULTS = 100; private static final int AWS_GLUE_GET_TABLES_MAX_RESULTS = 100; private static final Comparator> PARTITION_VALUE_COMPARATOR = lexicographical(String.CASE_INSENSITIVE_ORDER); - private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() - .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) - .withDelay(Duration.ofMillis(100)) - .withMaxRetries(3) - .build(); private final TrinoFileSystem fileSystem; private final AWSGlueAsync glueClient; @@ -450,9 +440,7 @@ public void dropTable(String databaseName, String tableName, boolean deleteData) .withDatabaseName(databaseName) .withName(tableName); try { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .run(() -> stats.getDeleteTable().call(() -> - glueClient.deleteTable(deleteTableRequest))); + stats.getDeleteTable().call(() -> glueClient.deleteTable(deleteTableRequest)); } catch (AmazonServiceException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); @@ -596,12 +584,6 @@ public Map getTableColumnStatistics(String databas @Override public void updateTableStatistics(String databaseName, String tableName, OptionalLong acidWriteId, StatisticsUpdateMode mode, PartitionStatistics statisticsUpdate) - { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .run(() -> updateTableStatisticsInternal(databaseName, tableName, acidWriteId, mode, statisticsUpdate)); - } - - private void updateTableStatisticsInternal(String databaseName, String tableName, OptionalLong acidWriteId, StatisticsUpdateMode mode, PartitionStatistics statisticsUpdate) { Table table = getExistingTable(databaseName, tableName); if (acidWriteId.isPresent()) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java index c5c7d58574b5..82a915e109fc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java @@ -14,133 +14,53 @@ package io.trino.plugin.hive.metastore.glue; import io.opentelemetry.api.OpenTelemetry; -import io.trino.Session; -import io.trino.plugin.hive.TestingHivePlugin; -import io.trino.spi.TrinoException; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.retries.api.RefreshRetryTokenRequest; +import software.amazon.awssdk.retries.api.RefreshRetryTokenResponse; +import software.amazon.awssdk.retries.api.RetryStrategy; +import software.amazon.awssdk.retries.internal.DefaultRetryToken; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.nio.file.Path; -import java.util.EnumSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static com.google.common.reflect.Reflection.newProxy; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.metastore.glue.GlueMetastoreModule.createGlueClient; -import static io.trino.testing.TestingNames.randomNameSuffix; -import static io.trino.testing.TestingSession.testSessionBuilder; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @TestInstance(PER_CLASS) public class TestHiveConcurrentModificationGlueMetastore - extends AbstractTestQueryFramework { - private static final String CATALOG_NAME = "test_hive_concurrent"; - private static final String SCHEMA = "test_hive_glue_concurrent_" + randomNameSuffix(); - private Path dataDirectory; - private GlueHiveMetastore metastore; - private final AtomicBoolean failNextGlueUpdateTableCall = new AtomicBoolean(false); - private final AtomicInteger updateTableCallsCounter = new AtomicInteger(); - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session deltaLakeSession = testSessionBuilder() - .setCatalog(CATALOG_NAME) - .setSchema(SCHEMA) - .build(); - - QueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build(); - - dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("data_hive_concurrent"); - GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() - .setDefaultWarehouseDir(dataDirectory.toUri().toString()); - - GlueClient glueClient = closeAfterClass(createGlueClient(new GlueHiveMetastoreConfig(), OpenTelemetry.noop())); - GlueClient proxiedGlueClient = newProxy(GlueClient.class, (proxy, method, args) -> { - try { - if (method.getName().equals("updateTable")) { - updateTableCallsCounter.incrementAndGet(); - if (failNextGlueUpdateTableCall.get()) { - // Simulate concurrent modifications on the table that is about to be dropped - failNextGlueUpdateTableCall.set(false); - throw new TrinoException(HIVE_METASTORE_ERROR, ConcurrentModificationException.builder() - .message("Test-simulated metastore concurrent modification exception") - .build()); - } - } - return method.invoke(glueClient, args); - } - catch (InvocationTargetException e) { - throw e.getCause(); - } - }); - - metastore = new GlueHiveMetastore( - proxiedGlueClient, - new GlueContext(glueConfig), - GlueCache.NOOP, - HDFS_FILE_SYSTEM_FACTORY, - glueConfig, - EnumSet.allOf(GlueHiveMetastore.TableKind.class)); - - queryRunner.installPlugin(new TestingHivePlugin(queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data"), metastore)); - queryRunner.createCatalog(CATALOG_NAME, "hive"); - queryRunner.execute("CREATE SCHEMA " + SCHEMA); - return queryRunner; - } - @Test - public void testUpdateTableStatsWithConcurrentModifications() + public void testGlueClientShouldRetryConcurrentModificationException() { - String tableName = "test_glue_table_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS data", 1); - - assertQuery( - "SHOW STATS FOR " + tableName, - "VALUES " + - " ('data', null, 1.0, 0.0, null, 1, 1), " + - " (null, null, null, null, 1.0, null, null)"); - - failNextGlueUpdateTableCall.set(true); - resetCounters(); - assertUpdate("INSERT INTO " + tableName + " VALUES 2", 1); - assertThat(updateTableCallsCounter.get()).isEqualTo(2); - assertQuery("SELECT * FROM " + tableName, "VALUES 1, 2"); - assertQuery( - "SHOW STATS FOR " + tableName, - "VALUES " + - " ('data', null, 1.0, 0.0, null, 1, 2), " + - " (null, null, null, null, 2.0, null, null)"); - } + try (GlueClient glueClient = createGlueClient(new GlueHiveMetastoreConfig(), OpenTelemetry.noop())) { + ClientOverrideConfiguration clientOverrideConfiguration = glueClient.serviceClientConfiguration().overrideConfiguration(); + RetryStrategy retryStrategy = clientOverrideConfiguration.retryStrategy().orElseThrow(); - private void resetCounters() - { - updateTableCallsCounter.set(0); - } + assertThatThrownBy(() -> retryStrategy.refreshRetryToken( + RefreshRetryTokenRequest.builder() + .token(DefaultRetryToken.builder().scope("test").build()) + .failure(new RuntimeException("This is not retryable exception so it should fail")) + .build())) + .hasMessage("Request attempt 1 encountered non-retryable failure"); - @AfterAll - public void cleanup() - throws IOException - { - if (metastore != null) { - metastore.dropDatabase(SCHEMA, false); - metastore.shutdown(); - deleteRecursively(dataDirectory, ALLOW_INSECURE); + RefreshRetryTokenResponse refreshRetryTokenResponse = retryStrategy.refreshRetryToken( + RefreshRetryTokenRequest.builder() + .token(DefaultRetryToken.builder().scope("test").build()) + .failure( + ConcurrentModificationException.builder() + .awsErrorDetails(AwsErrorDetails.builder() + // taken from software.amazon.awssdk.services.glue.DefaultGlueClient and + // software.amazon.awssdk.services.glue.DefaultGlueAsyncClient + .errorCode("ConcurrentModificationException") + .build()) + .message("Test-simulated metastore concurrent modification exception that should be allowed to retry") + .build()) + .build()); + assertThat(refreshRetryTokenResponse).isNotNull(); } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 91b2b6c1bd95..0722b5441ac0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -17,6 +17,7 @@ import dev.failsafe.RetryPolicy; import io.trino.annotation.NotThreadSafe; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.metastore.Column; import io.trino.metastore.HiveType; import io.trino.metastore.StorageFormat; @@ -257,10 +258,11 @@ protected void refreshFromMetadataLocation(String newLocation, Function failure instanceof ValidationException || isNotFoundException(failure)) + .withMaxDuration(Duration.ofMinutes(3)) + .handleIf(failure -> !(failure instanceof ValidationException) && !isNotFoundException(failure)) + .abortOn(TrinoFileSystem::isUnrecoverableException) .build()) .get(() -> metadataLoader.apply(newLocation)); } @@ -290,8 +292,8 @@ private static boolean isNotFoundException(Throwable failure) { // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException return failure instanceof org.apache.iceberg.exceptions.NotFoundException || - // This is used in context where the code cannot throw a checked exception, so FileNotFoundException would need to be wrapped - failure.getCause() instanceof FileNotFoundException; + // This is used in context where the code cannot throw a checked exception, so FileNotFoundException would need to be wrapped + failure.getCause() instanceof FileNotFoundException; } protected static String newTableMetadataFilePath(TableMetadata meta, int newVersion) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index edc660efbb61..2b257e085a8e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -14,8 +14,6 @@ package io.trino.plugin.iceberg.catalog; import com.google.common.collect.ImmutableMap; -import dev.failsafe.Failsafe; -import dev.failsafe.RetryPolicy; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -61,15 +59,12 @@ import org.apache.iceberg.types.Types; import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Stream; -import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.metastore.Table.TABLE_COMMENT; @@ -193,19 +188,7 @@ public Map getViews(ConnectorSession s @Override public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) { - try { - return Failsafe.with(RetryPolicy.builder() - .withMaxAttempts(10) - .withBackoff(1, 5_000, ChronoUnit.MILLIS, 4) - .withMaxDuration(Duration.ofSeconds(30)) - .abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException)) - .build()) - .get(() -> doGetMaterializedView(session, schemaViewName)); - } - catch (MaterializedViewMayBeBeingRemovedException e) { - throwIfUnchecked(e.getCause()); - throw new RuntimeException(e.getCause()); - } + return doGetMaterializedView(session, schemaViewName); } protected abstract Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName); @@ -516,13 +499,4 @@ protected Map createMaterializedViewProperties(ConnectorSession } protected abstract void invalidateTableCache(SchemaTableName schemaTableName); - - protected static class MaterializedViewMayBeBeingRemovedException - extends RuntimeException - { - public MaterializedViewMayBeBeingRemovedException(Throwable cause) - { - super(requireNonNull(cause, "cause is null")); - } - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 79ec1a1d2258..65f762309ef7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -939,7 +939,8 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, Failsafe.with(RetryPolicy.builder() .withMaxRetries(3) .withDelay(Duration.ofMillis(100)) - .abortIf(throwable -> !replace || throwable instanceof ViewAlreadyExistsException) + .handleIf(throwable -> replace && !(throwable instanceof ViewAlreadyExistsException)) + .abortOn(TrinoFileSystem::isUnrecoverableException) .build()) .run(() -> doCreateView(session, schemaViewName, viewTableInput, replace)); }