Skip to content
Merged
37 changes: 23 additions & 14 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.sql.analyzer.QueryType.DESCRIBE;
import static io.trino.sql.analyzer.QueryType.EXPLAIN;
import static java.lang.Boolean.FALSE;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -202,8 +205,7 @@ public class Analysis
private Optional<TableHandle> analyzeTarget = Optional.empty();
private Optional<List<ColumnMetadata>> updatedColumns = Optional.empty();

// for describe input and describe output
private final boolean isDescribe;
private final QueryType queryType;

// for recursive view detection
private final Deque<Table> tablesForView = new ArrayDeque<>();
Expand All @@ -213,11 +215,11 @@ public class Analysis
private final Multimap<Field, SourceColumn> originColumnDetails = ArrayListMultimap.create();
private final Multimap<NodeRef<Expression>, Field> fieldLineage = ArrayListMultimap.create();

public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, boolean isDescribe)
public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, QueryType queryType)
{
this.root = root;
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters, "parameters is null"));
this.isDescribe = isDescribe;
this.queryType = requireNonNull(queryType, "queryType is null");
}

public Statement getStatement()
Expand All @@ -238,23 +240,25 @@ public Optional<Output> getTarget()
});
}

public void setUpdateType(String updateType, QualifiedObjectName targetName, Optional<Table> targetTable, Optional<List<OutputColumn>> targetColumns)
public void setUpdateType(String updateType)
{
this.updateType = updateType;
this.target = Optional.of(new UpdateTarget(targetName, targetTable, targetColumns));
if (queryType != EXPLAIN) {
this.updateType = updateType;
}
}

public void resetUpdateType()
public void setUpdateTarget(QualifiedObjectName targetName, Optional<Table> targetTable, Optional<List<OutputColumn>> targetColumns)
{
this.updateType = null;
this.target = Optional.empty();
this.target = Optional.of(new UpdateTarget(targetName, targetTable, targetColumns));
}

public boolean isUpdateTarget(Table table)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used when creating a TableScanNode to set the updateTarget flag. Is it intentional that after this change we set the flag to true for ANALYZE, CREATE MATERIALIZED VIEW etc., while before this change it was set only for UPDATE and DELETE queries?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be still set only for UPDATE and DELETE queries - as targetTable is set only for those two types of queries and isUpdateTarget will return false if the targetTable is Optional#empty

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

{
return ("DELETE".equals(updateType) || "UPDATE".equals(updateType)) &&
target.orElseThrow(() -> new IllegalStateException("Update target not set"))
.getTable().orElseThrow(() -> new IllegalStateException("Table reference not set in update target")) == table; // intentional comparison by reference
requireNonNull(table, "table is null");
return target
.flatMap(UpdateTarget::getTable)
.map(tableReference -> tableReference == table) // intentional comparison by reference
.orElse(FALSE);
}

public boolean isSkipMaterializedViewRefresh()
Expand Down Expand Up @@ -840,9 +844,14 @@ public Map<NodeRef<Parameter>, Expression> getParameters()
return parameters;
}

public QueryType getQueryType()
{
return queryType;
}

public boolean isDescribe()
{
return isDescribe;
return queryType == DESCRIBE;
}

