@@ -1022,84 +1022,83 @@ class AdaptiveQueryExecSuite
10221022 }
10231023 }
10241024
1025- test(" SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled" ) {
1025+ test(" SPARK-31220 and SPARK-32056 coalesce partitions for repartition by expressions " +
1026+ " when AQE is enabled" ) {
10261027 Seq (true , false ).foreach { enableAQE =>
10271028 withSQLConf(
10281029 SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> enableAQE.toString,
10291030 SQLConf .COALESCE_PARTITIONS_ENABLED .key -> " true" ,
1030- SQLConf .SHUFFLE_PARTITIONS .key -> " 6" ,
1031- SQLConf .COALESCE_PARTITIONS_INITIAL_PARTITION_NUM .key -> " 7" ) {
1031+ SQLConf .COALESCE_PARTITIONS_INITIAL_PARTITION_NUM .key -> " 10" ,
1032+ SQLConf .SHUFFLE_PARTITIONS .key -> " 10" ) {
1033+
10321034 val df1 = spark.range(10 ).repartition($" id" )
1033- val df2 = spark.range(10 ).repartition(10 , $" id" )
1034- val df3 = spark.range(10 ).repartition(10 )
1035- val df4 = spark.range(10 ).repartitionByRange(10 , $" id" .asc)
1035+ val df2 = spark.range(10 ).repartition($" id" + 1 )
10361036
10371037 val partitionsNum1 = df1.rdd.collectPartitions().length
1038+ val partitionsNum2 = df2.rdd.collectPartitions().length
1039+
10381040 if (enableAQE) {
1039- assert(partitionsNum1 < 6 )
1041+ assert(partitionsNum1 < 10 )
1042+ assert(partitionsNum2 < 10 )
10401043
1044+ // repartition obeys initialPartitionNum when adaptiveExecutionEnabled
10411045 val plan = df1.queryExecution.executedPlan
10421046 assert(plan.isInstanceOf [AdaptiveSparkPlanExec ])
10431047 val shuffle = plan.asInstanceOf [AdaptiveSparkPlanExec ].executedPlan.collect {
10441048 case s : ShuffleExchangeExec => s
10451049 }
10461050 assert(shuffle.size == 1 )
1047- assert(shuffle(0 ).outputPartitioning.numPartitions == 7 )
1051+ assert(shuffle(0 ).outputPartitioning.numPartitions == 10 )
10481052 } else {
1049- assert(partitionsNum1 === 6 )
1053+ assert(partitionsNum1 === 10 )
1054+ assert(partitionsNum2 === 10 )
10501055 }
10511056
1052- assert(df2.rdd.collectPartitions().length == 10 )
1057+
1058+ // Don't coalesce partitions if the number of partitions is specified.
1059+ val df3 = spark.range(10 ).repartition(10 , $" id" )
1060+ val df4 = spark.range(10 ).repartition(10 )
10531061 assert(df3.rdd.collectPartitions().length == 10 )
10541062 assert(df4.rdd.collectPartitions().length == 10 )
10551063 }
10561064 }
10571065 }
10581066
1059- test(" SPARK-32056 coalesce partitions for repartition by expressions when AQE is enabled" ) {
1067+ test(" SPARK-31220 and SPARK-32056 coalesce partitions for repartition by range " +
1068+ " when AQE is enabled" ) {
10601069 Seq (true , false ).foreach { enableAQE =>
10611070 withSQLConf(
10621071 SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> enableAQE.toString,
10631072 SQLConf .COALESCE_PARTITIONS_ENABLED .key -> " true" ,
1064- SQLConf .COALESCE_PARTITIONS_INITIAL_PARTITION_NUM .key -> " 50 " ,
1073+ SQLConf .COALESCE_PARTITIONS_INITIAL_PARTITION_NUM .key -> " 10 " ,
10651074 SQLConf .SHUFFLE_PARTITIONS .key -> " 10" ) {
10661075
1067- val partitionsNum1 = (1 to 10 ).toDF.repartition($" value" )
1068- .rdd.collectPartitions().length
1069- val partitionsNum2 = (1 to 10 ).toDF.repartition($" value" + 1 )
1070- .rdd.collectPartitions().length
1076+ val df1 = spark.range(10 ).toDF.repartitionByRange($" id" .asc)
1077+ val df2 = spark.range(10 ).toDF.repartitionByRange(($" id" + 1 ).asc)
10711078
1072- if (enableAQE) {
1073- assert(partitionsNum1 < 10 )
1074- assert(partitionsNum2 < 10 )
1075- } else {
1076- assert(partitionsNum1 === 10 )
1077- assert(partitionsNum2 === 10 )
1078- }
1079- }
1080- }
1081- }
1082-
1083- test(" SPARK-32056 coalesce partitions for repartition by range when AQE is enabled" ) {
1084- Seq (true , false ).foreach { enableAQE =>
1085- withSQLConf(
1086- SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> enableAQE.toString,
1087- SQLConf .COALESCE_PARTITIONS_ENABLED .key -> " true" ,
1088- SQLConf .COALESCE_PARTITIONS_INITIAL_PARTITION_NUM .key -> " 50" ,
1089- SQLConf .SHUFFLE_PARTITIONS .key -> " 10" ) {
1090-
1091- val partitionsNum1 = (1 to 10 ).toDF.repartitionByRange($" value" .asc)
1092- .rdd.collectPartitions().length
1093- val partitionsNum2 = (1 to 10 ).toDF.repartitionByRange(($" value" + 1 ).asc)
1094- .rdd.collectPartitions().length
1079+ val partitionsNum1 = df1.rdd.collectPartitions().length
1080+ val partitionsNum2 = df2.rdd.collectPartitions().length
10951081
10961082 if (enableAQE) {
10971083 assert(partitionsNum1 < 10 )
10981084 assert(partitionsNum2 < 10 )
1085+
1086+ // repartition obeys initialPartitionNum when adaptiveExecutionEnabled
1087+ val plan = df1.queryExecution.executedPlan
1088+ assert(plan.isInstanceOf [AdaptiveSparkPlanExec ])
1089+ val shuffle = plan.asInstanceOf [AdaptiveSparkPlanExec ].executedPlan.collect {
1090+ case s : ShuffleExchangeExec => s
1091+ }
1092+ assert(shuffle.size == 1 )
1093+ assert(shuffle(0 ).outputPartitioning.numPartitions == 10 )
10991094 } else {
11001095 assert(partitionsNum1 === 10 )
11011096 assert(partitionsNum2 === 10 )
11021097 }
1098+
1099+ // Don't coalesce partitions if the number of partitions is specified.
1100+ val df3 = spark.range(10 ).repartitionByRange(10 , $" id" .asc)
1101+ assert(df3.rdd.collectPartitions().length == 10 )
11031102 }
11041103 }
11051104 }
0 commit comments