Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorIndexHandle;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
Expand Down Expand Up @@ -72,6 +73,12 @@ public static com.fasterxml.jackson.databind.Module insertTableHandleModule(Hand
return new AbstractTypedJacksonModule<>(ConnectorInsertTableHandle.class, resolver::getId, resolver::getInsertTableHandleClass) {};
}

@ProvidesIntoSet
public static com.fasterxml.jackson.databind.Module mergeTableHandleModule(HandleResolver resolver)
{
return new AbstractTypedJacksonModule<>(ConnectorMergeTableHandle.class, resolver::getId, resolver::getMergeTableHandleClass) {};
}

@ProvidesIntoSet
public static com.fasterxml.jackson.databind.Module indexHandleModule(HandleResolver resolver)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ConnectorHandleResolver;
import io.trino.spi.connector.ConnectorIndexHandle;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
Expand Down Expand Up @@ -103,6 +104,11 @@ public String getId(ConnectorInsertTableHandle insertHandle)
return getId(insertHandle, MaterializedHandleResolver::getInsertTableHandleClass);
}

public String getId(ConnectorMergeTableHandle mergeHandle)
{
return getId(mergeHandle, MaterializedHandleResolver::getMergeTableHandleClass);
}

public String getId(ConnectorPartitioningHandle partitioningHandle)
{
return getId(partitioningHandle, MaterializedHandleResolver::getPartitioningHandleClass);
Expand Down Expand Up @@ -148,6 +154,11 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass(Str
return resolverFor(id).getInsertTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
}

public Class<? extends ConnectorMergeTableHandle> getMergeTableHandleClass(String id)
{
return resolverFor(id).getMergeTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
}

public Class<? extends ConnectorPartitioningHandle> getPartitioningHandleClass(String id)
{
return resolverFor(id).getPartitioningHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
Expand Down Expand Up @@ -188,6 +199,7 @@ private static class MaterializedHandleResolver
private final Optional<Class<? extends ConnectorIndexHandle>> indexHandle;
private final Optional<Class<? extends ConnectorOutputTableHandle>> outputTableHandle;
private final Optional<Class<? extends ConnectorInsertTableHandle>> insertTableHandle;
private final Optional<Class<? extends ConnectorMergeTableHandle>> mergeTableHandle;
private final Optional<Class<? extends ConnectorPartitioningHandle>> partitioningHandle;
private final Optional<Class<? extends ConnectorTransactionHandle>> transactionHandle;

Expand All @@ -200,6 +212,7 @@ public MaterializedHandleResolver(ConnectorHandleResolver resolver)
indexHandle = getHandleClass(resolver::getIndexHandleClass);
outputTableHandle = getHandleClass(resolver::getOutputTableHandleClass);
insertTableHandle = getHandleClass(resolver::getInsertTableHandleClass);
mergeTableHandle = getHandleClass(resolver::getMergeTableHandleClass);
partitioningHandle = getHandleClass(resolver::getPartitioningHandleClass);
transactionHandle = getHandleClass(resolver::getTransactionHandleClass);
}
Expand Down Expand Up @@ -249,6 +262,11 @@ public Optional<Class<? extends ConnectorInsertTableHandle>> getInsertTableHandl
return insertTableHandle;
}

public Optional<Class<? extends ConnectorMergeTableHandle>> getMergeTableHandleClass()
{
return mergeTableHandle;
}

public Optional<Class<? extends ConnectorPartitioningHandle>> getPartitioningHandleClass()
{
return partitioningHandle;
Expand Down
47 changes: 47 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/MergeHandle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ConnectorMergeTableHandle;

import static java.util.Objects.requireNonNull;

public final class MergeHandle
{
private final TableHandle tableHandle;
private final ConnectorMergeTableHandle connectorMergeHandle;

@JsonCreator
public MergeHandle(
@JsonProperty("tableHandle") TableHandle tableHandle,
@JsonProperty("connectorMergeHandle") ConnectorMergeTableHandle connectorMergeHandle)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.connectorMergeHandle = requireNonNull(connectorMergeHandle, "connectorMergeHandle is null");
}

@JsonProperty
public TableHandle getTableHandle()
{
return tableHandle;
}

@JsonProperty
public ConnectorMergeTableHandle getConnectorMergeHandle()
{
return connectorMergeHandle;
}
}
36 changes: 36 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metadata;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.connector.CatalogName;
Expand All @@ -39,7 +40,9 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MergeDetails;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -112,6 +115,16 @@ public interface Metadata
*/
Optional<PartitioningHandle> getCommonPartitioning(Session session, PartitioningHandle left, PartitioningHandle right);

