Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ private[spark] class CoarseGrainedExecutorBackend(

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation])
val msg = StatusUpdate(executorId, taskId, state, data, resources)
val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ private[spark] class Executor(

class TaskRunner(
execBackend: ExecutorBackend,
private val taskDescription: TaskDescription,
val taskDescription: TaskDescription,
private val plugins: Option[PluginContainer])
extends Runnable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,20 @@ private[spark] object CoarseGrainedClusterMessages {
taskId: Long,
state: TaskState,
data: SerializableBuffer,
taskCpus: Int,
resources: Map[String, ResourceInformation] = Map.empty)
extends CoarseGrainedClusterMessage

object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer,
def apply(
executorId: String,
taskId: Long,
state: TaskState,
data: ByteBuffer,
taskCpus: Int,
resources: Map[String, ResourceInformation]): StatusUpdate = {
StatusUpdate(executorId, taskId, state, new SerializableBuffer(data), resources)
StatusUpdate(executorId, taskId, state, new SerializableBuffer(data), taskCpus, resources)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data, resources) =>
case StatusUpdate(executorId, taskId, state, data, taskCpus, resources) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
val rpId = executorInfo.resourceProfileId
val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
executorInfo.freeCores += taskCpus
resources.foreach { case (k, v) =>
executorInfo.resourcesInfo.get(k).foreach { r =>
Expand Down Expand Up @@ -418,10 +415,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorData = executorDataMap(task.executorId)
// Do resources allocation here. The allocated resources will get released after the task
// finishes.
val rpId = executorData.resourceProfileId
val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
executorData.freeCores -= taskCpus
executorData.freeCores -= task.cpus
task.resources.foreach { case (rName, rInfo) =>
assert(executorData.resourcesInfo.contains(rName))
executorData.resourcesInfo(rName).acquire(rInfo.addresses)
Expand Down Expand Up @@ -745,6 +739,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty)
}

// this function is for testing only
private[spark] def getExecutorAvailableCpus(
executorId: String): Option[Int] = synchronized {
executorDataMap.get(executorId).map(_.freeCores)
}

// this function is for testing only
def getExecutorResourceProfileId(executorId: String): Int = synchronized {
val execDataOption = executorDataMap.get(executorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,31 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
1, mutable.Map.empty, mutable.Map.empty, mutable.Map.empty, new Properties, 1,
Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
val serializedTaskDescription = TaskDescription.encode(taskDescription)
backend.executor = mock[Executor]
backend.rpcEnv.setupEndpoint("Executor 1", backend)
backend.executor = mock[Executor](CALLS_REAL_METHODS)
val executor = backend.executor
// Mock the executor.
val threadPool = ThreadUtils.newDaemonFixedThreadPool(1, "test-executor")
when(executor.threadPool).thenReturn(threadPool)
val runningTasks = new ConcurrentHashMap[Long, Executor#TaskRunner]
when(executor.runningTasks).thenAnswer(_ => runningTasks)
when(executor.conf).thenReturn(conf)

def getFakeTaskRunner(taskDescription: TaskDescription): Executor#TaskRunner = {
new executor.TaskRunner(backend, taskDescription, None) {
override def run(): Unit = {
logInfo(s"task ${taskDescription.taskId} runs.")
}

override def kill(interruptThread: Boolean, reason: String): Unit = {
logInfo(s"task ${taskDescription.taskId} killed.")
}
}
}

// Feed the fake task-runners to be executed by the executor.
doAnswer(_ => getFakeTaskRunner(taskDescription))
.when(executor).createTaskRunner(any(), any())

// Launch a new task shall add an entry to `taskResources` map.
backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
assert(exec3ResourceProfileId === rp.id)

val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
val taskCpus = 1
val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), 1, taskResources, bytebuffer)))
new Properties(), taskCpus, taskResources, bytebuffer)))
val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)

Expand All @@ -273,7 +274,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
// make sure that `availableAddrs` below won't change
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty)
backend.driverEndpoint.send(
StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources))
StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskCpus, taskResources))

eventually(timeout(5 seconds)) {
execResources = backend.getExecutorAvailableResources("1")
Expand Down Expand Up @@ -361,10 +362,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
assert(exec3ResourceProfileId === rp.id)

val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
val taskCpus = 1
val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), 1, taskResources, bytebuffer)))
new Properties(), taskCpus, taskResources, bytebuffer)))
val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)

Expand All @@ -380,7 +382,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
// make sure that `availableAddrs` below won't change
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty)
backend.driverEndpoint.send(
StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources))
StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskCpus, taskResources))

eventually(timeout(5 seconds)) {
execResources = backend.getExecutorAvailableResources("1")
Expand All @@ -403,6 +405,92 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
"Our unexpected executor does not have a request time.")
}

test("SPARK-41848: executor cores should be decreased based on taskCpus") {
val testStartTime = System.currentTimeMillis()

val execCores = 3
val conf = new SparkConf()
.set(EXECUTOR_CORES, execCores)
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
.set(EXECUTOR_INSTANCES, 0)
.setMaster(
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
.setAppName("test")

sc = new SparkContext(conf)

val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
// Request execs in the default profile.
backend.requestExecutors(1)
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => {})

var executorAddedCount: Int = 0
val infos = mutable.ArrayBuffer[ExecutorInfo]()
val listener = new SparkListener() {
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
// Lets check that the exec allocation times "make sense"
val info = executorAdded.executorInfo
infos += info
executorAddedCount += 1
}
}

sc.addSparkListener(listener)

val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty)
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("1", mockEndpointRef, mockAddress.host, execCores, Map.empty, Map.empty,
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.send(LaunchedExecutor("1"))
eventually(timeout(5 seconds)) {
assert(backend.getExecutorAvailableCpus("1").contains(3))
}

val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
val buffer = new SerializableBuffer(bytebuffer)

val defaultRp = ResourceProfile.getOrCreateDefaultProfile(conf)
assert(ResourceProfile.getTaskCpusOrDefaultForProfile(defaultRp, conf) == 1)
// Task cpus can be different from default resource profile when TaskResourceProfile is used.
val taskCpus = 2
val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), taskCpus, Map.empty, bytebuffer)))
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)

backend.driverEndpoint.send(ReviveOffers)

eventually(timeout(5 seconds)) {
assert(backend.getExecutorAvailableCpus("1").contains(1))
}

// To avoid allocating any resources immediately after releasing the resource from the task to
// make sure that executor's available cpus below won't change
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty)
backend.driverEndpoint.send(
StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskCpus))

eventually(timeout(5 seconds)) {
assert(backend.getExecutorAvailableCpus("1").contains(3))
}
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 1)
infos.foreach { info =>
info.requestTime.map { t =>
assert(t > 0,
"Exec request times don't make sense")
assert(t >= testStartTime,
"Exec allocation and request times don't make sense")
assert(t <= info.registrationTime.get,
"Exec allocation and request times don't make sense")
}
}
}

private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
sc.submitJob(
Expand Down