Skip to content

Commit 5550fdb

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-5997] Support more sql patterns for deciding bucketed scan dynamically (#975)
* [CARMEL-5997] Support more sql patterns for deciding bucketed scan dynamically * Fix ut
1 parent 0fe346f commit 5550fdb

File tree

2 files changed

+108
-15
lines changed

2 files changed

+108
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package org.apache.spark.sql.execution.bucketing
1919

2020
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashClusteredDistribution}
2121
import org.apache.spark.sql.catalyst.rules.Rule
22-
import org.apache.spark.sql.execution._
22+
import org.apache.spark.sql.execution.{CoalesceExec, FileSourceScanExec, FilterExec, LocalLimitExec, ProjectExec, RebucketingExec, SortExec, SparkPlan, UnionExec, WindowSortLimitExec}
2323
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
2424
import org.apache.spark.sql.execution.exchange.Exchange
25+
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, BroadcastRangeJoinExec}
26+
import org.apache.spark.sql.internal.SQLConf
2527

2628
/**
2729
* Disable unnecessary bucketed table scan based on actual physical query plan.
@@ -81,7 +83,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
8183
*
8284
* @param withInterestingPartition The traversed plan has operator with interesting partition.
8385
* @param withExchange The traversed plan has [[Exchange]] operator.
84-
* @param withAllowedNode The traversed plan has only [[isAllowedUnaryExecNode]] operators.
86+
* @param withAllowedNode The traversed plan has only [[isAllowedExecNode]] operators.
8587
*/
8688
private def disableBucketWithInterestingPartition(
8789
plan: SparkPlan,
@@ -114,7 +116,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
114116
_,
115117
withInterestingPartition,
116118
withExchange,
117-
withAllowedNode && isAllowedUnaryExecNode(o)))
119+
withAllowedNode && isAllowedExecNode(o)))
118120
}
119121
}
120122

@@ -131,14 +133,20 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
131133
}
132134

