diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7d9c1c6f96f6c..9add1e05805eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -95,9 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors we have requested the cluster manager to kill that have not died yet; maps // the executor ID to whether it was explicitly killed by the driver (and thus shouldn't - // be considered an app-related failure). + // be considered an app-related failure). Visible for testing only. @GuardedBy("CoarseGrainedSchedulerBackend.this") - private val executorsPendingToRemove = new HashMap[String, Boolean] + private[scheduler] val executorsPendingToRemove = new HashMap[String, Boolean] // A map to store hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") @@ -492,12 +492,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only * be called in the yarn-client mode when AM re-registers after a failure. + * Visible for testing only. * */ - protected def reset(): Unit = { + protected[scheduler] def reset(): Unit = { val executors: Set[String] = synchronized { requestedTotalExecutors = 0 numPendingExecutors = 0 - executorsPendingToRemove.clear() executorDataMap.keys.toSet } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1d648320fc80c..ab64e3b167440 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -21,18 +21,22 @@ import java.util.{Properties, Random} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import org.apache.hadoop.fs.FileAlreadyExistsException import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyString} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.scalatest.Assertions._ +import org.scalatest.PrivateMethodTester +import org.scalatest.concurrent.Eventually import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.serializer.SerializerInstance import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ManualClock} @@ -179,7 +183,12 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) { override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() } -class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logging { +class TaskSetManagerSuite + extends SparkFunSuite + with LocalSparkContext + with PrivateMethodTester + with Eventually + with Logging { import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL} private val conf = new SparkConf @@ -1894,4 +1903,58 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg manager.handleFailedTask(offerResult.get.taskId, TaskState.FAILED, reason) assert(sched.taskSetsFailed.contains(taskSet.id)) } + + test("SPARK-30359: don't clean executorsPendingToRemove " + + "at the beginning of CoarseGrainedSchedulerBackend.reset") { + val conf = new SparkConf() + // use local-cluster mode in order to get CoarseGrainedSchedulerBackend + .setMaster("local-cluster[2, 1, 2048]") + // allow to set up at most two executors + .set("spark.cores.max", "2") + .setAppName("CoarseGrainedSchedulerBackend.reset") + sc = new SparkContext(conf) + val sched = sc.taskScheduler + val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] + + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) + val Seq(exec0, exec1) = backend.getExecutorIds() + + val taskSet = FakeTask.createTaskSet(2) + val stageId = taskSet.stageId + val stageAttemptId = taskSet.stageAttemptId + sched.submitTasks(taskSet) + val taskSetManagers = PrivateMethod[mutable.HashMap[Int, mutable.HashMap[Int, TaskSetManager]]]( + Symbol("taskSetsByStageIdAndAttempt")) + // get the TaskSetManager + val manager = sched.invokePrivate(taskSetManagers()).get(stageId).get(stageAttemptId) + + val task0 = manager.resourceOffer(exec0, "localhost", TaskLocality.NO_PREF) + val task1 = manager.resourceOffer(exec1, "localhost", TaskLocality.NO_PREF) + assert(task0.isDefined && task1.isDefined) + val (taskId0, index0) = (task0.get.taskId, task0.get.index) + val (taskId1, index1) = (task1.get.taskId, task1.get.index) + // set up two running tasks + assert(manager.taskInfos(taskId0).running) + assert(manager.taskInfos(taskId0).executorId === exec0) + assert(manager.taskInfos(taskId1).running) + assert(manager.taskInfos(taskId1).executorId === exec1) + + val numFailures = PrivateMethod[Array[Int]](Symbol("numFailures")) + // no task failures yet + assert(manager.invokePrivate(numFailures())(index0) === 0) + assert(manager.invokePrivate(numFailures())(index1) === 0) + + // let exec1 count task failures but exec0 doesn't + backend.executorsPendingToRemove(exec0) = true + backend.executorsPendingToRemove(exec1) = false + + backend.reset() + + eventually(timeout(10.seconds), interval(100.milliseconds)) { + // executorsPendingToRemove should eventually be empty after reset() + assert(backend.executorsPendingToRemove.isEmpty) + assert(manager.invokePrivate(numFailures())(index0) === 0) + assert(manager.invokePrivate(numFailures())(index1) === 1) + } + } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index dda8172fb6369..471ee58d05cb8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -198,7 +198,7 @@ private[spark] abstract class YarnSchedulerBackend( * and re-registered itself to driver after a failure. The stale state in driver should be * cleaned. */ - override protected def reset(): Unit = { + override protected[scheduler] def reset(): Unit = { super.reset() sc.executorAllocationManager.foreach(_.reset()) }