Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.operator.TaskStats;
import io.trino.server.BasicQueryInfo;
import io.trino.spi.QueryId;
import io.trino.spi.eventlistener.OutputColumnMetadata;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryContext;
import io.trino.spi.eventlistener.QueryCreatedEvent;
Expand All @@ -56,6 +57,7 @@
import io.trino.spi.eventlistener.StageCpuDistribution;
import io.trino.spi.resourcegroups.QueryType;
import io.trino.spi.resourcegroups.ResourceGroupId;
import io.trino.sql.analyzer.Analysis;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
Expand All @@ -77,6 +79,7 @@
import java.util.stream.Collectors;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.execution.QueryState.QUEUED;
import static io.trino.execution.StageInfo.getAllStages;
import static io.trino.sql.planner.planprinter.PlanPrinter.textDistributedPlan;
Expand Down Expand Up @@ -363,15 +366,21 @@ private static QueryIOMetadata getQueryIOMetadata(QueryInfo queryInfo)
.map(TableFinishInfo.class::cast)
.findFirst();

Optional<List<OutputColumnMetadata>> outputColumnsMetadata = queryInfo.getOutput().get().getColumns()
.map(columns -> columns.stream()
.map(column -> new OutputColumnMetadata(
column.getColumn().getName(),
column.getSourceColumns().stream()
.map(Analysis.SourceColumn::getColumnDetail)
.collect(toImmutableSet())))
.collect(toImmutableList()));

output = Optional.of(
new QueryOutputMetadata(
queryInfo.getOutput().get().getCatalogName(),
queryInfo.getOutput().get().getSchema(),
queryInfo.getOutput().get().getTable(),
queryInfo.getOutput().get().getColumns()
.map(columns -> columns.stream()
.map(Column::getName)
.collect(toImmutableList())),
outputColumnsMetadata,
tableFinishInfo.map(TableFinishInfo::getConnectorOutputMetadata),
tableFinishInfo.map(TableFinishInfo::isJsonLengthLimitExceeded)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.Session;
import io.trino.connector.CatalogName;
Expand All @@ -32,6 +33,7 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeNotFoundException;
import io.trino.sql.analyzer.Output;
import io.trino.sql.analyzer.OutputColumn;
import io.trino.sql.tree.ColumnDefinition;
import io.trino.sql.tree.CreateTable;
import io.trino.sql.tree.Expression;
Expand Down Expand Up @@ -248,7 +250,7 @@ else if (element instanceof LikeClause) {
tableName.getSchemaName(),
tableName.getObjectName(),
Optional.of(tableMetadata.getColumns().stream()
.map(column -> new Column(column.getName(), column.getType().toString()))
.map(column -> new OutputColumn(new Column(column.getName(), column.getType().toString()), ImmutableSet.of()))
.collect(toImmutableList()))));
return immediateFuture(null);
}
Expand Down
96 changes: 82 additions & 14 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
*/
package io.trino.sql.analyzer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Streams;
import io.trino.execution.Column;
import io.trino.metadata.NewTableLayout;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.ResolvedFunction;
Expand All @@ -33,6 +34,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.eventlistener.ColumnDetail;
import io.trino.spi.eventlistener.ColumnInfo;
import io.trino.spi.eventlistener.RoutineInfo;
import io.trino.spi.eventlistener.TableInfo;
Expand Down Expand Up @@ -122,9 +124,6 @@ public class Analysis
// a map of users to the columns per table that they access
private final Map<AccessControlInfo, Map<QualifiedObjectName, Set<String>>> tableColumnReferences = new LinkedHashMap<>();

// Track referenced fields from source relation node
private final Multimap<NodeRef<? extends Node>, Field> referencedFields = HashMultimap.create();

