Skip to content

Commit 0ad343a

Browse files
committed
Added physical plan for DDL and commands to ensure the "exactly once" semantics
1 parent fe78b8b commit 0ad343a

File tree

10 files changed

+166
-138
lines changed

10 files changed

+166
-138
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
2121
import org.apache.spark.sql.catalyst.types.StringType
2222

2323
/**
@@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
2626
*/
2727
abstract class Command extends LeafNode {
2828
self: Product =>
29-
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
29+
def output: Seq[Attribute] = Seq.empty
3030
}
3131

3232
/**
3333
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
3434
* commands that are passed directly to another system.
3535
*/
36-
case class NativeCommand(cmd: String) extends Command
36+
case class NativeCommand(cmd: String) extends Command {
37+
override def output =
38+
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
39+
}
3740

3841
/**
3942
* Commands of the form "SET (key) (= value)".
4043
*/
4144
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
4245
override def output = Seq(
43-
AttributeReference("key", StringType, nullable = false)(),
44-
AttributeReference("value", StringType, nullable = false)()
45-
)
46+
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
47+
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
4648
}
4749

4850
/**
4951
* Returned by a parser when the users only wants to see what query plan would be executed, without
5052
* actually performing the execution.
5153
*/
5254
case class ExplainCommand(plan: LogicalPlan) extends Command {
53-
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
55+
override def output =
56+
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
5457
}
5558