public void setJoinUsing(Join node, JoinUsingAnalysis analysis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static io.trino.sql.analyzer.ExpressionTreeUtils.extractAggregateFunctions;
import static io.trino.sql.analyzer.ExpressionTreeUtils.extractExpressions;
import static io.trino.sql.analyzer.ExpressionTreeUtils.extractWindowExpressions;
import static io.trino.sql.analyzer.QueryType.OTHERS;
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -80,13 +81,13 @@ public Analyzer(

public Analysis analyze(Statement statement)
{
return analyze(statement, false);
return analyze(statement, OTHERS);
}

public Analysis analyze(Statement statement, boolean isDescribe)
public Analysis analyze(Statement statement, QueryType queryType)
{
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, parameterLookup, groupProvider, accessControl, warningCollector, statsCalculator);
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, isDescribe);
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, queryType);
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, groupProvider, accessControl, session, warningCollector, CorrelationSupport.ALLOWED);
analyzer.analyze(rewrittenStatement, Optional.empty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2539,9 +2539,9 @@ public static ExpressionAnalysis analyzeExpressions(
Iterable<Expression> expressions,
Map<NodeRef<Parameter>, Expression> parameters,
WarningCollector warningCollector,
boolean isDescribe)
QueryType queryType)
{
Analysis analysis = new Analysis(null, parameters, isDescribe);
Analysis analysis = new Analysis(null, parameters, queryType);
ExpressionAnalyzer analyzer = create(analysis, session, metadata, sqlParser, groupProvider, accessControl, types, warningCollector);
for (Expression expression : expressions) {
analyzer.analyze(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.sql.ParameterUtils.parameterExtractor;
import static io.trino.sql.analyzer.QueryType.EXPLAIN;
import static io.trino.sql.planner.LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED;
import static io.trino.sql.planner.planprinter.IoPlanPrinter.textIoPlan;
import static java.lang.String.format;
Expand Down Expand Up @@ -118,7 +119,7 @@ public QueryExplainer(
public Analysis analyze(Session session, Statement statement, List<Expression> parameters, WarningCollector warningCollector)
{
Analyzer analyzer = new Analyzer(session, metadata, sqlParser, groupProvider, accessControl, Optional.of(this), parameters, parameterExtractor(statement, parameters), warningCollector, statsCalculator);
return analyzer.analyze(statement);
return analyzer.analyze(statement, EXPLAIN);
}

public String getPlan(Session session, Statement statement, Type planType, List<Expression> parameters, WarningCollector warningCollector)
Expand Down
22 changes: 22 additions & 0 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/QueryType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.analyzer;

public enum QueryType
{
DESCRIBE,
EXPLAIN,
OTHERS,
/**/;
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ protected Scope visitInsert(Insert insert, Optional<Scope> scope)
.map(Type::toString),
Column::new);

analysis.setUpdateType(
"INSERT",
analysis.setUpdateType("INSERT");
analysis.setUpdateTarget(
targetTable,
Optional.empty(),
Optional.of(Streams.zip(
Expand All @@ -526,11 +526,11 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
}

accessControl.checkCanRefreshMaterializedView(session.toSecurityContext(), name);
analysis.setUpdateType("REFRESH MATERIALIZED VIEW");

if (metadata.delegateMaterializedViewRefreshToConnector(session, name)) {
analysis.setDelegatedRefreshMaterializedView(name);
analysis.setUpdateType(
"REFRESH MATERIALIZED VIEW",
analysis.setUpdateTarget(
name,
Optional.empty(),
Optional.empty());
Expand Down Expand Up @@ -590,8 +590,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
.map(Type::toString),
Column::new);

analysis.setUpdateType(
"REFRESH MATERIALIZED VIEW",
analysis.setUpdateTarget(
targetTable,
Optional.empty(),
Optional.of(Streams.zip(
Expand Down Expand Up @@ -699,7 +698,8 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
Scope tableScope = analyzer.analyzeForUpdate(table, scope, UpdateKind.DELETE);
node.getWhere().ifPresent(where -> analyzeWhere(node, tableScope, where));

analysis.setUpdateType("DELETE", tableName, Optional.of(table), Optional.empty());
analysis.setUpdateType("DELETE");
analysis.setUpdateTarget(tableName, Optional.of(table), Optional.empty());

return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}
Expand All @@ -708,7 +708,8 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
protected Scope visitAnalyze(Analyze node, Optional<Scope> scope)
{
QualifiedObjectName tableName = createQualifiedObjectName(session, node, node.getTableName());
analysis.setUpdateType("ANALYZE", tableName, Optional.empty(), Optional.empty());
analysis.setUpdateType("ANALYZE");
analysis.setUpdateTarget(tableName, Optional.empty(), Optional.empty());

// verify the target table exists and it's not a view
if (metadata.getView(session, tableName).isPresent()) {
Expand Down Expand Up @@ -763,7 +764,8 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
Optional.empty(),
node.isWithData(),
true));
analysis.setUpdateType("CREATE TABLE", targetTable, Optional.empty(), Optional.of(ImmutableList.of()));
analysis.setUpdateType("CREATE TABLE");
analysis.setUpdateTarget(targetTable, Optional.empty(), Optional.of(ImmutableList.of()));
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}
throw semanticException(TABLE_ALREADY_EXISTS, node, "Destination table '%s' already exists", targetTable);
Expand Down Expand Up @@ -846,8 +848,8 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
node.isWithData(),
false));

analysis.setUpdateType(
"CREATE TABLE",
analysis.setUpdateType("CREATE TABLE");
analysis.setUpdateTarget(
targetTable,
Optional.empty(),
Optional.of(outputColumns.build()));
Expand All @@ -869,8 +871,8 @@ protected Scope visitCreateView(CreateView node, Optional<Scope> scope)

validateColumns(node, queryScope.getRelationType());

analysis.setUpdateType(
"CREATE VIEW",
analysis.setUpdateType("CREATE VIEW");
analysis.setUpdateTarget(
viewName,
Optional.empty(),
Optional.of(queryScope.getRelationType().getVisibleFields().stream()
Expand Down Expand Up @@ -1069,8 +1071,8 @@ protected Scope visitCreateMaterializedView(CreateMaterializedView node, Optiona

validateColumns(node, queryScope.getRelationType());

analysis.setUpdateType(
"CREATE MATERIALIZED VIEW",
analysis.setUpdateType("CREATE MATERIALIZED VIEW");
analysis.setUpdateTarget(
viewName,
Optional.empty(),
Optional.of(
Expand Down Expand Up @@ -1153,15 +1155,13 @@ private void validateColumnAliasesCount(List<Identifier> columnAliases, int sour
protected Scope visitExplain(Explain node, Optional<Scope> scope)
{
process(node.getStatement(), scope);
analysis.resetUpdateType();
return createAndAssignScope(node, scope, Field.newUnqualified("Query Plan", VARCHAR));
}

@Override
protected Scope visitExplainAnalyze(ExplainAnalyze node, Optional<Scope> scope)
{
process(node.getStatement(), scope);
analysis.resetUpdateType();
return createAndAssignScope(node, scope, Field.newUnqualified("Query Plan", VARCHAR));
}

Expand Down Expand Up @@ -1904,7 +1904,7 @@ protected Scope visitSampledRelation(SampledRelation relation, Optional<Scope> s
ImmutableList.of(samplePercentage),
analysis.getParameters(),
WarningCollector.NOOP,
analysis.isDescribe())
analysis.getQueryType())
.getExpressionTypes();

Type samplePercentageType = expressionTypes.get(NodeRef.of(samplePercentage));
Expand Down Expand Up @@ -2313,8 +2313,8 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)
analysis.recordSubqueries(update, analyses.get(index));
}

analysis.setUpdateType(
"UPDATE",
analysis.setUpdateType("UPDATE");
analysis.setUpdateTarget(
tableName,
Optional.of(table),
Optional.of(updatedColumns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;

import static io.trino.sql.analyzer.ExpressionAnalyzer.analyzeExpressions;
import static io.trino.sql.analyzer.QueryType.OTHERS;

/**
* This class is to facilitate obtaining the type of an expression and its subexpressions
Expand Down Expand Up @@ -59,7 +60,7 @@ public Map<NodeRef<Expression>, Type> getTypes(Session session, TypeProvider inp
expressions,
ImmutableMap.of(),
WarningCollector.NOOP,
false)
OTHERS)
.getExpressionTypes();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import io.trino.sql.planner.PlanNodeIdAllocator;
Comment thread
Praveen2112 marked this conversation as resolved.
Outdated
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.TypeProvider;
import io.trino.sql.planner.plan.AssignUniqueId;
import io.trino.sql.planner.plan.DeleteNode;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.MarkDistinctNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.SemiJoinNode;
Expand All @@ -48,8 +50,8 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar;
import static io.trino.sql.planner.plan.ChildReplacer.replaceChildren;
import static io.trino.sql.planner.planprinter.PlanPrinter.textLogicalPlan;
import static java.util.stream.Collectors.toSet;
Expand Down Expand Up @@ -254,7 +256,9 @@ private WriterTarget createWriterTarget(WriterTarget target)
private TableHandle findTableScanHandle(PlanNode node)
{
if (node instanceof TableScanNode) {
return ((TableScanNode) node).getTable();
TableScanNode tableScanNode = (TableScanNode) node;
checkArgument(((TableScanNode) node).isUpdateTarget(), "TableScanNode should be an updatable target");
return tableScanNode.getTable();
}
if (node instanceof FilterNode) {
return findTableScanHandle(((FilterNode) node).getSource());
Expand All @@ -267,9 +271,13 @@ private TableHandle findTableScanHandle(PlanNode node)
}
if (node instanceof JoinNode) {
JoinNode joinNode = (JoinNode) node;
Comment thread
Praveen2112 marked this conversation as resolved.
Outdated
Comment thread
Praveen2112 marked this conversation as resolved.
Outdated
if (joinNode.getType() == JoinNode.Type.INNER && isAtMostScalar(joinNode.getRight())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would love to see in the commit description why this check is no longer needed :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Why this check is removed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now with support for correlated queries for assignment...we would translate them to a LEFT JOIN with a non SCALAR build source.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to top level discussion @ #8286 (comment)

return findTableScanHandle(joinNode.getLeft());
}
return findTableScanHandle(joinNode.getLeft());
}
if (node instanceof AssignUniqueId) {
return findTableScanHandle(((AssignUniqueId) node).getSource());
}
if (node instanceof MarkDistinctNode) {
return findTableScanHandle(((MarkDistinctNode) node).getSource());
}
throw new IllegalArgumentException("Invalid descendant for DeleteNode or UpdateNode: " + node.getClass().getName());
}
Expand Down Expand Up @@ -303,11 +311,16 @@ private PlanNode rewriteModifyTableScan(PlanNode node, TableHandle handle)
return replaceChildren(node, ImmutableList.of(source, ((SemiJoinNode) node).getFilteringSource()));
}
if (node instanceof JoinNode) {
JoinNode joinNode = (JoinNode) node;
if (joinNode.getType() == JoinNode.Type.INNER && isAtMostScalar(joinNode.getRight())) {
PlanNode source = rewriteModifyTableScan(joinNode.getLeft(), handle);
return replaceChildren(node, ImmutableList.of(source, joinNode.getRight()));
}
PlanNode source = rewriteModifyTableScan(((JoinNode) node).getLeft(), handle);
return replaceChildren(node, ImmutableList.of(source, ((JoinNode) node).getRight()));
}
if (node instanceof AssignUniqueId) {
PlanNode source = rewriteModifyTableScan(((AssignUniqueId) node).getSource(), handle);
return replaceChildren(node, ImmutableList.of(source));
}
if (node instanceof MarkDistinctNode) {
PlanNode source = rewriteModifyTableScan(((MarkDistinctNode) node).getSource(), handle);
return replaceChildren(node, ImmutableList.of(source));
}
throw new IllegalArgumentException("Invalid descendant for DeleteNode or UpdateNode: " + node.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public ActualProperties visitAssignUniqueId(AssignUniqueId node, List<ActualProp
}

return ActualProperties.builderFrom(properties)
.global(partitionedOn(ARBITRARY_DISTRIBUTION, ImmutableList.of(node.getIdColumn()), Optional.empty()))
.global(partitionedOn(ARBITRARY_DISTRIBUTION, ImmutableList.of(node.getIdColumn()), Optional.of(ImmutableList.of(node.getIdColumn()))))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from cmt msg

The uniqueId generated is based on StageId and TaskId so we could
specify stream partitioning for AssignUniqueId

I think that it doesn't matter how the ID is generated.
All we need to know is that the idColumn is populated with a value unique for every row.
If it's unique, it's also partitioned (both node-level and stream-level).

please update the commit message.

.local(newLocalProperties.build())
.build();
}
Expand Down
Loading