diff --git a/client/trino-client/pom.xml b/client/trino-client/pom.xml index b14dcc92265a..b6f28333f33c 100644 --- a/client/trino-client/pom.xml +++ b/client/trino-client/pom.xml @@ -65,7 +65,7 @@ - net.jodah + dev.failsafe failsafe diff --git a/client/trino-client/src/main/java/io/trino/client/auth/external/HttpTokenPoller.java b/client/trino-client/src/main/java/io/trino/client/auth/external/HttpTokenPoller.java index e35b971ebb76..85a53364c605 100644 --- a/client/trino-client/src/main/java/io/trino/client/auth/external/HttpTokenPoller.java +++ b/client/trino-client/src/main/java/io/trino/client/auth/external/HttpTokenPoller.java @@ -15,11 +15,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; import io.trino.client.JsonCodec; import io.trino.client.JsonResponse; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.FailsafeException; -import net.jodah.failsafe.RetryPolicy; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -75,11 +75,12 @@ public HttpTokenPoller(OkHttpClient client, Consumer refre public TokenPollResult pollForToken(URI tokenUri, Duration timeout) { try { - return Failsafe.with(new RetryPolicy() + return Failsafe.with(RetryPolicy.builder() .withMaxAttempts(-1) .withMaxDuration(timeout) .withBackoff(100, 500, MILLIS) - .handle(IOException.class)) + .handle(IOException.class) + .build()) .get(() -> executePoll(prepareRequestBuilder(tokenUri).build())); } catch (FailsafeException e) { @@ -94,11 +95,12 @@ public TokenPollResult pollForToken(URI tokenUri, Duration timeout) public void tokenReceived(URI tokenUri) { try { - Failsafe.with(new RetryPolicy() + Failsafe.with(RetryPolicy.builder() .withMaxAttempts(-1) .withMaxDuration(Duration.ofSeconds(4)) .withBackoff(100, 500, MILLIS) - .handleResultIf(code -> code >= HTTP_INTERNAL_ERROR)) + .handleResultIf(code -> code >= HTTP_INTERNAL_ERROR) + .build()) .get(() -> { Request request = prepareRequestBuilder(tokenUri) .delete() diff --git a/client/trino-jdbc/pom.xml b/client/trino-jdbc/pom.xml index 08647ebe6bc7..5f61dc599d06 100644 --- a/client/trino-jdbc/pom.xml +++ b/client/trino-jdbc/pom.xml @@ -356,8 +356,8 @@ ${shadeBase}.okio - net.jodah.failsafe - ${shadeBase}.net.jodah.failsafe + dev.failsafe + ${shadeBase}.dev.failsafe diff --git a/core/trino-main/pom.xml b/core/trino-main/pom.xml index 03619df45be0..3e895a6b2138 100644 --- a/core/trino-main/pom.xml +++ b/core/trino-main/pom.xml @@ -257,6 +257,11 @@ 1.15 + + dev.failsafe + failsafe + + io.jsonwebtoken jjwt-api @@ -302,11 +307,6 @@ joda-time - - net.jodah - failsafe - - org.apache.commons commons-math3 diff --git a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OidcDiscovery.java b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OidcDiscovery.java index 4867f0765d15..200def4a3791 100644 --- a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OidcDiscovery.java +++ b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OidcDiscovery.java @@ -21,10 +21,10 @@ import com.nimbusds.oauth2.sdk.id.Issuer; import com.nimbusds.openid.connect.sdk.op.OIDCProviderConfigurationRequest; import com.nimbusds.openid.connect.sdk.op.OIDCProviderMetadata; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.json.ObjectMapperProvider; import io.airlift.log.Logger; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import javax.inject.Inject; @@ -77,12 +77,13 @@ public OidcDiscovery(OAuth2Config oauthConfig, OidcDiscoveryConfig oidcConfig, N @Override public OAuth2ServerConfig get() { - return Failsafe.with(new RetryPolicy<>() + return Failsafe.with(RetryPolicy.builder() .withMaxAttempts(-1) .withMaxDuration(discoveryTimeout) .withDelay(Duration.ofSeconds(1)) .abortOn(IllegalStateException.class) - .onFailedAttempt(attempt -> LOG.debug("OpenID Connect Metadata read failed: %s", attempt.getLastFailure()))) + .onFailedAttempt(attempt -> LOG.debug("OpenID Connect Metadata read failed: %s", attempt.getLastException())) + .build()) .get(() -> httpClient.execute(new OIDCProviderConfigurationRequest(issuer), this::parseConfigurationResponse)); } diff --git a/plugin/trino-base-jdbc/pom.xml b/plugin/trino-base-jdbc/pom.xml index 362c60ff16b9..50b72d3ac0d7 100644 --- a/plugin/trino-base-jdbc/pom.xml +++ b/plugin/trino-base-jdbc/pom.xml @@ -94,6 +94,11 @@ guice + + dev.failsafe + failsafe + + javax.annotation javax.annotation-api @@ -114,11 +119,6 @@ joda-time - - net.jodah - failsafe - - org.antlr antlr4-runtime diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java index 476972c2b362..6063feaa0ecf 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java @@ -14,11 +14,11 @@ package io.trino.plugin.jdbc; import com.google.common.base.Throwables; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.FailsafeException; -import net.jodah.failsafe.RetryPolicy; import java.sql.Connection; import java.sql.SQLException; @@ -31,12 +31,13 @@ public class RetryingConnectionFactory implements ConnectionFactory { - private static final RetryPolicy RETRY_POLICY = new RetryPolicy<>() + private static final RetryPolicy RETRY_POLICY = RetryPolicy.builder() .withMaxDuration(java.time.Duration.of(30, SECONDS)) .withMaxAttempts(5) .withBackoff(50, 5_000, MILLIS, 4) .handleIf(RetryingConnectionFactory::isSqlRecoverableException) - .abortOn(TrinoException.class); + .abortOn(TrinoException.class) + .build(); private final ConnectionFactory delegate; diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcClient.java index a59ef9cdd395..ffde680385e2 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcClient.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestingH2JdbcClient.java @@ -14,6 +14,8 @@ package io.trino.plugin.jdbc; import com.google.common.collect.ImmutableSet; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.plugin.base.aggregation.AggregateFunctionRewriter; import io.trino.plugin.jdbc.aggregation.ImplementCountAll; @@ -31,8 +33,6 @@ import io.trino.spi.type.TimestampType; import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import java.sql.Connection; import java.sql.ResultSet; @@ -97,9 +97,10 @@ public TestingH2JdbcClient(BaseJdbcConfig config, ConnectionFactory connectionFa public Collection listSchemas(Connection connection) { // listing schemas in H2 may fail with NullPointerException when a schema is concurrently dropped - return Failsafe.with(new RetryPolicy>() + return Failsafe.with(RetryPolicy.>builder() .withMaxAttempts(100) - .onRetry(event -> log.warn(event.getLastFailure(), "Failed to list schemas, retrying"))) + .onRetry(event -> log.warn(event.getLastException(), "Failed to list schemas, retrying")) + .build()) .get(() -> super.listSchemas(connection)); } diff --git a/plugin/trino-bigquery/pom.xml b/plugin/trino-bigquery/pom.xml index 5b923291b20f..03e1ea61bce6 100644 --- a/plugin/trino-bigquery/pom.xml +++ b/plugin/trino-bigquery/pom.xml @@ -211,6 +211,11 @@ protobuf-java + + dev.failsafe + failsafe + + io.grpc grpc-api @@ -231,11 +236,6 @@ validation-api - - net.jodah - failsafe - - org.apache.arrow arrow-compression diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java index 0f8610852223..efa9d73a9430 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java @@ -22,14 +22,14 @@ import com.google.cloud.bigquery.storage.v1.DataFormat; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.protobuf.ByteString; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; @@ -108,11 +108,12 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List() + return Failsafe.with(RetryPolicy.builder() .withMaxRetries(maxCreateReadSessionRetries) .withBackoff(10, 500, MILLIS) - .onRetry(event -> log.debug("Request failed, retrying: %s", event.getLastFailure())) - .abortOn(failure -> !BigQueryUtil.isRetryable(failure))) + .onRetry(event -> log.debug("Request failed, retrying: %s", event.getLastException())) + .abortOn(failure -> !BigQueryUtil.isRetryable(failure)) + .build()) .get(() -> bigQueryReadClient.createReadSession(createReadSessionRequest)); } } diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 492c73f3ee54..533075347f8f 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -154,6 +154,11 @@ guice + + dev.failsafe + failsafe + + javax.inject javax.inject @@ -169,11 +174,6 @@ joda-time - - net.jodah - failsafe - - org.apache.iceberg iceberg-api diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java index 6521ef68e796..5c4a2026ec6e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.json.ObjectMapperProvider; import io.airlift.log.Logger; import io.trino.filesystem.TrinoFileSystem; @@ -29,8 +31,6 @@ import io.trino.spi.type.Decimals; import io.trino.spi.type.StandardTypes; import io.trino.spi.type.Type; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.hadoop.fs.Path; import javax.annotation.Nullable; @@ -219,14 +219,15 @@ public static Object deserializeColumnValue(DeltaLakeColumnHandle column, String static Optional readLastCheckpoint(TrinoFileSystem fileSystem, Path tableLocation) { - return Failsafe.with(new RetryPolicy<>() + return Failsafe.with(RetryPolicy.builder() .withMaxRetries(5) .withDelay(Duration.ofSeconds(1)) .onRetry(event -> { // The _last_checkpoint file is malformed, it's probably in the middle of a rewrite (file rewrites on Azure are NOT atomic) // Retry several times with a short delay, and if that fails, fall back to manually finding latest checkpoint. - log.debug(event.getLastFailure(), "Failure when accessing last checkpoint information, will be retried"); - })) + log.debug(event.getLastException(), "Failure when accessing last checkpoint information, will be retried"); + }) + .build()) .get(() -> tryReadLastCheckpoint(fileSystem, tableLocation)); } diff --git a/plugin/trino-elasticsearch/pom.xml b/plugin/trino-elasticsearch/pom.xml index 15ac5d51b983..5dcfcdaf3e90 100644 --- a/plugin/trino-elasticsearch/pom.xml +++ b/plugin/trino-elasticsearch/pom.xml @@ -97,6 +97,11 @@ guice + + dev.failsafe + failsafe + + javax.annotation javax.annotation-api @@ -112,11 +117,6 @@ validation-api - - net.jodah - failsafe - - org.apache.httpcomponents httpasyncclient diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestClient.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestClient.java index f5872d4cbb49..a0ed13363878 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestClient.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestClient.java @@ -14,15 +14,15 @@ package io.trino.plugin.elasticsearch.client; import com.google.common.base.Stopwatch; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; +import dev.failsafe.event.ExecutionAttemptedEvent; +import dev.failsafe.event.ExecutionCompletedEvent; +import dev.failsafe.function.CheckedSupplier; import io.airlift.log.Logger; import io.airlift.stats.TimeStat; import io.trino.plugin.elasticsearch.ElasticsearchConfig; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.FailsafeException; -import net.jodah.failsafe.RetryPolicy; -import net.jodah.failsafe.event.ExecutionAttemptedEvent; -import net.jodah.failsafe.event.ExecutionCompletedEvent; -import net.jodah.failsafe.function.CheckedSupplier; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; @@ -53,7 +53,7 @@ public BackpressureRestClient(RestClient delegate, ElasticsearchConfig config, T { this.delegate = requireNonNull(delegate, "restClient is null"); this.backpressureStats = requireNonNull(backpressureStats, "backpressureStats is null"); - retryPolicy = new RetryPolicy() + retryPolicy = RetryPolicy.builder() .withMaxAttempts(-1) .withMaxDuration(java.time.Duration.ofMillis(config.getMaxRetryTime().toMillis())) .withBackoff(config.getBackoffInitDelay().toMillis(), config.getBackoffMaxDelay().toMillis(), MILLIS) @@ -61,7 +61,8 @@ public BackpressureRestClient(RestClient delegate, ElasticsearchConfig config, T .handleIf(BackpressureRestClient::isBackpressure) .onFailedAttempt(this::onFailedAttempt) .onSuccess(this::onComplete) - .onFailure(this::onComplete); + .onFailure(this::onComplete) + .build(); } public void setHosts(HttpHost... hosts) @@ -119,7 +120,7 @@ private Response executeWithRetries(CheckedSupplier supplier) private void onFailedAttempt(ExecutionAttemptedEvent executionAttemptedEvent) { - log.debug("REST attempt failed: %s", executionAttemptedEvent.getLastFailure()); + log.debug("REST attempt failed: %s", executionAttemptedEvent.getLastException()); if (!stopwatch.get().isRunning()) { stopwatch.get().start(); } diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestHighLevelClient.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestHighLevelClient.java index 0986fab1a09a..2d6dc0ec3f6a 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestHighLevelClient.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/BackpressureRestHighLevelClient.java @@ -14,15 +14,15 @@ package io.trino.plugin.elasticsearch.client; import com.google.common.base.Stopwatch; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; +import dev.failsafe.event.ExecutionAttemptedEvent; +import dev.failsafe.event.ExecutionCompletedEvent; +import dev.failsafe.function.CheckedSupplier; import io.airlift.log.Logger; import io.airlift.stats.TimeStat; import io.trino.plugin.elasticsearch.ElasticsearchConfig; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.FailsafeException; -import net.jodah.failsafe.RetryPolicy; -import net.jodah.failsafe.event.ExecutionAttemptedEvent; -import net.jodah.failsafe.event.ExecutionCompletedEvent; -import net.jodah.failsafe.function.CheckedSupplier; import org.apache.http.Header; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionResponse; @@ -60,7 +60,7 @@ public BackpressureRestHighLevelClient(RestClientBuilder restClientBuilder, Elas this.backpressureStats = requireNonNull(backpressureStats, "backpressureStats is null"); delegate = new RestHighLevelClient(requireNonNull(restClientBuilder, "restClientBuilder is null")); backpressureRestClient = new BackpressureRestClient(delegate.getLowLevelClient(), config, backpressureStats); - retryPolicy = new RetryPolicy() + retryPolicy = RetryPolicy.builder() .withMaxAttempts(-1) .withMaxDuration(java.time.Duration.ofMillis(config.getMaxRetryTime().toMillis())) .withBackoff(config.getBackoffInitDelay().toMillis(), config.getBackoffMaxDelay().toMillis(), MILLIS) @@ -68,7 +68,8 @@ public BackpressureRestHighLevelClient(RestClientBuilder restClientBuilder, Elas .handleIf(BackpressureRestHighLevelClient::isBackpressure) .onFailedAttempt(this::onFailedAttempt) .onSuccess(this::onComplete) - .onFailure(this::onComplete); + .onFailure(this::onComplete) + .build(); } public BackpressureRestClient getLowLevelClient() @@ -133,7 +134,7 @@ private T executeWithRetries(CheckedSupplier suppl private void onFailedAttempt(ExecutionAttemptedEvent executionAttemptedEvent) { - log.debug("REST attempt failed: %s", executionAttemptedEvent.getLastFailure()); + log.debug("REST attempt failed: %s", executionAttemptedEvent.getLastException()); if (!stopwatch.get().isRunning()) { stopwatch.get().start(); } diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index 668e2e9c3893..d6193e480920 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -415,6 +415,12 @@ test + + dev.failsafe + failsafe + test + + io.minio minio @@ -431,12 +437,6 @@ - - net.jodah - failsafe - test - - org.assertj assertj-core diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 39963ce142b3..7bd2a7228e0d 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -149,6 +149,11 @@ guice + + dev.failsafe + failsafe + + io.jsonwebtoken jjwt-api @@ -179,11 +184,6 @@ joda-time - - net.jodah - failsafe - - org.apache.datasketches datasketches-java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 81577ba63a0d..0bfe63e59754 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -13,12 +13,12 @@ */ package io.trino.plugin.iceberg.catalog; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.CommitFailedException; @@ -215,11 +215,12 @@ protected void refreshFromMetadataLocation(String newLocation) return; } - TableMetadata newMetadata = Failsafe.with(new RetryPolicy<>() + TableMetadata newMetadata = Failsafe.with(RetryPolicy.builder() .withMaxRetries(20) .withBackoff(100, 5000, MILLIS, 4.0) .withMaxDuration(Duration.ofMinutes(10)) - .abortOn(org.apache.iceberg.exceptions.NotFoundException.class)) // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException + .abortOn(org.apache.iceberg.exceptions.NotFoundException.class) + .build()) // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException .get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation))); String newUUID = newMetadata.uuid(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index b1c5152da03a..7dbb20a4cfe5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -14,6 +14,8 @@ package io.trino.plugin.iceberg.catalog; import com.google.common.collect.ImmutableMap; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HiveMetadata; @@ -29,8 +31,6 @@ import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.type.TypeManager; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -135,11 +135,12 @@ public Map getViews(ConnectorSession s public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) { try { - return Failsafe.with(new RetryPolicy<>() + return Failsafe.with(RetryPolicy.builder() .withMaxAttempts(10) .withBackoff(1, 5_000, ChronoUnit.MILLIS, 4) .withMaxDuration(Duration.ofSeconds(30)) - .abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException))) + .abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException)) + .build()) .get(() -> doGetMaterializedView(session, schemaViewName)); } catch (MaterializedViewMayBeBeingRemovedException e) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 8bbc78b936c4..f3fb37479035 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -34,6 +34,8 @@ import com.amazonaws.services.glue.model.UpdateTableRequest; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; @@ -56,8 +58,6 @@ import io.trino.spi.security.PrincipalType; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.type.TypeManager; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; @@ -547,10 +547,11 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, encodeViewData(definition), session.getUser(), createViewProperties(session, trinoVersion, TRINO_CREATED_BY_VALUE)); - Failsafe.with(new RetryPolicy<>() + Failsafe.with(RetryPolicy.builder() .withMaxRetries(3) .withDelay(Duration.ofMillis(100)) - .abortIf(throwable -> !replace || throwable instanceof ViewAlreadyExistsException)) + .abortIf(throwable -> !replace || throwable instanceof ViewAlreadyExistsException) + .build()) .run(() -> doCreateView(session, schemaViewName, viewTableInput, replace)); } diff --git a/plugin/trino-kafka/pom.xml b/plugin/trino-kafka/pom.xml index 2a27780b4da9..2d0749d21a5f 100644 --- a/plugin/trino-kafka/pom.xml +++ b/plugin/trino-kafka/pom.xml @@ -266,6 +266,12 @@ test + + dev.failsafe + failsafe + test + + io.confluent kafka-avro-serializer @@ -298,12 +304,6 @@ test - - net.jodah - failsafe - test - - org.assertj assertj-core diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index 9b444098e29e..02d6815122ca 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -18,6 +18,8 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.kafka.serializers.subject.RecordNameStrategy; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; @@ -25,8 +27,6 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import io.trino.testing.kafka.TestingKafka; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.kafka.clients.producer.ProducerRecord; import org.testng.annotations.Test; @@ -221,14 +221,16 @@ private void assertNotExists(String tableName) private void waitUntilTableExists(String tableName) { Failsafe.with( - new RetryPolicy<>() + RetryPolicy.builder() .withMaxAttempts(10) - .withDelay(Duration.ofMillis(100))) + .withDelay(Duration.ofMillis(100)) + .build()) .run(() -> assertTrue(schemaExists())); Failsafe.with( - new RetryPolicy<>() + RetryPolicy.builder() .withMaxAttempts(10) - .withDelay(Duration.ofMillis(100))) + .withDelay(Duration.ofMillis(100)) + .build()) .run(() -> assertTrue(tableExists(tableName))); } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java index 1f443f2ea0c0..9178a72ab1df 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; import io.confluent.kafka.serializers.subject.RecordNameStrategy; @@ -25,8 +27,6 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import io.trino.testing.kafka.TestingKafka; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; @@ -285,14 +285,16 @@ private void assertNotExists(String tableName) private void waitUntilTableExists(String tableName) { Failsafe.with( - new RetryPolicy<>() + RetryPolicy.builder() .withMaxAttempts(10) - .withDelay(Duration.ofMillis(100))) + .withDelay(Duration.ofMillis(100)) + .build()) .run(() -> assertTrue(schemaExists())); Failsafe.with( - new RetryPolicy<>() + RetryPolicy.builder() .withMaxAttempts(10) - .withDelay(Duration.ofMillis(100))) + .withDelay(Duration.ofMillis(100)) + .build()) .run(() -> assertTrue(tableExists(tableName))); } diff --git a/plugin/trino-oracle/pom.xml b/plugin/trino-oracle/pom.xml index b68244ebd20e..5bf85eaafc5b 100644 --- a/plugin/trino-oracle/pom.xml +++ b/plugin/trino-oracle/pom.xml @@ -91,7 +91,7 @@ - net.jodah + dev.failsafe failsafe runtime diff --git a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java index 0e412d7b152b..afbcae416c90 100644 --- a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java +++ b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java @@ -15,6 +15,8 @@ import com.google.common.base.Joiner; import com.google.common.io.Files; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.plugin.jdbc.BaseJdbcConfig; import io.trino.plugin.jdbc.ConnectionFactory; @@ -22,8 +24,6 @@ import io.trino.plugin.jdbc.RetryingConnectionFactory; import io.trino.plugin.jdbc.credential.StaticCredentialProvider; import io.trino.testing.ResourcePresence; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import oracle.jdbc.OracleDriver; import org.testcontainers.containers.OracleContainer; import org.testcontainers.utility.MountableFile; @@ -47,13 +47,14 @@ public class TestingOracleServer { private static final Logger log = Logger.get(TestingOracleServer.class); - private static final RetryPolicy CONTAINER_RETRY_POLICY = new RetryPolicy<>() + private static final RetryPolicy CONTAINER_RETRY_POLICY = RetryPolicy.builder() .withBackoff(1, 5, ChronoUnit.SECONDS) .withMaxAttempts(5) .onRetry(event -> log.warn( "Container initialization failed on attempt %s, will retry. Exception: %s", event.getAttemptCount(), - event.getLastFailure().getMessage())); + event.getLastException().getMessage())) + .build(); private static final String TEST_TABLESPACE = "trino_test"; diff --git a/plugin/trino-redshift/pom.xml b/plugin/trino-redshift/pom.xml index 11b943fed412..010d68be3b1b 100644 --- a/plugin/trino-redshift/pom.xml +++ b/plugin/trino-redshift/pom.xml @@ -78,7 +78,7 @@ - net.jodah + dev.failsafe failsafe runtime diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java index 3e96738e7ba1..8b627642dd8e 100644 --- a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java +++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/RedshiftQueryRunner.java @@ -16,6 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.airlift.log.Logging; import io.trino.Session; @@ -26,8 +28,6 @@ import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.jdbi.v3.core.HandleCallback; import org.jdbi.v3.core.HandleConsumer; import org.jdbi.v3.core.Jdbi; @@ -153,10 +153,11 @@ private static void createUserIfNotExists(String user, String password) private static void executeInRedshiftWithRetry(String sql) { - Failsafe.with(new RetryPolicy<>() + Failsafe.with(RetryPolicy.builder() .handleIf(e -> e.getMessage().matches(".* concurrent transaction .*")) .withDelay(Duration.ofSeconds(10)) - .withMaxRetries(3)) + .withMaxRetries(3) + .build()) .run(() -> executeInRedshift(sql)); } diff --git a/plugin/trino-sqlserver/pom.xml b/plugin/trino-sqlserver/pom.xml index 85d934f575f0..a09e820d2034 100644 --- a/plugin/trino-sqlserver/pom.xml +++ b/plugin/trino-sqlserver/pom.xml @@ -73,13 +73,13 @@ - javax.inject - javax.inject + dev.failsafe + failsafe - net.jodah - failsafe + javax.inject + javax.inject diff --git a/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java b/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java index bead85ffc6f8..819c3e2e104c 100644 --- a/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java +++ b/plugin/trino-sqlserver/src/main/java/io/trino/plugin/sqlserver/SqlServerClient.java @@ -20,6 +20,9 @@ import com.google.common.collect.ImmutableSet; import com.microsoft.sqlserver.jdbc.SQLServerConnection; import com.microsoft.sqlserver.jdbc.SQLServerException; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedSupplier; import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.trino.plugin.base.aggregation.AggregateFunctionRewriter; @@ -78,9 +81,6 @@ import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; -import net.jodah.failsafe.function.CheckedSupplier; import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; @@ -1086,7 +1086,7 @@ public static T retryOnDeadlock(CheckedSupplier supplier, String attemptL { // DDL operations can take out locks against system tables causing queries against them to deadlock int maxAttemptCount = 3; - RetryPolicy retryPolicy = new RetryPolicy() + RetryPolicy retryPolicy = RetryPolicy.builder() .withMaxAttempts(maxAttemptCount) .handleIf(throwable -> { @@ -1094,7 +1094,8 @@ public static T retryOnDeadlock(CheckedSupplier supplier, String attemptL return rootCause instanceof SQLServerException && ((SQLServerException) (rootCause)).getSQLServerError().getErrorNumber() == SQL_SERVER_DEADLOCK_ERROR_CODE; }) - .onFailedAttempt(event -> log.warn(event.getLastFailure(), "Attempt %d of %d: %s", event.getAttemptCount(), maxAttemptCount, attemptLogMessage)); + .onFailedAttempt(event -> log.warn(event.getLastException(), "Attempt %d of %d: %s", event.getAttemptCount(), maxAttemptCount, attemptLogMessage)) + .build(); return Failsafe .with(retryPolicy) diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java index 8eacba7ccaf2..287f0595b6cc 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java @@ -13,12 +13,12 @@ */ package io.trino.plugin.sqlserver; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.Timeout; import io.airlift.log.Logger; import io.trino.testing.ResourcePresence; import io.trino.testing.sql.SqlExecutor; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; -import net.jodah.failsafe.Timeout; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.utility.DockerImageName; @@ -52,15 +52,16 @@ public final class TestingSqlServer executor.execute(format("ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON", databaseName)); }; - private static final RetryPolicy CONTAINER_RETRY_POLICY = new RetryPolicy() + private static final RetryPolicy CONTAINER_RETRY_POLICY = RetryPolicy.builder() .withBackoff(1, 5, ChronoUnit.SECONDS) .withMaxRetries(5) .handleIf(throwable -> getCausalChain(throwable).stream() - .anyMatch(SQLException.class::isInstance)) + .anyMatch(SQLException.class::isInstance) || throwable.getMessage().contains("Container exited with code")) .onRetry(event -> log.warn( "Query failed on attempt %s, will retry. Exception: %s", event.getAttemptCount(), - event.getLastFailure().getMessage())); + event.getLastException().getMessage())) + .build(); private static final DockerImageName IMAGE_NAME = DockerImageName.parse("mcr.microsoft.com/mssql/server"); public static final String DEFAULT_VERSION = "2017-CU13"; diff --git a/pom.xml b/pom.xml index e9c4ac26c3e4..f3b19c246339 100644 --- a/pom.xml +++ b/pom.xml @@ -1362,6 +1362,12 @@ 1.4 + + dev.failsafe + failsafe + 3.3.0 + + info.picocli picocli @@ -1565,12 +1571,6 @@ 5.12.1 - - net.jodah - failsafe - 2.4.0 - - net.sf.opencsv opencsv diff --git a/testing/trino-product-tests-launcher/pom.xml b/testing/trino-product-tests-launcher/pom.xml index daceb708c823..65793d5db589 100644 --- a/testing/trino-product-tests-launcher/pom.xml +++ b/testing/trino-product-tests-launcher/pom.xml @@ -102,6 +102,11 @@ guice + + dev.failsafe + failsafe + + info.picocli picocli @@ -122,11 +127,6 @@ junit - - net.jodah - failsafe - - org.testcontainers testcontainers diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java index eb64174ebcaf..5a0cdd4b8ebf 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java @@ -18,6 +18,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import com.google.inject.Module; +import dev.failsafe.Failsafe; +import dev.failsafe.Timeout; +import dev.failsafe.TimeoutExceededException; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.tests.product.launcher.Extensions; @@ -30,9 +33,6 @@ import io.trino.tests.product.launcher.env.EnvironmentOptions; import io.trino.tests.product.launcher.env.SupportedTrinoJdk; import io.trino.tests.product.launcher.testcontainers.ExistingNetwork; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.Timeout; -import net.jodah.failsafe.TimeoutExceededException; import picocli.CommandLine.ExitCode; import picocli.CommandLine.Mixin; import picocli.CommandLine.Parameters; @@ -227,8 +227,9 @@ public Integer call() try { int exitCode = Failsafe - .with(Timeout.of(java.time.Duration.ofMillis(timeoutMillis)) - .withCancel(true)) + .with(Timeout.builder(java.time.Duration.ofMillis(timeoutMillis)) + .withInterrupt() + .build()) .get(this::tryExecuteTests); log.info("Tests execution completed with code %d", exitCode); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/docker/DockerFiles.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/docker/DockerFiles.java index b35619cedd96..429fbb20026a 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/docker/DockerFiles.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/docker/DockerFiles.java @@ -14,9 +14,9 @@ package io.trino.tests.product.launcher.docker; import com.google.common.reflect.ClassPath; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import javax.annotation.PreDestroy; import javax.annotation.concurrent.GuardedBy; @@ -57,7 +57,7 @@ public synchronized void close() return; } if (dockerFilesHostPath != null) { - Failsafe.with(new RetryPolicy<>().withMaxAttempts(5)) + Failsafe.with(RetryPolicy.builder().withMaxAttempts(5).build()) .run(() -> deleteRecursively(dockerFilesHostPath, ALLOW_INSECURE)); dockerFilesHostPath = null; } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/DockerContainer.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/DockerContainer.java index 66037ee831de..a9b4e2d7f549 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/DockerContainer.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/DockerContainer.java @@ -20,12 +20,12 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.io.RecursiveDeleteOption; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.Timeout; +import dev.failsafe.function.CheckedRunnable; import io.airlift.log.Logger; import io.airlift.units.Duration; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.FailsafeExecutor; -import net.jodah.failsafe.Timeout; -import net.jodah.failsafe.function.CheckedRunnable; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.SelinuxContext; @@ -69,8 +69,9 @@ public class DockerContainer private static final Logger log = Logger.get(DockerContainer.class); private static final long NANOSECONDS_PER_SECOND = 1_000 * 1_000 * 1_000L; - private static final Timeout asyncTimeout = Timeout.of(ofSeconds(30)) - .withCancel(true); + private static final Timeout asyncTimeout = Timeout.builder(ofSeconds(30)) + .withInterrupt() + .build(); private static final FailsafeExecutor executor = Failsafe .with(asyncTimeout) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/Environment.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/Environment.java index 1626e31db6bb..ed5167692487 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/Environment.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/Environment.java @@ -25,13 +25,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.RetryPolicy; +import dev.failsafe.Timeout; import io.airlift.log.Logger; import io.trino.tests.product.launcher.testcontainers.PrintingLogConsumer; import io.trino.tests.product.launcher.util.ConsoleTable; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.FailsafeExecutor; -import net.jodah.failsafe.RetryPolicy; -import net.jodah.failsafe.Timeout; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.lifecycle.Startables; @@ -123,12 +123,13 @@ private Environment( public Environment start() { - RetryPolicy retryPolicy = new RetryPolicy<>() + RetryPolicy retryPolicy = RetryPolicy.builder() .withMaxRetries(startupRetries) - .onFailedAttempt(event -> log.warn(event.getLastFailure(), "Could not start environment '%s'", this)) + .onFailedAttempt(event -> log.warn(event.getLastException(), "Could not start environment '%s'", this)) .onRetry(event -> log.info("Trying to start environment '%s', %d failed attempt(s)", this, event.getAttemptCount() + 1)) .onSuccess(event -> log.info("Environment '%s' started in %s, %d attempt(s)", this, event.getElapsedTime(), event.getAttemptCount())) - .onFailure(event -> log.info("Environment '%s' failed to start in attempt(s): %d: %s", this, event.getAttemptCount(), event.getFailure())); + .onFailure(event -> log.info("Environment '%s' failed to start in attempt(s): %d: %s", this, event.getAttemptCount(), event.getException())) + .build(); return Failsafe .with(retryPolicy) @@ -190,11 +191,13 @@ public void stop() this.listener.ifPresent(listener -> listener.environmentStopping(this)); // Allow containers to take up to 5 minutes to stop - Timeout timeout = Timeout.of(ofMinutes(5)) - .withCancel(true); + Timeout timeout = Timeout.builder(ofMinutes(5)) + .withInterrupt() + .build(); - RetryPolicy retry = new RetryPolicy<>() - .withMaxAttempts(3); + RetryPolicy retry = RetryPolicy.builder() + .withMaxAttempts(3) + .build(); FailsafeExecutor executor = Failsafe .with(timeout, retry) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentListener.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentListener.java index e79bda9f945e..fd122d7989ac 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentListener.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/EnvironmentListener.java @@ -14,11 +14,11 @@ package io.trino.tests.product.launcher.env; import com.github.dockerjava.api.command.InspectContainerResponse; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.Timeout; import io.airlift.log.Logger; import io.trino.tests.product.launcher.util.ConsoleTable; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.FailsafeExecutor; -import net.jodah.failsafe.Timeout; import java.nio.file.Path; import java.util.Arrays; @@ -94,7 +94,7 @@ static EnvironmentListener compose(EnvironmentListener... listeners) return new EnvironmentListener() { private FailsafeExecutor executor = Failsafe - .with(Timeout.of(ofMinutes(5)).withCancel(true)) + .with(Timeout.builder(ofMinutes(5)).withInterrupt().build()) .with(newCachedThreadPool(daemonThreadsNamed("environment-listener-%d"))); @Override diff --git a/testing/trino-product-tests/pom.xml b/testing/trino-product-tests/pom.xml index 7d8051ab86b3..6ea3ff735fa2 100644 --- a/testing/trino-product-tests/pom.xml +++ b/testing/trino-product-tests/pom.xml @@ -171,6 +171,11 @@ okhttp-urlconnection + + dev.failsafe + failsafe + + io.confluent kafka-protobuf-provider @@ -199,11 +204,6 @@ javax.inject - - net.jodah - failsafe - - org.apache.thrift libthrift diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java index 8c91d8f9a0d4..157daf3368d3 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveTransactionalTable.java @@ -16,6 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreClient; @@ -25,8 +27,6 @@ import io.trino.tempto.query.QueryResult; import io.trino.testng.services.Flaky; import io.trino.tests.product.hive.util.TemporaryHiveTable; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.assertj.core.api.Assertions; import org.testng.SkipException; import org.testng.annotations.DataProvider; @@ -2146,11 +2146,12 @@ private static void compactTableAndWait(CompactionMode compactMode, String table log.info("Running %s compaction on %s", compactMode, tableName); Failsafe.with( - new RetryPolicy<>() + RetryPolicy.builder() .withMaxDuration(java.time.Duration.ofMillis(timeout.toMillis())) - .withMaxAttempts(Integer.MAX_VALUE)) // limited by MaxDuration + .withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration + .build()) .onFailure(event -> { - throw new IllegalStateException(format("Could not compact table %s in %d retries", tableName, event.getAttemptCount()), event.getFailure()); + throw new IllegalStateException(format("Could not compact table %s in %d retries", tableName, event.getAttemptCount()), event.getException()); }) .onSuccess(event -> log.info("Finished %s compaction on %s in %s (%d tries)", compactMode, tableName, event.getElapsedTime(), event.getAttemptCount())) .run(() -> tryCompactingTable(compactMode, tableName, partitionString, new Duration(2, MINUTES))); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/HadoopTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/HadoopTestUtils.java index ef2bf097fbd2..6554d65b3d28 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/HadoopTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/HadoopTestUtils.java @@ -14,10 +14,10 @@ package io.trino.tests.product.utils; import com.google.common.base.Throwables; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.tempto.query.QueryResult; import io.trino.tests.product.hive.HiveProductTest; -import net.jodah.failsafe.RetryPolicy; import org.intellij.lang.annotations.Language; import java.time.temporal.ChronoUnit; @@ -42,11 +42,12 @@ private HadoopTestUtils() {} // "could only be written to 0 of the 1 minReplication" is the error wording used by e.g. HDP 3 "(could only be replicated to 0 nodes instead of minReplication|could only be written to 0 of the 1 minReplication)"; - public static final RetryPolicy ERROR_COMMITTING_WRITE_TO_HIVE_RETRY_POLICY = new RetryPolicy() + public static final RetryPolicy ERROR_COMMITTING_WRITE_TO_HIVE_RETRY_POLICY = RetryPolicy.builder() .handleIf(HadoopTestUtils::isErrorCommittingToHive) .withBackoff(1, 10, ChronoUnit.SECONDS) .withMaxRetries(30) - .onRetry(event -> log.warn(event.getLastFailure(), "Query failed on attempt %d, will retry.", event.getAttemptCount())); + .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount())) + .build(); private static boolean isErrorCommittingToHive(Throwable throwable) { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java index 2af3cc6588dd..a90f8f20e6b5 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java @@ -13,12 +13,12 @@ */ package io.trino.tests.product.utils; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.tempto.query.QueryExecutionException; import io.trino.tempto.query.QueryExecutor; import io.trino.tempto.query.QueryResult; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import java.sql.Connection; import java.time.temporal.ChronoUnit; @@ -123,11 +123,12 @@ public static QueryExecutor onDelta() // return 502 then as well. Handling this with a query retry allows us to use the cluster's autostart feature safely, // while keeping costs to a minimum. - RetryPolicy databricksRetryPolicy = new RetryPolicy() + RetryPolicy databricksRetryPolicy = RetryPolicy.builder() .handleIf(throwable -> throwable.getMessage().contains("HTTP Response code: 502")) .withBackoff(1, 10, ChronoUnit.SECONDS) .withMaxRetries(60) - .onRetry(event -> log.warn(event.getLastFailure(), "Query failed on attempt %d, will retry.", event.getAttemptCount())); + .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount())) + .build(); return new QueryExecutor() { diff --git a/testing/trino-testing-containers/pom.xml b/testing/trino-testing-containers/pom.xml index a5a308303d64..d335621c5f3c 100644 --- a/testing/trino-testing-containers/pom.xml +++ b/testing/trino-testing-containers/pom.xml @@ -37,6 +37,11 @@ guava + + dev.failsafe + failsafe + + io.minio minio @@ -52,11 +57,6 @@ - - net.jodah - failsafe - - org.rnorth.duct-tape duct-tape diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/BaseTestContainer.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/BaseTestContainer.java index 56723143b87b..49a42fff6557 100644 --- a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/BaseTestContainer.java +++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/BaseTestContainer.java @@ -16,10 +16,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.net.HostAndPort; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.testing.ResourcePresence; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -120,13 +120,14 @@ protected HostAndPort getMappedHostAndPortForExposedPort(int exposedPort) public void start() { - Failsafe.with(new RetryPolicy<>() + Failsafe.with(RetryPolicy.builder() .withMaxRetries(startupRetryLimit) .onRetry(event -> log.warn( "%s initialization failed (attempt %s), will retry. Exception: %s", this.getClass().getSimpleName(), event.getAttemptCount(), - event.getLastFailure().getMessage()))) + event.getLastException().getMessage())) + .build()) .get(() -> TestContainers.startOrReuse(this.container)); } diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/Minio.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/Minio.java index fcb3b7a7cd7d..6948b149517e 100644 --- a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/Minio.java +++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/Minio.java @@ -18,10 +18,10 @@ import com.google.common.collect.ImmutableSet; import com.google.common.net.HostAndPort; import com.google.common.reflect.ClassPath; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.testing.minio.MinioClient; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.testcontainers.containers.Network; import java.io.IOException; @@ -114,10 +114,11 @@ public void createBucket(String bucketName) try (MinioClient minioClient = createMinioClient()) { // use retry loop for minioClient.makeBucket as minio container tends to return "Server not initialized, please try again" error // for some time after starting up - RetryPolicy retryPolicy = new RetryPolicy<>() + RetryPolicy retryPolicy = RetryPolicy.builder() .withMaxDuration(Duration.of(2, MINUTES)) .withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration - .withDelay(Duration.of(10, SECONDS)); + .withDelay(Duration.of(10, SECONDS)) + .build(); Failsafe.with(retryPolicy).run(() -> minioClient.makeBucket(bucketName)); } } diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/wait/strategy/SelectedPortWaitStrategy.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/wait/strategy/SelectedPortWaitStrategy.java index 63da3e61f5f9..4e93cf69e1df 100644 --- a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/wait/strategy/SelectedPortWaitStrategy.java +++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/wait/strategy/SelectedPortWaitStrategy.java @@ -16,8 +16,8 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Ints; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import org.testcontainers.containers.ContainerLaunchException; import org.testcontainers.containers.wait.internal.ExternalPortListeningCheck; import org.testcontainers.containers.wait.internal.InternalCommandPortListeningCheck; @@ -63,10 +63,11 @@ protected void waitUntilReady() .collect(toImmutableSet()); Callable externalCheck = new ExternalPortListeningCheck(waitStrategyTarget, externalPorts); - Failsafe.with(new RetryPolicy<>() + Failsafe.with(RetryPolicy.builder() .withMaxDuration(startupTimeout) .withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration - .abortOn(e -> getExitCode().isPresent())) + .abortOn(e -> getExitCode().isPresent()) + .build()) .run(() -> { // Note: This condition requires a dependency on org.rnorth.duct-tape:duct-tape if (!getRateLimiter().getWhenReady(() -> internalCheck.call() && externalCheck.call())) { diff --git a/testing/trino-testing-kafka/pom.xml b/testing/trino-testing-kafka/pom.xml index b0f083c17535..ab5420e15eaa 100644 --- a/testing/trino-testing-kafka/pom.xml +++ b/testing/trino-testing-kafka/pom.xml @@ -43,7 +43,7 @@ - net.jodah + dev.failsafe failsafe diff --git a/testing/trino-testing-kafka/src/main/java/io/trino/testing/kafka/TestingKafka.java b/testing/trino-testing-kafka/src/main/java/io/trino/testing/kafka/TestingKafka.java index 276cd1f8090e..5df2d4242daf 100644 --- a/testing/trino-testing-kafka/src/main/java/io/trino/testing/kafka/TestingKafka.java +++ b/testing/trino-testing-kafka/src/main/java/io/trino/testing/kafka/TestingKafka.java @@ -16,10 +16,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; import com.google.common.util.concurrent.Futures; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.testing.ResourcePresence; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -230,10 +230,11 @@ public RecordMetadata sendMessages(Stream> recordStr private Future send(KafkaProducer producer, ProducerRecord record) { return Failsafe.with( - new RetryPolicy<>() - .onRetry(event -> log.warn(event.getLastFailure(), "Retrying message send")) + RetryPolicy.builder() + .onRetry(event -> log.warn(event.getLastException(), "Retrying message send")) .withMaxAttempts(10) - .withBackoff(1, 10_000, MILLIS)) + .withBackoff(1, 10_000, MILLIS) + .build()) .get(() -> producer.send(record)); }