From bb43535cfc94a8dd6ba65be916905c0c0a2d003c Mon Sep 17 00:00:00 2001 From: subham611 Date: Sun, 12 May 2024 10:29:04 +0530 Subject: [PATCH 1/8] Add support for custom partitionCoalescer --- .../sql/catalyst/optimizer/Optimizer.scala | 8 +++++--- .../plans/logical/basicLogicalOperators.scala | 8 +++++++- .../scala/org/apache/spark/sql/Dataset.scala | 12 +++++------- .../spark/sql/execution/SparkStrategies.scala | 4 ++-- .../sql/execution/basicPhysicalOperators.scala | 17 ++++++++++++----- 5 files changed, 31 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dfc1e17c2a29..fcae302d25d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1088,7 +1088,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { case Project(l1, limit @ LocalLimit(_, p2 @ Project(l2, _))) if isRenaming(l1, l2) => val newProjectList = buildCleanedProjectList(l1, l2) limit.copy(child = p2.copy(projectList = newProjectList)) - case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) => + case Project(l1, r @ Repartition(_, _, p @ Project(l2, _), _)) if isRenaming(l1, l2) => r.copy(child = p.copy(projectList = buildCleanedProjectList(l1, p.projectList))) case Project(l1, s @ Sample(_, _, _, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) => s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, p2.projectList))) @@ -1272,8 +1272,10 @@ object CollapseRepartition extends Rule[LogicalPlan] { // enables the shuffle. Returns the child node if the last numPartitions is bigger; // otherwise, keep unchanged. // 2) In the other cases, returns the top node with the child's child - case r @ Repartition(_, _, child: RepartitionOperation) => (r.shuffle, child.shuffle) match { - case (false, true) => if (r.numPartitions >= child.numPartitions) child else r + case r @ Repartition(_, _, child: RepartitionOperation, coalescer) => + (r.shuffle, child.shuffle) match { + case (false, true) => + if (coalescer.isEmpty && r.numPartitions >= child.numPartitions) child else r case _ => r.copy(child = child.child) } // Case 2: When a RepartitionByExpression has a child of global Sort, Repartition or diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 9242a06cf1d6..12c64cf53314 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.rdd.PartitionCoalescer import org.apache.spark.sql.catalyst.{AliasIdentifier, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} @@ -1841,9 +1842,14 @@ abstract class RepartitionOperation extends UnaryNode { * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer * of the output requires some specific ordering or distribution of the data. */ -case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) +case class Repartition(numPartitions: Int, + shuffle: Boolean, + child: LogicalPlan, + coalescer: Option[PartitionCoalescer] = None) extends RepartitionOperation { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + require(!shuffle || coalescer.isEmpty, + "Custom coalescer is not allowed for repartition(shuffle=true)") override def partitioning: Partitioning = { require(shuffle, "Partitioning can only be used in shuffle.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3e843e64ebbf..c8c7c07abb32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -18,17 +18,14 @@ package org.apache.spark.sql import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} - import scala.annotation.varargs import scala.collection.mutable.{ArrayBuffer, HashSet} import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal - import org.apache.commons.lang3.StringUtils import org.apache.commons.text.StringEscapeUtils - import org.apache.spark.TaskContext import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable} import org.apache.spark.api.java.JavaRDD @@ -36,14 +33,14 @@ import org.apache.spark.api.java.function._ import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.api.r.RRDD import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PartitionCoalescer, RDD} import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QueryPlanningTracker, ScalaReflection, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} +import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonGenerator} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -3832,8 +3829,9 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { - Repartition(numPartitions, shuffle = false, logicalPlan) + def coalesce(numPartitions: Int, + userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f0682e6b9afc..ca2d8370fe14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -939,11 +939,11 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, oAttr, planLater(left), planLater(right)) :: Nil - case r @ logical.Repartition(numPartitions, shuffle, child) => + case r @ logical.Repartition(numPartitions, shuffle, child, coalescer) => if (shuffle) { ShuffleExchangeExec(r.partitioning, planLater(child), REPARTITION_BY_NUM) :: Nil } else { - execution.CoalesceExec(numPartitions, planLater(child)) :: Nil + execution.CoalesceExec(numPartitions, planLater(child), coalescer) :: Nil } case logical.Sort(sortExprs, global, child) => execution.SortExec(sortExprs, global, planLater(child)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 083858e4fe80..0db01ce5f120 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql.execution import java.util.concurrent.{Future => JFuture} import java.util.concurrent.TimeUnit._ - import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration - import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} -import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD} +import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences @@ -727,8 +725,17 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { * you see ShuffleExchange. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). + * + * If you want to define how to coalesce partitions, you can set a custom strategy + * to coalesce partitions in `coalescer`. + * + * @param numPartitions Number of partitions this coalescer tries to reduce partitions into + * @param child the SparkPlan + * @param coalescer Optional coalescer that an user specifies */ -case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { +case class CoalesceExec(numPartitions: Int, + child: SparkPlan, + coalescer: Option[PartitionCoalescer]) extends UnaryExecNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = { @@ -743,7 +750,7 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN // `SinglePartition`. new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions) } else { - rdd.coalesce(numPartitions, shuffle = false) + rdd.coalesce(numPartitions, shuffle = false, coalescer) } } From db7163143752c831d725c190b3966b06a4ff85d3 Mon Sep 17 00:00:00 2001 From: subham611 Date: Sun, 12 May 2024 20:03:30 +0530 Subject: [PATCH 2/8] fix uts --- .../scala/org/apache/spark/rdd/RDDSuite.scala | 15 ++++-- .../scala/org/apache/spark/sql/Dataset.scala | 3 +- .../sql/JavaBeanDeserializationSuite.java | 4 +- .../org/apache/spark/sql/DatasetSuite.scala | 51 +++++++++++++++++-- .../spark/sql/execution/PlannerSuite.scala | 2 +- .../adaptive/AdaptiveQueryExecSuite.scala | 2 +- .../execution/python/PythonUDTFSuite.scala | 8 +-- 7 files changed, 70 insertions(+), 15 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index f2e20ff6c9e0..23c91dee64a3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1331,8 +1331,17 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { * Took this class out of the test suite to prevent "Task not serializable" exceptions. */ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable { + + def getPartitions(parent: RDD[_]): Array[Partition] = { + parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions + } + + def getPartitionSize(partition: Partition): Long = { + partition.asInstanceOf[HadoopPartition].inputSplit.value.getLength + } + override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { - val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions + val partitions: Array[Partition] = getPartitions(parent) val groups = ArrayBuffer[PartitionGroup]() var currentGroup = new PartitionGroup() var currentSum = 0L @@ -1341,8 +1350,8 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria // sort partitions based on the size of the corresponding input splits partitions.sortWith((partition1, partition2) => { - val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength - val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength + val partition1Size = getPartitionSize(partition1) + val partition2Size = getPartitionSize(partition2) partition1Size < partition2Size }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c8c7c07abb32..eb69ca44173a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3830,7 +3830,8 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def coalesce(numPartitions: Int, - userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = withTypedPlan { + userDefinedCoalescer: Option[PartitionCoalescer] = None): + Dataset[T] = withTypedPlan { Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java index a83041dc522c..99ea78d7bbf9 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.spark.rdd.DefaultPartitionCoalescer; import org.junit.jupiter.api.*; import org.apache.spark.api.java.function.MapFunction; @@ -39,6 +40,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.test.TestSparkSession; +import scala.Option; import scala.Tuple2; public class JavaBeanDeserializationSuite implements Serializable { @@ -565,7 +567,7 @@ public void testSPARK38823NoBeanReuse() { Dataset ds = spark.createDataFrame(items, Item.class) .as(encoder) - .coalesce(1); + .coalesce(1, Option.empty()); MapFunction mf = new MapFunction() { @Override diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 16a493b52909..9c2de0569b62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,19 +19,17 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} - import scala.collection.immutable.HashSet import scala.reflect.ClassTag import scala.util.Random - import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ - -import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskContext} +import org.apache.spark.{Partition, SparkConf, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.TestUtils.withListener import org.apache.spark.internal.config.MAX_RESULT_SIZE +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD, SizeBasedCoalescer} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample} import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoders, ExpressionEncoder, OuterScopes} @@ -42,6 +40,7 @@ import org.apache.spark.sql.catalyst.trees.DataFrameQueryContext import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.expressions.UserDefinedFunction @@ -189,6 +188,39 @@ class DatasetSuite extends QueryTest data: _*) } + test("SPARK-19426: custom coalesce") { + withTempPath { path => + val maxSplitSize = 512 + val testData = (1 to 1000).map(i => ClassData(i.toString, i)) + testData.toDS().repartition(50).write.format("csv").save(path.toString) + + withSQLConf( + SQLConf.FILES_MAX_PARTITION_BYTES.key -> (maxSplitSize / 3).toString, + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0" + ) { + val ds = spark.read.format("csv") + .schema("a STRING, b INT") + .load(path.toString) + .as[ClassData] + + val coalescedDataSet = + ds.coalesce(4, Some(new DatasetSizeBasedPartitionCoalescer(maxSplitSize))) + + assert(coalescedDataSet.rdd.partitions.length <= 50) + + val expectedPartitionCount = ds.rdd.partitions.size + val totalPartitionCount = coalescedDataSet.rdd.partitions.map { p1 => + val splitSizes = p1.asInstanceOf[CoalescedRDDPartition].parents.map { p2 => + p2.asInstanceOf[FilePartition].files.map(_.length).sum + } + assert(splitSizes.sum <= maxSplitSize) + splitSizes.size + }.sum + assert(totalPartitionCount === expectedPartitionCount) + } + } + } + test("as tuple") { val data = Seq(("a", 1), ("b", 2)).toDF("a", "b") checkDataset( @@ -2893,3 +2925,14 @@ case class SaveModeArrayCase(modes: Array[SaveMode]) case class K1(a: Long) case class K2(a: Long, b: Long) + +class DatasetSizeBasedPartitionCoalescer(maxSize: Int) extends SizeBasedCoalescer(maxSize) { + + override def getPartitions(parent: RDD[_]): Array[Partition] = { + parent.firstParent.partitions + } + + override def getPartitionSize(partition: Partition): Long = { + partition.asInstanceOf[FilePartition].files.map(x => x.length - x.start).sum + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 15de4c5cc5b2..4986bf26edc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -311,7 +311,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 3) assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2) doubleRepartitioned.queryExecution.optimizedPlan match { - case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) => + case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _, _), _) => assert(numPartitions === 5) assert(shuffle === false) assert(shuffleChild) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index e7b375e55f17..7e48910c8c40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -3032,7 +3032,7 @@ private object TestProblematicCoalesceStrategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { case org.apache.spark.sql.catalyst.plans.logical.Repartition( - numPartitions, false, child) => + numPartitions, false, child, _) => TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil case _ => Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDTFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDTFSuite.scala index 1eaf1d24056d..42e5d4dc7758 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDTFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDTFSuite.scala @@ -159,7 +159,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession { .collectFirst { case r: Repartition => r }.get match { case Repartition( 1, true, SubqueryAlias( - _, _: LocalRelation)) => + _, _: LocalRelation), _) => case other => failure(other) } @@ -188,7 +188,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession { case Sort( _, false, Repartition( 1, true, SubqueryAlias( - _, _: LocalRelation))) => + _, _: LocalRelation), _)) => case other => failure(other) } @@ -217,7 +217,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession { |ORDER BY 1, 2 |""".stripMargin).queryExecution.analyzed plan.collectFirst { case r: Repartition => r } match { - case Some(Repartition(1, true, _)) => + case Some(Repartition(1, true, _, _)) => case _ => failure(plan) } @@ -235,7 +235,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession { |ORDER BY 1, 2 |""".stripMargin).queryExecution.analyzed plan.collectFirst { case r: Repartition => r } match { - case Some(Repartition(1, true, _)) => + case Some(Repartition(1, true, _, _)) => case _ => failure(plan) } From e8f089db033304e90fe117c607b3c5eb4b90f9fe Mon Sep 17 00:00:00 2001 From: subham611 Date: Sun, 12 May 2024 20:27:22 +0530 Subject: [PATCH 3/8] fix scala lint --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 5 ++++- .../apache/spark/sql/execution/basicPhysicalOperators.scala | 2 ++ .../org/apache/spark/sql/JavaBeanDeserializationSuite.java | 1 - 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index eb69ca44173a..d5ecbd80119f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -18,14 +18,17 @@ package org.apache.spark.sql import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} + import scala.annotation.varargs import scala.collection.mutable.{ArrayBuffer, HashSet} import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal + import org.apache.commons.lang3.StringUtils import org.apache.commons.text.StringEscapeUtils + import org.apache.spark.TaskContext import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable} import org.apache.spark.api.java.JavaRDD @@ -40,7 +43,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonGenerator} +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 0db01ce5f120..d148122bfd44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.execution import java.util.concurrent.{Future => JFuture} import java.util.concurrent.TimeUnit._ + import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration + import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java index 99ea78d7bbf9..43e1269f0b17 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java @@ -26,7 +26,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.spark.rdd.DefaultPartitionCoalescer; import org.junit.jupiter.api.*; import org.apache.spark.api.java.function.MapFunction; From d52473d410d142ac5de5b676507760e82d5b8e83 Mon Sep 17 00:00:00 2001 From: subham611 Date: Sun, 12 May 2024 22:48:23 +0530 Subject: [PATCH 4/8] Fix lint issue --- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 9c2de0569b62..2624225290c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,13 +19,16 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} + import scala.collection.immutable.HashSet import scala.reflect.ClassTag import scala.util.Random + import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ + import org.apache.spark.{Partition, SparkConf, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.TestUtils.withListener import org.apache.spark.internal.config.MAX_RESULT_SIZE From ba3c9631e14b73f7671a30c6ae846f3e12a4bbcb Mon Sep 17 00:00:00 2001 From: subham611 Date: Mon, 13 May 2024 11:21:00 +0530 Subject: [PATCH 5/8] Fix UT --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 23c91dee64a3..984decfc9834 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1369,9 +1369,7 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria while (index < partitions.length) { val partition = partitions(index) - val fileSplit = - partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit] - val splitSize = fileSplit.getLength + val splitSize = getPartitionSize(partition) if (currentSum + splitSize < maxSize) { addPartition(partition, splitSize) } else { From 192f2f7afa1fa0f011510e5e10eb7de442cbaf26 Mon Sep 17 00:00:00 2001 From: subham611 Date: Mon, 13 May 2024 11:28:35 +0530 Subject: [PATCH 6/8] Fix UT --- .../scala/org/apache/spark/sql/Dataset.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index d5ecbd80119f..5507e3d03e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3815,6 +3815,28 @@ class Dataset[T] private[sql]( repartitionByRange(None, partitionExprs) } + /** + * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions + * are requested. If a larger number of partitions is requested, it will stay at the current + * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in + * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not + * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. + * + * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, + * this may result in your computation taking place on fewer nodes than + * you like (e.g. one node in the case of numPartitions = 1). To avoid this, + * you can call repartition. This will add a shuffle step, but means the + * current upstream partitions will be executed in parallel (per whatever + * the current partitioning is). + * + * @group typedrel + * @since 1.6.0 + */ + def coalesce(numPartitions: Int): + Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = false, logicalPlan, None) + } + /** * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions * are requested. If a larger number of partitions is requested, it will stay at the current From fa6ed6f0a9e1a60ef5f17c6194260815acac6b5b Mon Sep 17 00:00:00 2001 From: subham611 Date: Mon, 13 May 2024 16:05:27 +0530 Subject: [PATCH 7/8] Adds UT in CollapseRepartitionSuite --- .../spark/sql/catalyst/dsl/package.scala | 6 ++-- .../sql/catalyst/optimizer/Optimizer.scala | 6 ++-- .../plans/logical/basicLogicalOperators.scala | 7 +++++ .../optimizer/CollapseRepartitionSuite.scala | 30 +++++++++++++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 22 ++++---------- .../sql/JavaBeanDeserializationSuite.java | 3 +- 6 files changed, 50 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index f1904c2436ab..4a2f8ccf8319 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.catalyst import java.sql.{Date, Timestamp} import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} - import scala.language.implicitConversions - import org.apache.spark.api.java.function.FilterFunction +import org.apache.spark.rdd.PartitionCoalescer import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ @@ -504,6 +503,9 @@ package object dsl { def coalesce(num: Integer): LogicalPlan = Repartition(num, shuffle = false, logicalPlan) + def coalesce(num: Integer, coalescer: Option[PartitionCoalescer]): LogicalPlan = + Repartition(num, shuffle = false, logicalPlan, coalescer) + def repartition(num: Integer): LogicalPlan = Repartition(num, shuffle = true, logicalPlan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index fcae302d25d8..4c78d7c50e5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1268,9 +1268,9 @@ object CollapseRepartition extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAnyPattern(REPARTITION_OPERATION, REBALANCE_PARTITIONS), ruleId) { // Case 1: When a Repartition has a child of Repartition or RepartitionByExpression, - // 1) When the top node does not enable the shuffle (i.e., coalesce API), but the child - // enables the shuffle. Returns the child node if the last numPartitions is bigger; - // otherwise, keep unchanged. + // 1) When the top node does not enable the shuffle (i.e., coalesce with no user-specified + // strategy), but the child enables the shuffle. Returns the child node if the last + // numPartitions is bigger; otherwise, keep unchanged. // 2) In the other cases, returns the top node with the child's child case r @ Repartition(_, _, child: RepartitionOperation, coalescer) => (r.shuffle, child.shuffle) match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 12c64cf53314..5c08652337c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1841,6 +1841,13 @@ abstract class RepartitionOperation extends UnaryNode { * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer * of the output requires some specific ordering or distribution of the data. + * If `shuffle` = false (`coalesce` cases), this logical plan can have an user-specified strategy + * to coalesce input partitions. + * + * @param numPartitions How many partitions to use in the output RDD + * @param shuffle Whether to shuffle when repartitioning + * @param child the LogicalPlan + * @param coalescer Optional coalescer that an user specifies */ case class Repartition(numPartitions: Int, shuffle: Boolean, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala index f9eb6d2e760c..eb589866609c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.rdd.SizeBasedCoalescer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.PlanTest @@ -106,6 +107,33 @@ class CollapseRepartitionSuite extends PlanTest { comparePlans(optimized2, correctAnswer) } + test("SPARK-19426: coalesce partitions with custom coalescer") { + val coalescer = new DatasetSizeBasedPartitionCoalescer(20) + + val query1 = testRelation + .repartition(10) + .coalesce(20, Option(coalescer)) + + val optimized1 = Optimize.execute(query1.analyze) + val correctAnswer1 = testRelation + .repartition(10) + .coalesce(20, Option(coalescer)) + .analyze + + comparePlans(optimized1, correctAnswer1) + + val query2 = testRelation + .repartition(30) + .coalesce(20, Option(coalescer)) + + val optimized2 = Optimize.execute(query2.analyze) + val correctAnswer2 = testRelation + .repartition(30) + .coalesce(20, Option(coalescer)) + .analyze + comparePlans(optimized2, correctAnswer2) + } + test("distribute above repartition") { // Always respects the top distribute and removes useless repartition val query1 = testRelation @@ -222,4 +250,6 @@ class CollapseRepartitionSuite extends PlanTest { comparePlans(optimized, expected) } } + + class DatasetSizeBasedPartitionCoalescer(maxSize: Int) extends SizeBasedCoalescer(maxSize) {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5507e3d03e61..2a73c30bc261 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3832,31 +3832,19 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def coalesce(numPartitions: Int): - Dataset[T] = withTypedPlan { - Repartition(numPartitions, shuffle = false, logicalPlan, None) + def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = false, logicalPlan) } /** - * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions - * are requested. If a larger number of partitions is requested, it will stay at the current - * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in - * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not - * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. - * - * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, - * this may result in your computation taking place on fewer nodes than - * you like (e.g. one node in the case of numPartitions = 1). To avoid this, - * you can call repartition. This will add a shuffle step, but means the - * current upstream partitions will be executed in parallel (per whatever - * the current partitioning is). + * Returns a new Dataset that an user-defined `PartitionCoalescer` reduces into fewer partitions. + * `userDefinedCoalescer` is the same with a coalescer used in the `RDD` coalesce function. * * @group typedrel * @since 1.6.0 */ def coalesce(numPartitions: Int, - userDefinedCoalescer: Option[PartitionCoalescer] = None): - Dataset[T] = withTypedPlan { + userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = withTypedPlan { Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java index 43e1269f0b17..a83041dc522c 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java @@ -39,7 +39,6 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.test.TestSparkSession; -import scala.Option; import scala.Tuple2; public class JavaBeanDeserializationSuite implements Serializable { @@ -566,7 +565,7 @@ public void testSPARK38823NoBeanReuse() { Dataset ds = spark.createDataFrame(items, Item.class) .as(encoder) - .coalesce(1, Option.empty()); + .coalesce(1); MapFunction mf = new MapFunction() { @Override From f3ebeb296cc8c4a96239ee438be0b49a826baa6a Mon Sep 17 00:00:00 2001 From: subham611 Date: Mon, 13 May 2024 16:06:47 +0530 Subject: [PATCH 8/8] Fix lint --- .../main/scala/org/apache/spark/sql/catalyst/dsl/package.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 4a2f8ccf8319..ef92b6ef9f4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst import java.sql.{Date, Timestamp} import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} + import scala.language.implicitConversions + import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.rdd.PartitionCoalescer import org.apache.spark.sql.Encoder