Skip to content

Commit b373a88

Browse files
Davies Liuyhuai
authored andcommitted
[SPARK-13415][SQL] Visualize subquery in SQL web UI
## What changes were proposed in this pull request? This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen. For example: ```python >>> sqlContext.range(100).registerTempTable("range") >>> sqlContext.sql("select id / (select sum(id) from range) from range where id > (select id from range limit 1)").explain(True) == Parsed Logical Plan == 'Project [unresolvedalias(('id / subquery#9), None)] : +- 'SubqueryAlias subquery#9 : +- 'Project [unresolvedalias('sum('id), None)] : +- 'UnresolvedRelation `range`, None +- 'Filter ('id > subquery#8) : +- 'SubqueryAlias subquery#8 : +- 'GlobalLimit 1 : +- 'LocalLimit 1 : +- 'Project [unresolvedalias('id, None)] : +- 'UnresolvedRelation `range`, None +- 'UnresolvedRelation `range`, None == Analyzed Logical Plan == (id / scalarsubquery()): double Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] : +- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [id#0L] : +- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- SubqueryAlias range +- Range 0, 100, 1, 4, [id#0L] == Optimized Logical Plan == Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] : +- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [id#0L] : +- Range 0, 100, 1, 4, [id#0L] +- Range 0, 100, 1, 4, [id#0L] == Physical Plan == WholeStageCodegen : +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : : +- Subquery subquery#9 : : +- WholeStageCodegen : : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L]) : : : +- INPUT : : +- Exchange SinglePartition, None : : +- WholeStageCodegen : : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L]) : : : +- Range 0, 1, 4, 100, [id#0L] : +- Filter (id#0L > subquery#8) : : +- Subquery subquery#8 : : +- CollectLimit 1 : : +- WholeStageCodegen : : : +- Project [id#0L] : : : +- Range 0, 1, 4, 100, [id#0L] : +- Range 0, 1, 4, 100, [id#0L] ``` The web UI looks like: ![subquery](https://cloud.githubusercontent.com/assets/40902/13377963/932bcbae-dda7-11e5-82f7-03c9be85d77c.png) This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by apache#11403 . ## How was this patch tested? Existing tests, also manual tests with the example query, check the explain and web UI. Author: Davies Liu <[email protected]> Closes apache#11417 from davies/viz_subquery.
1 parent ad0de99 commit b373a88

File tree

9 files changed

+166
-127
lines changed

9 files changed

+166
-127
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,12 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
229229

230230
override def simpleString: String = statePrefix + super.simpleString
231231

232-
override def treeChildren: Seq[PlanType] = {
233-
val subqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e})
234-
children ++ subqueries.map(e => e.plan.asInstanceOf[PlanType])
232+
/**
233+
* All the subqueries of current plan.
234+
*/
235+
def subqueries: Seq[PlanType] = {
236+
expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]})
235237
}
238+
239+
override def innerChildren: Seq[PlanType] = subqueries
236240
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,52 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
447447

448448
/**
449449
* All the nodes that will be used to generate tree string.
450+
*
451+
* For example:
452+
*
453+
* WholeStageCodegen
454+
* +-- SortMergeJoin
455+
* |-- InputAdapter
456+
* | +-- Sort
457+
* +-- InputAdapter
458+
* +-- Sort
459+
*
460+
* the treeChildren of WholeStageCodegen will be Seq(Sort, Sort), it will generate a tree string
461+
* like this:
462+
*
463+
* WholeStageCodegen
464+
* : +- SortMergeJoin
465+
* : :- INPUT
466+
* : :- INPUT
467+
* :- Sort
468+
* :- Sort
450469
*/
451470
protected def treeChildren: Seq[BaseType] = children
452471

