Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery pr
{
ImmutableList.Builder<JdbcColumnHandle> columns = ImmutableList.builder();
try (Connection connection = connectionFactory.openConnection(session);
PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery)) {
PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.empty())) {
ResultSetMetaData metadata = preparedStatement.getMetaData();
if (metadata == null) {
throw new UnsupportedOperationException("Query not supported: ResultSetMetaData not available for query: " + preparedQuery.getQuery());
Expand Down Expand Up @@ -456,7 +456,7 @@ public PreparedStatement buildSql(ConnectorSession session, Connection connectio
throws SQLException
{
PreparedQuery preparedQuery = prepareQuery(session, connection, table, Optional.empty(), columns, ImmutableMap.of(), Optional.of(split));
return queryBuilder.prepareStatement(this, session, connection, preparedQuery);
return queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.of(columns.size()));
}

protected PreparedQuery prepareQuery(
Expand Down Expand Up @@ -1060,7 +1060,7 @@ public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle
}

@Override
public PreparedStatement getPreparedStatement(Connection connection, String sql)
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
return connection.prepareStatement(sql);
Expand Down Expand Up @@ -1292,7 +1292,7 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
handle.getRequiredNamedRelation(),
handle.getConstraint(),
getAdditionalPredicate(handle.getConstraintExpressions(), Optional.empty()));
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.empty())) {
return OptionalLong.of(preparedStatement.executeUpdate());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,10 @@ public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle
}

@Override
public PreparedStatement getPreparedStatement(Connection connection, String sql)
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
return delegate.getPreparedStatement(connection, sql);
return delegate.getPreparedStatement(connection, sql, columnCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ public PreparedStatement prepareStatement(
JdbcClient client,
ConnectorSession session,
Connection connection,
PreparedQuery preparedQuery)
PreparedQuery preparedQuery,
Optional<Integer> columnCount)
throws SQLException
{
String modifiedQuery = queryModifier.apply(session, preparedQuery.getQuery());
log.debug("Preparing query: %s", modifiedQuery);
PreparedStatement statement = client.getPreparedStatement(connection, modifiedQuery);
PreparedStatement statement = client.getPreparedStatement(connection, modifiedQuery, columnCount);

List<QueryParameter> parameters = preparedQuery.getParameters();
for (int i = 0; i < parameters.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle
}

@Override
public PreparedStatement getPreparedStatement(Connection connection, String sql)
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
return delegate().getPreparedStatement(connection, sql);
return delegate().getPreparedStatement(connection, sql, columnCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ default void setTableProperties(ConnectorSession session, JdbcTableHandle handle
Connection getConnection(ConnectorSession session, JdbcOutputTableHandle handle)
throws SQLException;

PreparedStatement getPreparedStatement(Connection connection, String sql)
PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ PreparedStatement prepareStatement(
JdbcClient client,
ConnectorSession session,
Connection connection,
PreparedQuery preparedQuery)
PreparedQuery preparedQuery,
Optional<Integer> columnCount)
throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle
}

@Override
public PreparedStatement getPreparedStatement(Connection connection, String sql)
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
return stats.getGetPreparedStatement().wrap(() -> delegate().getPreparedStatement(connection, sql));
return stats.getGetPreparedStatement().wrap(() -> delegate().getPreparedStatement(connection, sql, columnCount));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void testNormalBuildSql()
Connection connection = database.getConnection();

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), tupleDomain, Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand Down Expand Up @@ -284,7 +284,7 @@ public void testBuildSqlWithDomainComplement()
Map.of(),
tupleDomain,
Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(3))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_3\", \"col_9\" " +
"FROM \"test_table\" " +
Expand Down Expand Up @@ -316,7 +316,7 @@ public void testBuildSqlWithFloat()
Connection connection = database.getConnection();

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), tupleDomain, Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand Down Expand Up @@ -350,7 +350,7 @@ public void testBuildSqlWithVarchar()
Connection connection = database.getConnection();

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), tupleDomain, Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand Down Expand Up @@ -386,7 +386,7 @@ public void testBuildSqlWithChar()
Connection connection = database.getConnection();

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), tupleDomain, Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand Down Expand Up @@ -432,7 +432,7 @@ public void testBuildSqlWithDateTime()
Connection connection = database.getConnection();

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), tupleDomain, Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand Down Expand Up @@ -473,7 +473,7 @@ public void testBuildSqlWithTimestamp()
Connection connection = database.getConnection();

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), tupleDomain, Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand Down Expand Up @@ -513,7 +513,7 @@ public void testBuildJoinSql()
List.of(new JdbcJoinCondition(columns.get(7), JoinCondition.Operator.EQUAL, columns.get(8))),
Map.of(columns.get(2), "name1"),
Map.of(columns.get(3), "name2"));
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.empty())) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT l.\"col_2\" AS \"name1\", r.\"col_3\" AS \"name2\" FROM " +
"(SELECT * FROM \"test_table\") l " +
Expand All @@ -539,7 +539,7 @@ public void testBuildSqlWithLimit()

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), TupleDomain.all(), Optional.empty());
preparedQuery = preparedQuery.transformQuery(function);
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand All @@ -566,7 +566,7 @@ public void testEmptyBuildSql()
Connection connection = database.getConnection();

