Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorIndexHandle;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
Expand Down Expand Up @@ -75,6 +76,12 @@ public static com.fasterxml.jackson.databind.Module tableExecuteHandleModule(Han
return new AbstractTypedJacksonModule<>(ConnectorTableExecuteHandle.class, resolver::getId, resolver::getHandleClass) {};
}

@ProvidesIntoSet
public static com.fasterxml.jackson.databind.Module mergeTableHandleModule(HandleResolver resolver)
{
return new AbstractTypedJacksonModule<>(ConnectorMergeTableHandle.class, resolver::getId, resolver::getHandleClass) {};
}

@ProvidesIntoSet
public static com.fasterxml.jackson.databind.Module indexHandleModule(HandleResolver resolver)
{
Expand Down
47 changes: 47 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/MergeHandle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ConnectorMergeTableHandle;

import static java.util.Objects.requireNonNull;

public final class MergeHandle
{
private final TableHandle tableHandle;
private final ConnectorMergeTableHandle connectorMergeHandle;

@JsonCreator
public MergeHandle(
@JsonProperty("tableHandle") TableHandle tableHandle,
@JsonProperty("connectorMergeHandle") ConnectorMergeTableHandle connectorMergeHandle)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.connectorMergeHandle = requireNonNull(connectorMergeHandle, "connectorMergeHandle is null");
}

@JsonProperty
public TableHandle getTableHandle()
{
return tableHandle;
}

@JsonProperty
public ConnectorMergeTableHandle getConnectorMergeHandle()
{
return connectorMergeHandle;
}
}
29 changes: 29 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SortItem;
Expand Down Expand Up @@ -375,6 +376,34 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
*/
void finishUpdate(Session session, TableHandle tableHandle, Collection<Slice> fragments);

/**
* Return the row update paradigm supported by the connector on the table or throw
* an exception if row change is not supported.
*/
RowChangeParadigm getRowChangeParadigm(Session session, TableHandle tableHandle);

/**
* Get the column handle that will generate row IDs for the merge operation.
* These IDs will be passed to the {@code storeMergedRows()} method of the
* {@link io.trino.spi.connector.ConnectorMergeSink} that created them.
*/
ColumnHandle getMergeRowIdColumnHandle(Session session, TableHandle tableHandle);

/**
* Get the physical layout for updated or deleted rows of a MERGE operation.
*/
Optional<PartitioningHandle> getUpdateLayout(Session session, TableHandle tableHandle);

/**
* Begin merge query
*/
MergeHandle beginMerge(Session session, TableHandle tableHandle);

/**
* Finish merge query
*/
void finishMerge(Session session, MergeHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);

