Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c537b3a
SPARK-51272. Fix for the race condition in Scheduler causing failure …
Feb 21, 2025
765ae55
SPARK-51272. added a reliable bug test demonstrating the race conditi…
Feb 22, 2025
e9a4659
SPARK-51272. fixed comment
Feb 23, 2025
1da65be
Merge branch 'upstream-master' into SPARK-51272
Feb 27, 2025
fc918af
SPARK-51272: Reworked the test reproducing the race condition, by not…
Mar 4, 2025
d204a67
SPARK-51272: formatting change
Mar 4, 2025
5a940a7
SPARK-51272: added some explanation to what test is doing to get the …
Mar 4, 2025
ddeeff8
SPARK-51272: Given the insight by Attila that DagEventProcessing will…
Mar 4, 2025
c4e1fee
SPARK-51272: formatting changes.
Mar 4, 2025
f84af25
initial changes
Mar 6, 2025
ac963bf
SPARK-51272. incorporating review feedback
Mar 6, 2025
d652319
SPARK-51272. incorporating review feedback. removing explict invocati…
Mar 6, 2025
d8b4079
initial changes
Mar 6, 2025
a0c700c
SPARK-51272. rectified a comment and renamed the test
Mar 6, 2025
a930c9d
initial changes
Mar 7, 2025
9b153a3
SPARK-51272. incorporated review feedback to refactor test
Mar 7, 2025
70b7910
SPARK-51272. incorporated review feedback to refactor test
Mar 7, 2025
a5ddc9e
initial changes
Mar 7, 2025
001258a
initial changes
Mar 10, 2025
a819f4f
initial changes
Mar 10, 2025
d8eb1c3
SPARK-51272. implemented review feedback. refactored code. no longer …
Mar 10, 2025
1ec98f0
SPARK-51272. Fixed the situation where ResultStage is inDeterminate b…
Mar 11, 2025
69b66aa
initial config
Mar 12, 2025
c95a34d
Merge branch 'SPARK-51016-functional-bug-repro' into SPARK-51272
Mar 12, 2025
c5fbb45
Merge branch 'upstream-master' into SPARK-51272
Mar 12, 2025
3439bf9
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
abcd133
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
b8865af
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
784aad3
SPARK-51272. added another unit test which checks for all partitions …
Mar 12, 2025
cd041ee
SPARK-51272. formatting change
Mar 13, 2025
2c95271
SPARK-51272. Increased the number of iterations. Using two positions …
Mar 13, 2025
1014c6b
SPARK-51272. fixed scalastyle issue
Mar 13, 2025
b1a0593
SPARK-51272. Implementing review feedbackk. Making sql dependency in …
Mar 14, 2025
dc08a7b
SPARK-51272. Implementing review feedbackk. Making sql dependency in …
Mar 14, 2025
4f263d1
SPARK-51272. refactored the code so that findMissingPartitions code r…
Mar 14, 2025
3895fec
SPARK-51272. refactored the code, based on the review feedback. Refac…
Mar 15, 2025
2498169
SPARK-51272. Aborting the result stage if its number of missing parti…
Mar 15, 2025
9fd7114
SPARK-51272. reverted previous change of throwing abort exception
Mar 17, 2025
914123f
SPARK-51272. Fixed the tests code as per feedback to use direct depen…
Apr 3, 2025
00a4aad
SPARK-51272. Fixed the race in test causing inconsistent pass
Apr 3, 2025
2d3cb78
SPARK-51272. provided explanation for the HA test
Apr 11, 2025
47eab67
SPARK-51272. provided explanation for the HA test
Apr 14, 2025
f16187c
SPARK-51272. Made the assertions stronger by using separate hostnames…
Apr 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 143 additions & 96 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1898,25 +1898,51 @@ private[spark] class DAGScheduler(
// Make sure the task's accumulators are updated before any other processing happens, so that
// we can post a task end event before any jobs or stages are updated. The accumulators are
// only updated in certain cases.
event.reason match {
val (readLockTaken, isIndeterministicZombie) = event.reason match {
case Success =>
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case None => // Ignore update if task's job has finished.
}
case _ =>
updateAccumulators(event)
stage.acquireStageReadLock()
val isZombieIndeterminate =
(task.stageAttemptId < stage.latestInfo.attemptNumber()
&& stage.isIndeterminate) ||
stage.treatAllPartitionsMissing(task.stageAttemptId)
if (!isZombieIndeterminate) {
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case _ => // Ignore update if task's job has finished.
}
case _ => updateAccumulators(event)
}
}
case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)
case _ =>
(true, isZombieIndeterminate)