private final Map<NodeRef<QuerySpecification>, List<FunctionCall>> aggregates = new LinkedHashMap<>();
private final Map<NodeRef<OrderBy>, List<Expression>> orderByAggregates = new LinkedHashMap<>();
private final Map<NodeRef<QuerySpecification>, GroupingSetAnalysis> groupingSets = new LinkedHashMap<>();
Expand Down Expand Up @@ -195,6 +194,8 @@ public class Analysis

// row id field for update/delete queries
private final Map<NodeRef<Table>, FieldReference> rowIdField = new LinkedHashMap<>();
private final Multimap<Field, SourceColumn> originColumnDetails = ArrayListMultimap.create();
private final Multimap<NodeRef<Expression>, Field> fieldLineage = ArrayListMultimap.create();

public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, boolean isDescribe)
{
Expand All @@ -221,7 +222,7 @@ public Optional<Output> getTarget()
});
}

public void setUpdateType(String updateType, QualifiedObjectName targetName, Optional<Table> targetTable, Optional<List<Column>> targetColumns)
public void setUpdateType(String updateType, QualifiedObjectName targetName, Optional<Table> targetTable, Optional<List<OutputColumn>> targetColumns)
{
this.updateType = updateType;
this.target = Optional.of(new UpdateTarget(targetName, targetTable, targetColumns));
Expand Down Expand Up @@ -851,11 +852,6 @@ public void addEmptyColumnReferencesForTable(AccessControl accessControl, Identi
tableColumnReferences.computeIfAbsent(accessControlInfo, k -> new LinkedHashMap<>()).computeIfAbsent(table, k -> new HashSet<>());
}

public void addReferencedFields(Multimap<NodeRef<Node>, Field> references)
{
referencedFields.putAll(references);
}

public Map<AccessControlInfo, Map<QualifiedObjectName, Set<String>>> getTableColumnReferences()
{
return tableColumnReferences;
Expand Down Expand Up @@ -966,6 +962,28 @@ public List<RoutineInfo> getRoutines()
.collect(toImmutableList());
}

public void addSourceColumns(Field field, Set<SourceColumn> sourceColumn)
{
originColumnDetails.putAll(field, sourceColumn);
}

public Set<SourceColumn> getSourceColumns(Field field)
{
return ImmutableSet.copyOf(originColumnDetails.get(field));
}

public void addExpressionFields(Expression expression, Collection<Field> fields)
{
fieldLineage.putAll(NodeRef.of(expression), fields);
}

public Set<SourceColumn> getExpressionSourceColumns(Expression expression)
{
return fieldLineage.get(NodeRef.of(expression)).stream()
.flatMap(field -> getSourceColumns(field).stream())
.collect(toImmutableSet());
}

public void setRowIdField(Table table, FieldReference field)
{
rowIdField.put(NodeRef.of(table), field);
Expand Down Expand Up @@ -1488,6 +1506,56 @@ public Scope getAccessControlScope()
}
}

public static class SourceColumn
Comment thread
Praveen2112 marked this conversation as resolved.
Outdated
{
private final QualifiedObjectName tableName;
private final String columnName;

@JsonCreator
public SourceColumn(@JsonProperty("tableName") QualifiedObjectName tableName, @JsonProperty("columnName") String columnName)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.columnName = requireNonNull(columnName, "columnName is null");
}

@JsonProperty
public QualifiedObjectName getTableName()
{
return tableName;
}

@JsonProperty
public String getColumnName()
{
return columnName;
}

public ColumnDetail getColumnDetail()
{
return new ColumnDetail(tableName.getCatalogName(), tableName.getSchemaName(), tableName.getObjectName(), columnName);
}

@Override
public int hashCode()
{
return Objects.hash(tableName, columnName);
}

@Override
public boolean equals(Object obj)
{
if (obj == this) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
SourceColumn entry = (SourceColumn) obj;
return Objects.equals(tableName, entry.tableName) &&
Objects.equals(columnName, entry.columnName);
}
}

