diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java index ea8bee223d96..34fa55590fda 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java @@ -15,10 +15,12 @@ import io.trino.spi.metrics.Metrics; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; class BasicOperatorStats { + private final boolean inputOperator; private final long totalDrivers; private final long inputPositions; private final double sumSquaredInputPositions; @@ -26,12 +28,14 @@ class BasicOperatorStats private final Metrics connectorMetrics; public BasicOperatorStats( + boolean inputOperator, long totalDrivers, long inputPositions, double sumSquaredInputPositions, Metrics metrics, Metrics connectorMetrics) { + this.inputOperator = inputOperator; this.totalDrivers = totalDrivers; this.inputPositions = inputPositions; this.sumSquaredInputPositions = sumSquaredInputPositions; @@ -39,6 +43,11 @@ public BasicOperatorStats( this.connectorMetrics = requireNonNull(connectorMetrics, "connectorMetrics is null"); } + public boolean isInputOperator() + { + return inputOperator; + } + public long getTotalDrivers() { return totalDrivers; @@ -66,7 +75,9 @@ public Metrics getConnectorMetrics() public static BasicOperatorStats merge(BasicOperatorStats first, BasicOperatorStats second) { + checkArgument(first.inputOperator == second.inputOperator); return new BasicOperatorStats( + first.inputOperator, first.totalDrivers + second.totalDrivers, first.inputPositions + second.inputPositions, first.sumSquaredInputPositions + second.sumSquaredInputPositions, diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java index d6c7659a22b6..e696f0d34162 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java @@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.util.MoreMaps.mergeMaps; import static java.lang.Double.max; @@ -105,6 +106,14 @@ public Set getOperatorTypes() return operatorStats.keySet(); } + public Set getInputOperatorTypes() + { + return operatorStats.entrySet().stream() + .filter(entry -> entry.getValue().isInputOperator()) + .map(Map.Entry::getKey) + .collect(toImmutableSet()); + } + public long getPlanNodeInputPositions() { return planNodeInputPositions; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java index f8ab93317359..a094c8dd92bb 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java @@ -106,19 +106,19 @@ private static List getPlanNodeStats(TaskStats taskStats) planNodeCpuMillis.merge(planNodeId, cpuMillis, Long::sum); planNodeBlockedMillis.merge(planNodeId, operatorStats.getBlockedWall().toMillis(), Long::sum); + planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum); - // A pipeline like hash build before join might link to another "internal" pipelines which provide actual input for this plan node - if (operatorStats.getPlanNodeId().equals(inputPlanNode) && !pipelineStats.isInputPipeline()) { - continue; - } - if (processedNodes.contains(planNodeId)) { - continue; - } + boolean inputOperator = !processedNodes.contains(planNodeId) + // A plan node like LocalExchange consists of LocalExchangeSource which links to another pipeline containing LocalExchangeSink + && (!operatorStats.getPlanNodeId().equals(inputPlanNode) || pipelineStats.isInputPipeline()) + // HACK: skip dynamic filtering operator as for broadcast join it will be executed by single node only + && !operatorStats.getOperatorType().equals("DynamicFilterSourceOperator"); basicOperatorStats.merge(planNodeId, ImmutableMap.of( operatorStats.getOperatorType(), new BasicOperatorStats( + inputOperator, operatorStats.getTotalDrivers(), operatorStats.getInputPositions(), operatorStats.getSumSquaredInputPositions(), @@ -126,10 +126,11 @@ private static List getPlanNodeStats(TaskStats taskStats) operatorStats.getConnectorMetrics())), (map1, map2) -> mergeMaps(map1, map2, BasicOperatorStats::merge)); - planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum); - planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum); - planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum); - processedNodes.add(planNodeId); + if (inputOperator) { + planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum); + planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum); + processedNodes.add(planNodeId); + } } // Gather output statistics diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java index 898386e3c4d6..f9a271b69e72 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java @@ -160,15 +160,19 @@ private void printMetrics(StringBuilder output, String label, Function translatedOperatorTypes = translateOperatorTypes(stats.getOperatorTypes()); - for (String operator : translatedOperatorTypes.keySet()) { - String translatedOperatorType = translatedOperatorTypes.get(operator); + Set operatorTypes = stats.getOperatorTypes(); + for (String operator : operatorTypes) { Metrics metrics = metricsGetter.apply(stats.getOperatorStats().get(operator)); if (metrics.getMetrics().isEmpty()) { continue; } - output.append(translatedOperatorType + label).append("\n"); + if (operatorTypes.size() > 1) { + output.append(operator + " " + label + "\n"); + } + else { + output.append(label + "\n"); + } Map> sortedMap = new TreeMap<>(metrics.getMetrics()); sortedMap.forEach((name, metric) -> output.append(format(" '%s' = %s\n", name, metric))); } @@ -178,7 +182,7 @@ private void printDistributions(StringBuilder output, PlanNodeStats stats) { Map inputAverages = stats.getOperatorInputPositionsAverages(); Map inputStdDevs = stats.getOperatorInputPositionsStdDevs(); - Map translatedOperatorTypes = translateOperatorTypes(stats.getOperatorTypes()); + Map translatedOperatorTypes = translateOperatorTypes(stats.getInputOperatorTypes()); for (String operator : translatedOperatorTypes.keySet()) { String translatedOperatorType = translatedOperatorTypes.get(operator); @@ -247,13 +251,6 @@ private static Map translateOperatorTypes(Set operators) "HashBuilderOperator", "Right (build) "); } - if (operators.contains("LookupJoinOperator") && operators.contains("DynamicFilterSourceOperator")) { - // join plan node - return ImmutableMap.of( - "LookupJoinOperator", "Left (probe) ", - "DynamicFilterSourceOperator", "Right (build) "); - } - return ImmutableMap.of(); } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java index 776e9216699e..15ccc8004ac9 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java @@ -225,7 +225,9 @@ public void testExplainAnalyzeVerbose() "EXPLAIN ANALYZE VERBOSE SELECT * FROM nation a", "'Input rows distribution' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", "'CPU time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", - "'Wall time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}"); + "'Wall time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", + // make sure that individual operator stats are reported + "TaskOutputOperator metrics"); } @Test