/**
* Returns a catalog handle for the specified catalog name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.trino.spi.connector.ConnectorCapabilities;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
Expand All @@ -65,6 +66,7 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -959,6 +961,26 @@ public ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tabl
return metadata.getUpdateRowIdColumnHandle(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle(), updatedColumns);
}

@Override
public ColumnHandle getMergeRowIdColumnHandle(Session session, TableHandle tableHandle)
{
CatalogHandle catalogHandle = tableHandle.getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
return metadata.getMergeRowIdColumnHandle(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle());
}

@Override
public Optional<PartitioningHandle> getUpdateLayout(Session session, TableHandle tableHandle)
{
CatalogHandle catalogHandle = tableHandle.getCatalogHandle();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogHandle);
ConnectorMetadata metadata = catalogMetadata.getMetadata(session);
ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(catalogHandle);

return metadata.getUpdateLayout(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle())
.map(partitioning -> new PartitioningHandle(Optional.of(catalogHandle), Optional.of(transactionHandle), partitioning));
}

@Override
public Optional<TableHandle> applyDelete(Session session, TableHandle table)
{
Expand Down Expand Up @@ -1014,6 +1036,31 @@ public void finishUpdate(Session session, TableHandle tableHandle, Collection<Sl
metadata.finishUpdate(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle(), fragments);
}

@Override
public RowChangeParadigm getRowChangeParadigm(Session session, TableHandle tableHandle)
{
CatalogHandle catalogHandle = tableHandle.getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
return metadata.getRowChangeParadigm(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle());
}

@Override
public MergeHandle beginMerge(Session session, TableHandle tableHandle)
{
CatalogHandle catalogHandle = tableHandle.getCatalogHandle();
ConnectorMetadata metadata = getMetadataForWrite(session, catalogHandle);
ConnectorMergeTableHandle newHandle = metadata.beginMerge(session.toConnectorSession(catalogHandle), tableHandle.getConnectorHandle(), getRetryPolicy(session).getRetryMode());
return new MergeHandle(tableHandle.withConnectorHandle(newHandle.getTableHandle()), newHandle);
}

@Override
public void finishMerge(Session session, MergeHandle mergeHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
CatalogHandle catalogHandle = mergeHandle.getTableHandle().getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
metadata.finishMerge(session.toConnectorSession(catalogHandle), mergeHandle.getConnectorMergeHandle(), fragments, computedStatistics);
}

@Override
public Optional<CatalogHandle> getCatalogHandle(Session session, String catalogName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected enum State
protected State state = State.RUNNING;
protected long rowCount;
private boolean closed;
private ListenableFuture<Collection<Slice>> finishFuture;
protected ListenableFuture<Collection<Slice>> finishFuture;
private ListenableFuture<Void> blockedFutureView;
private Supplier<Optional<UpdatablePageSource>> pageSource = Optional::empty;

Expand Down Expand Up @@ -146,6 +146,7 @@ public void close()
}
else {
pageSource.get().ifPresent(UpdatablePageSource::abort);
abort();
}
}
}
Expand All @@ -158,7 +159,9 @@ public void setPageSource(Supplier<Optional<UpdatablePageSource>> pageSource)
protected UpdatablePageSource pageSource()
{
Optional<UpdatablePageSource> source = pageSource.get();
checkState(source.isPresent(), "UpdatablePageSource not set");
checkState(source.isPresent(), "pageSource not set");
return source.get();
}

protected void abort() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator;

import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.ColumnarRow;
import io.trino.spi.block.RunLengthEncodedBlock;

import java.util.ArrayList;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.util.Objects.requireNonNull;

/**
* The transformPage() method in this class does two things:
* <ul>
* <li>Transform the input page into an "update" page format</li>
* <li>Removes all rows whose operation number is DEFAULT_CASE_OPERATION_NUMBER</li>
* </ul>
*/
public class ChangeOnlyUpdatedColumnsMergeProcessor
implements MergeRowChangeProcessor
{
private static final Block INSERT_FROM_UPDATE_BLOCK = nativeValueToBlock(TINYINT, 0L);

private final int rowIdChannel;
private final int mergeRowChannel;
private final List<Integer> dataColumnChannels;
private final int writeRedistributionColumnCount;

public ChangeOnlyUpdatedColumnsMergeProcessor(
Comment thread
djsagain marked this conversation as resolved.
Outdated
int rowIdChannel,
int mergeRowChannel,
List<Integer> dataColumnChannels,
List<Integer> redistributionColumnChannels)
{
this.rowIdChannel = rowIdChannel;
this.mergeRowChannel = mergeRowChannel;
this.dataColumnChannels = requireNonNull(dataColumnChannels, "dataColumnChannels is null");
this.writeRedistributionColumnCount = redistributionColumnChannels.size();
}

@Override
public Page transformPage(Page inputPage)
{
requireNonNull(inputPage, "inputPage is null");
int inputChannelCount = inputPage.getChannelCount();
checkArgument(inputChannelCount >= 2 + writeRedistributionColumnCount, "inputPage channelCount (%s) should be >= 2 + %s", inputChannelCount, writeRedistributionColumnCount);
int positionCount = inputPage.getPositionCount();
// TODO: Check with Karol to see if we can get empty pages
checkArgument(positionCount > 0, "positionCount should be > 0, but is %s", positionCount);

ColumnarRow mergeRow = toColumnarRow(inputPage.getBlock(mergeRowChannel));
Comment thread
djsagain marked this conversation as resolved.
Outdated
checkArgument(!mergeRow.mayHaveNull(), "The mergeRow may not have null rows");

// We've verified that the mergeRow block has no null rows, so it's okay to get the field blocks

List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 3);

for (int channel : dataColumnChannels) {
builder.add(mergeRow.getField(channel));
}
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
builder.add(operationChannelBlock);
Comment thread
djsagain marked this conversation as resolved.
Outdated
builder.add(inputPage.getBlock(rowIdChannel));
builder.add(new RunLengthEncodedBlock(INSERT_FROM_UPDATE_BLOCK, positionCount));

Page result = new Page(builder.toArray(Block[]::new));

int defaultCaseCount = 0;
for (int position = 0; position < positionCount; position++) {
if (TINYINT.getLong(operationChannelBlock, position) == DEFAULT_CASE_OPERATION_NUMBER) {
defaultCaseCount++;
}
}
if (defaultCaseCount == 0) {
return result;
}

int usedCases = 0;
int[] positions = new int[positionCount - defaultCaseCount];
for (int position = 0; position < positionCount; position++) {
if (TINYINT.getLong(operationChannelBlock, position) != DEFAULT_CASE_OPERATION_NUMBER) {
positions[usedCases] = position;
usedCases++;
}
}

checkArgument(usedCases + defaultCaseCount == positionCount, "usedCases (%s) + defaultCaseCount (%s) != positionCount (%s)", usedCases, defaultCaseCount, positionCount);

return result.getPositions(positions, 0, usedCases);
}
}
Loading