Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.function.AggregationFunctionMetadata;
import io.trino.spi.function.FunctionMetadata;
import io.trino.spi.function.OperatorType;
Expand Down Expand Up @@ -385,6 +386,16 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles);

/**
* Push update into connector
*/
Optional<TableHandle> applyUpdate(Session session, TableHandle tableHandle, Map<ColumnHandle, Constant> assignments);

/**
* Execute update in connector
*/
OptionalLong executeUpdate(Session session, TableHandle tableHandle);

/**
* Push delete into connector
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.expression.Variable;
import io.trino.spi.function.AggregationFunctionMetadata;
import io.trino.spi.function.AggregationFunctionMetadata.AggregationFunctionMetadataBuilder;
Expand Down Expand Up @@ -1183,6 +1184,27 @@ public Optional<PartitioningHandle> getUpdateLayout(Session session, TableHandle
.map(partitioning -> new PartitioningHandle(Optional.of(catalogHandle), Optional.of(transactionHandle), partitioning));
}

@Override
public Optional<TableHandle> applyUpdate(Session session, TableHandle table, Map<ColumnHandle, Constant> assignments)
{
CatalogHandle catalogHandle = table.getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);

ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return metadata.applyUpdate(connectorSession, table.getConnectorHandle(), assignments)
.map(newHandle -> new TableHandle(catalogHandle, newHandle, table.getTransaction()));
}

@Override
public OptionalLong executeUpdate(Session session, TableHandle table)
{
CatalogHandle catalogHandle = table.getCatalogHandle();
ConnectorMetadata metadata = getMetadataForWrite(session, catalogHandle);
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);

return metadata.executeUpdate(connectorSession, table.getConnectorHandle());
}

@Override
public Optional<TableHandle> applyDelete(Session session, TableHandle table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
package io.trino.operator;

import com.google.common.collect.ImmutableList;
import io.trino.Session;
import io.trino.metadata.Metadata;
import io.trino.metadata.TableHandle;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.block.BlockBuilder;
Expand All @@ -30,36 +27,39 @@
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;

public class TableDeleteOperator
public class TableMutationOperator
implements Operator
{
public static final List<Type> TYPES = ImmutableList.of(BIGINT);

public static class TableDeleteOperatorFactory
private final OperatorContext operatorContext;
private final Operation operation;
private boolean finished;

public static class TableMutationOperatorFactory
implements OperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
private final Metadata metadata;
private final Session session;
private final TableHandle tableHandle;
private final Operation operation;
private boolean closed;

public TableDeleteOperatorFactory(int operatorId, PlanNodeId planNodeId, Metadata metadata, Session session, TableHandle tableHandle)
public TableMutationOperatorFactory(
int operatorId,
PlanNodeId planNodeId,
Operation operation)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.session = requireNonNull(session, "session is null");
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.operation = requireNonNull(operation, "operation is null");
}

@Override
public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableDeleteOperator.class.getSimpleName());
return new TableDeleteOperator(context, metadata, session, tableHandle);
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableMutationOperator.class.getSimpleName());
return new TableMutationOperator(context, operation);
}

@Override
Expand All @@ -71,23 +71,17 @@ public void noMoreOperators()
@Override
public OperatorFactory duplicate()
{
return new TableDeleteOperatorFactory(operatorId, planNodeId, metadata, session, tableHandle);
return new TableMutationOperatorFactory(
operatorId,
planNodeId,
operation);
}
}

private final OperatorContext operatorContext;
private final Metadata metadata;
private final Session session;
private final TableHandle tableHandle;

private boolean finished;

public TableDeleteOperator(OperatorContext operatorContext, Metadata metadata, Session session, TableHandle tableHandle)
public TableMutationOperator(OperatorContext operatorContext, Operation operation)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.session = requireNonNull(session, "session is null");
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.operation = requireNonNull(operation, "operation is null");
}

@Override
Expand All @@ -97,9 +91,7 @@ public OperatorContext getOperatorContext()
}

@Override
public void finish()
{
}
public void finish() {}

@Override
public boolean isFinished()
Expand Down Expand Up @@ -127,19 +119,29 @@ public Page getOutput()
}
finished = true;

OptionalLong rowsDeletedCount = metadata.executeDelete(session, tableHandle);
OptionalLong rowsUpdatedCount = operation.execute();

return buildUpdatedCountPage(rowsUpdatedCount);
}

private Page buildUpdatedCountPage(OptionalLong count)
{
// output page will only be constructed once,
// so a new PageBuilder is constructed (instead of using PageBuilder.reset)
PageBuilder page = new PageBuilder(1, TYPES);
BlockBuilder rowsBuilder = page.getBlockBuilder(0);
page.declarePosition();
if (rowsDeletedCount.isPresent()) {
BIGINT.writeLong(rowsBuilder, rowsDeletedCount.getAsLong());
if (count.isPresent()) {
BIGINT.writeLong(rowsBuilder, count.getAsLong());
}
else {
rowsBuilder.appendNull();
}
return page.build();
}

public interface Operation
{
OptionalLong execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@
import io.trino.operator.SpatialJoinOperator.SpatialJoinOperatorFactory;
import io.trino.operator.StatisticsWriterOperator.StatisticsWriterOperatorFactory;
import io.trino.operator.StreamingAggregationOperator;
import io.trino.operator.TableDeleteOperator.TableDeleteOperatorFactory;
import io.trino.operator.TableFunctionOperator.TableFunctionOperatorFactory;
import io.trino.operator.TableMutationOperator.TableMutationOperatorFactory;
import io.trino.operator.TableScanOperator.TableScanOperatorFactory;
import io.trino.operator.TaskContext;
import io.trino.operator.TopNOperator;
Expand Down Expand Up @@ -233,6 +233,7 @@
import io.trino.sql.planner.plan.TableFunctionNode.PassThroughSpecification;
import io.trino.sql.planner.plan.TableFunctionProcessorNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TableUpdateNode;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.planner.plan.TableWriterNode.MergeTarget;
import io.trino.sql.planner.plan.TableWriterNode.TableExecuteTarget;
Expand Down Expand Up @@ -3572,7 +3573,15 @@ public PhysicalOperation visitMergeProcessor(MergeProcessorNode node, LocalExecu
@Override
public PhysicalOperation visitTableDelete(TableDeleteNode node, LocalExecutionPlanContext context)
{
OperatorFactory operatorFactory = new TableDeleteOperatorFactory(context.getNextOperatorId(), node.getId(), metadata, session, node.getTarget());
OperatorFactory operatorFactory = new TableMutationOperatorFactory(context.getNextOperatorId(), node.getId(), () -> metadata.executeDelete(session, node.getTarget()));

return new PhysicalOperation(operatorFactory, makeLayout(node), context);
}

@Override
public PhysicalOperation visitTableUpdate(TableUpdateNode node, LocalExecutionPlanContext context)
{
OperatorFactory operatorFactory = new TableMutationOperatorFactory(context.getNextOperatorId(), node.getId(), () -> metadata.executeUpdate(session, node.getTarget()));

return new PhysicalOperation(operatorFactory, makeLayout(node), context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.trino.sql.planner.plan.TableFunctionNode;
import io.trino.sql.planner.plan.TableFunctionProcessorNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TableUpdateNode;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.transaction.TransactionManager;
Expand Down Expand Up @@ -301,6 +302,13 @@ public PlanNode visitTableDelete(TableDeleteNode node, RewriteContext<FragmentPr
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitTableUpdate(TableUpdateNode node, RewriteContext<FragmentProperties> context)
{
context.get().setCoordinatorOnlyDistribution();
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProperties> context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import io.trino.sql.planner.iterative.rule.PushLimitThroughSemiJoin;
import io.trino.sql.planner.iterative.rule.PushLimitThroughUnion;
import io.trino.sql.planner.iterative.rule.PushMergeWriterDeleteIntoConnector;
import io.trino.sql.planner.iterative.rule.PushMergeWriterUpdateIntoConnector;
import io.trino.sql.planner.iterative.rule.PushOffsetThroughProject;
import io.trino.sql.planner.iterative.rule.PushPartialAggregationThroughExchange;
import io.trino.sql.planner.iterative.rule.PushPartialAggregationThroughJoin;
Expand Down Expand Up @@ -796,6 +797,7 @@ public PlanOptimizers(
ImmutableSet.of(
// Must run before AddExchanges
new PushMergeWriterDeleteIntoConnector(metadata),
new PushMergeWriterUpdateIntoConnector(plannerContext, typeAnalyzer, metadata),
new DetermineTableScanNodePartitioning(metadata, nodePartitioningManager, taskCountEstimator),
// Must run after join reordering because join reordering creates
// new join nodes without JoinNode.maySkipOutputDuplicates flag set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.trino.sql.planner.plan.TableFinishNode;
import io.trino.sql.planner.plan.TableFunctionProcessorNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TableUpdateNode;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.planner.plan.TopNNode;
import io.trino.sql.planner.plan.TopNRankingNode;
Expand Down Expand Up @@ -437,6 +438,13 @@ public Map<PlanNodeId, SplitSource> visitTableDelete(TableDeleteNode node, Void
return ImmutableMap.of();
}

@Override
public Map<PlanNodeId, SplitSource> visitTableUpdate(TableUpdateNode node, Void context)
{
// node does not have splits
return ImmutableMap.of();
}

@Override
public Map<PlanNodeId, SplitSource> visitTableExecute(TableExecuteNode node, Void context)
{
Expand Down
Loading