Skip to content

Commit 986e56c

Browse files
Andrew Orjeanlyn
authored andcommitted
[SPARK-7469] [SQL] DAG visualization: show SQL query operators
The DAG visualization currently displays only low-level Spark primitives (e.g. `map`, `reduceByKey`, `filter` etc.). For SQL, these aren't particularly useful. Instead, we should display higher level physical operators (e.g. `Filter`, `Exchange`, `ShuffleHashJoin`). cc marmbrus ----------------- **Before** <img src="https://issues.apache.org/jira/secure/attachment/12731586/before.png" width="600px"/> ----------------- **After** (Pay attention to the words) <img src="https://issues.apache.org/jira/secure/attachment/12731587/after.png" width="600px"/> ----------------- Author: Andrew Or <[email protected]> Closes apache#5999 from andrewor14/dag-viz-sql and squashes the following commits: 0db23a4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql 1e211db [Andrew Or] Update comment 0d49fd6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql ffd237a [Andrew Or] Fix style 202dac1 [Andrew Or] Make ignoreParent false by default e61b1ab [Andrew Or] Visualize SQL operators, not low-level Spark primitives 569034a [Andrew Or] Add a flag to ignore parent settings and scopes
1 parent 4197a47 commit 986e56c

28 files changed

+79
-56
lines changed

core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,25 +102,33 @@ private[spark] object RDDOperationScope {
102102
/**
103103
* Execute the given body such that all RDDs created in this body will have the same scope.
104104
*
105-
* If nesting is allowed, this concatenates the previous scope with the new one in a way that
106-
* signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
107-
* this method executed in the body will have no effect.
105+
* If nesting is allowed, any subsequent calls to this method in the given body will instantiate
106+
* child scopes that are nested within our scope. Otherwise, these calls will take no effect.
107+
*
108+
* Additionally, the caller of this method may optionally ignore the configurations and scopes
109+
* set by the higher level caller. In this case, this method will ignore the parent caller's
110+
* intention to disallow nesting, and the new scope instantiated will not have a parent. This
111+
* is useful for scoping physical operations in Spark SQL, for instance.
108112
*
109113
* Note: Return statements are NOT allowed in body.
110114
*/
111115
private[spark] def withScope[T](
112116
sc: SparkContext,
113117
name: String,
114-
allowNesting: Boolean)(body: => T): T = {
118+
allowNesting: Boolean,
119+
ignoreParent: Boolean = false)(body: => T): T = {
115120
// Save the old scope to restore it later
116121
val scopeKey = SparkContext.RDD_SCOPE_KEY
117122
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
118123
val oldScopeJson = sc.getLocalProperty(scopeKey)
119124
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
120125
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
121126
try {
122-
// Set the scope only if the higher level caller allows us to do so
123-
if (sc.getLocalProperty(noOverrideKey) == null) {
127+
if (ignoreParent) {
128+
// Ignore all parent settings and scopes and start afresh with our own root scope
129+
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
130+
} else if (sc.getLocalProperty(noOverrideKey) == null) {
131+
// Otherwise, set the scope only if the higher level caller allows us to do so
124132
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
125133
}
126134
// Optionally disallow the child body to override our scope

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan(
267267

268268
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
269269

270-
override def execute(): RDD[Row] = {
270+
protected override def doExecute(): RDD[Row] = {
271271
if (enableAccumulators) {
272272
readPartitions.setValue(0)
273273
readBatches.setValue(0)

sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ case class Aggregate(
121121
}
122122
}
123123

124-
override def execute(): RDD[Row] = attachTree(this, "execute") {
124+
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
125125
if (groupingExpressions.isEmpty) {
126126
child.execute().mapPartitions { iter =>
127127
val buffer = newAggregateBuffer()

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ case class Exchange(
109109
serializer
110110
}
111111

112-
override def execute(): RDD[Row] = attachTree(this , "execute") {
112+
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
113113
newPartitioning match {
114114
case HashPartitioning(expressions, numPartitions) =>
115115
// TODO: Eliminate redundant expressions in grouping key and value.

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon
106106

107107
/** Physical plan node for scanning data from an RDD. */
108108
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
109-
override def execute(): RDD[Row] = rdd
109+
protected override def doExecute(): RDD[Row] = rdd
110110
}
111111

112112
/** Logical plan node for scanning data from a local collection. */

sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ case class Expand(
4343
// as UNKNOWN partitioning
4444
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
4545

46-
override def execute(): RDD[Row] = attachTree(this, "execute") {
46+
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
4747
child.execute().mapPartitions { iter =>
4848
// TODO Move out projection objects creation and transfer to
4949
// workers via closure. However we can't assume the Projection

sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ case class Generate(
4646

4747
val boundGenerator = BindReferences.bindReference(generator, child.output)
4848

49-
override def execute(): RDD[Row] = {
49+
protected override def doExecute(): RDD[Row] = {
5050
if (join) {
5151
child.execute().mapPartitions { iter =>
5252
val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))

sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class GeneratedAggregate(
6666

6767
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
6868

69-
override def execute(): RDD[Row] = {
69+
protected override def doExecute(): RDD[Row] = {
7070
val aggregatesToCompute = aggregateExpressions.flatMap { a =>
7171
a.collect { case agg: AggregateExpression => agg}
7272
}

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e
3030

3131
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
3232

33-
override def execute(): RDD[Row] = rdd
33+
protected override def doExecute(): RDD[Row] = rdd
3434

3535

3636
override def executeCollect(): Array[Row] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.Logging
22-
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.rdd.{RDD, RDDOperationScope}
2323
import org.apache.spark.sql.SQLContext
2424
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
2525
import org.apache.spark.sql.catalyst.expressions._
@@ -79,14 +79,25 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
7979
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
8080

8181
/**
82-
* Runs this query returning the result as an RDD.
82+
* Returns the result of this query as an RDD[Row] by delegating to doExecute
83+
* after adding query plan information to created RDDs for visualization.
84+
* Concrete implementations of SparkPlan should override doExecute instead.
8385
*/
84-
def execute(): RDD[Row]
86+
final def execute(): RDD[Row] = {
87+
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
88+
doExecute()
89+
}
90+
}
8591

8692
/**
87-
* Runs this query returning the result as an array.
93+
* Overridden by concrete implementations of SparkPlan.
94+
* Produces the result of the query as an RDD[Row]
8895
*/
96+
protected def doExecute(): RDD[Row]
8997

98+
/**
99+
* Runs this query returning the result as an array.
100+
*/
90101
def executeCollect(): Array[Row] = {
91102
execute().mapPartitions { iter =>
92103
val converter = CatalystTypeConverters.createToScalaConverter(schema)

0 commit comments

Comments
 (0)