Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b950c07f3d8..9e339db4fb4 100644
index b950c07f3d8..6875582e0aa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture}
Expand Down Expand Up @@ -190,27 +190,30 @@ index b950c07f3d8..9e339db4fb4 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
@@ -1850,7 +1870,7 @@ private[spark] class DAGScheduler(
@@ -1850,7 +1870,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
+ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) {
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
@@ -1861,7 +1881,15 @@ private[spark] class DAGScheduler(
@@ -1861,7 +1882,17 @@ private[spark] class DAGScheduler(

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ if (isCelebornShuffleIndeterminate) {
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read stage
+ }
+ }
+ })
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index bd2823bcac1..e97218b046b 100644
index bd2823bcac1..996cf8662f2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture}
Expand Down Expand Up @@ -190,27 +190,30 @@ index bd2823bcac1..e97218b046b 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
@@ -1921,7 +1941,7 @@ private[spark] class DAGScheduler(
@@ -1921,7 +1941,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
+ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) {
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
@@ -1932,7 +1952,15 @@ private[spark] class DAGScheduler(
@@ -1932,7 +1953,17 @@ private[spark] class DAGScheduler(

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ if (isCelebornShuffleIndeterminate) {
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read stage
+ }
+ }
+ })
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 26be8c72bbc..4323b6d1a75 100644
index 26be8c72bbc..b29692e8f12 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture}
Expand Down Expand Up @@ -190,27 +190,30 @@ index 26be8c72bbc..4323b6d1a75 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
@@ -1977,7 +1997,7 @@ private[spark] class DAGScheduler(
@@ -1977,7 +1997,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
+ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) {
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
@@ -1988,7 +2008,15 @@ private[spark] class DAGScheduler(
@@ -1988,7 +2009,17 @@ private[spark] class DAGScheduler(

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ if (isCelebornShuffleIndeterminate) {
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read stage
+ }
+ }
+ })
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ index 00000000000..5e190c512df
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 89d16e57934..36ce50093c0 100644
index 89d16e57934..8b9ae779be2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture}
Expand Down Expand Up @@ -202,27 +202,30 @@ index 89d16e57934..36ce50093c0 100644
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
@@ -2042,7 +2065,7 @@ private[spark] class DAGScheduler(
@@ -2042,7 +2065,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
- if (mapStage.isIndeterminate) {
+ if (mapStage.isIndeterminate || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
+ val isCelebornShuffleIndeterminate = CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+ if (mapStage.isIndeterminate || isCelebornShuffleIndeterminate) {
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
@@ -2053,7 +2076,15 @@ private[spark] class DAGScheduler(
@@ -2053,7 +2077,17 @@ private[spark] class DAGScheduler(

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
- stageChain.drop(1).foreach(s => stagesToRollback += s)
+ stageChain.drop(1).foreach(s => {
+ stagesToRollback += s
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ if (isCelebornShuffleIndeterminate) {
+ s match {
+ case currentMapStage: ShuffleMapStage =>
+ CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
+ case _: ResultStage =>
+ // do nothing, should abort celeborn skewed read stage
+ }
+ }
+ })
} else {
Expand Down
Loading