diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index e51db79e9fe9..69599aef38b4 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -100,6 +100,8 @@ Optional getTableHandleForExecute( void finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState); + void executeTableExecute(Session session, TableExecuteHandle handle); + TableProperties getTableProperties(Session session, TableHandle handle); /** diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 8353be2b8d03..bd0060075cb1 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -363,6 +363,14 @@ public void finishTableExecute(Session session, TableExecuteHandle tableExecuteH metadata.finishTableExecute(session.toConnectorSession(catalogName), tableExecuteHandle.getConnectorHandle(), fragments, tableExecuteState); } + @Override + public void executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle) + { + CatalogName catalogName = tableExecuteHandle.getCatalogName(); + ConnectorMetadata metadata = getMetadata(session, catalogName); + metadata.executeTableExecute(session.toConnectorSession(catalogName), tableExecuteHandle.getConnectorHandle()); + } + @Override public Optional getSystemTable(Session session, QualifiedObjectName tableName) { diff --git a/core/trino-main/src/main/java/io/trino/metadata/TableExecuteHandle.java b/core/trino-main/src/main/java/io/trino/metadata/TableExecuteHandle.java index bd3b30683b64..bada9877474c 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/TableExecuteHandle.java +++ b/core/trino-main/src/main/java/io/trino/metadata/TableExecuteHandle.java @@ -91,6 +91,6 @@ public int hashCode() @Override public String toString() { - return "Execute[" + catalogName + ":" + connectorHandle + "]"; + return catalogName + ":" + connectorHandle; } } diff --git a/core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java b/core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java new file mode 100644 index 000000000000..e31db7611b28 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java @@ -0,0 +1,141 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator; + +import io.trino.Session; +import io.trino.metadata.Metadata; +import io.trino.metadata.TableExecuteHandle; +import io.trino.spi.Page; +import io.trino.sql.planner.plan.PlanNodeId; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class SimpleTableExecuteOperator + implements Operator +{ + private static final Page PAGE = new Page(0); + + public static class SimpleTableExecuteOperatorOperatorFactory + implements OperatorFactory + { + private final int operatorId; + private final PlanNodeId planNodeId; + private final Metadata metadata; + private final Session session; + private final TableExecuteHandle executeHandle; + private boolean closed; + + public SimpleTableExecuteOperatorOperatorFactory( + int operatorId, + PlanNodeId planNodeId, + Metadata metadata, + Session session, + TableExecuteHandle executeHandle) + { + this.operatorId = operatorId; + this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); + this.metadata = requireNonNull(metadata, "planNodeId is null"); + this.session = requireNonNull(session, "planNodeId is null"); + this.executeHandle = requireNonNull(executeHandle, "executeHandle is null"); + } + + @Override + public Operator createOperator(DriverContext driverContext) + { + checkState(!closed, "Factory is already closed"); + OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, SimpleTableExecuteOperator.class.getSimpleName()); + return new SimpleTableExecuteOperator( + context, + metadata, + session, + executeHandle); + } + + @Override + public void noMoreOperators() + { + closed = true; + } + + @Override + public OperatorFactory duplicate() + { + return new SimpleTableExecuteOperatorOperatorFactory( + operatorId, + planNodeId, + metadata, + session, + executeHandle); + } + } + + private final OperatorContext operatorContext; + private final Metadata metadata; + private final Session session; + private final TableExecuteHandle executeHandle; + + private boolean finished; + + public SimpleTableExecuteOperator( + OperatorContext operatorContext, + Metadata metadata, + Session session, + TableExecuteHandle executeHandle) + { + this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.session = requireNonNull(session, "session is null"); + this.executeHandle = requireNonNull(executeHandle, "executeHandle is null"); + } + + @Override + public OperatorContext getOperatorContext() + { + return operatorContext; + } + + @Override + public boolean needsInput() + { + return false; + } + + @Override + public void addInput(Page page) + { + throw new UnsupportedOperationException(); + } + + @Override + public Page getOutput() + { + if (finished) { + return null; + } + + metadata.executeTableExecute(session, executeHandle); + finished = true; + return PAGE; + } + + @Override + public void finish() {} + + @Override + public boolean isFinished() + { + return finished; + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java index 48f1702c8b22..53dd4ed7a6cd 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java @@ -114,6 +114,7 @@ public class Analysis private String updateType; private Optional target = Optional.empty(); private boolean skipMaterializedViewRefresh; + private Optional tableExecuteReadsData; private final Map, Query> namedQueries = new LinkedHashMap<>(); @@ -276,6 +277,16 @@ public void setSkipMaterializedViewRefresh(boolean skipMaterializedViewRefresh) this.skipMaterializedViewRefresh = skipMaterializedViewRefresh; } + public boolean isTableExecuteReadsData() + { + return tableExecuteReadsData.orElseThrow(() -> new IllegalStateException("tableExecuteReadsData not set")); + } + + public void setTableExecuteReadsData(boolean readsData) + { + this.tableExecuteReadsData = Optional.of(readsData); + } + public void setAggregates(QuerySpecification node, List aggregates) { this.aggregates.put(NodeRef.of(node), ImmutableList.copyOf(aggregates)); diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 7bf6e3d439ad..92f11936fea1 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -1096,6 +1096,7 @@ protected Scope visitTableExecute(TableExecute node, Optional scope) tableProperties) .orElseThrow(() -> semanticException(NOT_SUPPORTED, node, "Procedure '%s' cannot be executed on table '%s'", procedureName, tableName)); + analysis.setTableExecuteReadsData(procedureMetadata.getExecutionMode().isReadsData()); analysis.setTableExecuteHandle(executeHandle); analysis.setUpdateType("ALTER TABLE EXECUTE"); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 6e32576f630e..c9cec05ec5af 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -83,6 +83,7 @@ import io.trino.operator.ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory; import io.trino.operator.SetBuilderOperator.SetBuilderOperatorFactory; import io.trino.operator.SetBuilderOperator.SetSupplier; +import io.trino.operator.SimpleTableExecuteOperator.SimpleTableExecuteOperatorOperatorFactory; import io.trino.operator.SourceOperatorFactory; import io.trino.operator.SpatialIndexBuilderOperator.SpatialIndexBuilderOperatorFactory; import io.trino.operator.SpatialIndexBuilderOperator.SpatialPredicate; @@ -209,6 +210,7 @@ import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticAggregationsDescriptor; @@ -3327,6 +3329,21 @@ private List createColumnValueAndRowIdChannels(List outputSymbo return Arrays.asList(columnValueAndRowIdChannels); } + @Override + public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node, LocalExecutionPlanContext context) + { + context.setDriverInstanceCount(1); + SimpleTableExecuteOperatorOperatorFactory operatorFactory = + new SimpleTableExecuteOperatorOperatorFactory( + context.getNextOperatorId(), + node.getId(), + metadata, + session, + node.getExecuteHandle()); + + return new PhysicalOperation(operatorFactory, makeLayout(node), context, UNGROUPED_EXECUTION); + } + @Override public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecutionPlanContext context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index f70b3bc418fe..93dc6401fb54 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -60,6 +60,7 @@ import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.ProjectNode; import io.trino.sql.planner.plan.RefreshMaterializedViewNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.StatisticAggregations; import io.trino.sql.planner.plan.StatisticsWriterNode; import io.trino.sql.planner.plan.TableExecuteNode; @@ -826,10 +827,18 @@ private static Map, Symbol> buildLambdaDeclar private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute statement) { Table table = statement.getTable(); - TableHandle tableHandle = analysis.getTableHandle(table); QualifiedObjectName tableName = createQualifiedObjectName(session, statement, table.getName()); TableExecuteHandle executeHandle = analysis.getTableExecuteHandle().orElseThrow(); + if (!analysis.isTableExecuteReadsData()) { + SimpleTableExecuteNode node = new SimpleTableExecuteNode( + idAllocator.getNextId(), + symbolAllocator.newSymbol("rows", BIGINT), + executeHandle); + return new RelationPlan(node, analysis.getRootScope(), node.getOutputSymbols(), Optional.empty()); + } + + TableHandle tableHandle = analysis.getTableHandle(table); RelationPlan tableScanPlan = createRelationPlan(analysis, table); PlanBuilder sourcePlanBuilder = newPlanBuilder(tableScanPlan, analysis, ImmutableMap.of(), ImmutableMap.of()); if (statement.getWhere().isPresent()) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java b/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java index 199007992068..7a8418126eb8 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java @@ -46,6 +46,7 @@ import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SimplePlanRewriter; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.StatisticsWriterNode; import io.trino.sql.planner.plan.TableDeleteNode; import io.trino.sql.planner.plan.TableFinishNode; @@ -301,6 +302,13 @@ public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteCont return context.defaultRewrite(node, context.get()); } + @Override + public PlanNode visitSimpleTableExecuteNode(SimpleTableExecuteNode node, RewriteContext context) + { + context.get().setCoordinatorOnlyDistribution(); + return context.defaultRewrite(node, context.get()); + } + @Override public PlanNode visitTableFinish(TableFinishNode node, RewriteContext context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java index 5495d62cd4a4..25998a6bfda5 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java @@ -61,6 +61,7 @@ import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SemiJoinNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticsWriterNode; @@ -600,6 +601,16 @@ public PlanWithProperties visitTableExecute(TableExecuteNode node, PreferredProp return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties); } + @Override + public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode node, PreferredProperties context) + { + return new PlanWithProperties( + node, + ActualProperties.builder() + .global(singleStreamPartition()) + .build()); + } + private PlanWithProperties visitTableWriter(PlanNode node, Optional partitioningScheme, PlanNode source, PreferredProperties preferredProperties) { PlanWithProperties newSource = source.accept(this, preferredProperties); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java index 3192ad55cec9..a1a9f59b17e0 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java @@ -49,6 +49,7 @@ import io.trino.sql.planner.plan.ProjectNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SemiJoinNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticsWriterNode; @@ -580,6 +581,12 @@ public PlanWithProperties visitTopNRanking(TopNRankingNode node, StreamPreferred return planAndEnforceChildren(node, requiredProperties, requiredProperties); } + @Override + public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode node, StreamPreferredProperties context) + { + return planAndEnforceChildren(node, singleStream(), singleStream()); + } + // // Table Writer and Table Execute // diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java index 47a2b2453650..26aa328c898b 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java @@ -63,6 +63,7 @@ import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticsWriterNode; @@ -481,6 +482,15 @@ public ActualProperties visitTableExecute(TableExecuteNode node, List inputProperties) + { + // metadata operations always run on the coordinator + return ActualProperties.builder() + .global(coordinatorSingleStreamPartition()) + .build(); + } + @Override public ActualProperties visitJoin(JoinNode node, List inputProperties) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java index ab85e1ed2ca7..01b89d74aaa5 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java @@ -54,6 +54,7 @@ import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticsWriterNode; @@ -451,6 +452,12 @@ public StreamProperties visitTableExecute(TableExecuteNode node, List context) + { + return StreamProperties.singleStream(); + } + @Override public StreamProperties visitRefreshMaterializedView(RefreshMaterializedViewNode node, List inputProperties) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java index e3069e83758e..bc97d123799d 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -65,6 +65,7 @@ import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; import io.trino.sql.planner.plan.SimplePlanRewriter; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticsWriterNode; @@ -647,6 +648,21 @@ public PlanAndMappings visitTableExecute(TableExecuteNode node, UnaliasContext c return new PlanAndMappings(rewrittenTableExecute, mapping); } + @Override + public PlanAndMappings visitSimpleTableExecuteNode(SimpleTableExecuteNode node, UnaliasContext context) + { + Map mapping = new HashMap<>(context.getCorrelationMapping()); + SymbolMapper mapper = symbolMapper(mapping); + Symbol newOutput = mapper.map(node.getOutput()); + + return new PlanAndMappings( + new SimpleTableExecuteNode( + node.getId(), + newOutput, + node.getExecuteHandle()), + mapping); + } + @Override public PlanAndMappings visitStatisticsWriterNode(StatisticsWriterNode node, UnaliasContext context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java index 779ced97f7b9..c523d36cefaa 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java @@ -52,6 +52,7 @@ @JsonSubTypes.Type(value = DeleteNode.class, name = "delete"), @JsonSubTypes.Type(value = UpdateNode.class, name = "update"), @JsonSubTypes.Type(value = TableExecuteNode.class, name = "tableExecute"), + @JsonSubTypes.Type(value = SimpleTableExecuteNode.class, name = "simpleTableExecuteNode"), @JsonSubTypes.Type(value = TableDeleteNode.class, name = "tableDelete"), @JsonSubTypes.Type(value = TableFinishNode.class, name = "tablecommit"), @JsonSubTypes.Type(value = UnnestNode.class, name = "unnest"), diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java index 742f8eee4b7f..118d5189fd95 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java @@ -144,6 +144,11 @@ public R visitTableExecute(TableExecuteNode node, C context) return visitPlan(node, context); } + public R visitSimpleTableExecuteNode(SimpleTableExecuteNode node, C context) + { + return visitPlan(node, context); + } + public R visitTableDelete(TableDeleteNode node, C context) { return visitPlan(node, context); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java new file mode 100644 index 000000000000..cba07cbd5866 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.planner.plan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.trino.metadata.TableExecuteHandle; +import io.trino.sql.planner.Symbol; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class SimpleTableExecuteNode + extends PlanNode +{ + private final Symbol output; + private final TableExecuteHandle executeHandle; + + @JsonCreator + public SimpleTableExecuteNode( + @JsonProperty("id") PlanNodeId id, + @JsonProperty("output") Symbol output, + @JsonProperty("executeHandle") TableExecuteHandle executeHandle) + { + super(id); + this.output = requireNonNull(output, "output is null"); + this.executeHandle = requireNonNull(executeHandle, "executeHandle is null"); + } + + @Override + @JsonProperty + public List getSources() + { + return ImmutableList.of(); + } + + @Override + public List getOutputSymbols() + { + return ImmutableList.of(output); + } + + @JsonProperty + public Symbol getOutput() + { + return output; + } + + @Override + public PlanNode replaceChildren(List newChildren) + { + checkArgument(newChildren.isEmpty(), "newChildren should be empty"); + return this; + } + + @JsonProperty + public TableExecuteHandle getExecuteHandle() + { + return executeHandle; + } + + @Override + public R accept(PlanVisitor visitor, C context) + { + return visitor.visitSimpleTableExecuteNode(this, context); + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java index f3a73f0942a0..bcc8d82bb923 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java @@ -86,6 +86,7 @@ import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticAggregations; @@ -1389,6 +1390,14 @@ public Void visitTableExecute(TableExecuteNode node, Void context) return processChildren(node, context); } + @Override + public Void visitSimpleTableExecuteNode(SimpleTableExecuteNode node, Void context) + { + addNode(node, "SimpleTableExecute", format("[%s]", node.getExecuteHandle())); + + return processChildren(node, context); + } + @Override public Void visitTableDelete(TableDeleteNode node, Void context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java index 4bcfe17a66cb..df03b11d9ad6 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java @@ -54,6 +54,7 @@ import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; import io.trino.sql.planner.plan.SetOperationNode; +import io.trino.sql.planner.plan.SimpleTableExecuteNode; import io.trino.sql.planner.plan.SortNode; import io.trino.sql.planner.plan.SpatialJoinNode; import io.trino.sql.planner.plan.StatisticAggregationsDescriptor; @@ -657,6 +658,12 @@ public Void visitTableExecute(TableExecuteNode node, Set boundSymbols) return null; } + @Override + public Void visitSimpleTableExecuteNode(SimpleTableExecuteNode node, Set context) + { + return null; + } + @Override public Void visitTableDelete(TableDeleteNode node, Set boundSymbols) { diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index e3e8a5667f0b..6929bcdfe726 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -149,6 +149,12 @@ public void finishTableExecute(Session session, TableExecuteHandle handle, Colle throw new UnsupportedOperationException(); } + @Override + public void executeTableExecute(Session session, TableExecuteHandle handle) + { + throw new UnsupportedOperationException(); + } + @Override public Optional getSystemTable(Session session, QualifiedObjectName tableName) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 72c2e091024c..b0962bb55f6a 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -147,6 +147,14 @@ default void finishTableExecute(ConnectorSession session, ConnectorTableExecuteH throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata getTableHandleForExecute() is implemented without finishTableExecute()"); } + /** + * Execute a {@link TableProcedureExecutionMode#coordinatorOnly() coordinator-only} table procedure. + */ + default void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata executeTableExecute() is not implemented"); + } + /** * Returns the system table for the specified table name, if one exists. * The system tables handled via {@link #getSystemTable} differ form those returned by {@link Connector#getSystemTables()}. diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/TableProcedureExecutionMode.java b/core/trino-spi/src/main/java/io/trino/spi/connector/TableProcedureExecutionMode.java index 29d4841792b7..29fd820cc332 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/TableProcedureExecutionMode.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/TableProcedureExecutionMode.java @@ -18,15 +18,8 @@ public final class TableProcedureExecutionMode private final boolean readsData; private final boolean supportsFilter; - public TableProcedureExecutionMode(boolean readsData, boolean supportsFilter) + private TableProcedureExecutionMode(boolean readsData, boolean supportsFilter) { - if (!readsData) { - // TODO currently only table procedures which process data are supported - // this is temporary check to be dropped when execution flow will be added for - // table procedures which do not read data - throw new IllegalArgumentException("procedures that do not read data are not supported yet"); - } - if (!readsData) { if (supportsFilter) { throw new IllegalArgumentException("filtering not supported if table data is not processed"); @@ -46,6 +39,10 @@ public boolean supportsFilter() return supportsFilter; } + /** + * Table procedure that does not read any table data and only executes on the coordinator. + * Such procedures are useful for custom DDL-type operations. + */ public static TableProcedureExecutionMode coordinatorOnly() { return new TableProcedureExecutionMode(false, false); diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 061534fa3d5e..1f108382e6aa 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -207,6 +207,14 @@ public Optional getTableHandleForExecute(ConnectorS } } + @Override + public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.executeTableExecute(session, tableExecuteHandle); + } + } + @Override public Optional getLayoutForTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) {