Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
3445646
SPARK-51016. Fixing the code of isInDeterminate boolean for a Stage
Feb 20, 2025
da11f69
Merge branch 'upstream-master' into SPARK-51016
Feb 20, 2025
3ce5ad3
SPARK-51016. updating comment
Feb 20, 2025
c537b3a
SPARK-51272. Fix for the race condition in Scheduler causing failure …
Feb 21, 2025
765ae55
SPARK-51272. added a reliable bug test demonstrating the race conditi…
Feb 22, 2025
e9a4659
SPARK-51272. fixed comment
Feb 23, 2025
5de7160
Merge branch 'upstream-master' into SPARK-51016
Feb 27, 2025
1da65be
Merge branch 'upstream-master' into SPARK-51272
Feb 27, 2025
85d66c2
Merge branch 'upstream-master' into SPARK-51016
Feb 28, 2025
038e14d
SPARK-51016. Refactored the code so as to pass the boolean hasInDeter…
Feb 28, 2025
7f2d2bc
SPARK-51016. Refactored the code so as to pass the boolean hasInDeter…
Feb 28, 2025
9cec75e
SPARK-51016. fixed test failures after refactoring
Feb 28, 2025
c715348
Merge branch 'upstream-master' into SPARK-51016
Mar 1, 2025
7878a60
SPARK-51016. refcatored the tests to use inner joins instead of Left …
Mar 3, 2025
fc918af
SPARK-51272: Reworked the test reproducing the race condition, by not…
Mar 4, 2025
d204a67
SPARK-51272: formatting change
Mar 4, 2025
5a940a7
SPARK-51272: added some explanation to what test is doing to get the …
Mar 4, 2025
ddeeff8
SPARK-51272: Given the insight by Attila that DagEventProcessing will…
Mar 4, 2025
c4e1fee
SPARK-51272: formatting changes.
Mar 4, 2025
f84af25
initial changes
Mar 6, 2025
ac963bf
SPARK-51272. incorporating review feedback
Mar 6, 2025
d652319
SPARK-51272. incorporating review feedback. removing explict invocati…
Mar 6, 2025
d8b4079
initial changes
Mar 6, 2025
a0c700c
SPARK-51272. rectified a comment and renamed the test
Mar 6, 2025
a930c9d
initial changes
Mar 7, 2025
9b153a3
SPARK-51272. incorporated review feedback to refactor test
Mar 7, 2025
70b7910
SPARK-51272. incorporated review feedback to refactor test
Mar 7, 2025
a5ddc9e
initial changes
Mar 7, 2025
001258a
initial changes
Mar 10, 2025
a819f4f
initial changes
Mar 10, 2025
d8eb1c3
SPARK-51272. implemented review feedback. refactored code. no longer …
Mar 10, 2025
1ec98f0
SPARK-51272. Fixed the situation where ResultStage is inDeterminate b…
Mar 11, 2025
69b66aa
initial config
Mar 12, 2025
c95a34d
Merge branch 'SPARK-51016-functional-bug-repro' into SPARK-51272
Mar 12, 2025
c5fbb45
Merge branch 'upstream-master' into SPARK-51272
Mar 12, 2025
3439bf9
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
abcd133
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
b8865af
SPARK-51272. added disabled functional test to reproduce the issue. N…
Mar 12, 2025
784aad3
SPARK-51272. added another unit test which checks for all partitions …
Mar 12, 2025
cd041ee
SPARK-51272. formatting change
Mar 13, 2025
b4c541d
Merge branch 'SPARK-51016' into SPARK-51272--51016-combined
Mar 13, 2025
f63138b
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Mar 13, 2025
d77bcd8
SPARK-51272. SPARK-51016. combined PRs with HA Test enabled
Mar 13, 2025
2c95271
SPARK-51272. Increased the number of iterations. Using two positions …
Mar 13, 2025
2ec8333
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Mar 13, 2025
1a93cef
Merge branch 'upstream-master' into SPARK-51016
Mar 13, 2025
1014c6b
SPARK-51272. fixed scalastyle issue
Mar 13, 2025
c1faef4
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Mar 13, 2025
b1a0593
SPARK-51272. Implementing review feedbackk. Making sql dependency in …
Mar 14, 2025
dc08a7b
SPARK-51272. Implementing review feedbackk. Making sql dependency in …
Mar 14, 2025
4f263d1
SPARK-51272. refactored the code so that findMissingPartitions code r…
Mar 14, 2025
e5eb08f
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Mar 14, 2025
3895fec
SPARK-51272. refactored the code, based on the review feedback. Refac…
Mar 15, 2025
012081f
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Mar 15, 2025
2498169
SPARK-51272. Aborting the result stage if its number of missing parti…
Mar 15, 2025
b417a67
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Mar 15, 2025
9fd7114
SPARK-51272. reverted previous change of throwing abort exception
Mar 17, 2025
c79518e
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Mar 17, 2025
54f18fe
SPARK-51016. Implemented review feedback. Thank you @squito
Mar 19, 2025
56e572a
SPARK-51016. Implemented review feedback from @squito and @mridulm
Mar 20, 2025
b9bd30b
SPARK-51016. Implemented review feedback from @squito and @mridulm
Mar 20, 2025
f7a6721
SPARK-51016. Implemented review feedback from @squito and @mridulm
Mar 20, 2025
914123f
SPARK-51272. Fixed the tests code as per feedback to use direct depen…
Apr 3, 2025
00a4aad
SPARK-51272. Fixed the race in test causing inconsistent pass
Apr 3, 2025
c416ff6
Merge branch 'SPARK-51016' into SPARK-51272--51016-combined
Apr 3, 2025
1c8350c
Merge branch 'SPARK-51272' into SPARK-51272--51016-combined
Apr 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 64 additions & 26 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,7 @@ private[spark] class DAGScheduler(
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()

case _ =>
}

