Skip to content

Conversation

@mccheah
Copy link

@mccheah mccheah commented Feb 4, 2020

What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

Why are the changes needed?

Fix the correctness bug caused by repartition after a shuffle.

Does this PR introduce any user-facing change?

Yes, user will get the right result in the case of repartition stage rerun.

How was this patch tested?

Test with local-cluster[5, 2, 5120], use the integrated test below, it can return a right answer 100000000.

import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li [email protected]
Signed-off-by: Dongjoon Hyun [email protected]

This cherry-picks apache#25491

…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@mccheah mccheah merged commit 55a26e9 into master Feb 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants