Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ object ShufflePartitionsUtil extends Logging {
var coalescedSize = 0L
var i = 0

def createPartitionSpec(): Unit = {
// Skip empty inputs, as it is a waste to launch an empty task.
if (coalescedSize > 0) {
def createPartitionSpec(last: Boolean = false): Unit = {
// Skip empty inputs, as it is a waste to launch an empty task
// unless all inputs are empty
if (coalescedSize > 0 || (last && partitionSpecs.isEmpty)) {
Copy link
Contributor

@cloud-fan cloud-fan Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you want to create at least one partition? This doesn't match the PR description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, one partition if all partitions are empty. This creates one partition spec at last when no partition specs have been created.

partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
}
}
Expand All @@ -120,7 +121,7 @@ object ShufflePartitionsUtil extends Logging {
}
i += 1
}
createPartitionSpec()
createPartitionSpec(last = true)
partitionSpecs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
Seq.empty, targetSize, minNumPartitions)
Array(CoalescedPartitionSpec(0, 5)), targetSize, minNumPartitions)
}


Expand Down Expand Up @@ -243,21 +243,23 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
}
}

test("do not create partition spec for 0-size partitions") {
test("do not create partition spec for 0-size partitions except all partitions are empty") {
val targetSize = 100
val minNumPartitions = 2
val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))

{
// 1 shuffle: All bytes per partition are 0, no partition spec created.
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
checkEstimation(Array(bytesByPartitionId), Seq.empty, targetSize)
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, targetSize)
}

{
// 2 shuffles: All bytes per partition are 0, no partition spec created.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Seq.empty, targetSize)
checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs, targetSize)
}

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class AdaptiveQueryExecSuite
}
}

test("Empty stage coalesced to 0-partition RDD") {
test("Empty stage coalesced to 1-partition RDD") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") {
Expand All @@ -213,23 +213,31 @@ class AdaptiveQueryExecSuite
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined)
assert(plan.execute().getNumPartitions == 1)
val coalescedReaders = collect(plan) {
case r: CustomShuffleReaderExec => r
}
assert(coalescedReaders.length == 2)
coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
assert(coalescedReaders.length == 3, s"$plan")
coalescedReaders.foreach(r => assert(r.partitionSpecs.length == 1))
}

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
assert(plan.execute().getNumPartitions == 1)
val coalescedReaders = collect(plan) {
case r: CustomShuffleReaderExec => r
}
assert(coalescedReaders.length == 2, s"$plan")
coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
assert(coalescedReaders.length == 3, s"$plan")
coalescedReaders.foreach { r =>
if (r.isLocalReader) {
assert(r.partitionSpecs.length == 2)
} else {
assert(r.partitionSpecs.length == 1)
}
}
}
}
}
Expand Down