Skip to content

Commit f47fecf

Browse files
committed
Merge branch 'master' into toStringNPE
2 parents 76199c6 + a9ec033 commit f47fecf

File tree

9 files changed

+332
-66
lines changed

9 files changed

+332
-66
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
2020
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.plans.QueryPlan
23-
import org.apache.spark.sql.catalyst.types.StructType
23+
import org.apache.spark.sql.catalyst.types.{StringType, StructType}
2424
import org.apache.spark.sql.catalyst.trees
2525

2626
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
@@ -102,7 +102,7 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
102102
*/
103103
abstract class Command extends LeafNode {
104104
self: Product =>
105-
def output = Seq.empty
105+
def output: Seq[Attribute] = Seq.empty
106106
}
107107

108108
/**
@@ -115,7 +115,9 @@ case class NativeCommand(cmd: String) extends Command
115115
* Returned by a parser when the users only wants to see what query plan would be executed, without
116116
* actually performing the execution.
117117
*/
118-
case class ExplainCommand(plan: LogicalPlan) extends Command
118+
case class ExplainCommand(plan: LogicalPlan) extends Command {
119+
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
120+
}
119121

120122
/**
121123
* A logical plan node with single child.

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
191191
val sparkContext = self.sparkContext
192192

193193
val strategies: Seq[Strategy] =
194+
CommandStrategy(self) ::
194195
TakeOrdered ::
195196
PartialAggregation ::
196197
LeftSemiJoin ::
@@ -256,6 +257,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
256257
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
257258
}
258259

260+
// TODO: or should we make QueryExecution protected[sql]?
261+
protected[sql] def mkQueryExecution(plan: LogicalPlan) = new QueryExecution {
262+
val logical = plan
263+
}
264+
259265
/**
260266
* The primary workflow for executing relational queries using Spark. Designed to allow easy
261267
* access to the intermediate phases of query execution for developers.
@@ -285,11 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
285291
|== Physical Plan ==
286292
|${stringOrError(executedPlan)}
287293
""".stripMargin.trim
288-
289-
/**
290-
* Runs the query after interposing operators that print the result of each intermediate step.
291-
*/
292-
def debugExec() = DebugQuery(executedPlan).execute().collect()
293294
}
294295

295296
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
233233
case _ => Nil
234234
}
235235
}
236+
237+
// TODO: this should be merged with SPARK-1508's SetCommandStrategy
238+
case class CommandStrategy(context: SQLContext) extends Strategy {
239+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
240+
case logical.ExplainCommand(child) =>
241+
val qe = context.mkQueryExecution(child)
242+
Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
243+
case _ => Nil
244+
}
245+
}
246+
236247
}

sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,16 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
private[sql] object DebugQuery {
21-
def apply(plan: SparkPlan): SparkPlan = {
22-
val visited = new collection.mutable.HashSet[Long]()
23-
plan transform {
24-
case s: SparkPlan if !visited.contains(s.id) =>
25-
visited += s.id
26-
DebugNode(s)
27-
}
28-
}
29-
}
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.sql.{SQLContext, Row}
22+
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
3023

