Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.log.Logger;
import io.trino.plugin.base.CatalogName;
Expand All @@ -39,11 +40,12 @@ public void configure(Binder binder)
{
binder.install(new MBeanServerModule());
binder.install(new MBeanModule());
binder.bind(StatisticsAwareConnectionFactory.class).in(Scopes.SINGLETON);

Provider<CatalogName> catalogName = binder.getProvider(CatalogName.class);
newExporter(binder).export(Key.get(JdbcClient.class, StatsCollecting.class))
.as(generator -> generator.generatedNameOf(JdbcClient.class, catalogName.get().toString()));
newExporter(binder).export(Key.get(ConnectionFactory.class, StatsCollecting.class))
newExporter(binder).export(StatisticsAwareConnectionFactory.class)
.as(generator -> generator.generatedNameOf(ConnectionFactory.class, catalogName.get().toString()));
newExporter(binder).export(JdbcClient.class)
.as(generator -> generator.generatedNameOf(CachingJdbcClient.class, catalogName.get().toString()));
Expand All @@ -65,12 +67,4 @@ public JdbcClient createJdbcClientWithStats(@ForBaseJdbc JdbcClient client, Cata
return client;
}));
}

@Provides
@Singleton
@StatsCollecting
public static ConnectionFactory createConnectionFactoryWithStats(@ForBaseJdbc ConnectionFactory connectionFactory)
{
return new StatisticsAwareConnectionFactory(connectionFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void setup(Binder binder)
install(new JdbcDiagnosticModule());
install(new IdentifierMappingModule());
install(new RemoteQueryModifierModule());
install(new RetryingConnectionFactoryModule());

newOptionalBinder(binder, ConnectorAccessControl.class);
newOptionalBinder(binder, QueryBuilder.class).setDefault().to(DefaultQueryBuilder.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -88,10 +89,6 @@ public void setup(Binder binder)

newSetBinder(binder, ConnectorTableFunction.class);

binder.bind(ConnectionFactory.class)
.annotatedWith(ForLazyConnectionFactory.class)
.to(Key.get(ConnectionFactory.class, StatsCollecting.class))
.in(Scopes.SINGLETON);
install(conditionalModule(
QueryConfig.class,
QueryConfig::isReuseConnection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class LazyConnectionFactory
private final ConnectionFactory delegate;

@Inject
public LazyConnectionFactory(@ForLazyConnectionFactory ConnectionFactory delegate)
public LazyConnectionFactory(RetryingConnectionFactory delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
package io.trino.plugin.jdbc;

import com.google.common.base.Throwables;
import com.google.inject.Inject;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;

import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.SECONDS;
Expand All @@ -31,27 +33,30 @@
public class RetryingConnectionFactory
implements ConnectionFactory
{
private static final RetryPolicy<Object> RETRY_POLICY = RetryPolicy.builder()
.withMaxDuration(java.time.Duration.of(30, SECONDS))
.withMaxAttempts(5)
.withBackoff(50, 5_000, MILLIS, 4)
.handleIf(RetryingConnectionFactory::isSqlRecoverableException)
.abortOn(TrinoException.class)
.build();
private final RetryPolicy<Object> retryPolicy;

private final ConnectionFactory delegate;

public RetryingConnectionFactory(ConnectionFactory delegate)
@Inject
public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate, RetryStrategy retryStrategy)
{
requireNonNull(retryStrategy);
this.delegate = requireNonNull(delegate, "delegate is null");
this.retryPolicy = RetryPolicy.builder()
.withMaxDuration(java.time.Duration.of(30, SECONDS))
.withMaxAttempts(5)
.withBackoff(50, 5_000, MILLIS, 4)
.handleIf(retryStrategy::isExceptionRecoverable)
.abortOn(TrinoException.class)
.build();
}

@Override
public Connection openConnection(ConnectorSession session)
throws SQLException
{
try {
return Failsafe.with(RETRY_POLICY)
return Failsafe.with(retryPolicy)
.get(() -> delegate.openConnection(session));
}
catch (FailsafeException ex) {
Expand All @@ -69,9 +74,19 @@ public void close()
delegate.close();
}

private static boolean isSqlRecoverableException(Throwable exception)
public interface RetryStrategy
{
return Throwables.getCausalChain(exception).stream()
.anyMatch(SQLRecoverableException.class::isInstance);
boolean isExceptionRecoverable(Throwable exception);
}

public static class DefaultRetryStrategy
implements RetryStrategy
{
@Override
public boolean isExceptionRecoverable(Throwable exception)
{
return Throwables.getCausalChain(exception).stream()
.anyMatch(SQLTransientException.class::isInstance);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy;
import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy;

import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;

public class RetryingConnectionFactoryModule
extends AbstractModule
{
@Override
public void configure()
{
bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder(), RetryStrategy.class)
.setDefault()
.to(DefaultRetryStrategy.class)
.in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.jdbc.jmx;

import com.google.inject.Inject;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.spi.connector.ConnectorSession;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
Expand All @@ -30,7 +32,8 @@ public class StatisticsAwareConnectionFactory
private final JdbcApiStats closeConnection = new JdbcApiStats();
private final ConnectionFactory delegate;

public StatisticsAwareConnectionFactory(ConnectionFactory delegate)
@Inject
public StatisticsAwareConnectionFactory(@ForBaseJdbc ConnectionFactory delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.jdbc;

import com.google.inject.Guice;
import com.google.inject.Injector;
import io.trino.plugin.jdbc.credential.EmptyCredentialProvider;
import org.h2.Driver;
import org.junit.jupiter.api.Test;
Expand All @@ -30,11 +32,15 @@ public class TestLazyConnectionFactory
public void testNoConnectionIsCreated()
throws Exception
{
ConnectionFactory failingConnectionFactory = session -> {
throw new AssertionError("Expected no connection creation");
};
Injector injector = Guice.createInjector(binder -> {
binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance(
session -> {
throw new AssertionError("Expected no connection creation");
});
binder.install(new RetryingConnectionFactoryModule());
});

try (LazyConnectionFactory lazyConnectionFactory = new LazyConnectionFactory(failingConnectionFactory);
try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class);
Connection ignored = lazyConnectionFactory.openConnection(SESSION)) {
// no-op
}
Expand All @@ -47,8 +53,13 @@ public void testConnectionCannotBeReusedAfterClose()
BaseJdbcConfig config = new BaseJdbcConfig()
.setConnectionUrl(format("jdbc:h2:mem:test%s;DB_CLOSE_DELAY=-1", System.nanoTime() + ThreadLocalRandom.current().nextLong()));

try (DriverConnectionFactory h2ConnectionFactory = new DriverConnectionFactory(new Driver(), config, new EmptyCredentialProvider());
LazyConnectionFactory lazyConnectionFactory = new LazyConnectionFactory(h2ConnectionFactory)) {
Injector injector = Guice.createInjector(binder -> {
binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance(
new DriverConnectionFactory(new Driver(), config, new EmptyCredentialProvider()));
binder.install(new RetryingConnectionFactoryModule());
});

try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class)) {
Connection connection = lazyConnectionFactory.openConnection(SESSION);
connection.close();
assertThatThrownBy(() -> connection.createStatement())
Expand Down
Loading