feat: Add SQL Support for MERGE INTO in Presto (efficient workload distribution)#1
Conversation
|
@sourcery-ai review |
Reviewer's GuideThis PR implements SQL MERGE support with efficient workload distribution by introducing a dedicated MergePartitioningHandle to unify insert/update partition schemes, adapting the planner to generate and propagate partitioning schemes for MERGE, updating the engine’s NodePartitioningManager and local exchange rules to handle merge-specific partition logic, extending Metadata SPI for merge update layouts, and providing Iceberg-specific bucket functions and node partitioning provider stubs. Class diagram for new and updated partitioning handle classesclassDiagram
class PartitioningHandle {
+Optional<ConnectorId> connectorId
+Optional<ConnectorTransactionHandle> transactionHandle
+ConnectorPartitioningHandle connectorHandle
}
class MergePartitioningHandle {
+Optional<PartitioningScheme> insertPartitioning
+Optional<PartitioningScheme> updatePartitioning
+NodePartitionMap getNodePartitioningMap(Function)
+PartitionFunction getPartitionFunction(PartitionFunctionLookup, List<Type>, int[])
}
PartitioningHandle --> MergePartitioningHandle : uses
MergePartitioningHandle --|> ConnectorPartitioningHandle
class IcebergPartitioningHandle {
+List<IcebergColumnHandle> partitioningColumns
+List<PartitionField> partitioning
}
IcebergPartitioningHandle --|> ConnectorPartitioningHandle
class IcebergUpdateHandle {
}
IcebergUpdateHandle --|> ConnectorPartitioningHandle
Class diagram for MergeWriterNode and MergeProcessorNode changesclassDiagram
class MergeWriterNode {
+PlanNode source
+MergeTarget target
+List<VariableReferenceExpression> mergeProcessorProjectedVariables
+Optional<PartitioningScheme> partitioningScheme
+List<VariableReferenceExpression> outputs
}
class MergeProcessorNode {
+PlanNode source
+MergeTarget target
+VariableReferenceExpression targetTableRowIdColumnVariable
+VariableReferenceExpression mergeRowVariable
+List<VariableReferenceExpression> targetColumnVariables
+List<VariableReferenceExpression> targetRedistributionColumnVariables
+List<VariableReferenceExpression> outputs
}
MergeWriterNode --> MergeProcessorNode : uses
Class diagram for new Iceberg partitioning provider and bucket functionsclassDiagram
class IcebergNodePartitioningProvider {
+Optional<ConnectorBucketNodeMap> getBucketNodeMap(...)
+BucketFunction getBucketFunction(...)
+int getBucketCount(...)
+ToIntFunction<ConnectorSplit> getSplitBucketFunction(...)
}
IcebergNodePartitioningProvider --|> ConnectorNodePartitioningProvider
class IcebergBucketFunction {
+int getBucket(Page, int)
}
IcebergBucketFunction --|> BucketFunction
class IcebergUpdateBucketFunction {
+int getBucket(Page, int)
}
IcebergUpdateBucketFunction --|> BucketFunction
Class diagram for updated ConnectorNodePartitioningProvider SPIclassDiagram
class ConnectorNodePartitioningProvider {
+Optional<ConnectorBucketNodeMap> getBucketNodeMap(...)
+BucketFunction getBucketFunction(...)
+int getBucketCount(...)
+ToIntFunction<ConnectorSplit> getSplitBucketFunction(...)
}
Class diagram for updated Metadata SPI for merge update layoutclassDiagram
class Metadata {
+Optional<PartitioningHandle> getMergeUpdateLayout(Session, TableHandle)
}
class ConnectorMetadata {
+Optional<ConnectorPartitioningHandle> getMergeUpdateLayout(ConnectorSession, ConnectorTableHandle)
}
Metadata --> ConnectorMetadata : delegates
Class diagram for updated MergeAnalysis structureclassDiagram
class MergeAnalysis {
+Table targetTable
+List<ColumnMetadata> targetColumnsMetadata
+List<ColumnHandle> targetColumnHandles
+List<ColumnHandle> targetRedistributionColumnHandles
+List<List<ColumnHandle>> mergeCaseColumnHandles
+Set<ColumnHandle> nonNullableColumnHandles
+Map<ColumnHandle, Integer> columnHandleFieldNumbers
+List<Integer> insertPartitioningArgumentIndexes
+Optional<NewTableLayout> insertLayout
+Optional<PartitioningHandle> updateLayout
+Scope targetTableScope
+Scope joinScope
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java:217` </location>
<code_context>
+ int bucketCount = getBucketCount(session, partitioningHandle, connectorBucketNodeMap, preferDynamic);
+
+ // TODO #20578: WIP - This method is under development. Unsafe ".get()" method call.
+ NodeSelectionStrategy nodeSelectionStrategy = connectorBucketNodeMap.get().getNodeSelectionStrategy();
switch (nodeSelectionStrategy) {
case HARD_AFFINITY:
</code_context>
<issue_to_address>
**issue (bug_risk):** Unsafe use of Optional.get() in getBucketNodeMap could lead to runtime exceptions.
If getConnectorBucketNodeMap returns Optional.empty(), calling get() will throw an exception. Please handle the empty case explicitly or use orElseThrow with a clear error message.
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java:746` </location>
<code_context>
+ // connector provided hash function
+ verify(!(partitioningScheme.getPartitioning().getHandle().getConnectorHandle() instanceof SystemPartitioningHandle));
+ // TODO #20578: Check if the following verification is correct.
+ verify(partitioningScheme.getPartitioning().getArguments().stream()
+ .noneMatch(argument -> argument instanceof ConstantExpression),
+ "Table writer partitioning has constant arguments");
</code_context>
<issue_to_address>
**question:** The verification for constant arguments may be too strict.
This restriction may prevent legitimate scenarios, such as partitioning by a constant. Please review if this check is essential or if it can be made less strict.
</issue_to_address>
### Comment 3
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNodePartitioningProvider.java:77` </location>
<code_context>
+ @Override
+ public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
+ {
+ return 0;
+ }
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Returning 0 for getBucketCount may cause division by zero errors.
Consider returning a positive default value or throwing an exception to prevent downstream errors.
</issue_to_address>
### Comment 4
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergBucketFunction.java:85` </location>
<code_context>
+ @Override
+ public int getBucket(Page page, int position)
+ {
+ return HiveBucketing.getBucket(bucketCount, types, page, position);
+
+ // TODO #20578: Trino.
</code_context>
<issue_to_address>
**question:** Using HiveBucketing for Iceberg may not be semantically correct.
Verify that HiveBucketing.getBucket aligns with Iceberg's partitioning logic, and clearly document any differences or constraints.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| int bucketCount = getBucketCount(session, partitioningHandle, connectorBucketNodeMap, preferDynamic); | ||
|
|
||
| // TODO #20578: WIP - This method is under development. Unsafe ".get()" method call. | ||
| NodeSelectionStrategy nodeSelectionStrategy = connectorBucketNodeMap.get().getNodeSelectionStrategy(); |
There was a problem hiding this comment.
issue (bug_risk): Unsafe use of Optional.get() in getBucketNodeMap could lead to runtime exceptions.
If getConnectorBucketNodeMap returns Optional.empty(), calling get() will throw an exception. Please handle the empty case explicitly or use orElseThrow with a clear error message.
| // connector provided hash function | ||
| verify(!(partitioningScheme.getPartitioning().getHandle().getConnectorHandle() instanceof SystemPartitioningHandle)); | ||
| // TODO #20578: Check if the following verification is correct. | ||
| verify(partitioningScheme.getPartitioning().getArguments().stream() |
There was a problem hiding this comment.
question: The verification for constant arguments may be too strict.
This restriction may prevent legitimate scenarios, such as partitioning by a constant. Please review if this check is essential or if it can be made less strict.
| @Override | ||
| public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) | ||
| { | ||
| return 0; |
There was a problem hiding this comment.
issue (bug_risk): Returning 0 for getBucketCount may cause division by zero errors.
Consider returning a positive default value or throwing an exception to prevent downstream errors.
| @Override | ||
| public int getBucket(Page page, int position) | ||
| { | ||
| return HiveBucketing.getBucket(bucketCount, types, page, position); |
There was a problem hiding this comment.
question: Using HiveBucketing for Iceberg may not be semantically correct.
Verify that HiveBucketing.getBucket aligns with Iceberg's partitioning logic, and clearly document any differences or constraints.
7ad95ad to
0521ca4
Compare
31239ce to
834479e
Compare
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
834479e to
047dbbd
Compare
Cherry-pick of trinodb/trino@cee96c3 Co-authored-by: David Stryker <david.stryker@starburstdata.com>
Automated tests. Cherry-pick of trinodb/trino@cee96c3 Co-authored-by: David Stryker <david.stryker@starburstdata.com>
Added MERGE INTO statement documentation.
Support SQL MERGE in the Iceberg connector Cherry-pick of trinodb/trino@6cb188b Co-authored-by: David Phillips <david@acz.org>
SQL MERGE automated tests for Iceberg connector Cherry-pick of trinodb/trino@6cb188b Co-authored-by: David Phillips <david@acz.org>
Add MERGE efficient workload partitioning support
Add MERGE efficient workload partitioning support
0521ca4 to
cd53db1
Compare
047dbbd to
46e53c6
Compare
00a2694 to
f9e99cb
Compare
f9e99cb to
994f813
Compare
0b73712 to
298b708
Compare
298b708 to
79a5b6e
Compare
da4927d to
e6c9526
Compare
96b82ae to
92c97a0
Compare
Description
Engine support for SQL MERGE INTO. The MERGE INTO command inserts or updates rows in a table based on specified conditions.
Syntax:
Example: MERGE INTO usage to update the sales information for existing products and insert the sales information for the new products in the market.
The Presto engine commit introduces an enum called RowChangeParadigm, which describes how a connector modifies rows. The iceberg connector will utilize the
DELETE_ROW_AND_INSERT_ROWparadigm, as it represents an updated row as a combination of a deleted row followed by an inserted row. TheCHANGE_ONLY_UPDATED_COLUMNSparadigm is meant for connectors that support updating individual columns of rows.Note: Changes were made after reviewing the following Trino PR: trinodb/trino#7126
So, this commit is deeply inspired by Trino's implementation.
Motivation and Context
The
MERGE INTOstatement is commonly used to integrate data from two tables with different contents but similar structures.For example, the source table could be part of a production transactional system, while the target table might be located in a data warehouse for analytics.
Regularly, MERGE operations are performed to update the analytics warehouse with the latest production data.
You can also use MERGE with tables that have different structures, as long as you can define a condition to match the rows between them.
Test Plan
Automated tests developed in TestSqlParser, TestSqlParserErrorHandling, TestStatementBuilder, AbstractAnalyzerTest, TestAnalyzer, and TestClassLoaderSafeWrappers classes.
Contributor checklist
Release Notes