Skip to content

Commit ada7726

Browse files
committed
reviewer feedback
1 parent d8eb202 commit ada7726

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,14 @@ private[spark] class TaskSchedulerImpl(
163163
this.synchronized {
164164
val manager = createTaskSetManager(taskSet, maxTaskFailures)
165165
activeTaskSets(taskSet.id) = manager
166-
val taskSetsPerStage = activeTaskSets.values.filterNot(_.isZombie).groupBy(_.stageId)
167-
taskSetsPerStage.foreach { case (stage, taskSets) =>
168-
if (taskSets.size > 1) {
169-
throw new SparkIllegalStateException("more than one active taskSet for stage " + stage)
170-
}
166+
val stage = taskSet.stageId
167+
val conflictingTaskSet = activeTaskSets.exists { case (id, ts) =>
168+
// if the id matches, it really should be the same taskSet, but in some unit tests
169+
// we add new taskSets with the same id
170+
id != taskSet.id && !ts.isZombie && ts.stageId == stage
171+
}
172+
if (conflictingTaskSet) {
173+
throw new SparkIllegalStateException(s"more than one active taskSet for stage $stage")
171174
}
172175
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
173176

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -554,8 +554,10 @@ class DAGSchedulerSuite
554554
assert(sparkListener.failedStages.size == 1)
555555
}
556556

557-
/** This tests the case where another FetchFailed comes in while the map stage is getting
558-
* re-run. */
557+
/**
558+
* This tests the case where another FetchFailed comes in while the map stage is getting
559+
* re-run.
560+
*/
559561
test("late fetch failures don't cause multiple concurrent attempts for the same map stage") {
560562
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
561563
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -607,15 +609,15 @@ class DAGSchedulerSuite
607609
createFakeTaskInfo(),
608610
null))
609611

610-
// Another ResubmitFailedStages event should not result result in another attempt for the map
612+
// Another ResubmitFailedStages event should not result in another attempt for the map
611613
// stage being run concurrently.
614+
// NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't
615+
// effect anything -- our calling it just makes *SURE* it gets called between the desired event
616+
// and our check.
612617
runEvent(ResubmitFailedStages)
613618
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
614619
assert(countSubmittedMapStageAttempts() === 2)
615620

616-
// NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't
617-
// effect anything -- our calling it just makes *SURE* it gets called between the desired event
618-
// and our check.
619621
}
620622

621623
/** This tests the case where a late FetchFailed comes in after the map stage has finished getting

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
141141
val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null)
142142
val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null)
143143
taskScheduler.submitTasks(attempt1)
144-
intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2)}
144+
intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2) }
145+
146+
// OK to submit multiple if previous attempts are all zombie
147+
taskScheduler.activeTaskSets(attempt1.id).isZombie = true
148+
taskScheduler.submitTasks(attempt2)
149+
val attempt3 = new TaskSet(Array(new FakeTask(0)), 0, 2, 0, null)
150+
intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt3) }
151+
taskScheduler.activeTaskSets(attempt2.id).isZombie = true
152+
taskScheduler.submitTasks(attempt3)
145153
}
146154

147155
}

0 commit comments

Comments
 (0)