/**
* Return the column handles for the columns that must be present in order
* to perform the partitioning and/or bucketing required. By default, the table
* has no such columns.
*/
default List<ColumnHandle> getWriteRedistributionColumns(Session session, TableHandle table)
{
return ImmutableList.of();
}

Optional<Object> getInfo(Session session, TableHandle handle);

/**
Expand Down Expand Up @@ -348,6 +361,29 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
*/
void finishUpdate(Session session, TableHandle tableHandle, Collection<Slice> fragments);

/**
* Return the row update paradigm supported by the connector on the table or throw
* an exception if row change is not supported.
*/
RowChangeParadigm getRowChangeParadigm(Session session, TableHandle tableHandle);

/**
* Get the column handle that will generate row IDs for the merge operation.
* These IDs will be passed to the {@code storeMergedRows()} method of the
* {@link io.trino.spi.connector.ConnectorMergeSink} that created them.
*/
ColumnHandle getMergeRowIdColumnHandle(Session session, TableHandle tableHandle, MergeDetails mergeDetails);

/**
* Begin merge query
*/
MergeHandle beginMerge(Session session, TableHandle tableHandle, MergeDetails mergeDetails);

/**
* Finish merge query
*/
void finishMerge(Session session, MergeHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);

/**
* Returns a connector id for the specified catalog name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.trino.spi.connector.ConnectorCapabilities;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
Expand All @@ -80,7 +81,9 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MergeDetails;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
Expand Down Expand Up @@ -489,6 +492,15 @@ public Optional<PartitioningHandle> getCommonPartitioning(Session session, Parti
return commonHandle.map(handle -> new PartitioningHandle(Optional.of(catalogName), left.getTransactionHandle(), handle));
}

@Override
public List<ColumnHandle> getWriteRedistributionColumns(Session session, TableHandle table)
{
CatalogName catalogName = table.getCatalogName();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);
return metadata.getWriteRedistributionColumns(session.toConnectorSession(catalogName), table.getConnectorHandle());
}

@Override
public Optional<Object> getInfo(Session session, TableHandle handle)
{
Expand Down Expand Up @@ -936,6 +948,14 @@ public ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tabl
return metadata.getUpdateRowIdColumnHandle(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), updatedColumns);
}

@Override
public ColumnHandle getMergeRowIdColumnHandle(Session session, TableHandle tableHandle, MergeDetails mergeDetails)
{
CatalogName catalogName = tableHandle.getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);
return metadata.getMergeRowIdColumnHandle(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), mergeDetails);
}

@Override
public boolean supportsMetadataDelete(Session session, TableHandle tableHandle)
{
Expand Down Expand Up @@ -1017,6 +1037,31 @@ public void finishUpdate(Session session, TableHandle tableHandle, Collection<Sl
metadata.finishUpdate(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), fragments);
}

@Override
public RowChangeParadigm getRowChangeParadigm(Session session, TableHandle tableHandle)
{
CatalogName catalogName = tableHandle.getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);
return metadata.getRowChangeParadigm(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle());
}

@Override
public MergeHandle beginMerge(Session session, TableHandle tableHandle, MergeDetails mergeDetails)
{
CatalogName catalogName = tableHandle.getCatalogName();
ConnectorMetadata metadata = getMetadataForWrite(session, catalogName);
ConnectorMergeTableHandle newHandle = metadata.beginMerge(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), mergeDetails);
return new MergeHandle(tableHandle.withConnectorHandle(newHandle.getTableHandle()), newHandle);
}

@Override
public void finishMerge(Session session, MergeHandle mergeHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
CatalogName catalogName = mergeHandle.getTableHandle().getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);
metadata.finishMerge(session.toConnectorSession(catalogName), mergeHandle.getConnectorMergeHandle(), fragments, computedStatistics);
}

@Override
public Optional<CatalogName> getCatalogHandle(Session session, String catalogName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public String toString()
return catalogName + ":" + connectorHandle;
}

public TableHandle withConnectorHandle(ConnectorTableHandle connectorHandle)
{
return new TableHandle(catalogName, connectorHandle, transaction, layout);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected enum State
protected State state = State.RUNNING;
protected long rowCount;
private boolean closed;
private ListenableFuture<Collection<Slice>> finishFuture;
protected ListenableFuture<Collection<Slice>> finishFuture;
private Supplier<Optional<UpdatablePageSource>> pageSource = Optional::empty;

public AbstractRowChangeOperator(OperatorContext operatorContext)
Expand Down Expand Up @@ -149,7 +149,7 @@ public void setPageSource(Supplier<Optional<UpdatablePageSource>> pageSource)
protected UpdatablePageSource pageSource()
{
Optional<UpdatablePageSource> source = pageSource.get();
checkState(source.isPresent(), "UpdatablePageSource not set");
checkState(source.isPresent(), "pageSource not set");
return source.get();
}
}
Loading