From e18c07e36949a3c7628d16090c8fa3ebc01aa176 Mon Sep 17 00:00:00 2001 From: Oskar Szwajkowski Date: Thu, 25 Jul 2024 14:56:37 +0200 Subject: [PATCH 1/5] Introduce TrinoFileSystemException Utilize it in filesystems to mark operations that are terminal and shouldn't be retried --- .../filesystem/azure/AzureFileSystem.java | 27 +++++++------- .../io/trino/filesystem/azure/AzureUtils.java | 3 +- .../trino/filesystem/gcs/GcsFileSystem.java | 5 +-- .../io/trino/filesystem/gcs/GcsUtils.java | 8 +++++ .../io/trino/filesystem/s3/S3FileSystem.java | 9 ++--- .../io/trino/filesystem/s3/S3InputFile.java | 3 +- .../io/trino/filesystem/s3/S3InputStream.java | 3 +- .../io/trino/filesystem/TrinoFileSystem.java | 19 ++++++++++ .../filesystem/TrinoFileSystemException.java | 35 +++++++++++++++++++ .../SemiTransactionalHiveMetastore.java | 1 + 10 files changed, 91 insertions(+), 22 deletions(-) create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystemException.java diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java index 205c1fe02557..7f7d333072db 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java @@ -37,6 +37,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; @@ -165,7 +166,7 @@ private void deleteGen2Directory(AzureLocation location) DataLakeDirectoryClient directoryClient = createDirectoryClient(fileSystemClient, location.path()); if (directoryClient.exists()) { if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + location); + throw new TrinoFileSystemException("Location is not a directory: " + location); } directoryClient.deleteIfExistsWithResponse(deleteRecursiveOptions, null, null); } @@ -191,10 +192,10 @@ public void renameFile(Location source, Location target) AzureLocation sourceLocation = new AzureLocation(source); AzureLocation targetLocation = new AzureLocation(target); if (!sourceLocation.account().equals(targetLocation.account())) { - throw new IOException("Cannot rename across storage accounts"); + throw new TrinoFileSystemException("Cannot rename across storage accounts"); } if (!Objects.equals(sourceLocation.container(), targetLocation.container())) { - throw new IOException("Cannot rename across storage account containers"); + throw new TrinoFileSystemException("Cannot rename across storage account containers"); } // DFS rename file works with all storage types @@ -208,7 +209,7 @@ private void renameGen2File(AzureLocation source, AzureLocation target) DataLakeFileSystemClient fileSystemClient = createFileSystemClient(source); DataLakeFileClient dataLakeFileClient = createFileClient(fileSystemClient, source.path()); if (dataLakeFileClient.getProperties().isDirectory()) { - throw new IOException("Rename file from %s to %s, source is a directory".formatted(source, target)); + throw new TrinoFileSystemException("Rename file from %s to %s, source is a directory".formatted(source, target)); } createDirectoryIfNotExists(fileSystemClient, target.location().parentDirectory().path()); @@ -255,7 +256,7 @@ private FileIterator listGen2Files(AzureLocation location) return FileIterator.empty(); } if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + location); + throw new TrinoFileSystemException("Location is not a directory: " + location); } pathItems = directoryClient.listPaths(true, false, null, null); } @@ -315,7 +316,7 @@ public void createDirectory(Location location) DataLakeFileSystemClient fileSystemClient = createFileSystemClient(azureLocation); DataLakeDirectoryClient directoryClient = createDirectoryIfNotExists(fileSystemClient, azureLocation.path()); if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + azureLocation); + throw new TrinoFileSystemException("Location is not a directory: " + azureLocation); } } catch (RuntimeException e) { @@ -330,26 +331,26 @@ public void renameDirectory(Location source, Location target) AzureLocation sourceLocation = new AzureLocation(source); AzureLocation targetLocation = new AzureLocation(target); if (!sourceLocation.account().equals(targetLocation.account())) { - throw new IOException("Cannot rename across storage accounts"); + throw new TrinoFileSystemException("Cannot rename across storage accounts"); } if (!Objects.equals(sourceLocation.container(), targetLocation.container())) { - throw new IOException("Cannot rename across storage account containers"); + throw new TrinoFileSystemException("Cannot rename across storage account containers"); } if (!isHierarchicalNamespaceEnabled(sourceLocation)) { - throw new IOException("Azure non-hierarchical does not support directory renames"); + throw new TrinoFileSystemException("Azure non-hierarchical does not support directory renames"); } if (sourceLocation.path().isEmpty() || targetLocation.path().isEmpty()) { - throw new IOException("Cannot rename %s to %s".formatted(source, target)); + throw new TrinoFileSystemException("Cannot rename %s to %s".formatted(source, target)); } try { DataLakeFileSystemClient fileSystemClient = createFileSystemClient(sourceLocation); DataLakeDirectoryClient directoryClient = createDirectoryClient(fileSystemClient, sourceLocation.path()); if (!directoryClient.exists()) { - throw new IOException("Source directory does not exist: " + source); + throw new TrinoFileSystemException("Source directory does not exist: " + source); } if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Source is not a directory: " + source); + throw new TrinoFileSystemException("Source is not a directory: " + source); } directoryClient.rename(null, targetLocation.path()); } @@ -416,7 +417,7 @@ private Set listGen2Directories(AzureLocation location) return ImmutableSet.of(); } if (!directoryClient.getProperties().isDirectory()) { - throw new IOException("Location is not a directory: " + location); + throw new TrinoFileSystemException("Location is not a directory: " + location); } pathItems = directoryClient.listPaths(false, false, null, null); } diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java index 2117af82671f..5de1aa7660b5 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java @@ -17,6 +17,7 @@ import com.azure.storage.blob.models.BlobErrorCode; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.file.datalake.models.DataLakeStorageException; +import io.trino.filesystem.TrinoFileSystemException; import java.io.FileNotFoundException; import java.io.IOException; @@ -32,7 +33,7 @@ public static IOException handleAzureException(RuntimeException exception, Strin throw withCause(new FileNotFoundException(location.toString()), exception); } if (exception instanceof AzureException) { - throw new IOException("Azure service error %s file: %s".formatted(action, location), exception); + throw new TrinoFileSystemException("Azure service error %s file: %s".formatted(action, location), exception); } throw new IOException("Error %s file: %s".formatted(action, location), exception); } diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java index d51839e8068e..48dba70d775a 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java @@ -27,6 +27,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; @@ -150,7 +151,7 @@ public void deleteDirectory(Location location) public void renameFile(Location source, Location target) throws IOException { - throw new IOException("GCS does not support renames"); + throw new TrinoFileSystemException("GCS does not support renames"); } @Override @@ -241,7 +242,7 @@ public void createDirectory(Location location) public void renameDirectory(Location source, Location target) throws IOException { - throw new IOException("GCS does not support directory renames"); + throw new TrinoFileSystemException("GCS does not support directory renames"); } @Override diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java index fe6d28c632dd..cc9f85cb61c5 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java @@ -13,11 +13,13 @@ */ package io.trino.filesystem.gcs; +import com.google.cloud.BaseServiceException; import com.google.cloud.ReadChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemException; import java.io.FileNotFoundException; import java.io.IOException; @@ -36,12 +38,18 @@ private GcsUtils() {} public static IOException handleGcsException(RuntimeException exception, String action, GcsLocation location) throws IOException { + if (exception instanceof BaseServiceException) { + throw new TrinoFileSystemException("GCS service error %s: %s".formatted(action, location), exception); + } throw new IOException("Error %s: %s".formatted(action, location), exception); } public static IOException handleGcsException(RuntimeException exception, String action, Collection locations) throws IOException { + if (exception instanceof BaseServiceException) { + throw new TrinoFileSystemException("GCS service error %s: %s".formatted(action, locations), exception); + } throw new IOException("Error %s: %s".formatted(action, locations), exception); } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index d3130af4fe8e..193e30b2a0e0 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -18,6 +18,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import software.amazon.awssdk.core.exception.SdkException; @@ -100,7 +101,7 @@ public void deleteFile(Location location) client.deleteObject(request); } catch (SdkException e) { - throw new IOException("Failed to delete file: " + location, e); + throw new TrinoFileSystemException("Failed to delete file: " + location, e); } } @@ -158,7 +159,7 @@ private void deleteObjects(Collection locations) } } catch (SdkException e) { - throw new IOException("Error while batch deleting files", e); + throw new TrinoFileSystemException("Error while batch deleting files", e); } } } @@ -206,7 +207,7 @@ private FileIterator listObjects(Location location, boolean includeDirectoryObje return new S3FileIterator(s3Location, s3ObjectStream.iterator()); } catch (SdkException e) { - throw new IOException("Failed to list location: " + location, e); + throw new TrinoFileSystemException("Failed to list location: " + location, e); } } @@ -262,7 +263,7 @@ public Set listDirectories(Location location) .collect(toImmutableSet()); } catch (SdkException e) { - throw new IOException("Failed to list location: " + location, e); + throw new TrinoFileSystemException("Failed to list location: " + location, e); } } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java index 9fa15c6003bd..fe8ac5a271d5 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java @@ -14,6 +14,7 @@ package io.trino.filesystem.s3; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoInputStream; @@ -130,7 +131,7 @@ private boolean headObject() return false; } catch (SdkException e) { - throw new IOException("S3 HEAD request failed for file: " + location, e); + throw new TrinoFileSystemException("S3 HEAD request failed for file: " + location, e); } } } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java index 5166710d2daf..288a574f81eb 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputStream.java @@ -14,6 +14,7 @@ package io.trino.filesystem.s3; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemException; import io.trino.filesystem.TrinoInputStream; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.AbortedException; @@ -208,7 +209,7 @@ private void seekStream() throw ex; } catch (SdkException e) { - throw new IOException("Failed to open S3 file: " + location, e); + throw new TrinoFileSystemException("Failed to open S3 file: " + location, e); } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index 21c4b99423b0..39ecc9ba644d 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -13,6 +13,9 @@ */ package io.trino.filesystem; +import com.google.common.base.Throwables; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; import java.util.Optional; @@ -223,4 +226,20 @@ Set listDirectories(Location location) */ Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) throws IOException; + + /** + * Checks whether given exception is unrecoverable, so that further retries won't help + *