case _: ExceptionFailure | _: TaskKilled =>
updateAccumulators(event)
(false, false)

case _ => (false, false)
}

try {
handleTaskCompletionInOptionalReadLock(event, task, stageId, stage, isIndeterministicZombie)
} finally {
if (readLockTaken) {
stage.releaseStageReadLock()
}
}
}

private def handleTaskCompletionInOptionalReadLock(
event: CompletionEvent,
task: Task[_], stageId: Int,
stage: Stage,
isIndeterministicZombie: Boolean): Unit = {
if (trackingCacheVisibility) {
// Update rdd blocks' visibility status.
blockManagerMaster.updateRDDBlockVisibility(
Expand All @@ -1936,7 +1962,7 @@ private[spark] class DAGScheduler(
}

task match {
case rt: ResultTask[_, _] =>
case rt: ResultTask[_, _] if !isIndeterministicZombie =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
Expand Down Expand Up @@ -1984,7 +2010,7 @@ private[spark] class DAGScheduler(
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished")
}

case smt: ShuffleMapTask =>
case smt: ShuffleMapTask if !isIndeterministicZombie =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
// Ignore task completion for old attempt of indeterminate stage
val ignoreIndeterminate = stage.isIndeterminate &&
Expand Down Expand Up @@ -2017,6 +2043,8 @@ private[spark] class DAGScheduler(
processShuffleMapStageCompletion(shuffleStage)
}
}

case _ => // ignore
}

case FetchFailed(bmAddress, shuffleId, _, mapIndex, reduceId, failureMessage) =>
Expand Down Expand Up @@ -2121,92 +2149,111 @@ private[spark] class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
// If the map stage is INDETERMINATE, which means the map tasks may return
// different result when re-try, we need to re-try all the tasks of the failed
// stage and its succeeding stages, because the input data will be changed after the
// map tasks are re-tried.
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
if (mapStage.isIndeterminate) {
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
// in the stage chains that connect to the `mapStage`. To speed up the stage
// traversing, we collect the stages to rollback first. If a stage needs to
// rollback, all its succeeding stages need to rollback to.
val stagesToRollback = HashSet[Stage](mapStage)

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
stageChain.drop(1).foreach(s => stagesToRollback += s)
} else {
stageChain.head.parents.foreach { s =>
collectStagesToRollback(s :: stageChain)
val writeLockedStages = mutable.Buffer.empty[Stage]
try {
// If the map stage is INDETERMINATE, which means the map tasks may return
// different result when re-try, we need to re-try all the tasks of the failed
// stage and its succeeding stages, because the input data will be changed after the
// map tasks are re-tried.
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
if (mapStage.isIndeterminate) {
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
// in the stage chains that connect to the `mapStage`. To speed up the stage
// traversing, we collect the stages to rollback first. If a stage needs to
// rollback, all its succeeding stages need to rollback to.
val stagesToRollback = HashSet[Stage](mapStage)

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
stageChain.drop(1).foreach(s => stagesToRollback += s)
} else {
stageChain.head.parents.foreach { s =>
collectStagesToRollback(s :: stageChain)
}
}
}
}

def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}
def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}

activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))

activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))

