Skip to content

Commit ae2448a

Browse files
chenjian2664electrum
authored andcommitted
Refactor merge to support partial update in engine
1 parent d39f747 commit ae2448a

File tree

24 files changed

+146
-44
lines changed

24 files changed

+146
-44
lines changed

core/trino-main/src/main/java/io/trino/metadata/Metadata.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.metadata;
1515

16+
import com.google.common.collect.Multimap;
1617
import com.google.common.util.concurrent.ListenableFuture;
1718
import io.airlift.slice.Slice;
1819
import io.trino.Session;
@@ -452,8 +453,10 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
452453

453454
/**
454455
* Begin merge query
456+
*
457+
* @param updateCaseColumnHandles The merge update case number to the assignment target columns mapping
455458
*/
456-
MergeHandle beginMerge(Session session, TableHandle tableHandle);
459+
MergeHandle beginMerge(Session session, TableHandle tableHandle, Multimap<Integer, ColumnHandle> updateCaseColumnHandles);
457460

458461
/**
459462
* Finish merge query

core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.collect.ImmutableList;
1818
import com.google.common.collect.ImmutableMap;
1919
import com.google.common.collect.ImmutableSet;
20+
import com.google.common.collect.Multimap;
2021
import com.google.common.collect.Streams;
2122
import com.google.common.util.concurrent.Futures;
2223
import com.google.common.util.concurrent.ListenableFuture;
@@ -1352,11 +1353,15 @@ public RowChangeParadigm getRowChangeParadigm(Session session, TableHandle table
13521353
}
13531354

13541355
@Override
1355-
public MergeHandle beginMerge(Session session, TableHandle tableHandle)
1356+
public MergeHandle beginMerge(Session session, TableHandle tableHandle, Multimap<Integer, ColumnHandle> updateCaseColumns)
13561357
{
13571358
CatalogHandle catalogHandle = tableHandle.catalogHandle();
13581359
ConnectorMetadata metadata = getMetadataForWrite(session, catalogHandle);
1359-
ConnectorMergeTableHandle newHandle = metadata.beginMerge(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle(), getRetryPolicy(session).getRetryMode());
1360+
ConnectorMergeTableHandle newHandle = metadata.beginMerge(
1361+
session.toConnectorSession(catalogHandle),
1362+
tableHandle.connectorHandle(),
1363+
updateCaseColumns.asMap(),
1364+
getRetryPolicy(session).getRetryMode());
13601365
return new MergeHandle(tableHandle.withConnectorHandle(newHandle.getTableHandle()), newHandle);
13611366
}
13621367

core/trino-main/src/main/java/io/trino/operator/ChangeOnlyUpdatedColumnsMergeProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,14 @@ public Page transformPage(Page inputPage)
6767

6868
Block mergeRow = inputPage.getBlock(mergeRowChannel).getLoadedBlock();
6969
List<Block> fields = getRowFieldsFromBlock(mergeRow);
70-
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 3);
70+
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 4);
7171
for (int channel : dataColumnChannels) {
7272
builder.add(fields.get(channel));
7373
}
7474
Block operationChannelBlock = fields.get(fields.size() - 2);
7575
builder.add(operationChannelBlock);
76+
Block caseNumberChannelBlock = fields.get(fields.size() - 1);
77+
builder.add(caseNumberChannelBlock);
7678
builder.add(inputPage.getBlock(rowIdChannel));
7779
builder.add(RunLengthEncodedBlock.create(INSERT_FROM_UPDATE_BLOCK, positionCount));
7880

core/trino-main/src/main/java/io/trino/operator/DeleteAndInsertMergeProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_DELETE_OPERATION_NUMBER;
3232
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_INSERT_OPERATION_NUMBER;
3333
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_OPERATION_NUMBER;
34+
import static io.trino.spi.type.IntegerType.INTEGER;
3435
import static io.trino.spi.type.TinyintType.TINYINT;
3536
import static java.util.Objects.requireNonNull;
3637

@@ -124,6 +125,7 @@ public Page transformPage(Page inputPage)
124125
List<Type> pageTypes = ImmutableList.<Type>builder()
125126
.addAll(dataColumnTypes)
126127
.add(TINYINT)
128+
.add(INTEGER)
127129
.add(rowIdType)
128130
.add(TINYINT)
129131
.build();
@@ -171,11 +173,14 @@ private void addDeleteRow(PageBuilder pageBuilder, Page originalPage, int positi
171173
// Add the operation column == deleted
172174
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_DELETE_OPERATION_NUMBER : DELETE_OPERATION_NUMBER);
173175

176+
// Add the dummy case number, delete and insert won't use it, use -1 to mark it shouldn't be used
177+
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), -1);
178+
174179
// Copy row ID column
175-
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1));
180+
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2));
176181

177182
// Write 0, meaning this row is not an insert derived from an update
178-
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), 0);
183+
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), 0);
179184

180185
pageBuilder.declarePosition();
181186
}
@@ -193,11 +198,14 @@ private void addInsertRow(PageBuilder pageBuilder, List<Block> fields, int posit
193198
// Add the operation column == insert
194199
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_INSERT_OPERATION_NUMBER : INSERT_OPERATION_NUMBER);
195200

201+
// Add the dummy case number, delete and insert won't use it
202+
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), 0);
203+
196204
// Add null row ID column
197-
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1).appendNull();
205+
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2).appendNull();
198206

199207
// Write 1 if this row is an insert derived from an update, 0 otherwise
200-
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), causedByUpdate ? 1 : 0);
208+
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), causedByUpdate ? 1 : 0);
201209

202210
pageBuilder.declarePosition();
203211
}

core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1842,6 +1842,8 @@ public static class MergeAnalysis
18421842
private final List<ColumnHandle> dataColumnHandles;
18431843
private final List<ColumnHandle> redistributionColumnHandles;
18441844
private final List<List<ColumnHandle>> mergeCaseColumnHandles;
1845+
// Case number map to columns
1846+
private final Multimap<Integer, ColumnHandle> updateCaseColumnHandles;
18451847
private final Set<ColumnHandle> nonNullableColumnHandles;
18461848
private final Map<ColumnHandle, Integer> columnHandleFieldNumbers;
18471849
private final RowType mergeRowType;
@@ -1857,6 +1859,7 @@ public MergeAnalysis(
18571859
List<ColumnHandle> dataColumnHandles,
18581860
List<ColumnHandle> redistributionColumnHandles,
18591861
List<List<ColumnHandle>> mergeCaseColumnHandles,
1862+
Multimap<Integer, ColumnHandle> updateCaseColumnHandles,
18601863
Set<ColumnHandle> nonNullableColumnHandles,
18611864
Map<ColumnHandle, Integer> columnHandleFieldNumbers,
18621865
RowType mergeRowType,
@@ -1871,6 +1874,7 @@ public MergeAnalysis(
18711874
this.dataColumnHandles = requireNonNull(dataColumnHandles, "dataColumnHandles is null");
18721875
this.redistributionColumnHandles = requireNonNull(redistributionColumnHandles, "redistributionColumnHandles is null");
18731876
this.mergeCaseColumnHandles = requireNonNull(mergeCaseColumnHandles, "mergeCaseColumnHandles is null");
1877+
this.updateCaseColumnHandles = requireNonNull(updateCaseColumnHandles, "updateCaseColumnHandles is null");
18741878
this.nonNullableColumnHandles = requireNonNull(nonNullableColumnHandles, "nonNullableColumnHandles is null");
18751879
this.columnHandleFieldNumbers = requireNonNull(columnHandleFieldNumbers, "columnHandleFieldNumbers is null");
18761880
this.mergeRowType = requireNonNull(mergeRowType, "mergeRowType is null");
@@ -1906,6 +1910,11 @@ public List<List<ColumnHandle>> getMergeCaseColumnHandles()
19061910
return mergeCaseColumnHandles;
19071911
}
19081912

1913+
public Multimap<Integer, ColumnHandle> getUpdateCaseColumnHandles()
1914+
{
1915+
return updateCaseColumnHandles;
1916+
}
1917+
19091918
public Set<ColumnHandle> getNonNullableColumnHandles()
19101919
{
19111920
return nonNullableColumnHandles;

core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
851851
analyzeCheckConstraints(table, tableName, accessControlScope, tableSchema.tableSchema().getCheckConstraints());
852852
analysis.registerTable(table, Optional.of(handle), tableName, session.getIdentity().getUser(), accessControlScope, Optional.empty());
853853

854-
createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of());
854+
createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of(), ImmutableMultimap.of());
855855

856856
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
857857
}
@@ -3482,7 +3482,10 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)
34823482
sourceColumnsByColumnName.getOrDefault(column.getName(), ImmutableSet.of())))
34833483
.collect(toImmutableList())));
34843484

3485-
createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of(updatedColumnHandles));
3485+
ImmutableMultimap.Builder<Integer, ColumnHandle> updateCaseColumnsBuilder = ImmutableMultimap.builder();
3486+
// Update only have one update case number which default is 0
3487+
updatedColumnHandles.forEach(columnHandle -> updateCaseColumnsBuilder.put(0, columnHandle));
3488+
createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of(updatedColumnHandles), updateCaseColumnsBuilder.build());
34863489

34873490
return createAndAssignScope(update, scope, Field.newUnqualified("rows", BIGINT));
34883491
}
@@ -3645,12 +3648,32 @@ else if (operation instanceof MergeInsert && caseColumnNames.isEmpty()) {
36453648
analysis.setUpdateTarget(targetTableHandle.catalogHandle().getVersion(), tableName, Optional.of(table), Optional.of(updatedColumns));
36463649
List<List<ColumnHandle>> mergeCaseColumnHandles = buildCaseColumnLists(merge, dataColumnSchemas, allColumnHandles);
36473650

3648-
createMergeAnalysis(table, targetTableHandle, tableSchema, targetTableScope, joinScope, mergeCaseColumnHandles);
3651+
checkArgument(
3652+
mergeCaseColumnHandles.size() == merge.getMergeCases().size(),
3653+
"Unexpected mergeCaseColumnHandles size: %s with merge cases size: %s", mergeCaseColumnHandles.size(), merge.getMergeCases().size());
3654+
ImmutableMultimap.Builder<Integer, ColumnHandle> updateCaseColumnHandles = ImmutableMultimap.builder();
3655+
for (int caseCounter = 0; caseCounter < merge.getMergeCases().size(); caseCounter++) {
3656+
MergeCase mergeCase = merge.getMergeCases().get(caseCounter);
3657+
if (mergeCase instanceof MergeUpdate) {
3658+
for (ColumnHandle columnHandle : mergeCaseColumnHandles.get(caseCounter)) {
3659+
updateCaseColumnHandles.put(caseCounter, columnHandle);
3660+
}
3661+
}
3662+
}
3663+
3664+
createMergeAnalysis(table, targetTableHandle, tableSchema, targetTableScope, joinScope, mergeCaseColumnHandles, updateCaseColumnHandles.build());
36493665

36503666
return createAndAssignScope(merge, Optional.empty(), Field.newUnqualified("rows", BIGINT));
36513667
}
36523668

3653-
private void createMergeAnalysis(Table table, TableHandle handle, TableSchema tableSchema, Scope tableScope, Scope joinScope, List<List<ColumnHandle>> updatedColumns)
3669+
private void createMergeAnalysis(
3670+
Table table,
3671+
TableHandle handle,
3672+
TableSchema tableSchema,
3673+
Scope tableScope,
3674+
Scope joinScope,
3675+
List<List<ColumnHandle>> mergeCaseColumns,
3676+
Multimap<Integer, ColumnHandle> updateCaseColumns)
36543677
{
36553678
Optional<PartitioningHandle> updateLayout = metadata.getUpdateLayout(session, handle);
36563679
Map<String, ColumnHandle> allColumnHandles = metadata.getColumnHandles(session, handle);
@@ -3713,7 +3736,8 @@ private void createMergeAnalysis(Table table, TableHandle handle, TableSchema ta
37133736
dataColumnSchemas,
37143737
dataColumnHandles,
37153738
redistributionColumnHandles,
3716-
updatedColumns,
3739+
mergeCaseColumns,
3740+
updateCaseColumns,
37173741
nonNullableColumnHandles,
37183742
columnHandleFieldNumbers,
37193743
mergeRowType,

core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,8 @@ public PlanNode plan(Delete node)
553553
List<Symbol> columnSymbols = columnSymbolsBuilder.build();
554554
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
555555
assignmentsBuilder.put(operationSymbol, new Constant(TINYINT, (long) DELETE_OPERATION_NUMBER));
556+
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
557+
assignmentsBuilder.put(caseNumberSymbol, new Constant(INTEGER, 0L));
556558
Symbol projectedRowIdSymbol = symbolAllocator.newSymbol(rowIdSymbol.name(), rowIdType);
557559
assignmentsBuilder.put(projectedRowIdSymbol, rowIdSymbol.toSymbolReference());
558560
assignmentsBuilder.put(symbolAllocator.newSymbol("insert_from_update", TINYINT), new Constant(TINYINT, 0L));
@@ -575,7 +577,8 @@ public PlanNode plan(Delete node)
575577
Optional.empty(),
576578
tableMetadata.table(),
577579
paradigmAndTypes,
578-
findSourceTableHandles(projectNode)),
580+
findSourceTableHandles(projectNode),
581+
ImmutableListMultimap.of()),
579582
projectNode.getOutputSymbols(),
580583
partitioningScheme,
581584
outputs);
@@ -943,7 +946,8 @@ private MergeWriterNode createMergePipeline(Table table, RelationPlan relationPl
943946
Optional.empty(),
944947
metadata.getTableName(session, handle).getSchemaTableName(),
945948
mergeParadigmAndTypes,
946-
findSourceTableHandles(planNode));
949+
findSourceTableHandles(planNode),
950+
mergeAnalysis.getUpdateCaseColumnHandles());
947951

948952
ImmutableList.Builder<Symbol> columnSymbolsBuilder = ImmutableList.builder();
949953
for (ColumnHandle columnHandle : mergeAnalysis.getDataColumnHandles()) {
@@ -958,11 +962,13 @@ private MergeWriterNode createMergePipeline(Table table, RelationPlan relationPl
958962
}
959963

960964
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
965+
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
961966
Symbol insertFromUpdateSymbol = symbolAllocator.newSymbol("insert_from_update", TINYINT);
962967

963968
List<Symbol> projectedSymbols = ImmutableList.<Symbol>builder()
964969
.addAll(columnSymbols)
965970
.add(operationSymbol)
971+
.add(caseNumberSymbol)
966972
.add(rowIdSymbol)
967973
.add(insertFromUpdateSymbol)
968974
.build();

core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ public WriterTarget getWriterTarget(PlanNode node)
212212
mergeTarget.getMergeHandle(),
213213
mergeTarget.getSchemaTableName(),
214214
mergeTarget.getMergeParadigmAndTypes(),
215-
findSourceTableHandles(node));
215+
findSourceTableHandles(node),
216+
mergeTarget.getUpdateCaseColumnHandles());
216217
}
217218

218219
if (node instanceof ExchangeNode || node instanceof UnionNode) {
@@ -247,13 +248,14 @@ private WriterTarget createWriterTarget(WriterTarget target, PlanNode planNode)
247248
findSourceTableHandles(planNode));
248249
}
249250
if (target instanceof MergeTarget merge) {
250-
MergeHandle mergeHandle = metadata.beginMerge(session, merge.getHandle());
251+
MergeHandle mergeHandle = metadata.beginMerge(session, merge.getHandle(), merge.getUpdateCaseColumnHandles());
251252
return new MergeTarget(
252253
mergeHandle.tableHandle(),
253254
Optional.of(mergeHandle),
254255
merge.getSchemaTableName(),
255256
merge.getMergeParadigmAndTypes(),
256-
findSourceTableHandles(planNode));
257+
findSourceTableHandles(planNode),
258+
merge.getUpdateCaseColumnHandles());
257259
}
258260
if (target instanceof TableWriterNode.RefreshMaterializedViewReference refreshMV) {
259261
return new TableWriterNode.RefreshMaterializedViewTarget(

core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.fasterxml.jackson.annotation.JsonTypeInfo;
2020
import com.google.common.collect.ImmutableList;
2121
import com.google.common.collect.Iterables;
22+
import com.google.common.collect.Multimap;
2223
import com.google.errorprone.annotations.Immutable;
2324
import io.trino.Session;
2425
import io.trino.metadata.InsertTableHandle;
@@ -731,20 +732,23 @@ public static class MergeTarget
731732
private final SchemaTableName schemaTableName;
732733
private final MergeParadigmAndTypes mergeParadigmAndTypes;
733734
private final List<TableHandle> sourceTableHandles;
735+
private final Multimap<Integer, ColumnHandle> updateCaseColumnHandles;
734736

735737
@JsonCreator
736738
public MergeTarget(
737739
@JsonProperty("handle") TableHandle handle,
738740
@JsonProperty("mergeHandle") Optional<MergeHandle> mergeHandle,
739741
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
740742
@JsonProperty("mergeParadigmAndTypes") MergeParadigmAndTypes mergeParadigmAndTypes,
741-
@JsonProperty("sourceTableHandles") List<TableHandle> sourceTableHandles)
743+
@JsonProperty("sourceTableHandles") List<TableHandle> sourceTableHandles,
744+
@JsonProperty("updateCaseColumnHandles") Multimap<Integer, ColumnHandle> updateCaseColumnHandles)
742745
{
743746
this.handle = requireNonNull(handle, "handle is null");
744747
this.mergeHandle = requireNonNull(mergeHandle, "mergeHandle is null");
745748
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
746749
this.mergeParadigmAndTypes = requireNonNull(mergeParadigmAndTypes, "mergeElements is null");
747750
this.sourceTableHandles = ImmutableList.copyOf(requireNonNull(sourceTableHandles, "sourceTableHandles is null"));
751+
this.updateCaseColumnHandles = requireNonNull(updateCaseColumnHandles, "updateCaseColumnHandles is null");
748752
}
749753

750754
@JsonProperty
@@ -800,6 +804,12 @@ public List<TableHandle> getSourceTableHandles()
800804
{
801805
return sourceTableHandles;
802806
}
807+
808+
@JsonProperty
809+
public Multimap<Integer, ColumnHandle> getUpdateCaseColumnHandles()
810+
{
811+
return updateCaseColumnHandles;
812+
}
803813
}
804814

805815
public static class MergeParadigmAndTypes

core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,15 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
770770
}
771771
}
772772

773+
@Override
774+
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
775+
{
776+
Span span = startSpan("beginMerge", tableHandle);
777+
try (var _ = scopedSpan(span)) {
778+
return delegate.beginMerge(session, tableHandle, updateCaseColumns, retryMode);
779+
}
780+
}
781+
773782
@Override
774783
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
775784
{

0 commit comments

Comments
 (0)