+ * By default, all third party (AWS, Azure, GCP) SDKs will retry appropriate exceptions + * (either client side IO errors, or 500/503), so there is no need to retry those additionally. + *

+ * If any custom retry behavior is needed, it is advised to change SDK's retry handlers, + * rather than introducing outer retry loop, which combined with SDKs default retries, + * could lead to prolonged, unnecessary retries + */ + static boolean isUnrecoverableException(Throwable throwable) + { + return Throwables.getCausalChain(throwable).stream() + .anyMatch(t -> t instanceof TrinoFileSystemException || t instanceof FileNotFoundException); + } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystemException.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystemException.java new file mode 100644 index 000000000000..30d09b429d98 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystemException.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem; + +import java.io.IOException; + +/** + * Unrecoverable file system exception. + * This exception is thrown for fatal errors, or after retries have already been performed, + * so additional retries must not be performed when this is caught. + */ +public class TrinoFileSystemException + extends IOException +{ + public TrinoFileSystemException(String message, Throwable cause) + { + super(message, cause); + } + + public TrinoFileSystemException(String message) + { + super(message); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 124525254ec1..274641269dcc 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -164,6 +164,7 @@ public class SemiTransactionalHiveMetastore .withDelay(java.time.Duration.ofSeconds(1)) .withMaxDuration(java.time.Duration.ofSeconds(30)) .withMaxAttempts(3) + .abortOn(TrinoFileSystem::isUnrecoverableException) .build(); private static final Map ACID_OPERATION_ACTION_TYPES = ImmutableMap.of( From c9a9637c99afae7a73708d9b90a6c600a9998425 Mon Sep 17 00:00:00 2001 From: Oskar Szwajkowski Date: Thu, 25 Jul 2024 14:58:36 +0200 Subject: [PATCH 2/5] Do not rely on Failsafe in glue metastore Add retries to Glue v1 and v2 client for ConcurrentModificationException instead of relying on custom retries with Failsafe Change tests related to ConcurrentModificationException as those are now handled by glue client internally Instead of proxying client to throw this exception, check if glue client's retry policy is able to retry on this exception --- ...keConcurrentModificationGlueMetastore.java | 132 ---------------- plugin/trino-hive/pom.xml | 22 +++ .../metastore/glue/GlueHiveMetastore.java | 73 +++------ .../metastore/glue/GlueMetastoreModule.java | 8 +- .../metastore/glue/v1/GlueClientUtil.java | 20 ++- .../metastore/glue/v1/GlueHiveMetastore.java | 20 +-- ...veConcurrentModificationGlueMetastore.java | 142 ++++-------------- 7 files changed, 102 insertions(+), 315 deletions(-) delete mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java deleted file mode 100644 index c44b3865ff07..000000000000 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.deltalake.metastore.glue; - -import com.google.common.collect.ImmutableSet; -import io.opentelemetry.api.OpenTelemetry; -import io.trino.Session; -import io.trino.plugin.deltalake.TestingDeltaLakePlugin; -import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; -import io.trino.plugin.hive.metastore.glue.GlueCache; -import io.trino.plugin.hive.metastore.glue.GlueContext; -import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore; -import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; -import io.trino.spi.TrinoException; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import software.amazon.awssdk.services.glue.GlueClient; -import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.nio.file.Path; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static com.google.common.reflect.Reflection.newProxy; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.TableKind.DELTA; -import static io.trino.plugin.hive.metastore.glue.GlueMetastoreModule.createGlueClient; -import static io.trino.testing.TestingNames.randomNameSuffix; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - -@TestInstance(PER_CLASS) -public class TestDeltaLakeConcurrentModificationGlueMetastore - extends AbstractTestQueryFramework -{ - private static final String CATALOG_NAME = "test_delta_lake_concurrent"; - private static final String SCHEMA = "test_delta_lake_glue_concurrent_" + randomNameSuffix(); - private Path dataDirectory; - private GlueHiveMetastore metastore; - private final AtomicBoolean failNextGlueDeleteTableCall = new AtomicBoolean(false); - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session deltaLakeSession = testSessionBuilder() - .setCatalog(CATALOG_NAME) - .setSchema(SCHEMA) - .build(); - - QueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build(); - - dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("data_delta_concurrent"); - GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() - .setDefaultWarehouseDir(dataDirectory.toUri().toString()); - - GlueClient glueClient = closeAfterClass(createGlueClient(new GlueHiveMetastoreConfig(), OpenTelemetry.noop())); - GlueClient proxiedGlueClient = newProxy(GlueClient.class, (proxy, method, args) -> { - Object result; - try { - if (method.getName().equals("deleteTable") && failNextGlueDeleteTableCall.get()) { - // Simulate concurrent modifications on the table that is about to be dropped - failNextGlueDeleteTableCall.set(false); - throw new TrinoException(HIVE_METASTORE_ERROR, ConcurrentModificationException.builder() - .message("Test-simulated metastore concurrent modification exception") - .build()); - } - result = method.invoke(glueClient, args); - } - catch (InvocationTargetException e) { - throw e.getCause(); - } - return result; - }); - - metastore = new GlueHiveMetastore( - proxiedGlueClient, - new GlueContext(glueConfig), - GlueCache.NOOP, - HDFS_FILE_SYSTEM_FACTORY, - glueConfig, - ImmutableSet.of(DELTA)); - - queryRunner.installPlugin(new TestingDeltaLakePlugin(dataDirectory, Optional.of(new TestingDeltaLakeMetastoreModule(metastore)))); - queryRunner.createCatalog(CATALOG_NAME, "delta_lake"); - queryRunner.execute("CREATE SCHEMA " + SCHEMA); - return queryRunner; - } - - @Test - public void testDropTableWithConcurrentModifications() - { - String tableName = "test_glue_table_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS data", 1); - - failNextGlueDeleteTableCall.set(true); - assertUpdate("DROP TABLE " + tableName); - assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); - } - - @AfterAll - public void cleanup() - throws IOException - { - if (metastore != null) { - metastore.dropDatabase(SCHEMA, false); - metastore.shutdown(); - deleteRecursively(dataDirectory, ALLOW_INSECURE); - } - } -} diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index 2463ffdf8070..dbf21135b2d9 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -276,6 +276,16 @@ regions + + software.amazon.awssdk + retries + + + + software.amazon.awssdk + retries-spi + + software.amazon.awssdk sdk-core @@ -609,6 +619,18 @@ + + + org.apache.maven.plugins + maven-dependency-plugin + + + + software.amazon.awssdk:retries + + + diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index 65ba98816c06..95ea0c950764 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -14,15 +14,11 @@ package io.trino.plugin.hive.metastore.glue; import com.google.common.base.Strings; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.inject.Inject; -import dev.failsafe.Failsafe; -import dev.failsafe.FailsafeException; -import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; @@ -65,7 +61,6 @@ import software.amazon.awssdk.services.glue.model.BatchGetPartitionResponse; import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequestEntry; import software.amazon.awssdk.services.glue.model.ColumnStatistics; -import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; import software.amazon.awssdk.services.glue.model.CreateTableRequest; import software.amazon.awssdk.services.glue.model.DatabaseInput; import software.amazon.awssdk.services.glue.model.DeleteTableRequest; @@ -84,7 +79,6 @@ import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -101,6 +95,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -168,12 +163,6 @@ public class GlueHiveMetastore private static final int BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE = 100; private static final int AWS_GLUE_GET_FUNCTIONS_MAX_RESULTS = 100; - private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() - .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) - .withDelay(Duration.ofMillis(100)) - .withMaxRetries(3) - .build(); - private static final AtomicInteger poolCounter = new AtomicInteger(); private final GlueClient glueClient; @@ -525,8 +514,7 @@ public void dropTable(String databaseName, String tableName, boolean deleteData) .name(tableName) .build(); try { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .run(() -> stats.getDeleteTable().call(() -> glueClient.deleteTable(deleteTableRequest))); + stats.getDeleteTable().call(() -> glueClient.deleteTable(deleteTableRequest)); } catch (EntityNotFoundException e) { throw new TableNotFoundException(new SchemaTableName(databaseName, tableName)); @@ -563,34 +551,29 @@ public void replaceTable(String databaseName, String tableName, Table newTable, updateTable(databaseName, tableName, _ -> newTable); } - private void updateTable(String databaseName, String tableName, TableModifier modifier) + private void updateTable(String databaseName, String tableName, UnaryOperator modifier) { + Table existingTable = getTable(databaseName, tableName) + .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName))); + Table newTable = modifier.apply(existingTable); + if (!existingTable.getDatabaseName().equals(newTable.getDatabaseName()) || !existingTable.getTableName().equals(newTable.getTableName())) { + throw new TrinoException(NOT_SUPPORTED, "Update cannot be used to change rename a table"); + } + if (existingTable.getParameters().getOrDefault("table_type", "").equalsIgnoreCase("iceberg") && !Objects.equals( + existingTable.getParameters().get("metadata_location"), + newTable.getParameters().get("previous_metadata_location"))) { + throw new TrinoException(NOT_SUPPORTED, "Cannot update Iceberg table: supplied previous location does not match current location"); + } try { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY).run(() -> { - Table existingTable = getTable(databaseName, tableName) - .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName))); - Table newTable = modifier.modify(existingTable); - if (!existingTable.getDatabaseName().equals(newTable.getDatabaseName()) || !existingTable.getTableName().equals(newTable.getTableName())) { - throw new TrinoException(NOT_SUPPORTED, "Update cannot be used to change rename a table"); - } - if (existingTable.getParameters().getOrDefault("table_type", "").equalsIgnoreCase("iceberg") && !Objects.equals( - existingTable.getParameters().get("metadata_location"), - newTable.getParameters().get("previous_metadata_location"))) { - throw new TrinoException(NOT_SUPPORTED, "Cannot update Iceberg table: supplied previous location does not match current location"); - } - try { - stats.getUpdateTable().call(() -> glueClient.updateTable(builder -> builder - .applyMutation(glueContext::configureClient) - .databaseName(databaseName) - .tableInput(toGlueTableInput(newTable)))); - } - catch (EntityNotFoundException e) { - throw new TableNotFoundException(new SchemaTableName(databaseName, tableName)); - } - }); + stats.getUpdateTable().call(() -> glueClient.updateTable(builder -> builder + .applyMutation(glueContext::configureClient) + .databaseName(databaseName) + .tableInput(toGlueTableInput(newTable)))); } - catch (FailsafeException e) { - throwIfUnchecked(e.getCause()); + catch (EntityNotFoundException e) { + throw new TableNotFoundException(new SchemaTableName(databaseName, tableName)); + } + catch (SdkException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); } finally { @@ -598,12 +581,6 @@ private void updateTable(String databaseName, String tableName, TableModifier mo } } - private interface TableModifier - { - Table modify(Table table) - throws Exception; - } - @Override public void renameTable(String databaseName, String tableName, String newDatabaseName, String newTableName) { @@ -752,11 +729,9 @@ public void updateTableStatistics(String databaseName, String tableName, Optiona // first update the basic statistics on the table, which requires a read-modify-write cycle BasicTableStatisticsResult result; try { - result = Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .get(context -> updateBasicTableStatistics(databaseName, tableName, mode, statisticsUpdate, existingColumnStatistics)); + result = updateBasicTableStatistics(databaseName, tableName, mode, statisticsUpdate, existingColumnStatistics); } - catch (FailsafeException e) { - throwIfUnchecked(e.getCause()); + catch (SdkException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); } finally { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java index 771eea4a6313..c8b439385d28 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreModule.java @@ -38,6 +38,7 @@ import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.GlueClientBuilder; +import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.StsClientBuilder; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; @@ -98,7 +99,7 @@ public static GlueCache createGlueCache(CachingHiveMetastoreConfig config, Catal // Note: while we could skip CachingHiveMetastoreModule altogether on workers, we retain it so that catalog // configuration can remain identical for all nodes, making cluster configuration easier. boolean enabled = nodeManager.getCurrentNode().isCoordinator() && - (metadataCacheTtl.toMillis() > 0 || statsCacheTtl.toMillis() > 0); + (metadataCacheTtl.toMillis() > 0 || statsCacheTtl.toMillis() > 0); checkState(config.isPartitionCacheEnabled(), "Disabling partitions cache is not supported with Glue v2"); checkState(config.isCacheMissing(), "Disabling cache missing is not supported with Glue v2"); @@ -128,8 +129,9 @@ public static GlueClient createGlueClient(GlueHiveMetastoreConfig config, OpenTe .setCaptureExperimentalSpanAttributes(true) .setRecordIndividualHttpError(true) .build().newExecutionInterceptor()) - .retryPolicy(retry -> retry - .numRetries(config.getMaxGlueErrorRetries()))); + .retryStrategy(retryBuilder -> retryBuilder + .retryOnException(throwable -> throwable instanceof ConcurrentModificationException) + .maxAttempts(config.getMaxGlueErrorRetries()))); Optional staticCredentialsProvider = getStaticCredentialsProvider(config); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java index 9b3cb7e1092f..9a795f8cb765 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueClientUtil.java @@ -18,8 +18,12 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.metrics.RequestMetricCollector; +import com.amazonaws.retry.PredefinedRetryPolicies; +import com.amazonaws.retry.RetryPolicy; +import com.amazonaws.retry.RetryPolicy.RetryCondition; import com.amazonaws.services.glue.AWSGlueAsync; import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.amazonaws.services.glue.model.ConcurrentModificationException; import java.util.Set; @@ -36,9 +40,23 @@ public static AWSGlueAsync createAsyncGlueClient( Set requestHandlers, RequestMetricCollector metricsCollector) { + RetryPolicy defaultRetryPolicy = PredefinedRetryPolicies.getDefaultRetryPolicy(); + + RetryCondition customRetryCondition = (requestContext, exception, retriesAttempted) -> + defaultRetryPolicy.getRetryCondition().shouldRetry(requestContext, exception, retriesAttempted) + || exception instanceof ConcurrentModificationException; + + RetryPolicy glueRetryPolicy = RetryPolicy.builder() + .withRetryMode(defaultRetryPolicy.getRetryMode()) + .withRetryCondition(customRetryCondition) + .withBackoffStrategy(defaultRetryPolicy.getBackoffStrategy()) + .withFastFailRateLimiting(defaultRetryPolicy.isFastFailRateLimiting()) + .withMaxErrorRetry(config.getMaxGlueErrorRetries()) + .build(); + ClientConfiguration clientConfig = new ClientConfiguration() .withMaxConnections(config.getMaxGlueConnections()) - .withMaxErrorRetry(config.getMaxGlueErrorRetries()); + .withRetryPolicy(glueRetryPolicy); AWSGlueAsyncClientBuilder asyncGlueClientBuilder = AWSGlueAsyncClientBuilder.standard() .withMetricsCollector(metricsCollector) .withClientConfiguration(clientConfig); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java index eb7de260bbbe..62d56342329d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/GlueHiveMetastore.java @@ -26,7 +26,6 @@ import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest; import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry; import com.amazonaws.services.glue.model.BatchUpdatePartitionResult; -import com.amazonaws.services.glue.model.ConcurrentModificationException; import com.amazonaws.services.glue.model.CreateDatabaseRequest; import com.amazonaws.services.glue.model.CreateTableRequest; import com.amazonaws.services.glue.model.CreateUserDefinedFunctionRequest; @@ -63,15 +62,12 @@ import com.amazonaws.services.glue.model.UpdateUserDefinedFunctionRequest; import com.amazonaws.services.glue.model.UserDefinedFunctionInput; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.Inject; -import dev.failsafe.Failsafe; -import dev.failsafe.RetryPolicy; import io.airlift.concurrent.MoreFutures; import io.airlift.log.Logger; import io.trino.filesystem.Location; @@ -116,7 +112,6 @@ import org.weakref.jmx.Managed; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -187,11 +182,6 @@ public class GlueHiveMetastore private static final int AWS_GLUE_GET_FUNCTIONS_MAX_RESULTS = 100; private static final int AWS_GLUE_GET_TABLES_MAX_RESULTS = 100; private static final Comparator> PARTITION_VALUE_COMPARATOR = lexicographical(String.CASE_INSENSITIVE_ORDER); - private static final RetryPolicy CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.builder() - .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException) - .withDelay(Duration.ofMillis(100)) - .withMaxRetries(3) - .build(); private final TrinoFileSystem fileSystem; private final AWSGlueAsync glueClient; @@ -450,9 +440,7 @@ public void dropTable(String databaseName, String tableName, boolean deleteData) .withDatabaseName(databaseName) .withName(tableName); try { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .run(() -> stats.getDeleteTable().call(() -> - glueClient.deleteTable(deleteTableRequest))); + stats.getDeleteTable().call(() -> glueClient.deleteTable(deleteTableRequest)); } catch (AmazonServiceException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); @@ -596,12 +584,6 @@ public Map getTableColumnStatistics(String databas @Override public void updateTableStatistics(String databaseName, String tableName, OptionalLong acidWriteId, StatisticsUpdateMode mode, PartitionStatistics statisticsUpdate) - { - Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) - .run(() -> updateTableStatisticsInternal(databaseName, tableName, acidWriteId, mode, statisticsUpdate)); - } - - private void updateTableStatisticsInternal(String databaseName, String tableName, OptionalLong acidWriteId, StatisticsUpdateMode mode, PartitionStatistics statisticsUpdate) { Table table = getExistingTable(databaseName, tableName); if (acidWriteId.isPresent()) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java index c5c7d58574b5..82a915e109fc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveConcurrentModificationGlueMetastore.java @@ -14,133 +14,53 @@ package io.trino.plugin.hive.metastore.glue; import io.opentelemetry.api.OpenTelemetry; -import io.trino.Session; -import io.trino.plugin.hive.TestingHivePlugin; -import io.trino.spi.TrinoException; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.retries.api.RefreshRetryTokenRequest; +import software.amazon.awssdk.retries.api.RefreshRetryTokenResponse; +import software.amazon.awssdk.retries.api.RetryStrategy; +import software.amazon.awssdk.retries.internal.DefaultRetryToken; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.nio.file.Path; -import java.util.EnumSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static com.google.common.reflect.Reflection.newProxy; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.metastore.glue.GlueMetastoreModule.createGlueClient; -import static io.trino.testing.TestingNames.randomNameSuffix; -import static io.trino.testing.TestingSession.testSessionBuilder; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @TestInstance(PER_CLASS) public class TestHiveConcurrentModificationGlueMetastore - extends AbstractTestQueryFramework { - private static final String CATALOG_NAME = "test_hive_concurrent"; - private static final String SCHEMA = "test_hive_glue_concurrent_" + randomNameSuffix(); - private Path dataDirectory; - private GlueHiveMetastore metastore; - private final AtomicBoolean failNextGlueUpdateTableCall = new AtomicBoolean(false); - private final AtomicInteger updateTableCallsCounter = new AtomicInteger(); - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session deltaLakeSession = testSessionBuilder() - .setCatalog(CATALOG_NAME) - .setSchema(SCHEMA) - .build(); - - QueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build(); - - dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("data_hive_concurrent"); - GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig() - .setDefaultWarehouseDir(dataDirectory.toUri().toString()); - - GlueClient glueClient = closeAfterClass(createGlueClient(new GlueHiveMetastoreConfig(), OpenTelemetry.noop())); - GlueClient proxiedGlueClient = newProxy(GlueClient.class, (proxy, method, args) -> { - try { - if (method.getName().equals("updateTable")) { - updateTableCallsCounter.incrementAndGet(); - if (failNextGlueUpdateTableCall.get()) { - // Simulate concurrent modifications on the table that is about to be dropped - failNextGlueUpdateTableCall.set(false); - throw new TrinoException(HIVE_METASTORE_ERROR, ConcurrentModificationException.builder() - .message("Test-simulated metastore concurrent modification exception") - .build()); - } - } - return method.invoke(glueClient, args); - } - catch (InvocationTargetException e) { - throw e.getCause(); - } - }); - - metastore = new GlueHiveMetastore( - proxiedGlueClient, - new GlueContext(glueConfig), - GlueCache.NOOP, - HDFS_FILE_SYSTEM_FACTORY, - glueConfig, - EnumSet.allOf(GlueHiveMetastore.TableKind.class)); - - queryRunner.installPlugin(new TestingHivePlugin(queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data"), metastore)); - queryRunner.createCatalog(CATALOG_NAME, "hive"); - queryRunner.execute("CREATE SCHEMA " + SCHEMA); - return queryRunner; - } - @Test - public void testUpdateTableStatsWithConcurrentModifications() + public void testGlueClientShouldRetryConcurrentModificationException() { - String tableName = "test_glue_table_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS data", 1); - - assertQuery( - "SHOW STATS FOR " + tableName, - "VALUES " + - " ('data', null, 1.0, 0.0, null, 1, 1), " + - " (null, null, null, null, 1.0, null, null)"); - - failNextGlueUpdateTableCall.set(true); - resetCounters(); - assertUpdate("INSERT INTO " + tableName + " VALUES 2", 1); - assertThat(updateTableCallsCounter.get()).isEqualTo(2); - assertQuery("SELECT * FROM " + tableName, "VALUES 1, 2"); - assertQuery( - "SHOW STATS FOR " + tableName, - "VALUES " + - " ('data', null, 1.0, 0.0, null, 1, 2), " + - " (null, null, null, null, 2.0, null, null)"); - } + try (GlueClient glueClient = createGlueClient(new GlueHiveMetastoreConfig(), OpenTelemetry.noop())) { + ClientOverrideConfiguration clientOverrideConfiguration = glueClient.serviceClientConfiguration().overrideConfiguration(); + RetryStrategy retryStrategy = clientOverrideConfiguration.retryStrategy().orElseThrow(); - private void resetCounters() - { - updateTableCallsCounter.set(0); - } + assertThatThrownBy(() -> retryStrategy.refreshRetryToken( + RefreshRetryTokenRequest.builder() + .token(DefaultRetryToken.builder().scope("test").build()) + .failure(new RuntimeException("This is not retryable exception so it should fail")) + .build())) + .hasMessage("Request attempt 1 encountered non-retryable failure"); - @AfterAll - public void cleanup() - throws IOException - { - if (metastore != null) { - metastore.dropDatabase(SCHEMA, false); - metastore.shutdown(); - deleteRecursively(dataDirectory, ALLOW_INSECURE); + RefreshRetryTokenResponse refreshRetryTokenResponse = retryStrategy.refreshRetryToken( + RefreshRetryTokenRequest.builder() + .token(DefaultRetryToken.builder().scope("test").build()) + .failure( + ConcurrentModificationException.builder() + .awsErrorDetails(AwsErrorDetails.builder() + // taken from software.amazon.awssdk.services.glue.DefaultGlueClient and + // software.amazon.awssdk.services.glue.DefaultGlueAsyncClient + .errorCode("ConcurrentModificationException") + .build()) + .message("Test-simulated metastore concurrent modification exception that should be allowed to retry") + .build()) + .build()); + assertThat(refreshRetryTokenResponse).isNotNull(); } } } From 9de74f18b75e9085339a25d9517011640cc4f625 Mon Sep 17 00:00:00 2001 From: Oskar Szwajkowski Date: Thu, 25 Jul 2024 15:02:36 +0200 Subject: [PATCH 3/5] Replace RetryPolicy abort with explicit handle Instead of aborting Failsafe's retries on certain conditions, make retries happen under specific circumstances This should yield to more predictable retries, that are explicitly set in code Abort retries on TrinoFileSystemException, as it's not meant to be retried --- .../iceberg/catalog/AbstractIcebergTableOperations.java | 8 +++++--- .../plugin/iceberg/catalog/AbstractTrinoCatalog.java | 3 ++- .../plugin/iceberg/catalog/glue/TrinoGlueCatalog.java | 3 ++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 91b2b6c1bd95..f45f706a4e23 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -17,6 +17,7 @@ import dev.failsafe.RetryPolicy; import io.trino.annotation.NotThreadSafe; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.metastore.Column; import io.trino.metastore.HiveType; import io.trino.metastore.StorageFormat; @@ -260,7 +261,8 @@ protected void refreshFromMetadataLocation(String newLocation, Function failure instanceof ValidationException || isNotFoundException(failure)) + .handleIf(failure -> !(failure instanceof ValidationException) && !isNotFoundException(failure)) + .abortOn(TrinoFileSystem::isUnrecoverableException) .build()) .get(() -> metadataLoader.apply(newLocation)); } @@ -290,8 +292,8 @@ private static boolean isNotFoundException(Throwable failure) { // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException return failure instanceof org.apache.iceberg.exceptions.NotFoundException || - // This is used in context where the code cannot throw a checked exception, so FileNotFoundException would need to be wrapped - failure.getCause() instanceof FileNotFoundException; + // This is used in context where the code cannot throw a checked exception, so FileNotFoundException would need to be wrapped + failure.getCause() instanceof FileNotFoundException; } protected static String newTableMetadataFilePath(TableMetadata meta, int newVersion) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index edc660efbb61..a6b7363896a6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -198,7 +198,8 @@ public Optional getMaterializedView(Connect .withMaxAttempts(10) .withBackoff(1, 5_000, ChronoUnit.MILLIS, 4) .withMaxDuration(Duration.ofSeconds(30)) - .abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException)) + .handleIf(failure -> failure instanceof MaterializedViewMayBeBeingRemovedException) + .abortOn(TrinoFileSystem::isUnrecoverableException) .build()) .get(() -> doGetMaterializedView(session, schemaViewName)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 79ec1a1d2258..65f762309ef7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -939,7 +939,8 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, Failsafe.with(RetryPolicy.builder() .withMaxRetries(3) .withDelay(Duration.ofMillis(100)) - .abortIf(throwable -> !replace || throwable instanceof ViewAlreadyExistsException) + .handleIf(throwable -> replace && !(throwable instanceof ViewAlreadyExistsException)) + .abortOn(TrinoFileSystem::isUnrecoverableException) .build()) .run(() -> doCreateView(session, schemaViewName, viewTableInput, replace)); } From 2456811918f8b8465e77f076ba6e6582ad51bf19 Mon Sep 17 00:00:00 2001 From: Oskar Szwajkowski Date: Fri, 26 Jul 2024 00:20:47 +0200 Subject: [PATCH 4/5] Change iceberg related retries Remove retry in AbstractTrinoCatalog, as it never can catch exception on which retry was set up Reduce amount of retries in AbstractIcebergTableOperations, as 20 retries with max time of 10 minutes seems way too big --- .../AbstractIcebergTableOperations.java | 4 +-- .../iceberg/catalog/AbstractTrinoCatalog.java | 29 +------------------ 2 files changed, 3 insertions(+), 30 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index f45f706a4e23..0722b5441ac0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -258,9 +258,9 @@ protected void refreshFromMetadataLocation(String newLocation, Function !(failure instanceof ValidationException) && !isNotFoundException(failure)) .abortOn(TrinoFileSystem::isUnrecoverableException) .build()) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index a6b7363896a6..2b257e085a8e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -14,8 +14,6 @@ package io.trino.plugin.iceberg.catalog; import com.google.common.collect.ImmutableMap; -import dev.failsafe.Failsafe; -import dev.failsafe.RetryPolicy; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -61,15 +59,12 @@ import org.apache.iceberg.types.Types; import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Stream; -import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.metastore.Table.TABLE_COMMENT; @@ -193,20 +188,7 @@ public Map getViews(ConnectorSession s @Override public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) { - try { - return Failsafe.with(RetryPolicy.builder() - .withMaxAttempts(10) - .withBackoff(1, 5_000, ChronoUnit.MILLIS, 4) - .withMaxDuration(Duration.ofSeconds(30)) - .handleIf(failure -> failure instanceof MaterializedViewMayBeBeingRemovedException) - .abortOn(TrinoFileSystem::isUnrecoverableException) - .build()) - .get(() -> doGetMaterializedView(session, schemaViewName)); - } - catch (MaterializedViewMayBeBeingRemovedException e) { - throwIfUnchecked(e.getCause()); - throw new RuntimeException(e.getCause()); - } + return doGetMaterializedView(session, schemaViewName); } protected abstract Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName); @@ -517,13 +499,4 @@ protected Map createMaterializedViewProperties(ConnectorSession } protected abstract void invalidateTableCache(SchemaTableName schemaTableName); - - protected static class MaterializedViewMayBeBeingRemovedException - extends RuntimeException - { - public MaterializedViewMayBeBeingRemovedException(Throwable cause) - { - super(requireNonNull(cause, "cause is null")); - } - } } From 7540ae985942dde64d00447d20ce151160d32b64 Mon Sep 17 00:00:00 2001 From: Oskar Szwajkowski Date: Tue, 30 Jul 2024 12:59:18 +0200 Subject: [PATCH 5/5] Change semantics when it comes to retries Retry only when certain conditions happen, instead of aborting when they not happen --- .../main/java/io/trino/plugin/bigquery/ReadSessionCreator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java index 97b94e556bae..b0c8e9711e47 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java @@ -113,7 +113,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List log.debug("Request failed, retrying: %s", event.getLastException())) - .abortOn(failure -> !BigQueryUtil.isRetryable(failure)) + .handleIf(BigQueryUtil::isRetryable) .build()) .get(() -> { try {