Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,39 @@

import io.trino.spi.metrics.Metrics;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

class BasicOperatorStats
{
private final boolean inputOperator;
private final long totalDrivers;
private final long inputPositions;
private final double sumSquaredInputPositions;
private final Metrics metrics;
private final Metrics connectorMetrics;

public BasicOperatorStats(
boolean inputOperator,
long totalDrivers,
long inputPositions,
double sumSquaredInputPositions,
Metrics metrics,
Metrics connectorMetrics)
{
this.inputOperator = inputOperator;
this.totalDrivers = totalDrivers;
this.inputPositions = inputPositions;
this.sumSquaredInputPositions = sumSquaredInputPositions;
this.metrics = requireNonNull(metrics, "metrics is null");
this.connectorMetrics = requireNonNull(connectorMetrics, "connectorMetrics is null");
}

public boolean isInputOperator()
{
return inputOperator;
}

public long getTotalDrivers()
{
return totalDrivers;
Expand Down Expand Up @@ -66,7 +75,9 @@ public Metrics getConnectorMetrics()

public static BasicOperatorStats merge(BasicOperatorStats first, BasicOperatorStats second)
{
checkArgument(first.inputOperator == second.inputOperator);
return new BasicOperatorStats(
first.inputOperator,
first.totalDrivers + second.totalDrivers,
first.inputPositions + second.inputPositions,
first.sumSquaredInputPositions + second.sumSquaredInputPositions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.util.MoreMaps.mergeMaps;
import static java.lang.Double.max;
Expand Down Expand Up @@ -105,6 +106,14 @@ public Set<String> getOperatorTypes()
return operatorStats.keySet();
}

public Set<String> getInputOperatorTypes()
{
return operatorStats.entrySet().stream()
.filter(entry -> entry.getValue().isInputOperator())
.map(Map.Entry::getKey)
.collect(toImmutableSet());
}

public long getPlanNodeInputPositions()
{
return planNodeInputPositions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,30 +106,31 @@ private static List<PlanNodeStats> getPlanNodeStats(TaskStats taskStats)
planNodeCpuMillis.merge(planNodeId, cpuMillis, Long::sum);

planNodeBlockedMillis.merge(planNodeId, operatorStats.getBlockedWall().toMillis(), Long::sum);
planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum);

// A pipeline like hash build before join might link to another "internal" pipelines which provide actual input for this plan node
if (operatorStats.getPlanNodeId().equals(inputPlanNode) && !pipelineStats.isInputPipeline()) {
continue;
}
if (processedNodes.contains(planNodeId)) {
continue;
}
boolean inputOperator = !processedNodes.contains(planNodeId)
// A plan node like LocalExchange consists of LocalExchangeSource which links to another pipeline containing LocalExchangeSink
&& (!operatorStats.getPlanNodeId().equals(inputPlanNode) || pipelineStats.isInputPipeline())
// HACK: skip dynamic filtering operator as for broadcast join it will be executed by single node only
&& !operatorStats.getOperatorType().equals("DynamicFilterSourceOperator");

basicOperatorStats.merge(planNodeId,
ImmutableMap.of(
operatorStats.getOperatorType(),
new BasicOperatorStats(
inputOperator,
operatorStats.getTotalDrivers(),
operatorStats.getInputPositions(),
operatorStats.getSumSquaredInputPositions(),
operatorStats.getMetrics(),
operatorStats.getConnectorMetrics())),
(map1, map2) -> mergeMaps(map1, map2, BasicOperatorStats::merge));

planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum);
planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum);
planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum);
processedNodes.add(planNodeId);
if (inputOperator) {
planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum);
planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum);
processedNodes.add(planNodeId);
}
}

// Gather output statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,19 @@ private void printMetrics(StringBuilder output, String label, Function<BasicOper
return;
}

Map<String, String> translatedOperatorTypes = translateOperatorTypes(stats.getOperatorTypes());
for (String operator : translatedOperatorTypes.keySet()) {
String translatedOperatorType = translatedOperatorTypes.get(operator);
Set<String> operatorTypes = stats.getOperatorTypes();
for (String operator : operatorTypes) {
Metrics metrics = metricsGetter.apply(stats.getOperatorStats().get(operator));
if (metrics.getMetrics().isEmpty()) {
continue;
}

output.append(translatedOperatorType + label).append("\n");
if (operatorTypes.size() > 1) {
output.append(operator + " " + label + "\n");
}
else {
output.append(label + "\n");
}
Map<String, Metric<?>> sortedMap = new TreeMap<>(metrics.getMetrics());
sortedMap.forEach((name, metric) -> output.append(format(" '%s' = %s\n", name, metric)));
}
Expand All @@ -178,7 +182,7 @@ private void printDistributions(StringBuilder output, PlanNodeStats stats)
{
Map<String, Double> inputAverages = stats.getOperatorInputPositionsAverages();
Map<String, Double> inputStdDevs = stats.getOperatorInputPositionsStdDevs();
Map<String, String> translatedOperatorTypes = translateOperatorTypes(stats.getOperatorTypes());
Map<String, String> translatedOperatorTypes = translateOperatorTypes(stats.getInputOperatorTypes());

for (String operator : translatedOperatorTypes.keySet()) {
String translatedOperatorType = translatedOperatorTypes.get(operator);
Expand Down Expand Up @@ -247,13 +251,6 @@ private static Map<String, String> translateOperatorTypes(Set<String> operators)
"HashBuilderOperator", "Right (build) ");
}

if (operators.contains("LookupJoinOperator") && operators.contains("DynamicFilterSourceOperator")) {
// join plan node
return ImmutableMap.of(
"LookupJoinOperator", "Left (probe) ",
"DynamicFilterSourceOperator", "Right (build) ");
}

return ImmutableMap.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ public void testExplainAnalyzeVerbose()
"EXPLAIN ANALYZE VERBOSE SELECT * FROM nation a",
"'Input rows distribution' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}",
"'CPU time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}",
"'Wall time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}");
"'Wall time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}",
// make sure that individual operator stats are reported
"TaskOutputOperator metrics");
}

@Test
Expand Down