Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c537b3a
SPARK-51272. Fix for the race condition in Scheduler causing failure …
Feb 21, 2025
765ae55
SPARK-51272. added a reliable bug test demonstrating the race conditi…
Feb 22, 2025
e9a4659
SPARK-51272. fixed comment
Feb 23, 2025
1da65be
Merge branch 'upstream-master' into SPARK-51272
Feb 27, 2025
fc918af
SPARK-51272: Reworked the test reproducing the race condition, by not…
Mar 4, 2025
d204a67
SPARK-51272: formatting change
Mar 4, 2025
5a940a7
SPARK-51272: added some explanation to what test is doing to get the …
Mar 4, 2025
ddeeff8
SPARK-51272: Given the insight by Attila that DagEventProcessing will…
Mar 4, 2025
c4e1fee
SPARK-51272: formatting changes.
Mar 4, 2025
f84af25
initial changes
Mar 6, 2025
ac963bf
SPARK-51272. incorporating review feedback
Mar 6, 2025
d652319
SPARK-51272. incorporating review feedback. removing explict invocati…
Mar 6, 2025
d8b4079
initial changes
Mar 6, 2025
a0c700c
SPARK-51272. rectified a comment and renamed the test
Mar 6, 2025
a930c9d
initial changes
Mar 7, 2025
9b153a3
SPARK-51272. incorporated review feedback to refactor test
Mar 7, 2025
70b7910
SPARK-51272. incorporated review feedback to refactor test
Mar 7, 2025
a5ddc9e
initial changes
Mar 7, 2025
001258a
initial changes
Mar 10, 2025
a819f4f
initial changes
Mar 10, 2025
d8eb1c3
SPARK-51272. implemented review feedback. refactored code. no longer …
Mar 10, 2025
1ec98f0
SPARK-51272. Fixed the situation where ResultStage is inDeterminate b…
Mar 11, 2025
69b66aa
initial config
Mar 12, 2025
c95a34d
Merge branch 'SPARK-51016-functional-bug-repro' into SPARK-51272
Mar 12, 2025
c5fbb45
Merge branch 'upstream-master' into SPARK-51272
Mar 12, 2025
3439bf9
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
abcd133
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
b8865af
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
784aad3
SPARK-51272. added another unit test which checks for all partitions …
Mar 12, 2025
cd041ee
SPARK-51272. formatting change
Mar 13, 2025
2c95271
SPARK-51272. Increased the number of iterations. Using two positions …
Mar 13, 2025
1014c6b
SPARK-51272. fixed scalastyle issue
Mar 13, 2025
b1a0593
SPARK-51272. Implementing review feedbackk. Making sql dependency in …
Mar 14, 2025
dc08a7b
SPARK-51272. Implementing review feedbackk. Making sql dependency in …
Mar 14, 2025
4f263d1
SPARK-51272. refactored the code so that findMissingPartitions code r…
Mar 14, 2025
3895fec
SPARK-51272. refactored the code, based on the review feedback. Refac…
Mar 15, 2025
2498169
SPARK-51272. Aborting the result stage if its number of missing parti…
Mar 15, 2025
9fd7114
SPARK-51272. reverted previous change of throwing abort exception
Mar 17, 2025
914123f
SPARK-51272. Fixed the tests code as per feedback to use direct depen…
Apr 3, 2025
00a4aad
SPARK-51272. Fixed the race in test causing inconsistent pass
Apr 3, 2025
2d3cb78
SPARK-51272. provided explanation for the HA test
Apr 11, 2025
47eab67
SPARK-51272. provided explanation for the HA test
Apr 14, 2025
f16187c
SPARK-51272. Made the assertions stronger by using separate hostnames…
Apr 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1873,15 +1873,28 @@ private[spark] class DAGScheduler(
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
val task = event.task
val stageId = task.stageId
val stageOption = stageIdToStage.get(task.stageId)
val isIndeterministicZombie = event.reason match {
case Success if stageOption.isDefined =>
val stage = stageOption.get
(task.stageAttemptId < stage.latestInfo.attemptNumber()
&& stage.isIndeterminate) || stage.shouldDiscardResult(task.stageAttemptId)

case _ => false
}

outputCommitCoordinator.taskCompleted(
stageId,
task.stageAttemptId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
if (isIndeterministicZombie) {
TaskKilled(reason = "Indeterminate stage needs all tasks to be retried")
} else {
event.reason
})

if (!stageIdToStage.contains(task.stageId)) {
if (stageOption.isEmpty) {
// The stage may have already finished when we get this event -- e.g. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
Expand All @@ -1893,34 +1906,37 @@ private[spark] class DAGScheduler(
return
}

val stage = stageIdToStage(task.stageId)
val stage = stageOption.get

// Make sure the task's accumulators are updated before any other processing happens, so that
// we can post a task end event before any jobs or stages are updated. The accumulators are
// only updated in certain cases.
event.reason match {
case Success =>
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case None => // Ignore update if task's job has finished.
}
case _ =>
updateAccumulators(event)
if (!isIndeterministicZombie) {
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case _ => // Ignore update if task's job has finished.
}
case _ => updateAccumulators(event)
}
}

case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)

case _ =>
}
if (trackingCacheVisibility) {
// Update rdd blocks' visibility status.
blockManagerMaster.updateRDDBlockVisibility(
event.taskInfo.taskId, visible = event.reason == Success)
event.taskInfo.taskId, visible = event.reason == Success && !isIndeterministicZombie)
}

postTaskEnd(event)
Expand All @@ -1936,7 +1952,7 @@ private[spark] class DAGScheduler(
}

