Skip to content

Commit 6c33839

Browse files
author
himadripal
committed
SPJ : fix bucket reducer function
minimized the test remove order by
1 parent 25b03f9 commit 6c33839

File tree

2 files changed

+60
-10
lines changed

2 files changed

+60
-10
lines changed

sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala

+57-7
Original file line numberDiff line numberDiff line change
@@ -1390,7 +1390,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
13901390
}
13911391
}
13921392

1393-
test("SPARK-44647: test join key is subset of cluster key " +
1393+
test("SPARK-44647: SPJ: test join key is subset of cluster key " +
13941394
"with push values and partially-clustered") {
13951395
val table1 = "tab1e1"
13961396
val table2 = "table2"
@@ -1487,7 +1487,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
14871487
}
14881488
}
14891489

1490-
test("SPARK-47094: Support compatible buckets") {
1490+
test("SPARK-47094: SPJ: Support compatible buckets") {
14911491
val table1 = "tab1e1"
14921492
val table2 = "table2"
14931493

@@ -1580,11 +1580,11 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
15801580
val shuffles = collectShuffles(df.queryExecution.executedPlan)
15811581
assert(shuffles.isEmpty, "SPJ should be triggered")
15821582

1583-
val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
1583+
val partions = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
15841584
partitions.length)
15851585
val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
15861586
Math.min(table1buckets2, table2buckets2)
1587-
assert(scans == Seq(expectedBuckets, expectedBuckets))
1587+
assert(partions == Seq(expectedBuckets, expectedBuckets))
15881588

15891589
checkAnswer(df, Seq(
15901590
Row(0, 0, "aa", "aa"),
@@ -1647,7 +1647,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
16471647
}
16481648
}
16491649

1650-
test("SPARK-47094: Support compatible buckets with common divisor") {
1650+
test("SPARK-47094: SPJ:Support compatible buckets with common divisor") {
16511651
val table1 = "tab1e1"
16521652
val table2 = "table2"
16531653

@@ -1744,9 +1744,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
17441744
partitions.length)
17451745

17461746
def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
1747-
val expectedBuckets = gcd(table1buckets1, table2buckets1) *
1747+
val expectedPartitions = gcd(table1buckets1, table2buckets1) *
17481748
gcd(table1buckets2, table2buckets2)
1749-
assert(scans == Seq(expectedBuckets, expectedBuckets))
1749+
assert(scans == Seq(expectedPartitions, expectedPartitions))
17501750

17511751
checkAnswer(df, Seq(
17521752
Row(0, 0, "aa", "aa"),
@@ -1809,6 +1809,56 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
18091809
}
18101810
}
18111811

1812+
1813+
test("SPARK-47094: SPJ: Does not trigger when incompatible number of buckets on both side") {
1814+
val table1 = "tab1e1"
1815+
val table2 = "table2"
1816+
1817+
Seq(
1818+
(2, 3),
1819+
(3, 4)
1820+
).foreach {
1821+
case (table1buckets1, table2buckets1) =>
1822+
catalog.clearTables()
1823+
1824+
val partition1 = Array(bucket(table1buckets1, "store_id"))
1825+
val partition2 = Array(bucket(table2buckets1, "store_id"))
1826+
1827+
Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
1828+
createTable(tab, columns2, part)
1829+
val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
1830+
"(0, 0, 'aa'), " +
1831+
"(1, 0, 'ab'), " + // duplicate partition key
1832+
"(2, 2, 'ac'), " +
1833+
"(3, 3, 'ad'), " +
1834+
"(4, 2, 'bc') "
1835+
1836+
sql(insertStr)
1837+
}
1838+
1839+
Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
1840+
withSQLConf(
1841+
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
1842+
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
1843+
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
1844+
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
1845+
allowJoinKeysSubsetOfPartitionKeys.toString,
1846+
SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
1847+
val df = sql(
1848+
s"""
1849+
|${selectWithMergeJoinHint("t1", "t2")}
1850+
|t1.store_id, t1.dept_id, t1.data, t2.data
1851+
|FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
1852+
|ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
1853+
|""".stripMargin)
1854+
1855+
val shuffles = collectShuffles(df.queryExecution.executedPlan)
1856+
assert(shuffles.nonEmpty, "SPJ should not be triggered")
1857+
}
1858+
}
1859+
}
1860+
}
1861+
18121862
test("SPARK-47094: Support compatible buckets with less join keys than partition keys") {
18131863
val table1 = "tab1e1"
18141864
val table2 = "table2"

sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In
101101

102102
if (otherFunc == BucketFunction) {
103103
val gcd = this.gcd(thisNumBuckets, otherNumBuckets)
104-
if (gcd != thisNumBuckets) {
105-
return BucketReducer(thisNumBuckets, gcd)
104+
if (gcd > 1 && gcd != thisNumBuckets) {
105+
return BucketReducer(gcd)
106106
}
107107
}
108108
null
@@ -111,7 +111,7 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In
111111
private def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
112112
}
113113

114-
case class BucketReducer(thisNumBuckets: Int, divisor: Int) extends Reducer[Int, Int] {
114+
case class BucketReducer(divisor: Int) extends Reducer[Int, Int] {
115115
override def reduce(bucket: Int): Int = bucket % divisor
116116
}
117117

0 commit comments

Comments
 (0)