-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Make JDBC write parallelism configurable #16280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,10 @@ | |
| public class JdbcWriteConfig | ||
| { | ||
| public static final int MAX_ALLOWED_WRITE_BATCH_SIZE = 10_000_000; | ||
| static final int DEFAULT_WRITE_PARALELLISM = 8; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. by default
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 100 means unbounded. for jdbc the higher number of writers, the bigger number of connections opened/acquired/held per write which can result in high lock contention on the database side. |
||
|
|
||
| private int writeBatchSize = 1000; | ||
| private int writeParallelism = DEFAULT_WRITE_PARALELLISM; | ||
|
wendigo marked this conversation as resolved.
Outdated
|
||
|
|
||
| // Do not create temporary table during insert. | ||
| // This means that the write operation can fail and leave the table in an inconsistent state. | ||
|
|
@@ -57,4 +59,19 @@ public JdbcWriteConfig setNonTransactionalInsert(boolean nonTransactionalInsert) | |
| this.nonTransactionalInsert = nonTransactionalInsert; | ||
| return this; | ||
| } | ||
|
|
||
| @Min(1) | ||
| @Max(128) | ||
|
wendigo marked this conversation as resolved.
Outdated
|
||
| public int getWriteParallelism() | ||
| { | ||
| return writeParallelism; | ||
| } | ||
|
|
||
| @Config("write.parallelism") | ||
| @ConfigDescription("Maximum number of parallel write tasks") | ||
| public JdbcWriteConfig setWriteParallelism(int writeParallelism) | ||
| { | ||
| this.writeParallelism = writeParallelism; | ||
| return this; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ | |
| import io.trino.sql.planner.plan.OutputNode; | ||
| import io.trino.sql.planner.plan.ProjectNode; | ||
| import io.trino.sql.planner.plan.TableScanNode; | ||
| import io.trino.sql.planner.plan.TableWriterNode; | ||
| import io.trino.sql.planner.plan.TopNNode; | ||
| import io.trino.sql.planner.plan.ValuesNode; | ||
| import io.trino.sql.query.QueryAssertions.QueryAssert; | ||
|
|
@@ -73,6 +74,7 @@ | |
| import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.PARTITIONED; | ||
| import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree; | ||
| import static io.trino.sql.planner.assertions.PlanMatchPattern.node; | ||
| import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom; | ||
| import static io.trino.testing.DataProviders.toDataProvider; | ||
| import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; | ||
| import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN; | ||
|
|
@@ -1683,6 +1685,46 @@ public void testWriteBatchSizeSessionProperty(Integer batchSize, Integer numberO | |
| } | ||
| } | ||
|
|
||
| @Test(dataProvider = "writeTaskParallelismDataProvider") | ||
| public void testWriteTaskParallelismSessionProperty(int parallelism, int numberOfRows) | ||
| { | ||
| if (!hasBehavior(SUPPORTS_CREATE_TABLE)) { | ||
| throw new SkipException("CREATE TABLE is required for write_parallelism test but is not supported"); | ||
| } | ||
|
|
||
| Session session = Session.builder(getSession()) | ||
| .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "write_parallelism", String.valueOf(parallelism)) | ||
| .build(); | ||
|
|
||
| try (TestTable table = new TestTable( | ||
| getQueryRunner()::execute, | ||
| "write_parallelism", | ||
| "(a varchar(128), b bigint)")) { | ||
| assertUpdate(session, "INSERT INTO " + table.getName() + " (a, b) SELECT clerk, orderkey FROM tpch.sf100.orders LIMIT " + numberOfRows, numberOfRows, plan -> { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. very generous
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| TableWriterNode.WriterTarget target = searchFrom(plan.getRoot()) | ||
| .where(node -> node instanceof TableWriterNode) | ||
| .findFirst() | ||
| .map(TableWriterNode.class::cast) | ||
| .map(TableWriterNode::getTarget) | ||
| .orElseThrow(); | ||
|
|
||
| assertThat(target.getMaxWriterTasks(getQueryRunner().getMetadata(), getSession())) | ||
| .hasValue(parallelism); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| @DataProvider | ||
| public static Object[][] writeTaskParallelismDataProvider() | ||
| { | ||
| return new Object[][]{ | ||
| {1, 10_000}, | ||
| {2, 10_000}, | ||
| {4, 10_000}, | ||
| {16, 10_000}, | ||
| {32, 10_000}}; | ||
| } | ||
|
|
||
| private static List<String> buildRowsForInsert(int numberOfRows) | ||
| { | ||
| List<String> result = new ArrayList<>(numberOfRows); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.