472+
/**
473+
* All the nodes that are parts of this node.
474+
*
475+
* For example:
476+
*
477+
* WholeStageCodegen
478+
* +- SortMergeJoin
479+
* |-- InputAdapter
480+
* | +-- Sort
481+
* +-- InputAdapter
482+
* +-- Sort
483+
*
484+
* the innerChildren of WholeStageCodegen will be Seq(SortMergeJoin), it will generate a tree
485+
* string like this:
486+
*
487+
* WholeStageCodegen
488+
* : +- SortMergeJoin
489+
* : :- INPUT
490+
* : :- INPUT
491+
* :- Sort
492+
* :- Sort
493+
*/
494+
protected def innerChildren: Seq[BaseType] = Nil
495+
453496
/**
454497
* Appends the string represent of this node and its children to the given StringBuilder.
455498
*
@@ -472,6 +515,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
472515
builder.append(simpleString)
473516
builder.append("\n")
474517

518+
if (innerChildren.nonEmpty) {
519+
innerChildren.init.foreach(_.generateTreeString(
520+
depth + 2, lastChildren :+ false :+ false, builder))
521+
innerChildren.last.generateTreeString(depth + 2, lastChildren :+ false :+ true, builder)
522+
}
523+
475524
if (treeChildren.nonEmpty) {
476525
treeChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
477526
treeChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,8 @@ class SparkPlanInfo(
3636
private[sql] object SparkPlanInfo {
3737

3838
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
39-
val children = plan match {
40-
case WholeStageCodegen(child, _) => child :: Nil
41-
case InputAdapter(child) => child :: Nil
42-
case plan => plan.children
43-
}
39+
40+
val children = plan.children ++ plan.subqueries
4441
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
4542
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
4643
Utils.getFormattedClassName(metric.param))

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala

Lines changed: 45 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
2220
import org.apache.spark.broadcast
2321
import org.apache.spark.rdd.RDD
2422
import org.apache.spark.sql.SQLContext
@@ -29,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
2927
import org.apache.spark.sql.catalyst.rules.Rule
3028
import org.apache.spark.sql.catalyst.util.toCommentSafeString
3129
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
32-
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, BuildLeft, BuildRight, SortMergeJoin}
30+
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
3331
import org.apache.spark.sql.execution.metric.LongSQLMetricValue
3432

3533
/**
@@ -163,16 +161,12 @@ trait CodegenSupport extends SparkPlan {
163161
* This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
164162
* an RDD iterator of InternalRow.
165163
*/
166-
case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
164+
case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport {
167165

168166
override def output: Seq[Attribute] = child.output
169167
override def outputPartitioning: Partitioning = child.outputPartitioning
170168
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
171169

172-
override def doPrepare(): Unit = {
173-
child.prepare()
174-
}
175-
176170
override def doExecute(): RDD[InternalRow] = {
177171
child.execute()
178172
}
@@ -181,8 +175,6 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
181175
child.doExecuteBroadcast()
182176
}
183177

184-
override def supportCodegen: Boolean = false
185-
186178
override def upstreams(): Seq[RDD[InternalRow]] = {
187179
child.execute() :: Nil
188180
}
@@ -210,6 +202,8 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
210202
}
211203

212204
override def simpleString: String = "INPUT"
205+
206+
override def treeChildren: Seq[SparkPlan] = Nil
213207
}
214208

