Skip to content

Commit 399a211

Browse files
committed
fix
1 parent d0f36bc commit 399a211

File tree

5 files changed

+43
-1
lines changed

5 files changed

+43
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,12 @@ object SQLConf {
552552
.booleanConf
553553
.createWithDefault(true)
554554

555+
val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
556+
.internal()
557+
.doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
558+
.booleanConf
559+
.createWithDefault(true)
560+
555561
val STATE_STORE_PROVIDER_CLASS =
556562
buildConf("spark.sql.streaming.stateStore.providerClass")
557563
.internal()
@@ -932,6 +938,8 @@ class SQLConf extends Serializable with Logging {
932938

933939
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
934940

941+
def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
942+
935943
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
936944

937945
def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.errors._
2525
import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.aggregate._
2727
import org.apache.spark.sql.catalyst.expressions.codegen._
28+
import org.apache.spark.sql.catalyst.plans.QueryPlan
2829
import org.apache.spark.sql.catalyst.plans.physical._
2930
import org.apache.spark.sql.execution._
3031
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,9 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
595595
*/
596596
case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
597597

598+
// Ignore this wrapper for canonicalizing.
599+
override lazy val canonicalized: SparkPlan = child.canonicalized
600+
598601
override lazy val metrics = Map(
599602
"dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
600603
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
156156
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
157157

158158
def apply(plan: SparkPlan): SparkPlan = {
159-
if (!conf.exchangeReuseEnabled) {
159+
if (!conf.subqueryReuseEnabled) {
160160
return plan
161161
}
162162
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ import java.net.{MalformedURLException, URL}
2323
import java.sql.Timestamp
2424
import java.util.concurrent.atomic.AtomicBoolean
2525

26+
import scala.collection.mutable.ArrayBuffer
27+
2628
import org.apache.spark.{AccumulatorSuite, SparkException}
2729
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2830
import org.apache.spark.sql.catalyst.util.StringUtils
31+
import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
2932
import org.apache.spark.sql.execution.aggregate
3033
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
3134
import org.apache.spark.sql.functions._
@@ -700,6 +703,33 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
700703
row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
701704
}
702705

706+
test("Verify spark.sql.subquery.reuse") {
707+
Seq("true", "false").foreach { reuse =>
708+
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse) {
709+
val df = sql(
710+
"""
711+
|SELECT key, (SELECT avg(key) FROM testData)
712+
|FROM testData
713+
|WHERE key > (SELECT avg(key) FROM testData)
714+
""".stripMargin)
715+
val subqueries = ArrayBuffer.empty[SubqueryExec]
716+
df.queryExecution.executedPlan.transformAllExpressions {
717+
case s @ ScalarSubquery(plan: SubqueryExec, _) =>
718+
subqueries += plan
719+
s
720+
}
721+
722+
assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
723+
724+
if (reuse.toBoolean) {
725+
assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
726+
} else {
727+
assert(subqueries.distinct.size == 2, "Reuse is not expected")
728+
}
729+
}
730+
}
731+
}
732+
703733
test("cartesian product join") {
704734
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
705735
checkAnswer(

0 commit comments

Comments
 (0)