Skip to content

Commit d177485

Browse files
wangyumGitHub Enterprise
authored andcommitted
[HADP-55008] Repartition before writing also support sort columns (apache#546)
1 parent eda8544 commit d177485

File tree

3 files changed

+45
-8
lines changed

3 files changed

+45
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5136,10 +5136,10 @@ object SQLConf {
51365136
.createWithDefault(2000)
51375137

51385138
val AUTO_REPARTITION_BEFORE_WRITING_ENABLED =
5139-
buildConf("spark.carmel.sql.repartition.writing.enabled")
5139+
buildConf("spark.sql.repartition.writing.enabled")
51405140
.internal()
51415141
.doc("When true, add a shuffle before writing data into partitioned table or bucket table.")
5142-
.version("4.0.0")
5142+
.version("3.5.0")
51435143
.booleanConf
51445144
.createWithDefault(false)
51455145

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RepartitionBeforeWriting.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
21-
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, HasPartitionExpressions, LogicalPlan, RebalancePartitions, RepartitionByExpression, RepartitionOperation}
20+
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, SortOrder}
21+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, HasPartitionExpressions, LogicalPlan, RebalancePartitions, RepartitionByExpression, RepartitionOperation, Sort}
2222
import org.apache.spark.sql.catalyst.rules.Rule
2323
import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform}
2424
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -34,6 +34,7 @@ object RepartitionBeforeWriting extends Rule[LogicalPlan] {
3434

3535
def buildRepartition(
3636
attributes: Seq[Attribute],
37+
sortCols: Seq[Attribute],
3738
query: LogicalPlan,
3839
numPartitions: Option[Int] = None): LogicalPlan = {
3940
query.collectFirst { case r: RepartitionOperation => r } match {
@@ -43,7 +44,11 @@ object RepartitionBeforeWriting extends Rule[LogicalPlan] {
4344
case _ =>
4445
val repartitionOper = new RepartitionByExpression(attributes, query, numPartitions, None)
4546
repartitionOper.setTagValue(LogicalPlan.REMOVABLE_PLAN_TAG, true)
46-
repartitionOper
47+
if (sortCols.isEmpty) {
48+
repartitionOper
49+
} else {
50+
Sort(sortCols.map(SortOrder(_, Ascending)), global = false, repartitionOper)
51+
}
4752
}
4853
}
4954

@@ -72,7 +77,7 @@ object RepartitionBeforeWriting extends Rule[LogicalPlan] {
7277
numPartitions: Option[Int] = None): LogicalPlan = {
7378
val attributes =
7479
query.outputSet.filter(attr => parts.exists(part => query.conf.resolver(part, attr.name)))
75-
buildRepartition(attributes.toSeq, query, numPartitions)
80+
buildRepartition(attributes.toSeq, Nil, query, numPartitions)
7681
}
7782

7883
private def buildRebalanceForV2(
@@ -100,7 +105,11 @@ object RepartitionBeforeWriting extends Rule[LogicalPlan] {
100105
if (bucketColumns.isEmpty) {
101106
i
102107
} else {
103-
i.copy(query = buildRepartition(bucketColumns, query, Some(bucket.numBuckets)))
108+
val sortCols =
109+
bucket.sortColumnNames
110+
.map { col => query.resolve(Seq(col), resolver).get.toAttribute }
111+
.filterNot(_.foldable)
112+
i.copy(query = buildRepartition(bucketColumns, sortCols, query, Some(bucket.numBuckets)))
104113
}
105114

106115
case i @ InsertIntoHadoopFsRelationCommand(_, sp, _, pc, None, _, _, query, _, _, _, _)

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.{InsertOperationLockUtil, QueryTest, _}
2929
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
3030
import org.apache.spark.sql.catalyst.parser.ParseException
3131
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
32-
import org.apache.spark.sql.execution.CommandResultExec
32+
import org.apache.spark.sql.execution.{CommandResultExec, SortExec}
3333
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}
3434
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
3535
import org.apache.spark.sql.hive.execution.HiveTempPath
@@ -962,6 +962,28 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
962962
assert(exprs.map(_.asInstanceOf[NamedExpression].name).mkString(", ") == "a")
963963
}
964964
}
965+
966+
withTable("part_tab") {
967+
sql(s"CREATE TABLE part_tab(a int, b int) $dataSource PARTITIONED BY (b)")
968+
val df = sql(
969+
"""
970+
|INSERT INTO part_tab
971+
|SELECT a,
972+
| b
973+
|FROM src_tab distribute by b,
974+
| cast(rand() * 10 AS int)
975+
|""".stripMargin)
976+
977+
df.collect()
978+
val adaptivePlan =
979+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec]
980+
.commandPhysicalPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
981+
val shuffles = collect(adaptivePlan) { case s: ShuffleExchangeExec => s }
982+
assert(shuffles.length == 1)
983+
shuffles.foreach { s =>
984+
assert(s.outputPartitioning.asInstanceOf[HashPartitioning].expressions.size === 2)
985+
}
986+
}
965987
}
966988
}
967989
}
@@ -997,6 +1019,12 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
9971019
} else {
9981020
assert(exprs.map(_.asInstanceOf[NamedExpression].name).mkString(", ") == "a")
9991021
}
1022+
1023+
val sorts = collect(adaptivePlan) { case s: SortExec => s }
1024+
sorts.foreach { s =>
1025+
assert(s.sortOrder.size === 2)
1026+
assert(s.sortOrder.flatMap(_.references).map(_.name).mkString(", ") === "c, a")
1027+
}
10001028
}
10011029
}
10021030
}

0 commit comments

Comments
 (0)