Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1331,8 +1331,17 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
* Took this class out of the test suite to prevent "Task not serializable" exceptions.
*/
class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable {

def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
}

def getPartitionSize(partition: Partition): Long = {
partition.asInstanceOf[HadoopPartition].inputSplit.value.getLength
}

override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
val partitions: Array[Partition] = getPartitions(parent)
val groups = ArrayBuffer[PartitionGroup]()
var currentGroup = new PartitionGroup()
var currentSum = 0L
Expand All @@ -1341,8 +1350,8 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria

// sort partitions based on the size of the corresponding input splits
partitions.sortWith((partition1, partition2) => {
val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition1Size = getPartitionSize(partition1)
val partition2Size = getPartitionSize(partition2)
partition1Size < partition2Size
})

Expand All @@ -1360,9 +1369,7 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria

while (index < partitions.length) {
val partition = partitions(index)
val fileSplit =
partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
val splitSize = fileSplit.getLength
val splitSize = getPartitionSize(partition)
if (currentSum + splitSize < maxSize) {
addPartition(partition, splitSize)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
import scala.language.implicitConversions

import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -504,6 +505,9 @@ package object dsl {
def coalesce(num: Integer): LogicalPlan =
Repartition(num, shuffle = false, logicalPlan)

def coalesce(num: Integer, coalescer: Option[PartitionCoalescer]): LogicalPlan =
Repartition(num, shuffle = false, logicalPlan, coalescer)

def repartition(num: Integer): LogicalPlan =
Repartition(num, shuffle = true, logicalPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
case Project(l1, limit @ LocalLimit(_, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
val newProjectList = buildCleanedProjectList(l1, l2)
limit.copy(child = p2.copy(projectList = newProjectList))
case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) =>
case Project(l1, r @ Repartition(_, _, p @ Project(l2, _), _)) if isRenaming(l1, l2) =>
r.copy(child = p.copy(projectList = buildCleanedProjectList(l1, p.projectList)))
case Project(l1, s @ Sample(_, _, _, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, p2.projectList)))
Expand Down Expand Up @@ -1268,12 +1268,14 @@ object CollapseRepartition extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsAnyPattern(REPARTITION_OPERATION, REBALANCE_PARTITIONS), ruleId) {
// Case 1: When a Repartition has a child of Repartition or RepartitionByExpression,
// 1) When the top node does not enable the shuffle (i.e., coalesce API), but the child
// enables the shuffle. Returns the child node if the last numPartitions is bigger;
// otherwise, keep unchanged.
// 1) When the top node does not enable the shuffle (i.e., coalesce with no user-specified
// strategy), but the child enables the shuffle. Returns the child node if the last
// numPartitions is bigger; otherwise, keep unchanged.
// 2) In the other cases, returns the top node with the child's child
case r @ Repartition(_, _, child: RepartitionOperation) => (r.shuffle, child.shuffle) match {
case (false, true) => if (r.numPartitions >= child.numPartitions) child else r
case r @ Repartition(_, _, child: RepartitionOperation, coalescer) =>
(r.shuffle, child.shuffle) match {
case (false, true) =>
if (coalescer.isEmpty && r.numPartitions >= child.numPartitions) child else r
case _ => r.copy(child = child.child)
}
// Case 2: When a RepartitionByExpression has a child of global Sort, Repartition or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.sql.catalyst.{AliasIdentifier, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
Expand Down Expand Up @@ -1840,10 +1841,22 @@ abstract class RepartitionOperation extends UnaryNode {
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
* of the output requires some specific ordering or distribution of the data.
* If `shuffle` = false (`coalesce` cases), this logical plan can have an user-specified strategy
* to coalesce input partitions.
*
* @param numPartitions How many partitions to use in the output RDD
* @param shuffle Whether to shuffle when repartitioning
* @param child the LogicalPlan
* @param coalescer Optional coalescer that an user specifies
*/
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
case class Repartition(numPartitions: Int,
shuffle: Boolean,
child: LogicalPlan,
coalescer: Option[PartitionCoalescer] = None)
extends RepartitionOperation {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
require(!shuffle || coalescer.isEmpty,
"Custom coalescer is not allowed for repartition(shuffle=true)")

override def partitioning: Partitioning = {
require(shuffle, "Partitioning can only be used in shuffle.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.rdd.SizeBasedCoalescer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand Down Expand Up @@ -106,6 +107,33 @@ class CollapseRepartitionSuite extends PlanTest {
comparePlans(optimized2, correctAnswer)
}

test("SPARK-19426: coalesce partitions with custom coalescer") {
val coalescer = new DatasetSizeBasedPartitionCoalescer(20)

val query1 = testRelation
.repartition(10)
.coalesce(20, Option(coalescer))

val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation
.repartition(10)
.coalesce(20, Option(coalescer))
.analyze

comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.repartition(30)
.coalesce(20, Option(coalescer))

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation
.repartition(30)
.coalesce(20, Option(coalescer))
.analyze
comparePlans(optimized2, correctAnswer2)
}

test("distribute above repartition") {
// Always respects the top distribute and removes useless repartition
val query1 = testRelation
Expand Down Expand Up @@ -222,4 +250,6 @@ class CollapseRepartitionSuite extends PlanTest {
comparePlans(optimized, expected)
}
}

class DatasetSizeBasedPartitionCoalescer(maxSize: Int) extends SizeBasedCoalescer(maxSize) {}
}
14 changes: 13 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.api.r.RRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PartitionCoalescer, RDD}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QueryPlanningTracker, ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -3836,6 +3836,18 @@ class Dataset[T] private[sql](
Repartition(numPartitions, shuffle = false, logicalPlan)
}

/**
* Returns a new Dataset that an user-defined `PartitionCoalescer` reduces into fewer partitions.
* `userDefinedCoalescer` is the same with a coalescer used in the `RDD` coalesce function.
*
* @group typedrel
* @since 1.6.0
*/
def coalesce(numPartitions: Int,
userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer)
}

/**
* Returns a new Dataset that contains only the unique rows from this Dataset.
* This is an alias for `dropDuplicates`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,11 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, oAttr,
planLater(left), planLater(right)) :: Nil

case r @ logical.Repartition(numPartitions, shuffle, child) =>
case r @ logical.Repartition(numPartitions, shuffle, child, coalescer) =>
if (shuffle) {
ShuffleExchangeExec(r.partitioning, planLater(child), REPARTITION_BY_NUM) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
execution.CoalesceExec(numPartitions, planLater(child), coalescer) :: Nil
}
case logical.Sort(sortExprs, global, child) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration

import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
Expand Down Expand Up @@ -727,8 +727,17 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* you see ShuffleExchange. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* If you want to define how to coalesce partitions, you can set a custom strategy
* to coalesce partitions in `coalescer`.
*
* @param numPartitions Number of partitions this coalescer tries to reduce partitions into
* @param child the SparkPlan
* @param coalescer Optional coalescer that an user specifies
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
case class CoalesceExec(numPartitions: Int,
child: SparkPlan,
coalescer: Option[PartitionCoalescer]) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = {
Expand All @@ -743,7 +752,7 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN
// `SinglePartition`.
new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
} else {
rdd.coalesce(numPartitions, shuffle = false)
rdd.coalesce(numPartitions, shuffle = false, coalescer)
}
}

Expand Down
48 changes: 47 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import org.scalatest.Assertions._
import org.scalatest.exceptions.TestFailedException
import org.scalatest.prop.TableDrivenPropertyChecks._

import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskContext}
import org.apache.spark.{Partition, SparkConf, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskContext}
import org.apache.spark.TestUtils.withListener
import org.apache.spark.internal.config.MAX_RESULT_SIZE
import org.apache.spark.rdd.{CoalescedRDDPartition, RDD, SizeBasedCoalescer}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoders, ExpressionEncoder, OuterScopes}
Expand All @@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.trees.DataFrameQueryContext
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.expressions.UserDefinedFunction
Expand Down Expand Up @@ -189,6 +191,39 @@ class DatasetSuite extends QueryTest
data: _*)
}

test("SPARK-19426: custom coalesce") {
withTempPath { path =>
val maxSplitSize = 512
val testData = (1 to 1000).map(i => ClassData(i.toString, i))
testData.toDS().repartition(50).write.format("csv").save(path.toString)

withSQLConf(
SQLConf.FILES_MAX_PARTITION_BYTES.key -> (maxSplitSize / 3).toString,
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0"
) {
val ds = spark.read.format("csv")
.schema("a STRING, b INT")
.load(path.toString)
.as[ClassData]

val coalescedDataSet =
ds.coalesce(4, Some(new DatasetSizeBasedPartitionCoalescer(maxSplitSize)))

assert(coalescedDataSet.rdd.partitions.length <= 50)

val expectedPartitionCount = ds.rdd.partitions.size
val totalPartitionCount = coalescedDataSet.rdd.partitions.map { p1 =>
val splitSizes = p1.asInstanceOf[CoalescedRDDPartition].parents.map { p2 =>
p2.asInstanceOf[FilePartition].files.map(_.length).sum
}
assert(splitSizes.sum <= maxSplitSize)
splitSizes.size
}.sum
assert(totalPartitionCount === expectedPartitionCount)
}
}
}

test("as tuple") {
val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
checkDataset(
Expand Down Expand Up @@ -2893,3 +2928,14 @@ case class SaveModeArrayCase(modes: Array[SaveMode])

case class K1(a: Long)
case class K2(a: Long, b: Long)

class DatasetSizeBasedPartitionCoalescer(maxSize: Int) extends SizeBasedCoalescer(maxSize) {

override def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.firstParent.partitions
}

override def getPartitionSize(partition: Partition): Long = {
partition.asInstanceOf[FilePartition].files.map(x => x.length - x.start).sum
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 3)
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2)
doubleRepartitioned.queryExecution.optimizedPlan match {
case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) =>
case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _, _), _) =>
assert(numPartitions === 5)
assert(shuffle === false)
assert(shuffleChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3032,7 +3032,7 @@ private object TestProblematicCoalesceStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case org.apache.spark.sql.catalyst.plans.logical.Repartition(
numPartitions, false, child) =>
numPartitions, false, child, _) =>
TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil
case _ => Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession {
.collectFirst { case r: Repartition => r }.get match {
case Repartition(
1, true, SubqueryAlias(
_, _: LocalRelation)) =>
_, _: LocalRelation), _) =>
case other =>
failure(other)
}
Expand Down Expand Up @@ -188,7 +188,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession {
case Sort(
_, false, Repartition(
1, true, SubqueryAlias(
_, _: LocalRelation))) =>
_, _: LocalRelation), _)) =>
case other =>
failure(other)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession {
|ORDER BY 1, 2
|""".stripMargin).queryExecution.analyzed
plan.collectFirst { case r: Repartition => r } match {
case Some(Repartition(1, true, _)) =>
case Some(Repartition(1, true, _, _)) =>
case _ =>
failure(plan)
}
Expand All @@ -235,7 +235,7 @@ class PythonUDTFSuite extends QueryTest with SharedSparkSession {
|ORDER BY 1, 2
|""".stripMargin).queryExecution.analyzed
plan.collectFirst { case r: Repartition => r } match {
case Some(Repartition(1, true, _)) =>
case Some(Repartition(1, true, _, _)) =>
case _ =>
failure(plan)
}
Expand Down