diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch index 0e3be7a8ef8..509105cf1e2 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch @@ -135,7 +135,7 @@ index 00000000000..5e190c512df + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index b950c07f3d8..9e339db4fb4 100644 +index b950c07f3d8..6875582e0aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture} @@ -190,27 +190,30 @@ index b950c07f3d8..9e339db4fb4 100644 if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + -@@ -1850,7 +1870,7 @@ private[spark] class DAGScheduler( +@@ -1850,7 +1870,8 @@ private[spark] class DAGScheduler( // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. - if (mapStage.isIndeterminate) { -+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) { ++ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId) ++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) { // It's a little tricky to find all the succeeding stages of `mapStage`, because // each stage only know its parents not children. Here we traverse the stages from // the leaf nodes (the result stages of active jobs), and rollback all the stages -@@ -1861,7 +1881,15 @@ private[spark] class DAGScheduler( +@@ -1861,7 +1882,17 @@ private[spark] class DAGScheduler( def collectStagesToRollback(stageChain: List[Stage]): Unit = { if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) + stageChain.drop(1).foreach(s => { + stagesToRollback += s -+ s match { -+ case currentMapStage: ShuffleMapStage => -+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) -+ case _: ResultStage => ++ if (isCelebornShuffleIndeterminate) { ++ s match { ++ case currentMapStage: ShuffleMapStage => ++ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) ++ case _: ResultStage => + // do nothing, should abort celeborn skewed read stage ++ } + } + }) } else { diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch index 6bb8be96652..44c3f8a9707 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch @@ -135,7 +135,7 @@ index 00000000000..5e190c512df + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index bd2823bcac1..e97218b046b 100644 +index bd2823bcac1..996cf8662f2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture} @@ -190,27 +190,30 @@ index bd2823bcac1..e97218b046b 100644 if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + -@@ -1921,7 +1941,7 @@ private[spark] class DAGScheduler( +@@ -1921,7 +1941,8 @@ private[spark] class DAGScheduler( // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. - if (mapStage.isIndeterminate) { -+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) { ++ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId) ++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) { // It's a little tricky to find all the succeeding stages of `mapStage`, because // each stage only know its parents not children. Here we traverse the stages from // the leaf nodes (the result stages of active jobs), and rollback all the stages -@@ -1932,7 +1952,15 @@ private[spark] class DAGScheduler( +@@ -1932,7 +1953,17 @@ private[spark] class DAGScheduler( def collectStagesToRollback(stageChain: List[Stage]): Unit = { if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) + stageChain.drop(1).foreach(s => { + stagesToRollback += s -+ s match { -+ case currentMapStage: ShuffleMapStage => -+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) -+ case _: ResultStage => ++ if (isCelebornShuffleIndeterminate) { ++ s match { ++ case currentMapStage: ShuffleMapStage => ++ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) ++ case _: ResultStage => + // do nothing, should abort celeborn skewed read stage ++ } + } + }) } else { diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch index 9f38d8026b4..27f7f418806 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch @@ -135,7 +135,7 @@ index 00000000000..5e190c512df + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index 26be8c72bbc..4323b6d1a75 100644 +index 26be8c72bbc..b29692e8f12 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture} @@ -190,27 +190,30 @@ index 26be8c72bbc..4323b6d1a75 100644 if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + -@@ -1977,7 +1997,7 @@ private[spark] class DAGScheduler( +@@ -1977,7 +1997,8 @@ private[spark] class DAGScheduler( // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. - if (mapStage.isIndeterminate) { -+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) { ++ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId) ++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) { // It's a little tricky to find all the succeeding stages of `mapStage`, because // each stage only know its parents not children. Here we traverse the stages from // the leaf nodes (the result stages of active jobs), and rollback all the stages -@@ -1988,7 +2008,15 @@ private[spark] class DAGScheduler( +@@ -1988,7 +2009,17 @@ private[spark] class DAGScheduler( def collectStagesToRollback(stageChain: List[Stage]): Unit = { if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) + stageChain.drop(1).foreach(s => { + stagesToRollback += s -+ s match { -+ case currentMapStage: ShuffleMapStage => -+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) -+ case _: ResultStage => ++ if (isCelebornShuffleIndeterminate) { ++ s match { ++ case currentMapStage: ShuffleMapStage => ++ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) ++ case _: ResultStage => + // do nothing, should abort celeborn skewed read stage ++ } + } + }) } else { diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch index 71d0f98595a..d49a6c2c4c4 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch @@ -135,7 +135,7 @@ index 00000000000..5e190c512df + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index 89d16e57934..36ce50093c0 100644 +index 89d16e57934..8b9ae779be2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture} @@ -202,27 +202,30 @@ index 89d16e57934..36ce50093c0 100644 if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + -@@ -2042,7 +2065,7 @@ private[spark] class DAGScheduler( +@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler( // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. - if (mapStage.isIndeterminate) { -+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) { ++ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId) ++ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) { // It's a little tricky to find all the succeeding stages of `mapStage`, because // each stage only know its parents not children. Here we traverse the stages from // the leaf nodes (the result stages of active jobs), and rollback all the stages -@@ -2053,7 +2076,15 @@ private[spark] class DAGScheduler( +@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler( def collectStagesToRollback(stageChain: List[Stage]): Unit = { if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) + stageChain.drop(1).foreach(s => { + stagesToRollback += s -+ s match { -+ case currentMapStage: ShuffleMapStage => -+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) -+ case _: ResultStage => ++ if (isCelebornShuffleIndeterminate) { ++ s match { ++ case currentMapStage: ShuffleMapStage => ++ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId) ++ case _: ResultStage => + // do nothing, should abort celeborn skewed read stage ++ } + } + }) } else {