PreparedQuery preparedQuery = queryBuilder.prepareSelectQuery(jdbcClient, SESSION, connection, TEST_TABLE, Optional.empty(), columns, Map.of(), tupleDomain, Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.of(columns.size()))) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_0\", \"col_1\", \"col_2\", \"col_3\", \"col_4\", \"col_5\", " +
"\"col_6\", \"col_7\", \"col_8\", \"col_9\", \"col_10\", \"col_11\" " +
Expand Down Expand Up @@ -602,7 +602,7 @@ public void testAggregation()
Map.of("s", "sum(\"col_0\")"),
TupleDomain.all(),
Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.empty())) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_2\", sum(\"col_0\") AS \"s\" " +
"FROM \"test_table\" " +
Expand Down Expand Up @@ -645,7 +645,7 @@ public void testAggregationWithFilter()
Map.of("s", "sum(\"col_0\")"),
tupleDomain,
Optional.empty());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(jdbcClient, SESSION, connection, preparedQuery, Optional.empty())) {
assertThat(preparedQuery.getQuery()).isEqualTo("" +
"SELECT \"col_2\", sum(\"col_0\") AS \"s\" " +
"FROM \"test_table\" " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void abortReadConnection(Connection connection, ResultSet resultSet)
}

@Override
public PreparedStatement getPreparedStatement(Connection connection, String sql)
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
PreparedStatement statement = connection.prepareStatement(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,15 @@ protected boolean filterSchema(String schemaName)
}

@Override
public PreparedStatement getPreparedStatement(Connection connection, String sql)
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
PreparedStatement statement = connection.prepareStatement(sql);
statement.setFetchSize(1000);
// This is a heuristic, not exact science. A better formula can perhaps be found with measurements.
// Column count is not known for non-SELECT queries. Not setting fetch size for these.
if (columnCount.isPresent()) {
statement.setFetchSize(Math.min(100_000 / columnCount.get(), 1_000));
}
return statement;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public PreparedStatement prepareStatement(
columns,
ImmutableMap.of(),
split);
return queryBuilder.prepareStatement(this, session, connection, preparedQuery);
return queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.of(columns.size()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,17 @@ protected void renameTable(ConnectorSession session, Connection connection, Stri
}

@Override
public PreparedStatement getPreparedStatement(Connection connection, String sql)
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
// fetch-size is ignored when connection is in auto-commit
connection.setAutoCommit(false);
PreparedStatement statement = connection.prepareStatement(sql);
statement.setFetchSize(1000);
// This is a heuristic, not exact science. A better formula can perhaps be found with measurements.
// Column count is not known for non-SELECT queries. Not setting fetch size for these.
if (columnCount.isPresent()) {
statement.setFetchSize(Math.min(100_000 / columnCount.get(), 1_000));
}
return statement;
}

Expand Down Expand Up @@ -843,7 +847,7 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
handle.getRequiredNamedRelation(),
handle.getConstraint(),
getAdditionalPredicate(handle.getConstraintExpressions(), Optional.empty()));
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery)) {
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.empty())) {
int affectedRowsCount = preparedStatement.executeUpdate();
// In getPreparedStatement we set autocommit to false so here we need an explicit commit
connection.commit();
Expand Down
Loading