Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,17 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
* Took this class out of the test suite to prevent "Task not serializable" exceptions.
*/
class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable {

def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
}

def getPartitionSize(partition: Partition): Long = {
partition.asInstanceOf[HadoopPartition].inputSplit.value.getLength
}

override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
val partitions = getPartitions(parent)
val groups = ArrayBuffer[PartitionGroup]()
var currentGroup = new PartitionGroup()
var currentSum = 0L
Expand All @@ -1168,8 +1177,8 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria

// sort partitions based on the size of the corresponding input splits
partitions.sortWith((partition1, partition2) => {
val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition1Size = getPartitionSize(partition1)
val partition2Size = getPartitionSize(partition2)
partition1Size < partition2Size
})

Expand All @@ -1185,23 +1194,21 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria
totalSum += splitSize
}

while (index < partitions.size) {
while (index < partitions.length) {
val partition = partitions(index)
val fileSplit =
partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
val splitSize = fileSplit.getLength
val splitSize = getPartitionSize(partition)
if (currentSum + splitSize < maxSize) {
addPartition(partition, splitSize)
index += 1
if (index == partitions.size) {
updateGroups
if (index == partitions.length) {
updateGroups()
}
} else {
if (currentGroup.partitions.size == 0) {
if (currentGroup.partitions.isEmpty) {
addPartition(partition, splitSize)
index += 1
} else {
updateGroups
updateGroups()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -862,6 +863,19 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
}

/**
* Returns a new RDD that has at most `numPartitions` partitions. This behavior can be modified by
* supplying a [[PartitionCoalescer]] to control the behavior of the partitioning.
*/
case class PartitionCoalesce(
numPartitions: Int,
partitionCoalescer: Option[PartitionCoalescer],
child: LogicalPlan) extends UnaryNode {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
override def output: Seq[Attribute] = child.output
}


/**
* This method repartitions data using [[Expression]]s into `numPartitions`, and receives
* information about the number of partitions during execution. Used when a specific ordering or
Expand Down
25 changes: 20 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PartitionCoalescer, RDD}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
Expand Down Expand Up @@ -2590,7 +2590,7 @@ class Dataset[T] private[sql](
}

/**
* Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
* Returns a new Dataset that has at most `numPartitions` partitions.
* are requested. If a larger number of partitions is requested, it will stay at the current
* number of partitions. Similar to coalesce defined on an `RDD`, this operation results in
* a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not
Expand All @@ -2603,12 +2603,27 @@ class Dataset[T] private[sql](
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* A [[PartitionCoalescer]] can also be supplied allowing the behavior of the partitioning to be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds this trait is unable to be generated as is in Java. Simply wrapping `...` would be fine.

* customized similar to [[RDD.coalesce]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be [[org.apache.spark.rdd.RDD##coalesce]].

*
* @group typedrel
* @since 2.2.0
*/
def coalesce(numPartitions: Int, partitionCoalescer: Option[PartitionCoalescer]): Dataset[T] =
withTypedPlan {
PartitionCoalesce(numPartitions, partitionCoalescer, logicalPlan)
}

/**
* Returns a new Dataset that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
*
* @group typedrel
* @since 1.6.0
*/
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, logicalPlan)
}
def coalesce(numPartitions: Int): Dataset[T] = coalesce(numPartitions, None)

/**
* Returns a new Dataset that contains only the unique rows from this Dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
execution.CoalesceExec(numPartitions, planLater(child), None) :: Nil
}
case logical.PartitionCoalesce(numPartitions, partitionCoalescer, child) =>
execution.CoalesceExec(numPartitions, planLater(child), partitionCoalescer) :: Nil
case logical.Sort(sortExprs, global, child) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
Expand Down Expand Up @@ -571,7 +571,10 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
case class CoalesceExec(
numPartitions: Int,
child: SparkPlan,
partitionCoalescer: Option[PartitionCoalescer]) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = {
Expand All @@ -580,7 +583,7 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN
}

protected override def doExecute(): RDD[InternalRow] = {
child.execute().coalesce(numPartitions, shuffle = false)
child.execute().coalesce(numPartitions, shuffle = false, partitionCoalescer)
}
}

Expand Down
62 changes: 61 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ package org.apache.spark.sql
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import org.apache.hadoop.mapred.FileSplit

import org.apache.spark.Partition
import org.apache.spark.rdd._
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec}
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -118,6 +123,46 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
data: _*)
}

test("coalesce, custom") {

val maxSplitSize = 512
// Similar to the implementation of `test("custom RDD coalescer")` from [[RDDSuite]] we first
// write out to disk, to ensure that our splits are in fact [[FileSplit]] instances.
withTempPath { path =>
val data = (1 to 1000).map(i => ClassData(i.toString, i))
data.toDS().repartition(50).write.format("csv").save(path.toString)

val schema = StructType(Seq($"a".string, $"b".int))

withSQLConf(
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "200",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {

val ds = spark.read.format("csv")
.schema(schema)
.load(path.toString)
.as[ClassData]

val coalescedDataSet =
ds.coalesce(4,
partitionCoalescer = Option(new DataSetSizeBasedPartitionCoalescer(maxSplitSize)))

assert(coalescedDataSet.rdd.partitions.length <= 50)

var totalPartitionCount = 0L
coalescedDataSet.rdd.partitions.foreach(partition => {
var splitSizeSum = 0L
partition.asInstanceOf[CoalescedRDDPartition].parents.foreach(partition => {
splitSizeSum +=
partition.asInstanceOf[FilePartition].files.map(_.length).sum
totalPartitionCount += 1
})
assert(splitSizeSum <= maxSplitSize)
})
}
}
}

test("as tuple") {
val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
checkDataset(
Expand Down Expand Up @@ -1266,3 +1311,18 @@ case class CircularReferenceClassB(cls: CircularReferenceClassA)
case class CircularReferenceClassC(ar: Array[CircularReferenceClassC])
case class CircularReferenceClassD(map: Map[String, CircularReferenceClassE])
case class CircularReferenceClassE(id: String, list: List[CircularReferenceClassD])


class DataSetSizeBasedPartitionCoalescer(maxSize: Int) extends
SizeBasedCoalescer(maxSize) {

override def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.firstParent.asInstanceOf[FileScanRDD].partitions
}

override def getPartitionSize(partition: Partition): Long = {
val res = partition.asInstanceOf[FilePartition].files.map(
x => x.length - x.start).sum
res
}
}