Skip to content

Commit e61b1ab

Browse files
author
Andrew Or
committed
Visualize SQL operators, not low-level Spark primitives
1 parent 569034a commit e61b1ab

28 files changed

+58
-52
lines changed

core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,19 @@ private[spark] object RDDOperationScope {
121121
val scopeKey = SparkContext.RDD_SCOPE_KEY
122122
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
123123
val oldScopeJson = sc.getLocalProperty(scopeKey)
124+
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
124125
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
125126
try {
126127
if (ignoreParent) {
127128
// Ignore all parent settings and scopes and start afresh with our own root scope
128129
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
129-
} else {
130+
} else if (sc.getLocalProperty(noOverrideKey) == null) {
130131
// Otherwise, set the scope only if the higher level caller allows us to do so
131-
if (sc.getLocalProperty(noOverrideKey) == null) {
132-
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
133-
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
134-
}
135-
// Optionally disallow the child body to override our scope
136-
if (!allowNesting) {
137-
sc.setLocalProperty(noOverrideKey, "true")
138-
}
132+
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
133+
}
134+
// Optionally disallow the child body to override our scope
135+
if (!allowNesting) {
136+
sc.setLocalProperty(noOverrideKey, "true")
139137
}
140138
body
141139
} finally {

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan(
267267

268268
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
269269

270-
override def execute(): RDD[Row] = {
270+
protected override def doExecute(): RDD[Row] = {
271271
if (enableAccumulators) {
272272
readPartitions.setValue(0)
273273
readBatches.setValue(0)

sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ case class Aggregate(
121121
}
122122
}
123123

124-
override def execute(): RDD[Row] = attachTree(this, "execute") {
124+
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
125125
if (groupingExpressions.isEmpty) {
126126
child.execute().mapPartitions { iter =>
127127
val buffer = newAggregateBuffer()

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ case class Exchange(
109109
serializer
110110
}
111111

112-
override def execute(): RDD[Row] = attachTree(this , "execute") {
112+
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
113113
newPartitioning match {
114114
case HashPartitioning(expressions, numPartitions) =>
115115
// TODO: Eliminate redundant expressions in grouping key and value.

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon
106106

107107
/** Physical plan node for scanning data from an RDD. */
108108
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
109-
override def execute(): RDD[Row] = rdd
109+
protected override def doExecute(): RDD[Row] = rdd
110110
}
111111

112112
/** Logical plan node for scanning data from a local collection. */

sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ case class Expand(
4343
// as UNKNOWN partitioning
4444
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
4545

46-
override def execute(): RDD[Row] = attachTree(this, "execute") {
46+
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
4747
child.execute().mapPartitions { iter =>
4848
// TODO Move out projection objects creation and transfer to
4949
// workers via closure. However we can't assume the Projection

sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ case class Generate(
4646

4747
val boundGenerator = BindReferences.bindReference(generator, child.output)
4848

49-
override def execute(): RDD[Row] = {
49+
protected override def doExecute(): RDD[Row] = {
5050
if (join) {
5151
child.execute().mapPartitions { iter =>
5252
val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))

sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class GeneratedAggregate(
6666

6767
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
6868

69-
override def execute(): RDD[Row] = {
69+
protected override def doExecute(): RDD[Row] = {
7070
val aggregatesToCompute = aggregateExpressions.flatMap { a =>
7171
a.collect { case agg: AggregateExpression => agg}
7272
}

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e
3030

3131
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
3232

33-
override def execute(): RDD[Row] = rdd
33+
protected override def doExecute(): RDD[Row] = rdd
3434

3535

3636
override def executeCollect(): Array[Row] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.Logging
22-
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.rdd.{RDD, RDDOperationScope}
2323
import org.apache.spark.sql.SQLContext
2424
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
2525
import org.apache.spark.sql.catalyst.expressions._
@@ -81,12 +81,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
8181
/**
8282
* Runs this query returning the result as an RDD.
8383
*/
84-
def execute(): RDD[Row]
84+
final def execute(): RDD[Row] = {
85+
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
86+
doExecute()
87+
}
88+
}
8589

8690
/**
87-
* Runs this query returning the result as an array.
91+
* Runs this query returning the result as an RDD.
8892
*/
93+
protected def doExecute(): RDD[Row]
8994

95+
/**
96+
* Runs this query returning the result as an array.
97+
*/
9098
def executeCollect(): Array[Row] = {
9199
execute().mapPartitions { iter =>
92100
val converter = CatalystTypeConverters.createToScalaConverter(schema)

0 commit comments

Comments
 (0)