Skip to content

Commit 6a4e023

Browse files
committed
[SPARK-20941][SQL] Fix SubqueryExec Reuse
Before this PR, Subquery reuse does not work. Below are three issues: - Subquery reuse does not work. - It is sharing the same `SQLConf` (`spark.sql.exchange.reuse`) with the one for Exchange Reuse. - No test case covers the rule Subquery reuse. This PR is to fix the above three issues. - Ignored the physical operator `SubqueryExec` when comparing two plans. - Added a dedicated conf `spark.sql.subqueries.reuse` for controlling Subquery Reuse - Added a test case for verifying the behavior N/A Author: Xiao Li <[email protected]> Closes #18169 from gatorsmile/subqueryReuse. (cherry picked from commit f7cf209) Signed-off-by: Xiao Li <[email protected]>
1 parent 4ab7b82 commit 6a4e023

File tree

4 files changed

+47
-1
lines changed

4 files changed

+47
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,12 @@ object SQLConf {
552552
.booleanConf
553553
.createWithDefault(true)
554554

555+
val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
556+
.internal()
557+
.doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
558+
.booleanConf
559+
.createWithDefault(true)
560+
555561
val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
556562
buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
557563
.internal()
@@ -921,6 +927,8 @@ class SQLConf extends Serializable with Logging {
921927

922928
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
923929

930+
def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
931+
924932
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
925933

926934
def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,9 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
595595
*/
596596
case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
597597

598+
// Ignore this wrapper for canonicalizing.
599+
override lazy val canonicalized: SparkPlan = child.canonicalized
600+
598601
override lazy val metrics = Map(
599602
"dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
600603
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
156156
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
157157

158158
def apply(plan: SparkPlan): SparkPlan = {
159-
if (!conf.exchangeReuseEnabled) {
159+
if (!conf.subqueryReuseEnabled) {
160160
return plan
161161
}
162162
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ import java.net.{MalformedURLException, URL}
2323
import java.sql.Timestamp
2424
import java.util.concurrent.atomic.AtomicBoolean
2525

26+
import scala.collection.mutable.ArrayBuffer
27+
2628
import org.apache.spark.{AccumulatorSuite, SparkException}
2729
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2830
import org.apache.spark.sql.catalyst.util.StringUtils
31+
import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
2932
import org.apache.spark.sql.execution.aggregate
3033
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
3134
import org.apache.spark.sql.functions._
@@ -708,6 +711,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
708711
row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
709712
}
710713

714+
test("Verify spark.sql.subquery.reuse") {
715+
Seq(true, false).foreach { reuse =>
716+
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
717+
val df = sql(
718+
"""
719+
|SELECT key, (SELECT avg(key) FROM testData)
720+
|FROM testData
721+
|WHERE key > (SELECT avg(key) FROM testData)
722+
|ORDER BY key
723+
|LIMIT 3
724+
""".stripMargin)
725+
726+
checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil)
727+
728+
val subqueries = ArrayBuffer.empty[SubqueryExec]
729+
df.queryExecution.executedPlan.transformAllExpressions {
730+
case s @ ScalarSubquery(plan: SubqueryExec, _) =>
731+
subqueries += plan
732+
s
733+
}
734+
735+
assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
736+
737+
if (reuse) {
738+
assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
739+
} else {
740+
assert(subqueries.distinct.size == 2, "Reuse is not expected")
741+
}
742+
}
743+
}
744+
}
745+
711746
test("cartesian product join") {
712747
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
713748
checkAnswer(

0 commit comments

Comments
 (0)