Skip to content

Commit cbeaf9e

Browse files
committed
[SPARK-6376][SQL] Avoid eliminating subqueries until optimization
Previously it was okay to throw away subqueries after analysis, as we would never try to use that tree for resolution again. However, with eager analysis in `DataFrame`s this can cause errors for queries such as: ```scala val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count() ``` As a result, in this PR we defer the elimination of subqueries until the optimization phase. Author: Michael Armbrust <[email protected]> Closes #5160 from marmbrus/subqueriesInDfs and squashes the following commits: a9bb262 [Michael Armbrust] Update Optimizer.scala 27d25bf [Michael Armbrust] fix hive tests 9137e03 [Michael Armbrust] add type 81cd597 [Michael Armbrust] Avoid eliminating subqueries until optimization
1 parent 046c1e2 commit cbeaf9e

File tree

9 files changed

+34
-17
lines changed

9 files changed

+34
-17
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,7 @@ class Analyzer(catalog: Catalog,
6464
UnresolvedHavingClauseAttributes ::
6565
TrimGroupingAliases ::
6666
typeCoercionRules ++
67-
extendedResolutionRules : _*),
68-
Batch("Remove SubQueries", fixedPoint,
69-
EliminateSubQueries)
67+
extendedResolutionRules : _*)
7068
)
7169

7270
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
2222
import scala.language.implicitConversions
2323
import scala.reflect.runtime.universe.{TypeTag, typeTag}
2424

25-
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, UnresolvedAttribute}
25+
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, UnresolvedAttribute}
2626
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
@@ -289,7 +289,7 @@ package object dsl {
289289
InsertIntoTable(
290290
analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)
291291

292-
def analyze = analysis.SimpleAnalyzer(logicalPlan)
292+
def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
293293
}
294294

295295
object plans { // scalastyle:ignore

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.optimizer
1919

2020
import scala.collection.immutable.HashSet
21+
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.plans.Inner
2324
import org.apache.spark.sql.catalyst.plans.FullOuter
@@ -32,6 +33,9 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan]
3233

3334
object DefaultOptimizer extends Optimizer {
3435
val batches =
36+
// SubQueries are only needed for analysis and can be removed before execution.
37+
Batch("Remove SubQueries", FixedPoint(100),
38+
EliminateSubQueries) ::
3539
Batch("Combine Limits", FixedPoint(100),
3640
CombineLimits) ::
3741
Batch("ConstantFolding", FixedPoint(100),

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.Logging
2121
import org.apache.spark.sql.AnalysisException
22-
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
22+
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, Resolver}
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans.QueryPlan
2525
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -73,12 +73,16 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
7373
* can do better should override this function.
7474
*/
7575
def sameResult(plan: LogicalPlan): Boolean = {
76-
plan.getClass == this.getClass &&
77-
plan.children.size == children.size && {
78-
logDebug(s"[${cleanArgs.mkString(", ")}] == [${plan.cleanArgs.mkString(", ")}]")
79-
cleanArgs == plan.cleanArgs
76+
val cleanLeft = EliminateSubQueries(this)
77+
val cleanRight = EliminateSubQueries(plan)
78+
79+
cleanLeft.getClass == cleanRight.getClass &&
80+
cleanLeft.children.size == cleanRight.children.size && {
81+
logDebug(
82+
s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
83+
cleanRight.cleanArgs == cleanLeft.cleanArgs
8084
} &&
81-
(plan.children, children).zipped.forall(_ sameResult _)
85+
(cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
8286
}
8387

8488
/** Args that have cleaned such that differences in expression id should not affect equality */

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
3232
val caseInsensitiveCatalog = new SimpleCatalog(false)
3333

3434
val caseSensitiveAnalyzer =
35-
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
35+
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) {
36+
override val extendedResolutionRules = EliminateSubQueries :: Nil
37+
}
3638
val caseInsensitiveAnalyzer =
37-
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
39+
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) {
40+
override val extendedResolutionRules = EliminateSubQueries :: Nil
41+
}
3842

3943
val checkAnalysis = new CheckAnalysis
4044

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ class DataFrameSuite extends QueryTest {
108108
)
109109
}
110110

111+
test("self join with aliases") {
112+
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
113+
checkAnswer(
114+
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
115+
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
116+
}
117+
111118
test("explode") {
112119
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
113120
val df2 =

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
3434
test("equi-join is hash-join") {
3535
val x = testData2.as("x")
3636
val y = testData2.as("y")
37-
val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.analyzed
37+
val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.optimizedPlan
3838
val planned = planner.HashJoin(join)
3939
assert(planned.size === 1)
4040
}
@@ -109,7 +109,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
109109
test("multiple-key equi-join is hash-join") {
110110
val x = testData2.as("x")
111111
val y = testData2.as("y")
112-
val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.analyzed
112+
val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.optimizedPlan
113113
val planned = planner.HashJoin(join)
114114
assert(planned.size === 1)
115115
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
579579
Row(3) :: Row(4) :: Nil
580580
)
581581

582-
table("test_parquet_ctas").queryExecution.analyzed match {
582+
table("test_parquet_ctas").queryExecution.optimizedPlan match {
583583
case LogicalRelation(p: ParquetRelation2) => // OK
584584
case _ =>
585585
fail(

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
292292
Seq(Row(1, "str1"))
293293
)
294294

295-
table("test_parquet_ctas").queryExecution.analyzed match {
295+
table("test_parquet_ctas").queryExecution.optimizedPlan match {
296296
case LogicalRelation(p: ParquetRelation2) => // OK
297297
case _ =>
298298
fail(

0 commit comments

Comments
 (0)