Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.LegacyConfig;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

public class JdbcMetadataConfig
{
static final int MAX_ALLOWED_INSERT_BATCH_SIZE = 1_000_000;
Comment thread
sergey-melnychuk marked this conversation as resolved.
Outdated

private boolean allowDropTable;
/*
* Join pushdown is disabled by default as this is the safer option.
Expand All @@ -40,6 +43,8 @@ public class JdbcMetadataConfig
// between performance and pushdown capabilities
private int domainCompactionThreshold = 32;

private int insertBatchSize = 1000;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't belong to JdbcMetadataConfig, since it's not consumed in JdbcMetadata layer

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is mostly because there were no session properties contributed by BaseJdbcConfig.
Maybe this should change? WDYT?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hashhar, @findepi,

Thank you for the discussion. I'm currently working with Trino 465 (OSS) and using the JDBC connector to insert data into SQL Server.

I've observed that even after setting write_batch_size = 5000, the inserts are still performed row-by-row in SQL Server Profiler (sp_execute per row), which impacts performance significantly for large datasets.

From my analysis:

  • The property insert_batch_size was introduced in PR Add configurable insert_batch_size JDBC session property #8434 to control batch size.
  • However, in Open Source Trino, this property is not exposed as a session property (SET SESSION insert_batch_size = ...).
  • Instead, write_batch_size is used, but it seems to be ignored or not applied effectively in some cases.

My question:
Is there a known reason why write_batch_size doesn't lead to true batched inserts (e.g., multi-row VALUES or bulk operations) when using the JDBC connector?

I'd appreciate any insights or guidance on how to achieve better batching behavior.

Thanks for your time and contributions to the project!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi !! The property was renamed write_batch_size so we could SET SESSION <catalog_name>.write_batch_size-... for configuring it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_batch_size is more of a JDBC specific generic property but it doesn't guarantee that writes operations are performed in bulk - it depends on how the JDBC driver is being implemented. In case of SQLServer sp_execute could be invoked per row. Have we tried by setting bulkInsertCopy property in SQLServer - https://learn.microsoft.com/en-us/sql/connect/jdbc/using-bulk-copy-with-the-jdbc-driver?view=sql-server-ver17#sqlserverbulkcopyoptions

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Praveen2112,

Thank you for the clarification!

I understand that write_batch_size is a JDBC-specific property and doesn't guarantee true bulk inserts — it only controls batch size for parameterized statements.

However, I'm trying to achieve real bulk insert performance (like BULK INSERT or bcp) when inserting 1.5M rows from Trino to MSSQL.

My goal is to see in SQL Server Profiler:

BULK INSERT [trino_test].[aaaaaa].[orders] 
FROM 'virtual_stream' 
WITH (TABLOCK, BATCHSIZE = 1000)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please advise what configuration changes are required in Trino 465 (Open Source) to enable this behavior?

I've already confirmed that:
dbs-ai-sqldev_trino-test.bulk_copy_for_write = true is set in the catalog
retry_policy = 'NONE' is set in the session
Despite this, I still observe row-by-row inserts via sp_execute in SQL Profiler.

Is there any additional configuration or known limitation in Open Source Trino that prevents the use of SQLServerBulkCopy API?


public boolean isAllowDropTable()
{
return allowDropTable;
Expand Down Expand Up @@ -107,4 +112,19 @@ public JdbcMetadataConfig setDomainCompactionThreshold(int domainCompactionThres
this.domainCompactionThreshold = domainCompactionThreshold;
return this;
}

@Min(1)
@Max(MAX_ALLOWED_INSERT_BATCH_SIZE)
public int getInsertBatchSize()
{
return insertBatchSize;
}

@Config("insert.batch-size")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it applicable to CREATE TABLE AS as well?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The PageSink gets used there too. write.batch-size?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConfigDescription("Maximum number of rows to insert in a single batch")
Comment thread
sergey-melnychuk marked this conversation as resolved.
Outdated
public JdbcMetadataConfig setInsertBatchSize(int insertBatchSize)
{
this.insertBatchSize = insertBatchSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Optional;

import static io.trino.plugin.jdbc.JdbcMetadataConfig.MAX_ALLOWED_INSERT_BATCH_SIZE;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand All @@ -36,6 +37,7 @@ public class JdbcMetadataSessionProperties
public static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled";
public static final String TOPN_PUSHDOWN_ENABLED = "topn_pushdown_enabled";
public static final String DOMAIN_COMPACTION_THRESHOLD = "domain_compaction_threshold";
public static final String INSERT_BATCH_SIZE = "insert_batch_size";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here -- it shouldn't be in this class, since it's not consumed by metadata layer


private final List<PropertyMetadata<?>> properties;

Expand Down Expand Up @@ -65,6 +67,12 @@ public JdbcMetadataSessionProperties(JdbcMetadataConfig jdbcMetadataConfig, @Max
"Enable TopN pushdown",
jdbcMetadataConfig.isTopNPushdownEnabled(),
false))
.add(integerProperty(
Comment thread
sergey-melnychuk marked this conversation as resolved.
Outdated
INSERT_BATCH_SIZE,
"Insert batch size",
jdbcMetadataConfig.getInsertBatchSize(),
value -> validateInsertBatchSize(value, MAX_ALLOWED_INSERT_BATCH_SIZE),
false))
.build();
}

Expand Down Expand Up @@ -94,6 +102,11 @@ public static int getDomainCompactionThreshold(ConnectorSession session)
return session.getProperty(DOMAIN_COMPACTION_THRESHOLD, Integer.class);
}

public static int getInsertBatchSize(ConnectorSession session)
{
return session.getProperty(INSERT_BATCH_SIZE, Integer.class);
}

private static void validateDomainCompactionThreshold(int domainCompactionThreshold, Optional<Integer> maxDomainCompactionThreshold)
{
if (domainCompactionThreshold < 1) {
Expand All @@ -106,4 +119,14 @@ private static void validateDomainCompactionThreshold(int domainCompactionThresh
}
});
}

private static void validateInsertBatchSize(int maxBatchSize, int maxAllowedBatchSize)
{
if (maxBatchSize < 1) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be greater than 0: %s", INSERT_BATCH_SIZE, maxBatchSize));
}
if (maxBatchSize > maxAllowedBatchSize) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s cannot exceed %s: %s", INSERT_BATCH_SIZE, maxAllowedBatchSize, maxBatchSize));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_NON_TRANSIENT_ERROR;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.getInsertBatchSize;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.concurrent.CompletableFuture.completedFuture;

Expand All @@ -46,6 +47,7 @@ public class JdbcPageSink

private final List<Type> columnTypes;
private final List<WriteFunction> columnWriters;
private final int maxBatchSize;
private int batchSize;
Comment thread
sergey-melnychuk marked this conversation as resolved.
Outdated

public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient)
Expand Down Expand Up @@ -92,6 +94,8 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
closeWithSuppression(connection, e);
throw new TrinoException(JDBC_ERROR, e);
}

this.maxBatchSize = getInsertBatchSize(session);
}

@Override
Expand All @@ -106,7 +110,7 @@ public CompletableFuture<?> appendPage(Page page)
statement.addBatch();
batchSize++;

if (batchSize >= 1000) {
if (batchSize >= maxBatchSize) {
statement.executeBatch();
batchSize = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -1278,4 +1282,50 @@ public void testDeleteWithVarcharPredicate()
{
throw new SkipException("This is implemented by testDeleteWithVarcharEqualityPredicate");
}

@Test(dataProvider = "testInsertBatchSizeSessionProperty")
public void testInsertBatchSizeSessionProperty(Integer batchSize, Integer numberOfRows)
{
if (!hasBehavior(SUPPORTS_CREATE_TABLE)) {
throw new SkipException("CREATE TABLE is required for insert_batch_size test but is not supported");
}
Session session = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "insert_batch_size", batchSize.toString())
.build();

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_insert_batch_size",
"(a varchar(36), b bigint)")) {
String values = String.join(",", makeValuesForInsertBatchSizeSessionPropertyTest(numberOfRows));
assertUpdate(session, "INSERT INTO " + table.getName() + " (a, b) VALUES " + values, numberOfRows);
Comment thread
sergey-melnychuk marked this conversation as resolved.
Outdated
assertQuery("SELECT COUNT(*) FROM " + table.getName(), format("VALUES %d", numberOfRows));
}
}

private static List<String> makeValuesForInsertBatchSizeSessionPropertyTest(int numberOfRows)
{
List<String> result = new ArrayList<>(numberOfRows);
for (int i = 0; i < numberOfRows; i++) {
result.add(format("('%s', %d)", UUID.randomUUID(), ThreadLocalRandom.current().nextLong()));
}
return result;
}

@DataProvider(name = "testInsertBatchSizeSessionProperty")
public static Object[][] batchSizeAndNumberOfRowsForInsertBatchSizePropertyTest()
Comment thread
sergey-melnychuk marked this conversation as resolved.
{
return new Object[][] {
{100, 64},
{100, 100},
{100, 512},
{100, 1000},
{1000, 100},
{1000, 1000},
{1000, 5000},
{10000, 1000},
{10000, 5000},
{10000, 15000},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableMap;
import io.airlift.configuration.ConfigurationFactory;
import org.testng.annotations.Test;

import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestJdbcMetadataConfig
{
Expand All @@ -32,7 +35,8 @@ public void testDefaults()
.setJoinPushdownEnabled(false)
.setAggregationPushdownEnabled(true)
.setTopNPushdownEnabled(true)
.setDomainCompactionThreshold(32));
.setDomainCompactionThreshold(32)
.setInsertBatchSize(1000));
}

@Test
Expand All @@ -44,15 +48,38 @@ public void testExplicitPropertyMappings()
.put("aggregation-pushdown.enabled", "false")
.put("domain-compaction-threshold", "42")
.put("topn-pushdown.enabled", "false")
.put("insert.batch-size", "24")
.build();

JdbcMetadataConfig expected = new JdbcMetadataConfig()
.setAllowDropTable(true)
.setJoinPushdownEnabled(true)
.setAggregationPushdownEnabled(false)
.setTopNPushdownEnabled(false)
.setDomainCompactionThreshold(42);
.setDomainCompactionThreshold(42)
.setInsertBatchSize(24);

assertFullMapping(properties, expected);
}

@Test
public void testInsertBatchSizeValidation()
{
assertThatThrownBy(() -> makeConfig(ImmutableMap.of("insert.batch-size", "-42")))
.hasMessageContaining("insert.batch-size: must be greater than or equal to 1");

assertThatThrownBy(() -> makeConfig(ImmutableMap.of("insert.batch-size", "0")))
.hasMessageContaining("insert.batch-size: must be greater than or equal to 1");

assertThatCode(() -> makeConfig(ImmutableMap.of("insert.batch-size", "1")))
.doesNotThrowAnyException();

assertThatCode(() -> makeConfig(ImmutableMap.of("insert.batch-size", "42")))
.doesNotThrowAnyException();
}

private static JdbcMetadataConfig makeConfig(Map<String, String> props)
{
return new ConfigurationFactory(props).build(JdbcMetadataConfig.class);
}
}