From b2a1592e5fe40490cc238a2ca2996ca1ebe49a5c Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 30 Sep 2022 12:56:00 +0200 Subject: [PATCH 1/3] Adjust comment to provide a better, real example --- .../trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java index f8ab93317359..ba60d6d0fd79 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java @@ -107,7 +107,7 @@ private static List getPlanNodeStats(TaskStats taskStats) planNodeBlockedMillis.merge(planNodeId, operatorStats.getBlockedWall().toMillis(), Long::sum); - // A pipeline like hash build before join might link to another "internal" pipelines which provide actual input for this plan node + // A plan node like LocalExchange consists of LocalExchangeSource which links to another pipeline containing LocalExchangeSink if (operatorStats.getPlanNodeId().equals(inputPlanNode) && !pipelineStats.isInputPipeline()) { continue; } From 7768da6a56e8c36e08b4d6328f6e7bcbe1b42cd8 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 30 Sep 2022 15:26:17 +0200 Subject: [PATCH 2/3] Print stats for each individual operator in EXPLAIN ANALYZE VERBOSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Example Fragment 2 [HASH] CPU: 41.22s, Scheduled: 1.08m, Blocked 2.61m (Input: 48.15s, Output: 0.00ns), Input: 119972104 rows (2.01GB); per task: avg.: 119972104.00 std.dev.: 0.00, Output: 8 rows ( Output layout: [count_19] Output partitioning: SINGLE [] Aggregate[type = PARTIAL] │ Layout: [count_19:bigint] │ CPU: 1.30s (1.92%), Scheduled: 2.00s (1.20%), Blocked: 0.00ns (0.00%), Output: 8 rows (72B) │ AggregationOperator metrics: │ 'CPU time distribution (s)' = {count=8.00, p01=0.15, p05=0.15, p10=0.15, p25=0.15, p50=0.17, p75=0.18, p90=0.18, p95=0.18, p99=0.18, min=0.15, max=0.18} │ 'Input rows distribution' = {count=8.00, p01=36809359.00, p05=36809359.00, p10=36809359.00, p25=37080874.00, p50=37387039.00, p75=38224448.00, p90=38731833.00, p95=3 │ 'Wall time distribution (s)' = {count=8.00, p01=0.20, p05=0.20, p10=0.20, p25=0.23, p50=0.27, p75=0.27, p90=0.32, p95=0.32, p99=0.32, min=0.20, max=0.32} │ TaskOutputOperator metrics: │ 'CPU time distribution (s)' = {count=8.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=0.00, p90=0.00, p95=0.00, p99=0.00, min=0.00, max=0.00} │ 'Input rows distribution' = {count=8.00, p01=1.00, p05=1.00, p10=1.00, p25=1.00, p50=1.00, p75=1.00, p90=1.00, p95=1.00, p99=1.00, min=1.00, max=1.00} │ 'Wall time distribution (s)' = {count=8.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=0.00, p90=0.00, p95=0.00, p99=0.00, min=0.00, max=0.00} │ Input avg.: 37485342.50 rows, Input std.dev.: 1.71% │ count_19 := count("orderkey") └─ InnerJoin[criteria = ("orderkey" = "orderkey_0"), hash = [$hashvalue, $hashvalue_21], distribution = PARTITIONED] │ Layout: [orderkey:bigint] │ Reorder joins cost : {rows: 239034200 (2.00GB), cpu: 5.52G, memory: 514.86MB, network: 1.01GB} │ Estimates: {rows: 239034200 (2.00GB), cpu: 4.01G, memory: 1.01GB, network: 0B} │ CPU: 29.98s (44.16%), Scheduled: 43.86s (26.35%), Blocked: 1.20m (14.13%), Output: 299882740 rows (1.62GB) │ LookupJoinOperator metrics: │ 'CPU time distribution (s)' = {count=8.00, p01=2.38, p05=2.38, p10=2.38, p25=2.42, p50=2.44, p75=2.50, p90=2.51, p95=2.51, p99=2.51, min=2.38, max=2.51} │ 'Input rows distribution' = {count=8.00, p01=7363095.00, p05=7363095.00, p10=7363095.00, p25=7416595.00, p50=7479924.00, p75=7646558.00, p90=7746970.00, p95=77469 │ 'Wall time distribution (s)' = {count=8.00, p01=3.14, p05=3.14, p10=3.14, p25=3.19, p50=3.20, p75=3.26, p90=3.27, p95=3.27, p99=3.27, min=3.14, max=3.27} │ DynamicFilterSourceOperator metrics: │ 'CPU time distribution (s)' = {count=8.00, p01=0.04, p05=0.04, p10=0.04, p25=0.04, p50=0.04, p75=0.04, p90=0.05, p95=0.05, p99=0.05, min=0.04, max=0.05} │ 'Input rows distribution' = {count=8.00, p01=7489515.00, p05=7489515.00, p10=7489515.00, p25=7495257.00, p50=7496362.00, p75=7506635.00, p90=7510105.00, p95=75101 │ 'Wall time distribution (s)' = {count=8.00, p01=0.12, p05=0.12, p10=0.12, p25=0.14, p50=0.16, p75=0.19, p90=0.21, p95=0.21, p99=0.21, min=0.12, max=0.21} │ HashBuilderOperator metrics: │ 'CPU time distribution (s)' = {count=8.00, p01=1.21, p05=1.21, p10=1.21, p25=1.23, p50=1.24, p75=1.32, p90=1.40, p95=1.40, p99=1.40, min=1.21, max=1.40} │ 'Input rows distribution' = {count=8.00, p01=7489515.00, p05=7489515.00, p10=7489515.00, p25=7495257.00, p50=7496362.00, p75=7506635.00, p90=7510105.00, p95=75101 │ 'Wall time distribution (s)' = {count=8.00, p01=2.00, p05=2.00, p10=2.00, p25=2.04, p50=2.14, p75=2.21, p90=2.28, p95=2.28, p99=2.28, min=2.00, max=2.28} │ Left (probe) Input avg.: 7498256.50 rows, Input std.dev.: 1.72% │ Right (build) Input avg.: 7498256.50 rows, Input std.dev.: 0.09% │ Collisions avg.: 472153.48 (15.58% est.), Collisions std.dev.: 0.33% │ Distribution: PARTITIONED │ dynamicFilterAssignments = {orderkey_0 -> #df_401} --- .../planprinter/BasicOperatorStats.java | 11 +++++++++ .../planner/planprinter/PlanNodeStats.java | 9 ++++++++ .../planprinter/PlanNodeStatsSummarizer.java | 23 ++++++++++--------- .../sql/planner/planprinter/TextRenderer.java | 21 ++++++++--------- .../AbstractDistributedEngineOnlyQueries.java | 4 +++- 5 files changed, 44 insertions(+), 24 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java index ea8bee223d96..34fa55590fda 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/BasicOperatorStats.java @@ -15,10 +15,12 @@ import io.trino.spi.metrics.Metrics; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; class BasicOperatorStats { + private final boolean inputOperator; private final long totalDrivers; private final long inputPositions; private final double sumSquaredInputPositions; @@ -26,12 +28,14 @@ class BasicOperatorStats private final Metrics connectorMetrics; public BasicOperatorStats( + boolean inputOperator, long totalDrivers, long inputPositions, double sumSquaredInputPositions, Metrics metrics, Metrics connectorMetrics) { + this.inputOperator = inputOperator; this.totalDrivers = totalDrivers; this.inputPositions = inputPositions; this.sumSquaredInputPositions = sumSquaredInputPositions; @@ -39,6 +43,11 @@ public BasicOperatorStats( this.connectorMetrics = requireNonNull(connectorMetrics, "connectorMetrics is null"); } + public boolean isInputOperator() + { + return inputOperator; + } + public long getTotalDrivers() { return totalDrivers; @@ -66,7 +75,9 @@ public Metrics getConnectorMetrics() public static BasicOperatorStats merge(BasicOperatorStats first, BasicOperatorStats second) { + checkArgument(first.inputOperator == second.inputOperator); return new BasicOperatorStats( + first.inputOperator, first.totalDrivers + second.totalDrivers, first.inputPositions + second.inputPositions, first.sumSquaredInputPositions + second.sumSquaredInputPositions, diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java index d6c7659a22b6..e696f0d34162 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStats.java @@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.util.MoreMaps.mergeMaps; import static java.lang.Double.max; @@ -105,6 +106,14 @@ public Set getOperatorTypes() return operatorStats.keySet(); } + public Set getInputOperatorTypes() + { + return operatorStats.entrySet().stream() + .filter(entry -> entry.getValue().isInputOperator()) + .map(Map.Entry::getKey) + .collect(toImmutableSet()); + } + public long getPlanNodeInputPositions() { return planNodeInputPositions; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java index ba60d6d0fd79..9ef49f91ac62 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java @@ -107,18 +107,17 @@ private static List getPlanNodeStats(TaskStats taskStats) planNodeBlockedMillis.merge(planNodeId, operatorStats.getBlockedWall().toMillis(), Long::sum); - // A plan node like LocalExchange consists of LocalExchangeSource which links to another pipeline containing LocalExchangeSink - if (operatorStats.getPlanNodeId().equals(inputPlanNode) && !pipelineStats.isInputPipeline()) { - continue; - } - if (processedNodes.contains(planNodeId)) { - continue; - } + boolean inputOperator = !processedNodes.contains(planNodeId) + // A plan node like LocalExchange consists of LocalExchangeSource which links to another pipeline containing LocalExchangeSink + && (!operatorStats.getPlanNodeId().equals(inputPlanNode) || pipelineStats.isInputPipeline()) + // HACK: skip dynamic filtering operator as for broadcast join it will be executed by single node only + && !operatorStats.getOperatorType().equals("DynamicFilterSourceOperator"); basicOperatorStats.merge(planNodeId, ImmutableMap.of( operatorStats.getOperatorType(), new BasicOperatorStats( + inputOperator, operatorStats.getTotalDrivers(), operatorStats.getInputPositions(), operatorStats.getSumSquaredInputPositions(), @@ -126,10 +125,12 @@ private static List getPlanNodeStats(TaskStats taskStats) operatorStats.getConnectorMetrics())), (map1, map2) -> mergeMaps(map1, map2, BasicOperatorStats::merge)); - planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum); - planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum); - planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum); - processedNodes.add(planNodeId); + if (inputOperator) { + planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum); + planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum); + planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum); + processedNodes.add(planNodeId); + } } // Gather output statistics diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java index 898386e3c4d6..f9a271b69e72 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java @@ -160,15 +160,19 @@ private void printMetrics(StringBuilder output, String label, Function translatedOperatorTypes = translateOperatorTypes(stats.getOperatorTypes()); - for (String operator : translatedOperatorTypes.keySet()) { - String translatedOperatorType = translatedOperatorTypes.get(operator); + Set operatorTypes = stats.getOperatorTypes(); + for (String operator : operatorTypes) { Metrics metrics = metricsGetter.apply(stats.getOperatorStats().get(operator)); if (metrics.getMetrics().isEmpty()) { continue; } - output.append(translatedOperatorType + label).append("\n"); + if (operatorTypes.size() > 1) { + output.append(operator + " " + label + "\n"); + } + else { + output.append(label + "\n"); + } Map> sortedMap = new TreeMap<>(metrics.getMetrics()); sortedMap.forEach((name, metric) -> output.append(format(" '%s' = %s\n", name, metric))); } @@ -178,7 +182,7 @@ private void printDistributions(StringBuilder output, PlanNodeStats stats) { Map inputAverages = stats.getOperatorInputPositionsAverages(); Map inputStdDevs = stats.getOperatorInputPositionsStdDevs(); - Map translatedOperatorTypes = translateOperatorTypes(stats.getOperatorTypes()); + Map translatedOperatorTypes = translateOperatorTypes(stats.getInputOperatorTypes()); for (String operator : translatedOperatorTypes.keySet()) { String translatedOperatorType = translatedOperatorTypes.get(operator); @@ -247,13 +251,6 @@ private static Map translateOperatorTypes(Set operators) "HashBuilderOperator", "Right (build) "); } - if (operators.contains("LookupJoinOperator") && operators.contains("DynamicFilterSourceOperator")) { - // join plan node - return ImmutableMap.of( - "LookupJoinOperator", "Left (probe) ", - "DynamicFilterSourceOperator", "Right (build) "); - } - return ImmutableMap.of(); } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java index 776e9216699e..15ccc8004ac9 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java @@ -225,7 +225,9 @@ public void testExplainAnalyzeVerbose() "EXPLAIN ANALYZE VERBOSE SELECT * FROM nation a", "'Input rows distribution' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", "'CPU time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", - "'Wall time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}"); + "'Wall time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", + // make sure that individual operator stats are reported + "TaskOutputOperator metrics"); } @Test From c819fc7d177a7885aa83bfa6568e2edd3aa8c21a Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 30 Sep 2022 15:31:49 +0200 Subject: [PATCH 3/3] Sum spilled bytes for all operators of a plan node --- .../trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java index 9ef49f91ac62..a094c8dd92bb 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanNodeStatsSummarizer.java @@ -106,6 +106,7 @@ private static List getPlanNodeStats(TaskStats taskStats) planNodeCpuMillis.merge(planNodeId, cpuMillis, Long::sum); planNodeBlockedMillis.merge(planNodeId, operatorStats.getBlockedWall().toMillis(), Long::sum); + planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum); boolean inputOperator = !processedNodes.contains(planNodeId) // A plan node like LocalExchange consists of LocalExchangeSource which links to another pipeline containing LocalExchangeSink @@ -128,7 +129,6 @@ private static List getPlanNodeStats(TaskStats taskStats) if (inputOperator) { planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum); planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum); - planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum); processedNodes.add(planNodeId); } }