diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index d8f33a06123f8..ab238626efe9b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -262,7 +262,8 @@ private[spark] class CoarseGrainedExecutorBackend( override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = { val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation]) - val msg = StatusUpdate(executorId, taskId, state, data, resources) + val cpus = executor.runningTasks.get(taskId).taskDescription.cpus + val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources) if (TaskState.isFinished(state)) { taskResources.remove(taskId) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8d8a4592a3e5d..5776ae30d7066 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -400,7 +400,7 @@ private[spark] class Executor( class TaskRunner( execBackend: ExecutorBackend, - private val taskDescription: TaskDescription, + val taskDescription: TaskDescription, private val plugins: Option[PluginContainer]) extends Runnable { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6d2befec155e7..109c737344761 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -74,14 +74,20 @@ private[spark] object CoarseGrainedClusterMessages { taskId: Long, state: TaskState, data: SerializableBuffer, + taskCpus: Int, resources: Map[String, ResourceInformation] = Map.empty) extends CoarseGrainedClusterMessage object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */ - def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer, + def apply( + executorId: String, + taskId: Long, + state: TaskState, + data: ByteBuffer, + taskCpus: Int, resources: Map[String, ResourceInformation]): StatusUpdate = { - StatusUpdate(executorId, taskId, state, new SerializableBuffer(data), resources) + StatusUpdate(executorId, taskId, state, new SerializableBuffer(data), taskCpus, resources) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2d3cf2ebc4f5e..28015dd927914 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -152,14 +152,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } override def receive: PartialFunction[Any, Unit] = { - case StatusUpdate(executorId, taskId, state, data, resources) => + case StatusUpdate(executorId, taskId, state, data, taskCpus, resources) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => - val rpId = executorInfo.resourceProfileId - val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) - val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) executorInfo.freeCores += taskCpus resources.foreach { case (k, v) => executorInfo.resourcesInfo.get(k).foreach { r => @@ -418,10 +415,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorData = executorDataMap(task.executorId) // Do resources allocation here. The allocated resources will get released after the task // finishes. - val rpId = executorData.resourceProfileId - val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) - val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) - executorData.freeCores -= taskCpus + executorData.freeCores -= task.cpus task.resources.foreach { case (rName, rInfo) => assert(executorData.resourcesInfo.contains(rName)) executorData.resourcesInfo(rName).acquire(rInfo.addresses) @@ -745,6 +739,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty) } + // this function is for testing only + private[spark] def getExecutorAvailableCpus( + executorId: String): Option[Int] = synchronized { + executorDataMap.get(executorId).map(_.freeCores) + } + // this function is for testing only def getExecutorResourceProfileId(executorId: String): Int = synchronized { val execDataOption = executorDataMap.get(executorId) diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 64789ca94e080..7b8b7cf4cddee 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -310,8 +310,31 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite 1, mutable.Map.empty, mutable.Map.empty, mutable.Map.empty, new Properties, 1, Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) val serializedTaskDescription = TaskDescription.encode(taskDescription) - backend.executor = mock[Executor] backend.rpcEnv.setupEndpoint("Executor 1", backend) + backend.executor = mock[Executor](CALLS_REAL_METHODS) + val executor = backend.executor + // Mock the executor. + val threadPool = ThreadUtils.newDaemonFixedThreadPool(1, "test-executor") + when(executor.threadPool).thenReturn(threadPool) + val runningTasks = new ConcurrentHashMap[Long, Executor#TaskRunner] + when(executor.runningTasks).thenAnswer(_ => runningTasks) + when(executor.conf).thenReturn(conf) + + def getFakeTaskRunner(taskDescription: TaskDescription): Executor#TaskRunner = { + new executor.TaskRunner(backend, taskDescription, None) { + override def run(): Unit = { + logInfo(s"task ${taskDescription.taskId} runs.") + } + + override def kill(interruptThread: Boolean, reason: String): Unit = { + logInfo(s"task ${taskDescription.taskId} killed.") + } + } + } + + // Feed the fake task-runners to be executed by the executor. + doAnswer(_ => getFakeTaskRunner(taskDescription)) + .when(executor).createTaskRunner(any(), any()) // Launch a new task shall add an entry to `taskResources` map. backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription))) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 17c9f2e9dffa3..15c8ab16ec97c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -254,10 +254,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(exec3ResourceProfileId === rp.id) val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) + val taskCpus = 1 val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], - new Properties(), 1, taskResources, bytebuffer))) + new Properties(), taskCpus, taskResources, bytebuffer))) val ts = backend.getTaskSchedulerImpl() when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs) @@ -273,7 +274,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo // make sure that `availableAddrs` below won't change when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty) backend.driverEndpoint.send( - StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources)) + StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskCpus, taskResources)) eventually(timeout(5 seconds)) { execResources = backend.getExecutorAvailableResources("1") @@ -361,10 +362,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(exec3ResourceProfileId === rp.id) val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) + val taskCpus = 1 val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], - new Properties(), 1, taskResources, bytebuffer))) + new Properties(), taskCpus, taskResources, bytebuffer))) val ts = backend.getTaskSchedulerImpl() when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs) @@ -380,7 +382,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo // make sure that `availableAddrs` below won't change when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty) backend.driverEndpoint.send( - StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources)) + StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskCpus, taskResources)) eventually(timeout(5 seconds)) { execResources = backend.getExecutorAvailableResources("1") @@ -403,6 +405,92 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo "Our unexpected executor does not have a request time.") } + test("SPARK-41848: executor cores should be decreased based on taskCpus") { + val testStartTime = System.currentTimeMillis() + + val execCores = 3 + val conf = new SparkConf() + .set(EXECUTOR_CORES, execCores) + .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test + .set(EXECUTOR_INSTANCES, 0) + .setMaster( + "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]") + .setAppName("test") + + sc = new SparkContext(conf) + + val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend] + // Request execs in the default profile. + backend.requestExecutors(1) + val mockEndpointRef = mock[RpcEndpointRef] + val mockAddress = mock[RpcAddress] + when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => {}) + + var executorAddedCount: Int = 0 + val infos = mutable.ArrayBuffer[ExecutorInfo]() + val listener = new SparkListener() { + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + // Lets check that the exec allocation times "make sense" + val info = executorAdded.executorInfo + infos += info + executorAddedCount += 1 + } + } + + sc.addSparkListener(listener) + + val ts = backend.getTaskSchedulerImpl() + when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty) + backend.driverEndpoint.askSync[Boolean]( + RegisterExecutor("1", mockEndpointRef, mockAddress.host, execCores, Map.empty, Map.empty, + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) + backend.driverEndpoint.send(LaunchedExecutor("1")) + eventually(timeout(5 seconds)) { + assert(backend.getExecutorAvailableCpus("1").contains(3)) + } + + val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf) + val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100) + val buffer = new SerializableBuffer(bytebuffer) + + val defaultRp = ResourceProfile.getOrCreateDefaultProfile(conf) + assert(ResourceProfile.getTaskCpusOrDefaultForProfile(defaultRp, conf) == 1) + // Task cpus can be different from default resource profile when TaskResourceProfile is used. + val taskCpus = 2 + val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", + "t1", 0, 1, mutable.Map.empty[String, Long], + mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], + new Properties(), taskCpus, Map.empty, bytebuffer))) + when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs) + + backend.driverEndpoint.send(ReviveOffers) + + eventually(timeout(5 seconds)) { + assert(backend.getExecutorAvailableCpus("1").contains(1)) + } + + // To avoid allocating any resources immediately after releasing the resource from the task to + // make sure that executor's available cpus below won't change + when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty) + backend.driverEndpoint.send( + StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskCpus)) + + eventually(timeout(5 seconds)) { + assert(backend.getExecutorAvailableCpus("1").contains(3)) + } + sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) + assert(executorAddedCount === 1) + infos.foreach { info => + info.requestTime.map { t => + assert(t > 0, + "Exec request times don't make sense") + assert(t >= testStartTime, + "Exec allocation and request times don't make sense") + assert(t <= info.registrationTime.get, + "Exec allocation and request times don't make sense") + } + } + } private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = { sc.submitJob(