Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkProps extends CoarseGrainedClusterMessage

case object RetrieveCurrentExecutorIdCounter extends CoarseGrainedClusterMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this RetrieveMaxExecutorId. Without context the reader has no idea what executorIdCounter is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 Yeah, you are right. I will fix it soon.


// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]

// The num of current max ExecutorId used to re-register appMaster
private var currentExecutorIdCounter = 0

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand Down Expand Up @@ -155,6 +158,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < Integer.parseInt(executorId)) {
currentExecutorIdCounter = Integer.parseInt(executorId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of awkward. You don't need to keep track of another variable; just compute the max executor ID when the AM asks for it. You already have all the information you need in executorDataMap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 Thank for review it. For my understanding, I don't think we can get the max executor ID in executorDataMap. Because, when AM is failure, all the executor are disconnect and be removed, by this time, as the code in method CoarseGrainedSchedulerBackend.removeExecutor show, the executor information in executorDataMap also be removed.
So, I think the executor information in executorDataMap is not complete. What do you think ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, with dynamic allocation, for example, the executor with the max known id may be gone already.

minor: executorId.toInt instead of Integer.parseInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Thanks for your comments. I will optimize it.

if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
Expand Down Expand Up @@ -184,6 +190,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

case RetrieveSparkProps =>
context.reply(sparkProperties)

case RetrieveCurrentExecutorIdCounter =>
context.reply(currentExecutorIdCounter)
}

// Make fake resource offers on all executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ private[spark] class ApplicationMaster(
historyAddress,
securityMgr)

allocator.initExecutorIdCounter()
allocator.allocateResources()
reporterThread = launchReporterThread()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveCurrentExecutorIdCounter
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.Utils

/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
Expand Down Expand Up @@ -168,6 +171,24 @@ private[yarn] class YarnAllocator(
.toSeq
}

/**
* Init `executorIdCounter`
*/
def initExecutorIdCounter(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, you can just initialize executorIdCounter to the value you get from the driver + 1 right?

/**
 * [Add huge comment related to SPARK-12864 to explain why we need to do this here.]
 */
private var executorCounter: Int = {
  driverRef.askWithRetry[Int](RetrieveMaxExecutorId) + 1
}

then you don't need this method

val port = sparkConf.getInt("spark.yarn.am.port", 0)
SparkHadoopUtil.get.runAsSparkUser { () =>
val init = RpcEnv.create(
"executorIdCounterInit",
Utils.localHostName,
port,
sparkConf,
new SecurityManager(sparkConf))
val driver = init.setupEndpointRefByURI(driverUrl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create another endpoint here? Can't we just use driverRef?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @andrewor14 , driverRef doesn't work in this case. Because, for my understanding, driverRef which endpoint name called YarnScheduler send message to YarnSchedulerEndpoint (or get message from YarnSchedulerEndpoint), while we should get max executorId from CoarseGrainedSchedulerBackend.DriverEndpoint which endpoint name called CoarseGrainedScheduler.

So, I think we should need a method to initialize executorIdCounter. And as you said, we should add huge comment huge comment related to SPARK-12864 to explain why we need to do this at this method. What‘s your opinion ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YarnSchedulerBackend extends CoarseGrainedSchedulerBackend, so what you mentioned can be achieved, you can check other codes inside the class to know how other codes handle this. Creating another endpoint is not necessary and weird here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jerryshao , thanks for your comments. I see what you mean, I will fix it soon. Thanks a lot.

executorIdCounter = driver.askWithRetry[Integer](RetrieveCurrentExecutorIdCounter)
init.shutdown()
}
}

/**
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
Expand Down