31-
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
32-
def references = Set.empty
33-
def output = child.output
34-
def execute() = {
35-
val childRdd = child.execute()
36-
println(
37-
s"""
38-
|=========================
39-
|${child.simpleString}
40-
|=========================
41-
""".stripMargin)
42-
childRdd.foreach(println(_))
43-
childRdd
24+
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
25+
(@transient context: SQLContext) extends UnaryNode {
26+
def execute(): RDD[Row] = {
27+
val planString = new GenericRow(Array[Any](child.toString))
28+
context.sparkContext.parallelize(Seq(planString))
4429
}
30+
31+
override def otherCopyArgs = context :: Nil
4532
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import scala.collection.mutable.HashSet
21+
22+
import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
23+
import org.apache.spark.annotation.DeveloperApi
24+
import org.apache.spark.SparkContext._
25+
import org.apache.spark.sql.{SchemaRDD, Row}
26+
27+
/**
28+
* :: DeveloperApi ::
29+
* Contains methods for debugging query execution.
30+
*
31+
* Usage:
32+
* {{{
33+
* sql("SELECT key FROM src").debug
34+
* }}}
35+
*/
36+
package object debug {
37+
38+
/**
39+
* :: DeveloperApi ::
40+
* Augments SchemaRDDs with debug methods.
41+
*/
42+
@DeveloperApi
43+
implicit class DebugQuery(query: SchemaRDD) {
44+
def debug(implicit sc: SparkContext): Unit = {
45+
val plan = query.queryExecution.executedPlan
46+
val visited = new collection.mutable.HashSet[Long]()
47+
val debugPlan = plan transform {
48+
case s: SparkPlan if !visited.contains(s.id) =>
49+
visited += s.id
50+
DebugNode(sc, s)
51+
}
52+
println(s"Results returned: ${debugPlan.execute().count()}")
53+
debugPlan.foreach {
54+
case d: DebugNode => d.dumpStats()
55+
case _ =>
56+
}
57+
}
58+
}
59+
60+
private[sql] case class DebugNode(
61+
@transient sparkContext: SparkContext,
62+
child: SparkPlan) extends UnaryNode {
63+
def references = Set.empty
64+
65+
def output = child.output
66+
67+
implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
68+
def zero(initialValue: HashSet[String]): HashSet[String] = {
69+
initialValue.clear()
70+
initialValue
71+
}
72+
73+
def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
74+
v1 ++= v2
75+
v1
76+
}
77+
}
78+
79+
/**
80+
* A collection of stats for each column of output.
81+
* @param elementTypes the actual runtime types for the output. Useful when there are bugs
82+
* causing the wrong data to be projected.
83+
*/
84+
case class ColumnStat(
85+
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
86+
val tupleCount = sparkContext.accumulator[Int](0)
87+
88+
val numColumns = child.output.size
89+
val columnStats = Array.fill(child.output.size)(new ColumnStat())
90+
91+
def dumpStats(): Unit = {
92+
println(s"== ${child.simpleString} ==")
93+
println(s"Tuples output: ${tupleCount.value}")
94+
child.output.zip(columnStats).foreach { case(attr, stat) =>
95+
val actualDataTypes =stat.elementTypes.value.mkString("{", ",", "}")
96+
println(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
97+
}
98+
}
99+
100+
def execute() = {
101+
child.execute().mapPartitions { iter =>
102+
new Iterator[Row] {
103+
def hasNext = iter.hasNext
104+
def next() = {
105+
val currentRow = iter.next()
106+
tupleCount += 1
107+
var i = 0
108+
while (i < numColumns) {
109+
val value = currentRow(i)
110+
columnStats(i).elementTypes += HashSet(value.getClass.getName)
111+
i += 1
112+
}
113+
currentRow
114+
}
115+
}
116+
}
117+
}
118+
}
119+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
218218
val hiveContext = self
219219

220220
override val strategies: Seq[Strategy] = Seq(
221+
CommandStrategy(self),
221222
TakeOrdered,
222223
ParquetOperations,
223224
HiveTableScans,
@@ -304,7 +305,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
304305
*/
305306
def stringResult(): Seq[String] = analyzed match {
306307
case NativeCommand(cmd) => runSqlHive(cmd)
307-
case ExplainCommand(plan) => new QueryExecution { val logical = plan }.toString.split("\n")
308+
case ExplainCommand(plan) => mkQueryExecution(plan).toString.split("\n")
308309
case query =>
309310
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
310311
// We need the types so we can output struct field names

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive.execution
1919

2020
import org.apache.spark.sql.hive.test.TestHive._
21+
import org.apache.spark.sql.hive.test.TestHive
2122

2223
/**
2324
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
@@ -159,4 +160,15 @@ class HiveQuerySuite extends HiveComparisonTest {
159160
hql("SHOW TABLES").toString
160161
hql("SELECT * FROM src").toString
161162
}
163+
164+
test("SPARK-1704: Explain commands as a SchemaRDD") {
165+
hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
166+
val rdd = hql("explain select key, count(value) from src group by key")
167+
assert(rdd.collect().size == 1)
168+
assert(rdd.toString.contains("ExplainCommand"))
169+
assert(rdd.filter(row => row.toString.contains("ExplainCommand")).collect().size == 0,
170+
"actual contents of the result should be the plans of the query to be explained")
171+
TestHive.reset()
172+
}
173+
162174
}

0 commit comments

Comments
 (0)