// The stages will be rolled back after checking
val rollingBackStages = HashSet[Stage](mapStage)
stagesToRollback.foreach {
case mapStage: ShuffleMapStage =>
val numMissingPartitions = mapStage.findMissingPartitions().length
if (numMissingPartitions < mapStage.numTasks) {
if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
val reason = "A shuffle map stage with indeterminate output was failed " +
"and retried. However, Spark can only do this while using the new " +
"shuffle block fetching protocol. Please check the config " +
"'spark.shuffle.useOldFetchProtocol', see more detail in " +
"SPARK-27665 and SPARK-25341."
abortStage(mapStage, reason, None)
// The stages will be rolled back after checking
val rollingBackStages = HashSet[Stage](mapStage)
stagesToRollback.foreach {
case mapStage: ShuffleMapStage =>
if (mapStage.acquireStageWriteLock()) {
writeLockedStages += mapStage
}
val numMissingPartitions = mapStage.findMissingPartitions().length
if (numMissingPartitions < mapStage.numTasks) {
if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
val reason = "A shuffle map stage with indeterminate output was failed " +
"and retried. However, Spark can only do this while using the new " +
"shuffle block fetching protocol. Please check the config " +
"'spark.shuffle.useOldFetchProtocol', see more detail in " +
"SPARK-27665 and SPARK-25341."
abortStage(mapStage, reason, None)
} else {
rollingBackStages += mapStage
mapStage.markAttemptIdForAllPartitionsMissing(
mapStage.latestInfo.attemptNumber())
}
} else {
rollingBackStages += mapStage
mapStage.markAttemptIdForAllPartitionsMissing(
mapStage.latestInfo.attemptNumber())
}
}

case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
}
case resultStage: ResultStage if resultStage.activeJob.isDefined =>
if (resultStage.acquireStageWriteLock()) {
writeLockedStages += resultStage
}
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
} else {
resultStage.markAttemptIdForAllPartitionsMissing(
resultStage.latestInfo.attemptNumber())
}

case _ =>
case _ =>
}
logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " +
log"we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
}
logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " +
log"we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
}

// We expect one executor failure to trigger many FetchFailures in rapid succession,
// but all of those task failures can typically be handled by a single resubmission of
// the failed stage. We avoid flooding the scheduler's event queue with resubmit
// messages by checking whether a resubmit is already in the event queue for the
// failed stage. If there is already a resubmit enqueued for a different failed
// stage, that event would also be sufficient to handle the current failed stage, but
// producing a resubmit for each failed stage makes debugging and logging a little
// simpler while not producing an overwhelming number of scheduler events.
logInfo(
log"Resubmitting ${MDC(STAGE, mapStage)} " +
log"(${MDC(STAGE_NAME, mapStage.name)}) and ${MDC(FAILED_STAGE, failedStage)} " +
log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to fetch failure")
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
},
DAGScheduler.RESUBMIT_TIMEOUT,
TimeUnit.MILLISECONDS
)
// We expect one executor failure to trigger many FetchFailures in rapid succession,
// but all of those task failures can typically be handled by a single resubmission of
// the failed stage. We avoid flooding the scheduler's event queue with resubmit
// messages by checking whether a resubmit is already in the event queue for the
// failed stage. If there is already a resubmit enqueued for a different failed
// stage, that event would also be sufficient to handle the current failed stage, but
// producing a resubmit for each failed stage makes debugging and logging a little
// simpler while not producing an overwhelming number of scheduler events.
logInfo(
log"Resubmitting ${MDC(STAGE, mapStage)} " +
log"(${MDC(STAGE_NAME, mapStage.name)}) and ${MDC(FAILED_STAGE, failedStage)} " +
log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to fetch failure")
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
},
DAGScheduler.RESUBMIT_TIMEOUT,
TimeUnit.MILLISECONDS
)
} finally {
writeLockedStages.foreach(_.releaseStageWriteLock())
}
}
}

Expand Down Expand Up @@ -2263,8 +2310,8 @@ private[spark] class DAGScheduler(
log"and there is a more recent attempt for that stage (attempt " +
log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running")
} else {
logInfo(log"Marking ${MDC(STAGE_ID, failedStage.id)} (${MDC(STAGE_NAME, failedStage.name)}) " +
log"as failed due to a barrier task failed.")
logInfo(log"Marking ${MDC(STAGE_ID, failedStage.id)} (${MDC(STAGE_NAME, failedStage.name)}) " +
log"as failed due to a barrier task failed.")
val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
failure.toErrorString
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ private[spark] class ResultStage(
*/
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
val allPartitions = (0 until job.numPartitions)
if (this.treatAllPartitionsMissing(this.latestInfo.attemptNumber())) {
allPartitions
} else {
allPartitions.filter(id => !job.finished(id))
}
}

override def toString: String = "ResultStage " + id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ private[spark] class ShuffleMapStage(

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
if (this.treatAllPartitionsMissing(this.latestInfo.attemptNumber())) {
0 until numPartitions
} else {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
}
}
}
Loading