5659
/**
5760
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
5861
*/
5962
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
60-

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {
161161

162162
comparePlans(optimized, correctAnswer)
163163
}
164-
164+
165165
test("joins: push down left outer join #1") {
166166
val x = testRelation.subquery('x)
167167
val y = testRelation.subquery('y)

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.types._
3333
import org.apache.spark.sql.catalyst.optimizer.Optimizer
34-
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
34+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3535
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3636

3737
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
@@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
147147
*
148148
* @group userf
149149
*/
150-
def sql(sqlText: String): SchemaRDD = {
151-
val result = new SchemaRDD(this, parseSql(sqlText))
152-
// We force query optimization to happen right away instead of letting it happen lazily like
153-
// when using the query DSL. This is so DDL commands behave as expected. This is only
154-
// generates the RDD lineage for DML queries, but do not perform any execution.
155-
result.queryExecution.toRdd
156-
result
157-
}
150+
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
158151

159152
/** Returns the specified table as a SchemaRDD */
160153
def table(tableName: String): SchemaRDD =
@@ -280,35 +273,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
280273
protected abstract class QueryExecution {
281274
def logical: LogicalPlan
282275

283-
def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
284-
case SetCommand(key, value) =>
285-
// Only this case needs to be executed eagerly. The other cases will
286-
// be taken care of when the actual results are being extracted.
287-
// In the case of HiveContext, sqlConf is overridden to also pass the
288-
// pair into its HiveConf.
289-
if (key.isDefined && value.isDefined) {
290-
set(key.get, value.get)
291-
}
292-
// It doesn't matter what we return here, since this is only used
293-
// to force the evaluation to happen eagerly. To query the results,
294-
// one must use SchemaRDD operations to extract them.
295-
emptyResult
296-
case _ => executedPlan.execute()
297-
}
298-
299276
lazy val analyzed = analyzer(logical)
300277
lazy val optimizedPlan = optimizer(analyzed)
301278
// TODO: Don't just pick the first one...
302279
lazy val sparkPlan = planner(optimizedPlan).next()
303280
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
304281

305282
/** Internal version of the RDD. Avoids copies and has no schema */
306-
lazy val toRdd: RDD[Row] = {
307-
logical match {
308-
case s: SetCommand => eagerlyProcess(s)
309-
case _ => executedPlan.execute()
310-
}
311-
}
283+
lazy val toRdd: RDD[Row] = executedPlan.execute()
312284

313285
protected def stringOrError[A](f: => A): String =
314286
try f.toString catch { case e: Throwable => e.toString }
@@ -330,7 +302,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
330302
* TODO: We only support primitive types, add support for nested types.
331303
*/
332304
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
333-
val schema = rdd.first.map { case (fieldName, obj) =>
305+
val schema = rdd.first().map { case (fieldName, obj) =>
334306
val dataType = obj.getClass match {
335307
case c: Class[_] if c == classOf[java.lang.String] => StringType
336308
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ private[sql] trait SchemaRDDLike {
5050
@DeveloperApi
5151
lazy val queryExecution = sqlContext.executePlan(logicalPlan)
5252

53+
logicalPlan match {
54+
// For various commands (like DDL) and queries with side effects, we force query optimization to
55+
// happen right away to let these side effects take place eagerly.
56+
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
57+
queryExecution.toRdd
58+
case _ =>
59+
}
60+
5361
override def toString =
5462
s"""${super.toString}
5563
|== Query Plan ==

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD
2222
import org.apache.spark.sql.{SQLContext, Row}
2323
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
2424

25+
trait PhysicalCommand {
26+
/**
27+
* A concrete command should override this lazy field to wrap up any side effects caused by the
28+
* command or any other computation that should be evaluated exactly once. The value of this field
29+
* can be used as the contents of the corresponding RDD generated from the physical plan of this
30+
* command.
31+
*
32+
* The `execute()` method of all the physical command classes should reference `sideEffect` so
33+
* that the command can be executed eagerly right after the command query is created.
34+
*/
35+
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
36+
}
37+
2538
/**
2639
* :: DeveloperApi ::
2740
*/
2841
@DeveloperApi
29-
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
30-
(@transient context: SQLContext) extends LeafNode {
31-
def execute(): RDD[Row] = (key, value) match {
32-
// Set value for key k; the action itself would
33-
// have been performed in QueryExecution eagerly.
34-
case (Some(k), Some(v)) => context.emptyResult
42+
case class SetCommandPhysical(
43+
key: Option[String], value: Option[String], output: Seq[Attribute])(
44+
@transient context: SQLContext)
45+
extends LeafNode with PhysicalCommand {
46+
47+
override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
48+
// Set value for key k.
49+
case (Some(k), Some(v)) =>
50+
context.set(k, v)
51+
Array.empty[(String, String)]
52+
3553
// Query the value bound to key k.
36-
case (Some(k), None) =>
37-
val resultString = context.getOption(k) match {
38-
case Some(v) => s"$k=$v"
39-
case None => s"$k is undefined"
40-
}
41-
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
54+
case (Some(k), _) =>
55+
Array(k -> context.getOption(k).getOrElse("<undefined>"))
56+
4257
// Query all key-value pairs that are set in the SQLConf of the context.
4358
case (None, None) =>
44-
val pairs = context.getAll
45-
val rows = pairs.map { case (k, v) =>
46-
new GenericRow(Array[Any](s"$k=$v"))
47-
}.toSeq
48-
// Assume config parameters can fit into one split (machine) ;)
49-
context.sparkContext.parallelize(rows, 1)
50-
// The only other case is invalid semantics and is impossible.
51-
case _ => context.emptyResult
59+
context.getAll
60+
61+
case _ =>
62+
throw new IllegalArgumentException()
5263
}
64+
65+
def execute(): RDD[Row] = {
66+
val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
67+
context.sparkContext.parallelize(rows, 1)
68+
}
69+
70+
override def otherCopyArgs = context :: Nil
5371
}
5472

5573
/**
5674
* :: DeveloperApi ::
5775
*/
5876
@DeveloperApi
59-
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
60-
(@transient context: SQLContext) extends UnaryNode {
77+
case class ExplainCommandPhysical(
78+
child: SparkPlan, output: Seq[Attribute])(
79+
@transient context: SQLContext)
80+
extends UnaryNode with PhysicalCommand {
81+
82+
// Actually "EXPLAIN" command doesn't cause any side effect.
83+
override protected[sql] lazy val sideEffectResult: Seq[String] = child.toString.split("\n")
84+
6185
def execute(): RDD[Row] = {
62-
val planString = new GenericRow(Array[Any](child.toString))
63-
context.sparkContext.parallelize(Seq(planString))
86+
val explanation = sideEffectResult.mkString("\n")
87+
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
6488
}
6589

6690
override def otherCopyArgs = context :: Nil
@@ -71,18 +95,19 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
7195
*/
7296
@DeveloperApi
7397
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
74-
extends LeafNode {
98+
extends LeafNode with PhysicalCommand {
7599

76-
lazy val commandSideEffect = {
100+
override protected[sql] lazy val sideEffectResult = {
77101
if (doCache) {
78102
context.cacheTable(tableName)
79103
} else {
80104
context.uncacheTable(tableName)
81105
}
106+
Seq.empty[Any]
82107
}
83108

84109
override def execute(): RDD[Row] = {
85-
commandSideEffect
110+
sideEffectResult
86111
context.emptyResult
87112
}
88113

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class SQLQuerySuite extends QueryTest {
141141
sql("SELECT AVG(a),b FROM largeAndSmallInts group by b"),
142142
Seq((2147483645.0,1),(2.0,2)))
143143
}
144-
144+
145145
test("count") {
146146
checkAnswer(
147147
sql("SELECT COUNT(*) FROM testData2"),
@@ -332,7 +332,7 @@ class SQLQuerySuite extends QueryTest {
332332
(3, "C"),
333333
(4, "D")))
334334
}
335-
335+
336336
test("system function upper()") {
337337
checkAnswer(
338338
sql("SELECT n,UPPER(l) FROM lowerCaseData"),
@@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest {
349349
(2, "ABC"),
350350
(3, null)))
351351
}
352-
352+
353353
test("system function lower()") {
354354
checkAnswer(
355355
sql("SELECT N,LOWER(L) FROM upperCaseData"),
@@ -382,25 +382,25 @@ class SQLQuerySuite extends QueryTest {
382382
sql(s"SET $testKey=$testVal")
383383
checkAnswer(
384384
sql("SET"),
385-
Seq(Seq(s"$testKey=$testVal"))
385+
Seq(Seq(testKey, testVal))
386386
)
387387

388388
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
389389
checkAnswer(
390390
sql("set"),
391391
Seq(
392-
Seq(s"$testKey=$testVal"),
393-
Seq(s"${testKey + testKey}=${testVal + testVal}"))
392+
Seq(testKey, testVal),
393+
Seq(testKey + testKey, testVal + testVal))
394394
)
395395

396396
// "set key"
397397
checkAnswer(
398398
sql(s"SET $testKey"),
399-
Seq(Seq(s"$testKey=$testVal"))
399+
Seq(Seq(testKey, testVal))
400400
)
401401
checkAnswer(
402402
sql(s"SET $nonexistentKey"),
403-
Seq(Seq(s"$nonexistentKey is undefined"))
403+
Seq(Seq(nonexistentKey, "<undefined>"))
404404
)
405405
clear()
406406
}

0 commit comments

Comments
 (0)