Expand Down Expand Up @@ -1873,15 +1874,28 @@ private[spark] class DAGScheduler(
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
val task = event.task
val stageId = task.stageId
val stageOption = stageIdToStage.get(task.stageId)
val isIndeterministicZombie = event.reason match {
case Success if stageOption.isDefined =>
val stage = stageOption.get
(task.stageAttemptId < stage.latestInfo.attemptNumber()
&& stage.isIndeterminate) || stage.shouldDiscardResult(task.stageAttemptId)

case _ => false
}

outputCommitCoordinator.taskCompleted(
stageId,
task.stageAttemptId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
if (isIndeterministicZombie) {
TaskKilled(reason = "Indeterminate stage needs all tasks to be retried")
} else {
event.reason
})

if (!stageIdToStage.contains(task.stageId)) {
if (stageOption.isEmpty) {
// The stage may have already finished when we get this event -- e.g. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
Expand All @@ -1893,34 +1907,37 @@ private[spark] class DAGScheduler(
return
}

val stage = stageIdToStage(task.stageId)
val stage = stageOption.get

// Make sure the task's accumulators are updated before any other processing happens, so that
// we can post a task end event before any jobs or stages are updated. The accumulators are
// only updated in certain cases.
event.reason match {
case Success =>
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case None => // Ignore update if task's job has finished.
}
case _ =>
updateAccumulators(event)
if (!isIndeterministicZombie) {
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case _ => // Ignore update if task's job has finished.
}
case _ => updateAccumulators(event)
}
}

case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)

case _ =>
}
if (trackingCacheVisibility) {
// Update rdd blocks' visibility status.
blockManagerMaster.updateRDDBlockVisibility(
event.taskInfo.taskId, visible = event.reason == Success)
event.taskInfo.taskId, visible = event.reason == Success && !isIndeterministicZombie)
}

postTaskEnd(event)
Expand All @@ -1936,7 +1953,7 @@ private[spark] class DAGScheduler(
}

task match {
case rt: ResultTask[_, _] =>
case rt: ResultTask[_, _] if !isIndeterministicZombie =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
Expand Down Expand Up @@ -1984,7 +2001,7 @@ private[spark] class DAGScheduler(
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished")
}

case smt: ShuffleMapTask =>
case smt: ShuffleMapTask if !isIndeterministicZombie =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
// Ignore task completion for old attempt of indeterminate stage
val ignoreIndeterminate = stage.isIndeterminate &&
Expand Down Expand Up @@ -2017,6 +2034,8 @@ private[spark] class DAGScheduler(
processShuffleMapStageCompletion(shuffleStage)
}
}

case _ => // ignore
}

case FetchFailed(bmAddress, shuffleId, _, mapIndex, reduceId, failureMessage) =>
Expand Down Expand Up @@ -2121,6 +2140,12 @@ private[spark] class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}
// If the map stage is INDETERMINATE, which means the map tasks may return
// different result when re-try, we need to re-try all the tasks of the failed
// stage and its succeeding stages, because the input data will be changed after the
Expand All @@ -2147,13 +2172,6 @@ private[spark] class DAGScheduler(
}
}

def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}

activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))

// The stages will be rolled back after checking
Expand All @@ -2171,21 +2189,41 @@ private[spark] class DAGScheduler(
abortStage(mapStage, reason, None)
} else {
rollingBackStages += mapStage
mapOutputTracker.unregisterAllMapAndMergeOutput(
mapStage.shuffleDep.shuffleId)
}
} else {
mapOutputTracker.unregisterAllMapAndMergeOutput(
mapStage.shuffleDep.shuffleId)
}

case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
} else {
resultStage.markAllPartitionsMissing()
}

case _ =>
}
logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " +
log"we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
} else if (failedStage.isIndeterminate) {
failedStage match {
case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
} else {
resultStage.markAllPartitionsMissing()
}

case _ =>
}
}

// We expect one executor failure to trigger many FetchFailures in rapid succession,
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ private[spark] class ResultStage(
resourceProfileId: Int)
extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite, resourceProfileId) {

@volatile
private var discardResultsForAttemptId: Int = -1

/**
* The active job for this result stage. Will be empty if the job has already finished
* (e.g., because the job was cancelled).
Expand All @@ -54,6 +57,14 @@ private[spark] class ResultStage(
_activeJob = None
}

override def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
super.makeNewStageAttempt(numPartitionsToCompute, taskLocalityPreferences)
// clear the attemptId set in the attemptIdAllPartitionsMissing
discardResultsForAttemptId = -1
}

/**
* Returns the sequence of partition ids that are missing (i.e. needs to be computed).
*
Expand All @@ -64,5 +75,16 @@ private[spark] class ResultStage(
(0 until job.numPartitions).filter(id => !job.finished(id))
}

def markAllPartitionsMissing(): Unit = {
this.discardResultsForAttemptId = this.latestInfo.attemptNumber()
val job = activeJob.get
for (id <- 0 until job.numPartitions) {
job.finished(id) = false
}
}

override def shouldDiscardResult(attemptId: Int): Boolean =
this.discardResultsForAttemptId >= attemptId

override def toString: String = "ResultStage " + id
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ private[scheduler] abstract class Stage(
def isIndeterminate: Boolean = {
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}

def shouldDiscardResult(attemptId: Int): Boolean = false
}
Loading