Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metadata;

import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.trino.Session;
Expand Down Expand Up @@ -452,8 +453,10 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(

/**
* Begin merge query
*
* @param updateCaseColumnHandles The merge update case number to the assignment target columns mapping
*/
MergeHandle beginMerge(Session session, TableHandle tableHandle);
MergeHandle beginMerge(Session session, TableHandle tableHandle, Multimap<Integer, ColumnHandle> updateCaseColumnHandles);

/**
* Finish merge query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -1352,11 +1353,15 @@ public RowChangeParadigm getRowChangeParadigm(Session session, TableHandle table
}

@Override
public MergeHandle beginMerge(Session session, TableHandle tableHandle)
public MergeHandle beginMerge(Session session, TableHandle tableHandle, Multimap<Integer, ColumnHandle> updateCaseColumns)
{
CatalogHandle catalogHandle = tableHandle.catalogHandle();
ConnectorMetadata metadata = getMetadataForWrite(session, catalogHandle);
ConnectorMergeTableHandle newHandle = metadata.beginMerge(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle(), getRetryPolicy(session).getRetryMode());
ConnectorMergeTableHandle newHandle = metadata.beginMerge(
session.toConnectorSession(catalogHandle),
tableHandle.connectorHandle(),
updateCaseColumns.asMap(),
getRetryPolicy(session).getRetryMode());
return new MergeHandle(tableHandle.withConnectorHandle(newHandle.getTableHandle()), newHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ public Page transformPage(Page inputPage)

Block mergeRow = inputPage.getBlock(mergeRowChannel).getLoadedBlock();
List<Block> fields = getRowFieldsFromBlock(mergeRow);
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 3);
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 4);
for (int channel : dataColumnChannels) {
builder.add(fields.get(channel));
}
Block operationChannelBlock = fields.get(fields.size() - 2);
builder.add(operationChannelBlock);
Block caseNumberChannelBlock = fields.get(fields.size() - 1);
builder.add(caseNumberChannelBlock);
builder.add(inputPage.getBlock(rowIdChannel));
builder.add(RunLengthEncodedBlock.create(INSERT_FROM_UPDATE_BLOCK, positionCount));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_DELETE_OPERATION_NUMBER;
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_INSERT_OPERATION_NUMBER;
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_OPERATION_NUMBER;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -124,6 +125,7 @@ public Page transformPage(Page inputPage)
List<Type> pageTypes = ImmutableList.<Type>builder()
.addAll(dataColumnTypes)
.add(TINYINT)
.add(INTEGER)
.add(rowIdType)
.add(TINYINT)
.build();
Expand Down Expand Up @@ -171,11 +173,14 @@ private void addDeleteRow(PageBuilder pageBuilder, Page originalPage, int positi
// Add the operation column == deleted
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_DELETE_OPERATION_NUMBER : DELETE_OPERATION_NUMBER);

// Add the dummy case number, delete and insert won't use it, use -1 to mark it shouldn't be used
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), -1);

// Copy row ID column
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1));
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2));

// Write 0, meaning this row is not an insert derived from an update
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), 0);
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), 0);

pageBuilder.declarePosition();
}
Expand All @@ -193,11 +198,14 @@ private void addInsertRow(PageBuilder pageBuilder, List<Block> fields, int posit
// Add the operation column == insert
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_INSERT_OPERATION_NUMBER : INSERT_OPERATION_NUMBER);

// Add the dummy case number, delete and insert won't use it
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), 0);

// Add null row ID column
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1).appendNull();
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2).appendNull();

// Write 1 if this row is an insert derived from an update, 0 otherwise
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), causedByUpdate ? 1 : 0);
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), causedByUpdate ? 1 : 0);

pageBuilder.declarePosition();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,8 @@ public static class MergeAnalysis
private final List<ColumnHandle> dataColumnHandles;
private final List<ColumnHandle> redistributionColumnHandles;
private final List<List<ColumnHandle>> mergeCaseColumnHandles;
// Case number map to columns
private final Multimap<Integer, ColumnHandle> updateCaseColumnHandles;
private final Set<ColumnHandle> nonNullableColumnHandles;
private final Map<ColumnHandle, Integer> columnHandleFieldNumbers;
private final RowType mergeRowType;
Expand All @@ -1857,6 +1859,7 @@ public MergeAnalysis(
List<ColumnHandle> dataColumnHandles,
List<ColumnHandle> redistributionColumnHandles,
List<List<ColumnHandle>> mergeCaseColumnHandles,
Multimap<Integer, ColumnHandle> updateCaseColumnHandles,
Set<ColumnHandle> nonNullableColumnHandles,
Map<ColumnHandle, Integer> columnHandleFieldNumbers,
RowType mergeRowType,
Expand All @@ -1871,6 +1874,7 @@ public MergeAnalysis(
this.dataColumnHandles = requireNonNull(dataColumnHandles, "dataColumnHandles is null");
this.redistributionColumnHandles = requireNonNull(redistributionColumnHandles, "redistributionColumnHandles is null");
this.mergeCaseColumnHandles = requireNonNull(mergeCaseColumnHandles, "mergeCaseColumnHandles is null");
this.updateCaseColumnHandles = requireNonNull(updateCaseColumnHandles, "updateCaseColumnHandles is null");
this.nonNullableColumnHandles = requireNonNull(nonNullableColumnHandles, "nonNullableColumnHandles is null");
this.columnHandleFieldNumbers = requireNonNull(columnHandleFieldNumbers, "columnHandleFieldNumbers is null");
this.mergeRowType = requireNonNull(mergeRowType, "mergeRowType is null");
Expand Down Expand Up @@ -1906,6 +1910,11 @@ public List<List<ColumnHandle>> getMergeCaseColumnHandles()
return mergeCaseColumnHandles;
}

public Multimap<Integer, ColumnHandle> getUpdateCaseColumnHandles()
{
return updateCaseColumnHandles;
}

public Set<ColumnHandle> getNonNullableColumnHandles()
{
return nonNullableColumnHandles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
analyzeCheckConstraints(table, tableName, accessControlScope, tableSchema.tableSchema().getCheckConstraints());
analysis.registerTable(table, Optional.of(handle), tableName, session.getIdentity().getUser(), accessControlScope, Optional.empty());

createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of());
createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of(), ImmutableMultimap.of());

return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}
Expand Down Expand Up @@ -3482,7 +3482,10 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)
sourceColumnsByColumnName.getOrDefault(column.getName(), ImmutableSet.of())))
.collect(toImmutableList())));

createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of(updatedColumnHandles));
ImmutableMultimap.Builder<Integer, ColumnHandle> updateCaseColumnsBuilder = ImmutableMultimap.builder();
// Update only have one update case number which default is 0
updatedColumnHandles.forEach(columnHandle -> updateCaseColumnsBuilder.put(0, columnHandle));
createMergeAnalysis(table, handle, tableSchema, tableScope, tableScope, ImmutableList.of(updatedColumnHandles), updateCaseColumnsBuilder.build());

return createAndAssignScope(update, scope, Field.newUnqualified("rows", BIGINT));
}
Expand Down Expand Up @@ -3645,12 +3648,32 @@ else if (operation instanceof MergeInsert && caseColumnNames.isEmpty()) {
analysis.setUpdateTarget(targetTableHandle.catalogHandle().getVersion(), tableName, Optional.of(table), Optional.of(updatedColumns));
List<List<ColumnHandle>> mergeCaseColumnHandles = buildCaseColumnLists(merge, dataColumnSchemas, allColumnHandles);

createMergeAnalysis(table, targetTableHandle, tableSchema, targetTableScope, joinScope, mergeCaseColumnHandles);
checkArgument(
mergeCaseColumnHandles.size() == merge.getMergeCases().size(),
"Unexpected mergeCaseColumnHandles size: %s with merge cases size: %s", mergeCaseColumnHandles.size(), merge.getMergeCases().size());
ImmutableMultimap.Builder<Integer, ColumnHandle> updateCaseColumnHandles = ImmutableMultimap.builder();
for (int caseCounter = 0; caseCounter < merge.getMergeCases().size(); caseCounter++) {
MergeCase mergeCase = merge.getMergeCases().get(caseCounter);
if (mergeCase instanceof MergeUpdate) {
for (ColumnHandle columnHandle : mergeCaseColumnHandles.get(caseCounter)) {
updateCaseColumnHandles.put(caseCounter, columnHandle);
}
}
}

createMergeAnalysis(table, targetTableHandle, tableSchema, targetTableScope, joinScope, mergeCaseColumnHandles, updateCaseColumnHandles.build());

return createAndAssignScope(merge, Optional.empty(), Field.newUnqualified("rows", BIGINT));
}

private void createMergeAnalysis(Table table, TableHandle handle, TableSchema tableSchema, Scope tableScope, Scope joinScope, List<List<ColumnHandle>> updatedColumns)
private void createMergeAnalysis(
Table table,
TableHandle handle,
TableSchema tableSchema,
Scope tableScope,
Scope joinScope,
List<List<ColumnHandle>> mergeCaseColumns,
Multimap<Integer, ColumnHandle> updateCaseColumns)
{
Optional<PartitioningHandle> updateLayout = metadata.getUpdateLayout(session, handle);
Map<String, ColumnHandle> allColumnHandles = metadata.getColumnHandles(session, handle);
Expand Down Expand Up @@ -3713,7 +3736,8 @@ private void createMergeAnalysis(Table table, TableHandle handle, TableSchema ta
dataColumnSchemas,
dataColumnHandles,
redistributionColumnHandles,
updatedColumns,
mergeCaseColumns,
updateCaseColumns,
nonNullableColumnHandles,
columnHandleFieldNumbers,
mergeRowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ public PlanNode plan(Delete node)
List<Symbol> columnSymbols = columnSymbolsBuilder.build();
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
assignmentsBuilder.put(operationSymbol, new Constant(TINYINT, (long) DELETE_OPERATION_NUMBER));
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
assignmentsBuilder.put(caseNumberSymbol, new Constant(INTEGER, 0L));
Symbol projectedRowIdSymbol = symbolAllocator.newSymbol(rowIdSymbol.name(), rowIdType);
assignmentsBuilder.put(projectedRowIdSymbol, rowIdSymbol.toSymbolReference());
assignmentsBuilder.put(symbolAllocator.newSymbol("insert_from_update", TINYINT), new Constant(TINYINT, 0L));
Expand All @@ -575,7 +577,8 @@ public PlanNode plan(Delete node)
Optional.empty(),
tableMetadata.table(),
paradigmAndTypes,
findSourceTableHandles(projectNode)),
findSourceTableHandles(projectNode),
ImmutableListMultimap.of()),
projectNode.getOutputSymbols(),
partitioningScheme,
outputs);
Expand Down Expand Up @@ -943,7 +946,8 @@ private MergeWriterNode createMergePipeline(Table table, RelationPlan relationPl
Optional.empty(),
metadata.getTableName(session, handle).getSchemaTableName(),
mergeParadigmAndTypes,
findSourceTableHandles(planNode));
findSourceTableHandles(planNode),
mergeAnalysis.getUpdateCaseColumnHandles());

ImmutableList.Builder<Symbol> columnSymbolsBuilder = ImmutableList.builder();
for (ColumnHandle columnHandle : mergeAnalysis.getDataColumnHandles()) {
Expand All @@ -958,11 +962,13 @@ private MergeWriterNode createMergePipeline(Table table, RelationPlan relationPl
}

Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
Symbol insertFromUpdateSymbol = symbolAllocator.newSymbol("insert_from_update", TINYINT);

List<Symbol> projectedSymbols = ImmutableList.<Symbol>builder()
.addAll(columnSymbols)
.add(operationSymbol)
.add(caseNumberSymbol)
.add(rowIdSymbol)
.add(insertFromUpdateSymbol)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public WriterTarget getWriterTarget(PlanNode node)
mergeTarget.getMergeHandle(),
mergeTarget.getSchemaTableName(),
mergeTarget.getMergeParadigmAndTypes(),
findSourceTableHandles(node));
findSourceTableHandles(node),
mergeTarget.getUpdateCaseColumnHandles());
}

if (node instanceof ExchangeNode || node instanceof UnionNode) {
Expand Down Expand Up @@ -247,13 +248,14 @@ private WriterTarget createWriterTarget(WriterTarget target, PlanNode planNode)
findSourceTableHandles(planNode));
}
if (target instanceof MergeTarget merge) {
MergeHandle mergeHandle = metadata.beginMerge(session, merge.getHandle());
MergeHandle mergeHandle = metadata.beginMerge(session, merge.getHandle(), merge.getUpdateCaseColumnHandles());
return new MergeTarget(
mergeHandle.tableHandle(),
Optional.of(mergeHandle),
merge.getSchemaTableName(),
merge.getMergeParadigmAndTypes(),
findSourceTableHandles(planNode));
findSourceTableHandles(planNode),
merge.getUpdateCaseColumnHandles());
}
if (target instanceof TableWriterNode.RefreshMaterializedViewReference refreshMV) {
return new TableWriterNode.RefreshMaterializedViewTarget(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.errorprone.annotations.Immutable;
import io.trino.Session;
import io.trino.metadata.InsertTableHandle;
Expand Down Expand Up @@ -731,20 +732,23 @@ public static class MergeTarget
private final SchemaTableName schemaTableName;
private final MergeParadigmAndTypes mergeParadigmAndTypes;
private final List<TableHandle> sourceTableHandles;
private final Multimap<Integer, ColumnHandle> updateCaseColumnHandles;

@JsonCreator
public MergeTarget(
@JsonProperty("handle") TableHandle handle,
@JsonProperty("mergeHandle") Optional<MergeHandle> mergeHandle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("mergeParadigmAndTypes") MergeParadigmAndTypes mergeParadigmAndTypes,
@JsonProperty("sourceTableHandles") List<TableHandle> sourceTableHandles)
@JsonProperty("sourceTableHandles") List<TableHandle> sourceTableHandles,
@JsonProperty("updateCaseColumnHandles") Multimap<Integer, ColumnHandle> updateCaseColumnHandles)
{
this.handle = requireNonNull(handle, "handle is null");
this.mergeHandle = requireNonNull(mergeHandle, "mergeHandle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.mergeParadigmAndTypes = requireNonNull(mergeParadigmAndTypes, "mergeElements is null");
this.sourceTableHandles = ImmutableList.copyOf(requireNonNull(sourceTableHandles, "sourceTableHandles is null"));
this.updateCaseColumnHandles = requireNonNull(updateCaseColumnHandles, "updateCaseColumnHandles is null");
}

@JsonProperty
Expand Down Expand Up @@ -800,6 +804,12 @@ public List<TableHandle> getSourceTableHandles()
{
return sourceTableHandles;
}

@JsonProperty
public Multimap<Integer, ColumnHandle> getUpdateCaseColumnHandles()
{
return updateCaseColumnHandles;
}
}

public static class MergeParadigmAndTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,15 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
}
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
Span span = startSpan("beginMerge", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.beginMerge(session, tableHandle, updateCaseColumns, retryMode);
}
}

@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
Expand Down
Loading
Loading