215209
/**
@@ -243,30 +237,23 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
243237
* doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
244238
* used to generated code for BoundReference.
245239
*/
246-
case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
247-
extends SparkPlan with CodegenSupport {
248-
249-
override def supportCodegen: Boolean = false
250-
251-
override def output: Seq[Attribute] = plan.output
252-
override def outputPartitioning: Partitioning = plan.outputPartitioning
253-
override def outputOrdering: Seq[SortOrder] = plan.outputOrdering
240+
case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport {
254241

255-
override def doPrepare(): Unit = {
256-
plan.prepare()
257-
}
242+
override def output: Seq[Attribute] = child.output
243+
override def outputPartitioning: Partitioning = child.outputPartitioning
244+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
258245

259246
override def doExecute(): RDD[InternalRow] = {
260247
val ctx = new CodegenContext
261-
val code = plan.produce(ctx, this)
248+
val code = child.asInstanceOf[CodegenSupport].produce(ctx, this)
262249
val references = ctx.references.toArray
263250
val source = s"""
264251
public Object generate(Object[] references) {
265252
return new GeneratedIterator(references);
266253
}
267254

268255
/** Codegened pipeline for:
269-
* ${toCommentSafeString(plan.treeString.trim)}
256+
* ${toCommentSafeString(child.treeString.trim)}
270257
*/
271258
class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
272259

@@ -294,7 +281,7 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
294281
// println(s"${CodeFormatter.format(cleanedSource)}")
295282
CodeGenerator.compile(cleanedSource)
296283

297-
val rdds = plan.upstreams()
284+
val rdds = child.asInstanceOf[CodegenSupport].upstreams()
298285
assert(rdds.size <= 2, "Up to two upstream RDDs can be supported")
299286
if (rdds.length == 1) {
300287
rdds.head.mapPartitions { iter =>
@@ -361,34 +348,17 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
361348
}
362349
}
363350

364-
private[sql] override def resetMetrics(): Unit = {
365-
plan.foreach(_.resetMetrics())
351+
override def innerChildren: Seq[SparkPlan] = {
352+
child :: Nil
366353
}
367354

368-
override def generateTreeString(
369-
depth: Int,
370-
lastChildren: Seq[Boolean],
371-
builder: StringBuilder): StringBuilder = {
372-
if (depth > 0) {
373-
lastChildren.init.foreach { isLast =>
374-
val prefixFragment = if (isLast) " " else ": "
375-
builder.append(prefixFragment)
376-
}
377-
378-
val branch = if (lastChildren.last) "+- " else ":- "
379-
builder.append(branch)
380-
}
381-
382-
builder.append(simpleString)
383-
builder.append("\n")
384-
385-
plan.generateTreeString(depth + 2, lastChildren :+ false :+ true, builder)
386-
if (children.nonEmpty) {
387-
children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
388-
children.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
389-
}
355+
private def collectInputs(plan: SparkPlan): Seq[SparkPlan] = plan match {
356+
case InputAdapter(c) => c :: Nil
357+
case other => other.children.flatMap(collectInputs)
358+
}
390359

391-
builder
360+
override def treeChildren: Seq[SparkPlan] = {
361+
collectInputs(child)
392362
}
393363

394364
override def simpleString: String = "WholeStageCodegen"
@@ -416,27 +386,34 @@ private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Ru
416386
case _ => false
417387
}
418388

389+
/**
390+
* Inserts a InputAdapter on top of those that do not support codegen.
391+
*/
392+
private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
393+
case j @ SortMergeJoin(_, _, _, left, right) =>
394+
// The children of SortMergeJoin should do codegen separately.
395+
j.copy(left = InputAdapter(insertWholeStageCodegen(left)),
396+
right = InputAdapter(insertWholeStageCodegen(right)))
397+
case p if !supportCodegen(p) =>
398+
// collapse them recursively
399+
InputAdapter(insertWholeStageCodegen(p))
400+
case p =>
401+
p.withNewChildren(p.children.map(insertInputAdapter))
402+
}
403+
404+
/**
405+
* Inserts a WholeStageCodegen on top of those that support codegen.
406+
*/
407+
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match {
408+
case plan: CodegenSupport if supportCodegen(plan) =>
409+
WholeStageCodegen(insertInputAdapter(plan))
410+
case other =>
411+
other.withNewChildren(other.children.map(insertWholeStageCodegen))
412+
}
413+
419414
def apply(plan: SparkPlan): SparkPlan = {
420415
if (sqlContext.conf.wholeStageEnabled) {
421-
plan.transform {
422-
case plan: CodegenSupport if supportCodegen(plan) =>
423-
var inputs = ArrayBuffer[SparkPlan]()
424-
val combined = plan.transform {
425-
// The build side can't be compiled together
426-
case b @ BroadcastHashJoin(_, _, _, BuildLeft, _, left, right) =>
427-
b.copy(left = apply(left))
428-
case b @ BroadcastHashJoin(_, _, _, BuildRight, _, left, right) =>
429-
b.copy(right = apply(right))
430-
case j @ SortMergeJoin(_, _, _, left, right) =>
431-
// The children of SortMergeJoin should do codegen separately.
432-
j.copy(left = apply(left), right = apply(right))
433-
case p if !supportCodegen(p) =>
434-
val input = apply(p) // collapse them recursively
435-
inputs += input
436-
InputAdapter(input)
437-
}.asInstanceOf[CodegenSupport]
438-
WholeStageCodegen(combined, inputs)
439-
}
416+
insertWholeStageCodegen(plan)
440417
} else {
441418
plan
442419
}

sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
2424
import org.apache.spark.sql._
2525
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.Attribute
27+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2728
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
2829
import org.apache.spark.sql.internal.SQLConf
2930

@@ -68,7 +69,7 @@ package object debug {
6869
}
6970
}
7071

71-
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
72+
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode with CodegenSupport {
7273
def output: Seq[Attribute] = child.output
7374

7475
implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
@@ -86,10 +87,11 @@ package object debug {
8687
/**
8788
* A collection of metrics for each column of output.
8889
* @param elementTypes the actual runtime types for the output. Useful when there are bugs
89-
* causing the wrong data to be projected.
90+
* causing the wrong data to be projected.
9091
*/
9192
case class ColumnMetrics(
92-
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
93+
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
94+
9395
val tupleCount: Accumulator[Int] = sparkContext.accumulator[Int](0)
9496

9597
val numColumns: Int = child.output.size
@@ -98,7 +100,7 @@ package object debug {
98100
def dumpStats(): Unit = {
99101
logDebug(s"== ${child.simpleString} ==")
100102
logDebug(s"Tuples output: ${tupleCount.value}")
101-
child.output.zip(columnStats).foreach { case(attr, metric) =>
103+
child.output.zip(columnStats).foreach { case (attr, metric) =>
102104
val actualDataTypes = metric.elementTypes.value.mkString("{", ",", "}")
103105
logDebug(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
104106
}
@@ -108,6 +110,7 @@ package object debug {
108110
child.execute().mapPartitions { iter =>
109111
new Iterator[InternalRow] {
110112
def hasNext: Boolean = iter.hasNext
113+
111114
def next(): InternalRow = {
112115
val currentRow = iter.next()
113116
tupleCount += 1
@@ -124,5 +127,17 @@ package object debug {
124127
}
125128
}
126129
}
130+
131+
override def upstreams(): Seq[RDD[InternalRow]] = {
132+
child.asInstanceOf[CodegenSupport].upstreams()
133+
}
134+
135+
override def doProduce(ctx: CodegenContext): String = {
136+
child.asInstanceOf[CodegenSupport].produce(ctx, this)
137+
}
138+
139+
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
140+
consume(ctx, input)
141+
}
127142
}
128143
}

0 commit comments

Comments
 (0)