task match {
case rt: ResultTask[_, _] =>
case rt: ResultTask[_, _] if !isIndeterministicZombie =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
Expand Down Expand Up @@ -1984,7 +2000,7 @@ private[spark] class DAGScheduler(
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished")
}

case smt: ShuffleMapTask =>
case smt: ShuffleMapTask if !isIndeterministicZombie =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
// Ignore task completion for old attempt of indeterminate stage
val ignoreIndeterminate = stage.isIndeterminate &&
Expand Down Expand Up @@ -2017,6 +2033,8 @@ private[spark] class DAGScheduler(
processShuffleMapStageCompletion(shuffleStage)
}
}

case _ => // ignore
}

case FetchFailed(bmAddress, shuffleId, _, mapIndex, reduceId, failureMessage) =>
Expand Down Expand Up @@ -2121,6 +2139,12 @@ private[spark] class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}
// If the map stage is INDETERMINATE, which means the map tasks may return
// different result when re-try, we need to re-try all the tasks of the failed
// stage and its succeeding stages, because the input data will be changed after the
Expand All @@ -2147,13 +2171,6 @@ private[spark] class DAGScheduler(
}
}

def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}

activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))

// The stages will be rolled back after checking
Expand All @@ -2171,21 +2188,41 @@ private[spark] class DAGScheduler(
abortStage(mapStage, reason, None)
} else {
rollingBackStages += mapStage
mapOutputTracker.unregisterAllMapAndMergeOutput(
mapStage.shuffleDep.shuffleId)
}
} else {
mapOutputTracker.unregisterAllMapAndMergeOutput(
mapStage.shuffleDep.shuffleId)
Copy link
Contributor

@mridulm mridulm Apr 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add if/else for SHUFFLE_USE_OLD_FETCH_PROTOCOL.
Orthogonally, given we are in 4.0, perhaps it is time to drop SHUFFLE_USE_OLD_FETCH_PROTOCOL. Thoughts @attilapiros ?

}

case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
} else {
resultStage.markAllPartitionsMissing()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Here and in other places) If ResultStage does not have any missing tasks - why are we failing it ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are at this line, as per my understanding, because the current stage ( result stage) , its first task has failed. And the ResultStage is directly or indirectly dependent on an inDeterminate stage ( that is ResultStage is an inDeterminate type of stage). The function call resultStage.markAllPartitionsMissing(), sets the flag for this stage , such that while this stage is being resubmitted for re-execution ( a separate thread adds it back to the event queue), in that window , if any successful task comes for some other partition , it should be rejected. If it gets added , then that is the race. As it would other wise have resulted in refetch of some partitions ( & not all).
This flag is checked at
line : 1881.
&& stage.isIndeterminate) || stage.shouldDiscardResult(task.stageAttemptId)

}

case _ =>
}
logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " +
log"we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
} else if (failedStage.isIndeterminate) {
failedStage match {
case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
} else {
resultStage.markAllPartitionsMissing()
}

case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failedStage has already been marked as failed.

Given mapStage is not indeterminate, but only failedStage (the 'reducer' stage which is a ResultStage) is - we only need to recompute the failed partitions for it, not all partitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case, where ResultStage is failing due to the first task. The ShuffleID which caused the result stage to fail is Determinate. But the Result Stage ( in case of Join) is dependent on two shuffle stages, where the other shuffle stage is inDeterminate. ( which makes the ResultStage also inDeterminate).
Now at this point its not known whether the other inDeterminate shuffle stage has also failed or not.
Assuming that if it has also failed, and one of the result task is marked successful or if not all partitions are retried, then we will get wrong results ( the commented HA test can fail sporadically due to this problem).
So in this case also, the behaviour needs to be the same as the previous one ( where the failing Shuffle Stage is inDeterminate).

}
}

// We expect one executor failure to trigger many FetchFailures in rapid succession,
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ private[spark] class ResultStage(
resourceProfileId: Int)
extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite, resourceProfileId) {

@volatile
private var discardResultsForAttemptId: Int = -1

/**
* The active job for this result stage. Will be empty if the job has already finished
* (e.g., because the job was cancelled).
Expand All @@ -54,6 +57,14 @@ private[spark] class ResultStage(
_activeJob = None
}

override def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
super.makeNewStageAttempt(numPartitionsToCompute, taskLocalityPreferences)
// clear the attemptId set in the attemptIdAllPartitionsMissing
discardResultsForAttemptId = -1
}

/**
* Returns the sequence of partition ids that are missing (i.e. needs to be computed).
*
Expand All @@ -64,5 +75,16 @@ private[spark] class ResultStage(
(0 until job.numPartitions).filter(id => !job.finished(id))
}

def markAllPartitionsMissing(): Unit = {
this.discardResultsForAttemptId = this.latestInfo.attemptNumber()
val job = activeJob.get
for (id <- 0 until job.numPartitions) {
job.finished(id) = false
}
}

override def shouldDiscardResult(attemptId: Int): Boolean =
this.discardResultsForAttemptId >= attemptId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming both usecases for markAllPartitionsMissing are not required (based on discussion above), we can remove these changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we decide to aggressively abort the query, even if its the first task which is failing ( & resultStage is inDeterminate), this piece of code and the two above, are needed , IMO to avoid the race condition.


override def toString: String = "ResultStage " + id
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ private[scheduler] abstract class Stage(
def isIndeterminate: Boolean = {
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}

def shouldDiscardResult(attemptId: Int): Boolean = false
}
Loading