Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkProps extends CoarseGrainedClusterMessage

case object RetrieveMaxExecutorId extends CoarseGrainedClusterMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we seem to be intermixing current and Max. Max makes me think that this is some limit so can we change things to be consistent and perhaps use CurrentExecutorId or LargestAllocated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Thanks for reviewing. Because this variables is the max executorId in previous AM, and this is just called by initializing AM. Our intention is to get the max executorId from all executor in previous AM. So I think maxExecutorId is ok.
@andrewor14 What's your opinion ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what the variable is, but readers looking at just this message without other context wouldn't necessarily know. When I look at the name I think its giving me some limit for max executor id. For instance we have many configs that set max of things ( for instance spark.reducer.maxSizeInFlight, spark.shuffle.io.maxRetries, etc.) That is why I would like the name clarified.

If we change the calling context, then its not just the max executor id for the last AM, its the last executor id that was allocated. So perhaps rename to be RetrieveLastAllocatedExecutorId

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about MaxKnownExecutorId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Ok,I get your mean. Thanks a lot.
Use this name RetrieveLastAllocatedExecutorId is ok ? @vanzin What's your opinion ?


// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]

// The num of current max ExecutorId used to re-register appMaster
protected var currentExecutorIdCounter = 0

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand Down Expand Up @@ -155,6 +158,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < Integer.parseInt(executorId)) {
currentExecutorIdCounter = Integer.parseInt(executorId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of awkward. You don't need to keep track of another variable; just compute the max executor ID when the AM asks for it. You already have all the information you need in executorDataMap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 Thank for review it. For my understanding, I don't think we can get the max executor ID in executorDataMap. Because, when AM is failure, all the executor are disconnect and be removed, by this time, as the code in method CoarseGrainedSchedulerBackend.removeExecutor show, the executor information in executorDataMap also be removed.
So, I think the executor information in executorDataMap is not complete. What do you think ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, with dynamic allocation, for example, the executor with the max known id may be gone already.

minor: executorId.toInt instead of Integer.parseInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Thanks for your comments. I will optimize it.

if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveMaxExecutorId
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -81,8 +82,20 @@ private[yarn] class YarnAllocator(
new ConcurrentHashMap[ContainerId, java.lang.Boolean])

@volatile private var numExecutorsRunning = 0
// Used to generate a unique ID per executor
private var executorIdCounter = 0

/**
* Used to generate a unique ID per executor
*
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
* the id of new executor will start from 1, this will conflict with the executor has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to clarify this to say this is required for client mode when driver isn't running on yarn. this isn't an issue in cluster mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs I think we can clarify this in SPARK-12864 issue. @andrewor14 What's your opinion ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to do it here in this comment to better describe the situation this is needed. It should be a line or two and I personally much prefer that then pointing at jiras unless its a big discussion/background required, then the jira makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Ok, I will do it soon. Thanks a lot.

* already created before. So, we should initialize the `executorIdCounter` by getting
* the max executorId from driver.
*
* @see SPARK-12864
*/
private var executorIdCounter: Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think there is a race condition here. For instance, AM dies right after it sends launch command for a new executor, if the AM comes up and get the currentExecutorId right before the executor registers, this number is going to be off by one (or possibly more then one if it sent launch commands for many). What happens in this case? Do we have a check to make sure we don't allow executor to register with same id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Thanks your comment. Yes. I think in this situation, when Am comes up, if the executor has already registered, the execuotr information must set in executorDataMap. And in executor registers process, it has a check to make sure we don't allow executor to register with same id.
You can see it in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L139

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a check in CoarseGrainedSchedulerBackend:

      case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
        if (executorDataMap.contains(executorId)) {
          context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))

That would cause the container failure count to go up, though. Probably ok? I don't see a way to fix that race without updating the driver with a synchronous RPC every time a new container is launched.

driverRef.askWithRetry[Int](RetrieveMaxExecutorId) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to request the executor id every time allocating a new executor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jerryshao , thanks for reviewing. Allocating a new executor won't execute that code. It just request the executor id when YarnAllocator being created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we should also make sure driverRef is already established, both in yarn client and cluster mode, did you test locally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but just in yarn-client mode. Is it necessary to test in yarn-cluster mode ? @jerryshao

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jerryshao , I made a test just now. It can work in yarn cluster mode. From my understanding, driverRef is created by ApplicationMaster, so, I think at this time driverEndpoint is started. What's your opinion ? Thanks a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Another thing is that current executor id is started from 1, not 0, we'd better keep that convention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit on the fence here whether we do this for cluster mode. We don't need to so its just an extra overhead but at the same time its nice not having a bunch of if checks.. @vanzin opinion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes a difference; it's just one extra RPC call during initialization. In cluster mode that's a local call which should also be much cheaper.

An alternative would be to change the RegisterClusterManager message so that it returns this data to the AM (see AMEndpoint.onStart in ApplicationMaster.scala).

}
@volatile private var numExecutorsFailed = 0

@volatile private var targetNumExecutors =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}

case RetrieveMaxExecutorId =>
context.reply(currentExecutorIdCounter)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down