From bc06c1dbdd4982025d493140e079f6497430a86b Mon Sep 17 00:00:00 2001 From: Unknown Date: Thu, 10 Oct 2019 18:11:02 +0200 Subject: [PATCH 1/2] [SPARK-29431][WebUI] Improve Sql tab cached dataframes --- .../spark/sql/execution/SparkPlanInfo.scala | 13 +++++- .../sql/execution/ui/SparkPlanGraph.scala | 2 +- .../sql/execution/ui/SparkPlanInfoSuite.scala | 46 +++++++++++++++++++ 3 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 5b72ec058e127..9dc541ef19dda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, LocalShuffleReaderExec, QueryStageExec} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.sql.internal.SQLConf @@ -33,7 +34,8 @@ class SparkPlanInfo( val simpleString: String, val children: Seq[SparkPlanInfo], val metadata: Map[String, String], - val metrics: Seq[SQLMetricInfo]) { + val metrics: Seq[SQLMetricInfo], + val relation: Seq[SparkPlanInfo] = Seq()) { override def hashCode(): Int = { // hashCode of simpleString should be good enough to distinguish the plans from each other @@ -67,11 +69,18 @@ private[execution] object SparkPlanInfo { case fileScan: FileSourceScanExec => fileScan.metadata case _ => Map[String, String]() } + + val relation = plan match { + case inMemTab: InMemoryTableScanExec => Seq(fromSparkPlan(inMemTab.relation.cachedPlan)) + case _ => Seq() + } + new SparkPlanInfo( plan.nodeName, plan.simpleString(SQLConf.get.maxToStringFields), children.map(fromSparkPlan), metadata, - metrics) + metrics, + relation) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index d31d77840b802..30f1b3e65f602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -140,7 +140,7 @@ object SparkPlanGraph { if (parent != null) { edges += SparkPlanGraphEdge(node.id, parent.id) } - planInfo.children.foreach( + planInfo.children.union(planInfo.relation).foreach( buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph, exchanges)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala new file mode 100644 index 0000000000000..8588e0fcbb260 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.test.SharedSparkSession + +class SparkPlanInfoSuite extends SharedSparkSession{ + + import testImplicits._ + + def vaidateSparkPlanInfoRelation(sparkPlanInfo: SparkPlanInfo): Unit = { + sparkPlanInfo.nodeName match { + case "InMemoryTableScan" => assert(sparkPlanInfo.relation.length == 1) + case _ => + assert(sparkPlanInfo.relation.length == 0) + sparkPlanInfo.children.foreach(vaidateSparkPlanInfoRelation) + } + } + + test("SparkPlanInfo creation from SparkPlan with InMemoryTableScan node") { + val dfWithCache = Seq( + (1, 1), + (2, 2) + ).toDF().filter("_1 > 1").cache().repartition(10) + + val planInfoResult = SparkPlanInfo.fromSparkPlan(dfWithCache.queryExecution.executedPlan) + + vaidateSparkPlanInfoRelation(planInfoResult) + } +} From c383de4aed07e7b8e97fd6e83bc8b2896ff2f3f3 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Sat, 15 Feb 2020 13:48:16 +0100 Subject: [PATCH 2/2] [SPARK-29431][WebUI] Add inmemorytablescan relation to sparkInfo child nodes --- .../apache/spark/sql/execution/SparkPlanInfo.scala | 13 +++---------- .../spark/sql/execution/ui/SparkPlanGraph.scala | 2 +- .../spark/sql/execution/ui/SparkPlanInfoSuite.scala | 10 ++++------ 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 9dc541ef19dda..a8a7d665dbce1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -34,8 +34,7 @@ class SparkPlanInfo( val simpleString: String, val children: Seq[SparkPlanInfo], val metadata: Map[String, String], - val metrics: Seq[SQLMetricInfo], - val relation: Seq[SparkPlanInfo] = Seq()) { + val metrics: Seq[SQLMetricInfo]) { override def hashCode(): Int = { // hashCode of simpleString should be good enough to distinguish the plans from each other @@ -58,6 +57,7 @@ private[execution] object SparkPlanInfo { case ReusedSubqueryExec(child) => child :: Nil case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil + case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => @@ -69,18 +69,11 @@ private[execution] object SparkPlanInfo { case fileScan: FileSourceScanExec => fileScan.metadata case _ => Map[String, String]() } - - val relation = plan match { - case inMemTab: InMemoryTableScanExec => Seq(fromSparkPlan(inMemTab.relation.cachedPlan)) - case _ => Seq() - } - new SparkPlanInfo( plan.nodeName, plan.simpleString(SQLConf.get.maxToStringFields), children.map(fromSparkPlan), metadata, - metrics, - relation) + metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 30f1b3e65f602..d31d77840b802 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -140,7 +140,7 @@ object SparkPlanGraph { if (parent != null) { edges += SparkPlanGraphEdge(node.id, parent.id) } - planInfo.children.union(planInfo.relation).foreach( + planInfo.children.foreach( buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph, exchanges)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala index 8588e0fcbb260..a702e00ff9f92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala @@ -24,12 +24,10 @@ class SparkPlanInfoSuite extends SharedSparkSession{ import testImplicits._ - def vaidateSparkPlanInfoRelation(sparkPlanInfo: SparkPlanInfo): Unit = { + def vaidateSparkPlanInfo(sparkPlanInfo: SparkPlanInfo): Unit = { sparkPlanInfo.nodeName match { - case "InMemoryTableScan" => assert(sparkPlanInfo.relation.length == 1) - case _ => - assert(sparkPlanInfo.relation.length == 0) - sparkPlanInfo.children.foreach(vaidateSparkPlanInfoRelation) + case "InMemoryTableScan" => assert(sparkPlanInfo.children.length == 1) + case _ => sparkPlanInfo.children.foreach(vaidateSparkPlanInfo) } } @@ -41,6 +39,6 @@ class SparkPlanInfoSuite extends SharedSparkSession{ val planInfoResult = SparkPlanInfo.fromSparkPlan(dfWithCache.queryExecution.executedPlan) - vaidateSparkPlanInfoRelation(planInfoResult) + vaidateSparkPlanInfo(planInfoResult) } }