Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/trino-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import io.trino.client.JsonCodec;
import io.trino.client.JsonResponse;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.FailsafeException;
import net.jodah.failsafe.RetryPolicy;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand Down Expand Up @@ -75,11 +75,12 @@ public HttpTokenPoller(OkHttpClient client, Consumer<OkHttpClient.Builder> refre
public TokenPollResult pollForToken(URI tokenUri, Duration timeout)
{
try {
return Failsafe.with(new RetryPolicy<TokenPollResult>()
return Failsafe.with(RetryPolicy.builder()
.withMaxAttempts(-1)
.withMaxDuration(timeout)
.withBackoff(100, 500, MILLIS)
.handle(IOException.class))
.handle(IOException.class)
.build())
.get(() -> executePoll(prepareRequestBuilder(tokenUri).build()));
}
catch (FailsafeException e) {
Expand All @@ -94,11 +95,12 @@ public TokenPollResult pollForToken(URI tokenUri, Duration timeout)
public void tokenReceived(URI tokenUri)
{
try {
Failsafe.with(new RetryPolicy<Integer>()
Failsafe.with(RetryPolicy.<Integer>builder()
.withMaxAttempts(-1)
.withMaxDuration(Duration.ofSeconds(4))
.withBackoff(100, 500, MILLIS)
.handleResultIf(code -> code >= HTTP_INTERNAL_ERROR))
.handleResultIf(code -> code >= HTTP_INTERNAL_ERROR)
.build())
.get(() -> {
Request request = prepareRequestBuilder(tokenUri)
.delete()
Expand Down
4 changes: 2 additions & 2 deletions client/trino-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@
<shadedPattern>${shadeBase}.okio</shadedPattern>
</relocation>
<relocation>
<pattern>net.jodah.failsafe</pattern>
<shadedPattern>${shadeBase}.net.jodah.failsafe</shadedPattern>
<pattern>dev.failsafe</pattern>
<shadedPattern>${shadeBase}.dev.failsafe</shadedPattern>
</relocation>
</relocations>
<filters>
Expand Down
10 changes: 5 additions & 5 deletions core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@
<version>1.15</version>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
Expand Down Expand Up @@ -302,11 +307,6 @@
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.nimbusds.oauth2.sdk.id.Issuer;
import com.nimbusds.openid.connect.sdk.op.OIDCProviderConfigurationRequest;
import com.nimbusds.openid.connect.sdk.op.OIDCProviderMetadata;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.log.Logger;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;

import javax.inject.Inject;

Expand Down Expand Up @@ -77,12 +77,13 @@ public OidcDiscovery(OAuth2Config oauthConfig, OidcDiscoveryConfig oidcConfig, N
@Override
public OAuth2ServerConfig get()
{
return Failsafe.with(new RetryPolicy<>()
return Failsafe.with(RetryPolicy.builder()
.withMaxAttempts(-1)
.withMaxDuration(discoveryTimeout)
.withDelay(Duration.ofSeconds(1))
.abortOn(IllegalStateException.class)
.onFailedAttempt(attempt -> LOG.debug("OpenID Connect Metadata read failed: %s", attempt.getLastFailure())))
.onFailedAttempt(attempt -> LOG.debug("OpenID Connect Metadata read failed: %s", attempt.getLastException()))
.build())
.get(() -> httpClient.execute(new OIDCProviderConfigurationRequest(issuer), this::parseConfigurationResponse));
}

Expand Down
10 changes: 5 additions & 5 deletions plugin/trino-base-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
Expand All @@ -114,11 +119,6 @@
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package io.trino.plugin.jdbc;

import com.google.common.base.Throwables;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.FailsafeException;
import net.jodah.failsafe.RetryPolicy;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -31,12 +31,13 @@
public class RetryingConnectionFactory
implements ConnectionFactory
{
private static final RetryPolicy<Object> RETRY_POLICY = new RetryPolicy<>()
private static final RetryPolicy<Object> RETRY_POLICY = RetryPolicy.builder()
.withMaxDuration(java.time.Duration.of(30, SECONDS))
.withMaxAttempts(5)
.withBackoff(50, 5_000, MILLIS, 4)
.handleIf(RetryingConnectionFactory::isSqlRecoverableException)
.abortOn(TrinoException.class);
.abortOn(TrinoException.class)
.build();

private final ConnectionFactory delegate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableSet;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.airlift.log.Logger;
import io.trino.plugin.base.aggregation.AggregateFunctionRewriter;
import io.trino.plugin.jdbc.aggregation.ImplementCountAll;
Expand All @@ -31,8 +33,6 @@
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;

import java.sql.Connection;
import java.sql.ResultSet;
Expand Down Expand Up @@ -97,9 +97,10 @@ public TestingH2JdbcClient(BaseJdbcConfig config, ConnectionFactory connectionFa
public Collection<String> listSchemas(Connection connection)
{
// listing schemas in H2 may fail with NullPointerException when a schema is concurrently dropped
return Failsafe.with(new RetryPolicy<Collection<String>>()
return Failsafe.with(RetryPolicy.<Collection<String>>builder()
.withMaxAttempts(100)
.onRetry(event -> log.warn(event.getLastFailure(), "Failed to list schemas, retrying")))
.onRetry(event -> log.warn(event.getLastException(), "Failed to list schemas, retrying"))
.build())
.get(() -> super.listSchemas(connection));
}

Expand Down
10 changes: 5 additions & 5 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand All @@ -231,11 +236,6 @@
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-compression</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.protobuf.ByteString;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

Expand Down Expand Up @@ -108,11 +108,12 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List<St
.setMaxStreamCount(parallelism)
.build();

return Failsafe.with(new RetryPolicy<>()
return Failsafe.with(RetryPolicy.builder()
.withMaxRetries(maxCreateReadSessionRetries)
.withBackoff(10, 500, MILLIS)
.onRetry(event -> log.debug("Request failed, retrying: %s", event.getLastFailure()))
.abortOn(failure -> !BigQueryUtil.isRetryable(failure)))
.onRetry(event -> log.debug("Request failed, retrying: %s", event.getLastException()))
.abortOn(failure -> !BigQueryUtil.isRetryable(failure))
.build())
.get(() -> bigQueryReadClient.createReadSession(createReadSessionRequest));
}
}
Expand Down
10 changes: 5 additions & 5 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand All @@ -169,11 +174,6 @@
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.log.Logger;
import io.trino.filesystem.TrinoFileSystem;
Expand All @@ -29,8 +31,6 @@
import io.trino.spi.type.Decimals;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -219,14 +219,15 @@ public static Object deserializeColumnValue(DeltaLakeColumnHandle column, String

static Optional<LastCheckpoint> readLastCheckpoint(TrinoFileSystem fileSystem, Path tableLocation)
{
return Failsafe.with(new RetryPolicy<>()
return Failsafe.with(RetryPolicy.builder()
.withMaxRetries(5)
.withDelay(Duration.ofSeconds(1))
.onRetry(event -> {
// The _last_checkpoint file is malformed, it's probably in the middle of a rewrite (file rewrites on Azure are NOT atomic)
// Retry several times with a short delay, and if that fails, fall back to manually finding latest checkpoint.
log.debug(event.getLastFailure(), "Failure when accessing last checkpoint information, will be retried");
}))
log.debug(event.getLastException(), "Failure when accessing last checkpoint information, will be retried");
})
.build())
.get(() -> tryReadLastCheckpoint(fileSystem, tableLocation));
}

Expand Down
10 changes: 5 additions & 5 deletions plugin/trino-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
Expand All @@ -112,11 +117,6 @@
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
package io.trino.plugin.elasticsearch.client;

import com.google.common.base.Stopwatch;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.event.ExecutionCompletedEvent;
import dev.failsafe.function.CheckedSupplier;
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.trino.plugin.elasticsearch.ElasticsearchConfig;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.FailsafeException;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.event.ExecutionAttemptedEvent;
import net.jodah.failsafe.event.ExecutionCompletedEvent;
import net.jodah.failsafe.function.CheckedSupplier;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
Expand Down Expand Up @@ -53,15 +53,16 @@ public BackpressureRestClient(RestClient delegate, ElasticsearchConfig config, T
{
this.delegate = requireNonNull(delegate, "restClient is null");
this.backpressureStats = requireNonNull(backpressureStats, "backpressureStats is null");
retryPolicy = new RetryPolicy<Response>()
retryPolicy = RetryPolicy.<Response>builder()
.withMaxAttempts(-1)
.withMaxDuration(java.time.Duration.ofMillis(config.getMaxRetryTime().toMillis()))
.withBackoff(config.getBackoffInitDelay().toMillis(), config.getBackoffMaxDelay().toMillis(), MILLIS)
.withJitter(0.125)
.handleIf(BackpressureRestClient::isBackpressure)
.onFailedAttempt(this::onFailedAttempt)
.onSuccess(this::onComplete)
.onFailure(this::onComplete);
.onFailure(this::onComplete)
.build();
}

public void setHosts(HttpHost... hosts)
Expand Down Expand Up @@ -119,7 +120,7 @@ private Response executeWithRetries(CheckedSupplier<Response> supplier)

private void onFailedAttempt(ExecutionAttemptedEvent<Response> executionAttemptedEvent)
{
log.debug("REST attempt failed: %s", executionAttemptedEvent.getLastFailure());
log.debug("REST attempt failed: %s", executionAttemptedEvent.getLastException());
if (!stopwatch.get().isRunning()) {
stopwatch.get().start();
}
Expand Down
Loading