Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.planprinter.Anonymizer;
import io.trino.sql.planner.planprinter.CounterBasedAnonymizer;
import io.trino.sql.planner.planprinter.NoOpAnonymizer;
import io.trino.sql.planner.planprinter.ValuePrinter;
import io.trino.transaction.TransactionId;
import org.joda.time.DateTime;
Expand All @@ -86,6 +89,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.execution.QueryState.QUEUED;
import static io.trino.execution.StageInfo.getAllStages;
import static io.trino.sql.planner.planprinter.PlanPrinter.jsonDistributedPlan;
import static io.trino.sql.planner.planprinter.PlanPrinter.textDistributedPlan;
import static java.lang.Math.max;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -159,12 +163,13 @@ public void queryCreatedEvent(BasicQueryInfo queryInfo)
ImmutableList.of(),
queryInfo.getSelf(),
Optional.empty(),
Optional.empty(),
Optional.empty())));
}

public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailureInfo failure)
{
eventListenerManager.queryCompleted(new QueryCompletedEvent(
eventListenerManager.queryCompleted(requiresAnonymizedPlan -> new QueryCompletedEvent(
new QueryMetadata(
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
Expand All @@ -176,6 +181,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
ImmutableList.of(),
queryInfo.getSelf(),
Optional.empty(),
Optional.empty(),
Optional.empty()),
new QueryStatistics(
ofMillis(0),
Expand Down Expand Up @@ -233,9 +239,9 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
public void queryCompletedEvent(QueryInfo queryInfo)
{
QueryStats queryStats = queryInfo.getQueryStats();
eventListenerManager.queryCompleted(
eventListenerManager.queryCompleted(requiresAnonymizedPlan ->
new QueryCompletedEvent(
createQueryMetadata(queryInfo),
createQueryMetadata(queryInfo, requiresAnonymizedPlan),
createQueryStatistics(queryInfo),
createQueryContext(
queryInfo.getSession(),
Expand All @@ -252,8 +258,9 @@ public void queryCompletedEvent(QueryInfo queryInfo)
logQueryTimeline(queryInfo);
}

private QueryMetadata createQueryMetadata(QueryInfo queryInfo)
private QueryMetadata createQueryMetadata(QueryInfo queryInfo, boolean requiresAnonymizedPlan)
{
Anonymizer anonymizer = requiresAnonymizedPlan ? new CounterBasedAnonymizer() : new NoOpAnonymizer();
return new QueryMetadata(
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
Expand All @@ -264,7 +271,8 @@ private QueryMetadata createQueryMetadata(QueryInfo queryInfo)
queryInfo.getReferencedTables(),
queryInfo.getRoutines(),
queryInfo.getSelf(),
createTextQueryPlan(queryInfo),
createTextQueryPlan(queryInfo, anonymizer),
createJsonQueryPlan(queryInfo, anonymizer),
queryInfo.getOutputStage().flatMap(stage -> stageInfoCodec.toJsonWithLengthLimit(stage, maxJsonLimit)));
}

Expand Down Expand Up @@ -345,15 +353,16 @@ private QueryContext createQueryContext(SessionRepresentation session, Optional<
retryPolicy.toString());
}

private Optional<String> createTextQueryPlan(QueryInfo queryInfo)
private Optional<String> createTextQueryPlan(QueryInfo queryInfo, Anonymizer anonymizer)
{
try {
if (queryInfo.getOutputStage().isPresent()) {
return Optional.of(textDistributedPlan(
queryInfo.getOutputStage().get(),
queryInfo.getQueryStats(),
new ValuePrinter(metadata, functionManager, queryInfo.getSession().toSession(sessionPropertyManager)),
false));
false,
anonymizer));
}
}
catch (Exception e) {
Expand All @@ -364,6 +373,26 @@ private Optional<String> createTextQueryPlan(QueryInfo queryInfo)
return Optional.empty();
}

private Optional<String> createJsonQueryPlan(QueryInfo queryInfo, Anonymizer anonymizer)
{
try {
if (queryInfo.getOutputStage().isPresent()) {
return Optional.of(jsonDistributedPlan(
queryInfo.getOutputStage().get(),
queryInfo.getSession().toSession(sessionPropertyManager),
metadata,
functionManager,
anonymizer));
}
}
catch (Exception e) {
// Sometimes it is expected to fail. For example if generated plan is too long.
// Don't fail to create event if the plan cannot be created.
log.warn(e, "Error creating anonymized json plan for query %s", queryInfo.getQueryId());
}
return Optional.empty();
}

private static QueryIOMetadata getQueryIOMetadata(QueryInfo queryInfo)
{
Multimap<FragmentNode, OperatorStats> planNodeStats = extractPlanNodeStats(queryInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.eventlistener;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -131,14 +132,15 @@ private static Map<String, String> loadEventListenerProperties(File configFile)
}
}

public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
public void queryCompleted(Function<Boolean, QueryCompletedEvent> queryCompletedEventProvider)
{
for (EventListener listener : configuredEventListeners.get()) {
QueryCompletedEvent event = queryCompletedEventProvider.apply(listener.requiresAnonymizedPlan());
try {
listener.queryCompleted(queryCompletedEvent);
listener.queryCompleted(event);
}
catch (Throwable e) {
log.warn(e, "Failed to publish QueryCompletedEvent for query %s", queryCompletedEvent.getMetadata().getQueryId());
log.warn(e, "Failed to publish QueryCompletedEvent for query %s", event.getMetadata().getQueryId());
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/TableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,34 @@

import javax.annotation.concurrent.Immutable;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

@Immutable
public class TableInfo
{
private final Optional<String> connectorName;
private final QualifiedObjectName tableName;
private final TupleDomain<ColumnHandle> predicate;

@JsonCreator
public TableInfo(
@JsonProperty("connectorName") Optional<String> connectorName,
@JsonProperty("tableName") QualifiedObjectName tableName,
@JsonProperty("predicate") TupleDomain<ColumnHandle> predicate)
{
this.connectorName = requireNonNull(connectorName, "connectorName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.predicate = requireNonNull(predicate, "predicate is null");
}

@JsonProperty
public Optional<String> getConnectorName()
{
return connectorName;
}

@JsonProperty
public QualifiedObjectName getTableName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.execution.StageInfo;
import io.trino.execution.TableInfo;
import io.trino.execution.TaskId;
import io.trino.metadata.CatalogInfo;
import io.trino.metadata.Metadata;
import io.trino.metadata.TableProperties;
import io.trino.metadata.TableSchema;
Expand Down Expand Up @@ -142,7 +143,11 @@ private static TableInfo getTableInfo(Session session, Metadata metadata, TableS
{
TableSchema tableSchema = metadata.getTableSchema(session, node.getTable());
TableProperties tableProperties = metadata.getTableProperties(session, node.getTable());
return new TableInfo(tableSchema.getQualifiedName(), tableProperties.getPredicate());
Optional<String> connectorName = metadata.listCatalogs(session).stream()
.filter(catalogInfo -> catalogInfo.getCatalogName().equals(tableSchema.getCatalogName()))
.map(CatalogInfo::getConnectorName)
.findFirst();
return new TableInfo(connectorName, tableSchema.getQualifiedName(), tableProperties.getPredicate());
}

private static StageId getStageId(QueryId queryId, PlanFragmentId fragmentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static io.trino.sql.analyzer.QueryType.EXPLAIN;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should it be separate PR?

import static io.trino.sql.planner.LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED;
import static io.trino.sql.planner.planprinter.IoPlanPrinter.textIoPlan;
import static io.trino.sql.planner.planprinter.PlanPrinter.jsonDistributedPlan;
import static io.trino.sql.planner.planprinter.PlanPrinter.jsonLogicalPlan;
import static io.trino.util.StatementUtils.isDataDefinitionStatement;
import static java.lang.String.format;
Expand Down Expand Up @@ -147,6 +148,8 @@ public String getJsonPlan(Session session, Statement statement, Type planType, L
plan = getLogicalPlan(session, statement, parameters, warningCollector);
return jsonLogicalPlan(plan.getRoot(), session, plan.getTypes(), plannerContext.getMetadata(), plannerContext.getFunctionManager(), plan.getStatsAndCosts());
case DISTRIBUTED:
SubPlan subPlan = getDistributedPlan(session, statement, parameters, warningCollector);
return jsonDistributedPlan(subPlan, plannerContext.getMetadata(), plannerContext.getFunctionManager(), session);
case VALIDATE:
// unsupported
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public SystemPartitionFunction getFunction()
return function;
}

public String getPartitioningName()
{
return partitioning.name();
}

@Override
public boolean isSingleNode()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner.planprinter;

import io.trino.execution.TableInfo;
import io.trino.metadata.IndexHandle;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableExecuteHandle;
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.Symbol;
import io.trino.sql.tree.Expression;

import static io.trino.sql.planner.Partitioning.ArgumentBinding;
import static io.trino.sql.planner.plan.StatisticsWriterNode.WriteStatisticsTarget;
import static io.trino.sql.planner.plan.TableWriterNode.WriterTarget;

/**
* An interface for anonymizing the plan in {@link PlanPrinter}
*/
public interface Anonymizer
{
String anonymize(Type type, String value);

String anonymize(Symbol symbol);

String anonymizeColumn(String column);

String anonymize(Expression expression);

String anonymize(ColumnHandle columnHandle);

String anonymize(QualifiedObjectName objectName);

String anonymize(ArgumentBinding argument);

String anonymize(IndexHandle indexHandle);

String anonymize(TableHandle tableHandle, TableInfo tableInfo);

String anonymize(PartitioningHandle partitioningHandle);

String anonymize(WriterTarget writerTarget);

String anonymize(WriteStatisticsTarget writerTarget);

String anonymize(TableHandle tableHandle);

String anonymize(TableExecuteHandle tableHandle);
}
Loading