Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.trino.spi.connector.ConnectorSession;

import java.sql.Connection;
import java.sql.SQLException;

import static java.util.Objects.requireNonNull;

public final class ConfiguringConnectionFactory
implements ConnectionFactory
{
private final ConnectionFactory delegate;
private final Configurator configurator;

public ConfiguringConnectionFactory(ConnectionFactory delegate, Configurator configurator)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.configurator = requireNonNull(configurator, "configurator is null");
}

@Override
public Connection openConnection(ConnectorSession session)
throws SQLException
{
Connection connection = delegate.openConnection(session);
try {
configurator.configure(connection);
}
catch (SQLException | RuntimeException e) {
try (connection) {
throw e;
}
}
return connection;
}

@Override
public void close()
throws SQLException
{
delegate.close();
}

@FunctionalInterface
public interface Configurator
{
void configure(Connection connection)
throws SQLException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
throw new TrinoException(JDBC_ERROR, e);
}

try {
connection.setAutoCommit(false);
}
catch (SQLException e) {
closeWithSuppression(connection, e);
throw new TrinoException(JDBC_ERROR, e);
}

columnTypes = handle.getColumnTypes();

if (handle.getJdbcColumnTypes().isEmpty()) {
Expand Down Expand Up @@ -92,6 +84,8 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
}

try {
// Per JDBC specification, auto-commit mode is the default. Verify that in case pooling or custom ConnectionFactory is used.
verify(connection.getAutoCommit(), "Connection not in auto-commit");
statement = connection.prepareStatement(jdbcClient.buildInsertSql(handle, columnWriters));
}
catch (SQLException e) {
Expand All @@ -114,8 +108,6 @@ public CompletableFuture<?> appendPage(Page page)

if (batchSize >= 1000) {
statement.executeBatch();
connection.commit();
connection.setAutoCommit(false);
batchSize = 0;
}
}
Expand Down Expand Up @@ -165,7 +157,6 @@ public CompletableFuture<Collection<Slice>> finish()
PreparedStatement statement = this.statement) {
if (batchSize > 0) {
statement.executeBatch();
connection.commit();
}
}
catch (SQLNonTransientException e) {
Expand All @@ -186,17 +177,12 @@ public CompletableFuture<Collection<Slice>> finish()
return completedFuture(ImmutableList.of());
}

@SuppressWarnings("unused")
@Override
public void abort()
{
// rollback and close
try (Connection connection = this.connection;
Comment thread
findepi marked this conversation as resolved.
Outdated
PreparedStatement statement = this.statement) {
// skip rollback if implicitly closed due to an error
if (!connection.isClosed()) {
connection.rollback();
}
// close statement and connection
try (connection) {
statement.close();
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ public void configure(Binder binder)
@ForBaseJdbc
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider)
{
return new DriverConnectionFactory(new ClickHouseDriver(), config, credentialProvider);
return new ClickHouseConnectionFactory(new DriverConnectionFactory(new ClickHouseDriver(), config, credentialProvider));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.clickhouse;

import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.ForwardingConnection;
import io.trino.spi.connector.ConnectorSession;

import javax.annotation.PreDestroy;

import java.sql.Connection;
import java.sql.SQLException;

import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;

public class ClickHouseConnectionFactory
implements ConnectionFactory
{
private final ConnectionFactory delegate;

public ClickHouseConnectionFactory(ConnectionFactory delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public Connection openConnection(ConnectorSession session)
throws SQLException
{
return new ForwardingConnection()
{
private final Connection delegate = ClickHouseConnectionFactory.this.delegate.openConnection(session);

@Override
protected Connection getDelegate()
{
return delegate;
}

@Override
public boolean getAutoCommit()
throws SQLException
{
// ClickHouse's Connection (ru.yandex.clickhouse.ClickHouseConnectionImpl) ignores setAutoCommit, commit and rollback,
// but still returns false from getAutoCommit().
// TODO once https://github.com/ClickHouse/clickhouse-jdbc/issues/657 is solved, remove the workaround.
verify(!delegate.getAutoCommit(), "ClickHouse connection declared auto-commit mode, the code needs update");
return true;
}
};
}

@Override
@PreDestroy
public void close()
throws SQLException
{
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
import io.trino.plugin.base.classloader.ForClassLoaderSafe;
import io.trino.plugin.jdbc.ConfiguringConnectionFactory;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
Expand Down Expand Up @@ -52,7 +53,6 @@

import javax.annotation.PreDestroy;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -125,11 +125,17 @@ private void checkConfiguration(String connectionUrl)
public ConnectionFactory getConnectionFactory(PhoenixConfig config)
throws SQLException
{
return new DriverConnectionFactory(
DriverManager.getDriver(config.getConnectionUrl()),
config.getConnectionUrl(),
getConnectionProperties(config),
new EmptyCredentialProvider());
return new ConfiguringConnectionFactory(
new DriverConnectionFactory(
PhoenixDriver.INSTANCE, // Note: for some reason new PhoenixDriver won't work.
config.getConnectionUrl(),
getConnectionProperties(config),
new EmptyCredentialProvider()),
connection -> {
// Per JDBC spec, a Driver is expected to have new connections in auto-commit mode.
// This seems not to be true for PhoenixDriver, so we need to be explicit here.
connection.setAutoCommit(true);
});
}

public static Properties getConnectionProperties(PhoenixConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
import io.trino.plugin.base.classloader.ForClassLoaderSafe;
import io.trino.plugin.jdbc.ConfiguringConnectionFactory;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
Expand Down Expand Up @@ -52,7 +53,6 @@

import javax.annotation.PreDestroy;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -125,11 +125,17 @@ private void checkConfiguration(String connectionUrl)
public ConnectionFactory getConnectionFactory(PhoenixConfig config)
throws SQLException
{
return new DriverConnectionFactory(
DriverManager.getDriver(config.getConnectionUrl()),
config.getConnectionUrl(),
getConnectionProperties(config),
new EmptyCredentialProvider());
return new ConfiguringConnectionFactory(
new DriverConnectionFactory(
PhoenixDriver.INSTANCE, // Note: for some reason new PhoenixDriver won't work.
config.getConnectionUrl(),
getConnectionProperties(config),
new EmptyCredentialProvider()),
connection -> {
// Per JDBC spec, a Driver is expected to have new connections in auto-commit mode.
// This seems not to be true for PhoenixDriver, so we need to be explicit here.
connection.setAutoCommit(true);
});
}

public static Properties getConnectionProperties(PhoenixConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ protected void renameTable(ConnectorSession session, String catalogName, String
public PreparedStatement getPreparedStatement(Connection connection, String sql)
throws SQLException
{
// fetch-size is ignored when connection is in auto-commit
connection.setAutoCommit(false);
PreparedStatement statement = connection.prepareStatement(sql);
statement.setFetchSize(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ protected void renameTable(ConnectorSession session, String catalogName, String
public PreparedStatement getPreparedStatement(Connection connection, String sql)
throws SQLException
{
// In PostgreSQL, fetch-size is ignored when connection is in auto-commit. Redshift JDBC documentation does not state this requirement
// but it still links to https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor for more information, which states
// that.
connection.setAutoCommit(false);
PreparedStatement statement = connection.prepareStatement(sql);
statement.setFetchSize(1000);
Expand Down