private static class RoutineEntry
{
private final ResolvedFunction function;
Expand All @@ -1514,9 +1582,9 @@ private static class UpdateTarget
{
private final QualifiedObjectName name;
private final Optional<Table> table;
private final Optional<List<Column>> columns;
private final Optional<List<OutputColumn>> columns;

public UpdateTarget(QualifiedObjectName name, Optional<Table> table, Optional<List<Column>> columns)
public UpdateTarget(QualifiedObjectName name, Optional<Table> table, Optional<List<OutputColumn>> columns)
{
this.name = requireNonNull(name, "name is null");
this.table = requireNonNull(table, "table is null");
Expand All @@ -1533,7 +1601,7 @@ public Optional<Table> getTable()
return table;
}

public Optional<List<Column>> getColumns()
public Optional<List<OutputColumn>> getColumns()
{
return columns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public class ExpressionAnalyzer
private final CorrelationSupport correlationSupport;
private final Function<Expression, Type> getPreanalyzedType;
private final Function<FunctionCall, ResolvedWindow> getResolvedWindow;
private final List<Field> sourceFields = new ArrayList<>();

public ExpressionAnalyzer(
Metadata metadata,
Expand Down Expand Up @@ -401,6 +402,11 @@ public Multimap<NodeRef<Node>, Field> getReferencedFields()
return referencedFields;
}

public List<Field> getSourceFields()
{
return sourceFields;
}

private class Visitor
extends StackableAstVisitor<Type, Context>
{
Expand Down Expand Up @@ -507,6 +513,8 @@ private Type handleResolvedField(Expression node, ResolvedField resolvedField, S
tableColumnReferences.put(field.getOriginTable().get(), field.getOriginColumnName().get());
}

sourceFields.add(field);

fieldId.getRelationId()
.getSourceNode()
.ifPresent(source -> referencedFields.put(NodeRef.of(source), field));
Expand Down Expand Up @@ -1571,6 +1579,8 @@ else if (previousNode instanceof QuantifiedComparisonExpression) {
scalarSubqueries.add(NodeRef.of(node));
}

sourceFields.add(queryScope.getRelationType().getFieldByIndex(0));

Type type = getOnlyElement(queryScope.getRelationType().getVisibleFields()).getType();
return setExpressionType(node, type);
}
Expand Down Expand Up @@ -1973,6 +1983,7 @@ public static ExpressionAnalysis analyzeExpression(
analyzer.analyze(expression, scope);

updateAnalysis(analysis, analyzer, session, accessControl);
analysis.addExpressionFields(expression, analyzer.getSourceFields());

return new ExpressionAnalysis(
analyzer.getExpressionTypes(),
Expand Down Expand Up @@ -2030,7 +2041,6 @@ private static void updateAnalysis(Analysis analysis, ExpressionAnalyzer analyze
analysis.addColumnReferences(analyzer.getColumnReferences());
analysis.addLambdaArgumentReferences(analyzer.getLambdaArgumentReferences());
analysis.addTableColumnReferences(accessControl, session.getIdentity(), analyzer.getTableColumnReferences());
analysis.addReferencedFields(analyzer.getReferencedFields());
}

public static ExpressionAnalyzer create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.execution.Column;

import javax.annotation.concurrent.Immutable;

Expand All @@ -32,14 +31,14 @@ public final class Output
private final String catalogName;
private final String schema;
private final String table;
private final Optional<List<Column>> columns;
private final Optional<List<OutputColumn>> columns;

@JsonCreator
public Output(
@JsonProperty("catalogName") String catalogName,
@JsonProperty("schema") String schema,
@JsonProperty("table") String table,
@JsonProperty("columns") Optional<List<Column>> columns)
@JsonProperty("columns") Optional<List<OutputColumn>> columns)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.schema = requireNonNull(schema, "schema is null");
Expand All @@ -66,7 +65,7 @@ public String getTable()
}

@JsonProperty
public Optional<List<Column>> getColumns()
public Optional<List<OutputColumn>> getColumns()
{
return columns;
}
Expand Down
Loading