133135
/**
134-
* Check if the operator is allowed single-child operator.
136+
* Check if the operator is allowed operator.
135137
* We may revisit this method later as we probably can
136138
* remove this restriction to allow arbitrary operator between
137139
* bucketed table scan and operator with interesting partition.
138140
*/
139-
private def isAllowedUnaryExecNode(plan: SparkPlan): Boolean = {
141+
private def isAllowedExecNode(plan: SparkPlan): Boolean = {
140142
plan match {
141-
case _: SortExec | _: ProjectExec | _: FilterExec => true
143+
case _: SortExec | _: ProjectExec | _: FilterExec |
144+
_: BroadcastHashJoinExec |
145+
_: BroadcastNestedLoopJoinExec |
146+
_: BroadcastRangeJoinExec |
147+
_: UnionExec |
148+
_: WindowSortLimitExec |
149+
_: LocalLimitExec => true
142150
case partialAgg: BaseAggregateExec =>
143151
partialAgg.requiredChildDistributionExpressions.isEmpty
144152
case _ => false

sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
7272

7373
def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int,
7474
logEnabled: Boolean = false): Unit = {
75-
val plan = sql(query).queryExecution.executedPlan
75+
val df = sql(query)
76+
df.collect()
77+
78+
val plan = df.queryExecution.executedPlan
7679

7780
if (logEnabled) {
7881
// scalastyle:off println
@@ -157,7 +160,8 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
157160
// }
158161

159162
test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test") {
160-
withSQLConf(SQLConf.ENABLE_REBUCKETING.key -> "true") {
163+
withSQLConf(SQLConf.ENABLE_REBUCKETING.key -> "false",
164+
SQLConf.ENABLE_COALESCE.key -> "true") {
161165
withTable("t1", "t2", "t3") {
162166
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
163167
df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
@@ -174,7 +178,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
174178
"""
175179
SELECT /*+ broadcast(t1) merge(t3)*/ * FROM t1 JOIN t2 JOIN t3
176180
ON t1.i = t2.i AND t2.i = t3.i
177-
""".stripMargin, 3, 3), // TODO 3->2 if ENABLE_REBUCKETING=false
181+
""".stripMargin, 2, 3),
178182
(
179183
"""
180184
SELECT /*+ merge(t1) broadcast(t3)*/ * FROM t1 JOIN t2 JOIN t3
@@ -184,7 +188,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
184188
"""
185189
SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3
186190
ON t1.i = t2.i AND t2.i = t3.i
187-
""".stripMargin, 3, 3), // TODO 3->2 if ENABLE_REBUCKETING=false
191+
""".stripMargin, 3, 3),
188192
// Multiple joins on non-bucketed columns
189193
(
190194
"""
@@ -195,7 +199,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
195199
"""
196200
SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3
197201
ON t1.i = t2.j AND t2.j = t3.i
198-
""".stripMargin, 2, 3), // TODO 2->1 if ENABLE_REBUCKETING=false
202+
""".stripMargin, 2, 3),
199203
(
200204
"""
201205
SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3
@@ -280,12 +284,26 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
280284
(SELECT t2.i FROM t2 GROUP BY t2.i)
281285
""".stripMargin, 1, 2),
282286
// Non-allowed operator in sub-plan
287+
(
288+
"""
289+
SELECT j, COUNT(*)
290+
FROM (SELECT * FROM (SELECT i, j FROM t1 DISTRIBUTE BY i, j) ORDER BY i, j LIMIT 10)
291+
GROUP BY j
292+
""".stripMargin, 1, 1),
293+
// Union all in sub-plan
283294
(
284295
"""
285296
SELECT COUNT(*)
286297
FROM (SELECT t1.i FROM t1 UNION ALL SELECT t2.i FROM t2)
287298
GROUP BY i
288-
""".stripMargin, 2, 2),
299+
""".stripMargin, 0, 2),
300+
// Union in sub-plan
301+
(
302+
"""
303+
SELECT COUNT(*)
304+
FROM (SELECT t1.i FROM t1 UNION SELECT t2.i FROM t2)
305+
GROUP BY i
306+
""".stripMargin, 0, 2),
289307
// Multiple [[Exchange]] in sub-plan
290308
(
291309
"""
@@ -299,14 +317,81 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
299317
GROUP BY j
300318
""".stripMargin, 0, 1),
301319
// No bucketed table scan in plan
320+
(
321+
"""
322+
SELECT i, j, COUNT(*)
323+
FROM (SELECT t1.j, t3.i FROM t1 JOIN t3 ON t1.j = t3.j)
324+
GROUP BY i, j
325+
""".stripMargin, 0, 0),
326+
327+
// Broadcast hash join in plan
328+
(
329+
"""
330+
SELECT i, j, COUNT(*)
331+
FROM (SELECT t1.j, t3.i FROM t1 JOIN t3 ON t1.i = t3.j)
332+
GROUP BY i, j
333+
""".stripMargin, 0, 1),
334+
335+
// Broadcast nested loop join in plan
336+
(
337+
"""
338+
SELECT i, j, COUNT(*)
339+
FROM (SELECT t1.j, t3.i FROM t1 JOIN t3 ON t1.i != t3.j)
340+
GROUP BY i, j
341+
""".stripMargin, 0, 1),
342+
343+
// Broadcast range join in plan
344+
(
345+
"""
346+
SELECT i, j, COUNT(*)
347+
FROM (SELECT t1.j, t3.i FROM t1 JOIN t3 ON t1.i between t3.i and t3.j)
348+
GROUP BY i, j
349+
""".stripMargin, 0, 1),
350+
351+
// Local limit
302352
(
303353
"""
304354
SELECT j, COUNT(*)
305-
FROM (SELECT t1.j FROM t1 JOIN t3 ON t1.j = t3.j)
355+
FROM (SELECT t1.j FROM t1 JOIN t3 ON t1.i = t3.j limit 200)
306356
GROUP BY j
307-
""".stripMargin, 0, 0)
357+
""".stripMargin, 0, 1),
358+
359+
// WindowSortLimitExec
360+
(
361+
"""
362+
SELECT * FROM
363+
(SELECT j, row_number() OVER (partition by j order by i) rn, i
364+
FROM t1) t
365+
WHERE rn <= 3 order by j, rn, i
366+
""".stripMargin, 0, 1),
367+
368+
(
369+
"""
370+
SELECT * FROM
371+
(SELECT j, row_number() OVER (partition by j order by i) rn, i
372+
FROM t1) t
373+
WHERE rn <= 3
374+
""".stripMargin, 0, 1),
375+
376+
(
377+
"""
378+
SELECT * FROM
379+
(SELECT j, row_number() OVER (partition by j order by i) rn, i
380+
FROM t1 order by j) t
381+
WHERE rn <= 3
382+
""".stripMargin, 0, 1),
383+
384+
(
385+
"""
386+
SELECT * FROM
387+
(SELECT j, row_number() OVER (partition by j order by i) rn, i
388+
FROM t1 order by j) t
389+
WHERE rn <= 3 order by j, rn, i
390+
""".stripMargin, 0, 1)
308391
).foreach { case (query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) =>
309-
checkDisableBucketedScan(query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled)
392+
withSQLConf(SQLConf.RANGE_JOIN_ENABLED.key -> "true") {
393+
checkDisableBucketedScan(query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled)
394+
}
310395
}
311396
}
312397
}

0 commit comments

Comments
 (0)