From ba8bf28985bd0d80377649a3db40c9d8b3040583 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Wed, 14 Jun 2017 23:08:06 +0800 Subject: [PATCH 1/3] Refactoring RetrieveLastAllocatedExecutorId --- .../spark/ExecutorAllocationManager.scala | 4 --- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 10 +++---- .../spark/deploy/yarn/ApplicationMaster.scala | 14 +++++++++- .../spark/deploy/yarn/YarnAllocator.scala | 22 +++------------ .../spark/deploy/yarn/YarnRMClient.scala | 5 ++-- .../cluster/YarnSchedulerBackend.scala | 27 ++++++++++++++++--- .../deploy/yarn/YarnAllocatorSuite.scala | 3 ++- 8 files changed, 50 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index fcc72ff49276..a04c0cc4872c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -249,10 +249,6 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { - initializing = true - numExecutorsTarget = initialNumExecutors - numExecutorsToAdd = 1 - executorsPendingToRemove.clear() removeTimes.clear() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 89a9ad6811e1..f4e287bf5ddf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -35,7 +35,7 @@ private[spark] object CoarseGrainedClusterMessages { ioEncryptionKey: Option[Array[Byte]]) extends CoarseGrainedClusterMessage - case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage + case object GetAMInitialState extends CoarseGrainedClusterMessage // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0b396b794ddc..be82db84e42e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -93,9 +93,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var localityAwareTasks = 0 - // The num of current max ExecutorId used to re-register appMaster - @volatile protected var currentExecutorIdCounter = 0 - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -184,9 +181,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) - if (currentExecutorIdCounter < executorId.toInt) { - currentExecutorIdCounter = executorId.toInt - } + setCurrentExecutorIdCounter(executorId.toInt) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") @@ -654,6 +649,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp defaultAskTimeout.awaitResult(response) } + // Set the num of current max ExecutorId used to re-register appMaster + protected def setCurrentExecutorIdCounter(executorId: Int): Unit = {} + /** * Kill the given list of executors through the cluster manager. * @return whether the kill request is acknowledged. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ce290c399d9f..bc23e7c9fc11 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -388,6 +388,14 @@ private[spark] class ApplicationMaster( dummyRunner.launchContextDebugInfo() } + /** + * (executorIdCounter, requestExecutors) should be the initial state + * or the last state AM restart. + * + * @see SPARK-12864, SPARK-20079 + */ + val (executorIdCounter, requestExecutors) = + driverRef.askSync[(Int, RequestExecutors)](GetAMInitialState) allocator = client.register(driverUrl, driverRef, yarnConf, @@ -395,7 +403,11 @@ private[spark] class ApplicationMaster( uiAddress, historyAddress, securityMgr, - localResources) + localResources, + executorIdCounter) + if (requestExecutors.requestedTotal != allocator.getTargetNumExecutors) { + amEndpoint.send(requestExecutors) + } allocator.allocateResources() reporterThread = launchReporterThread() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ed77a6e4a1c7..00a75d32bc38 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -40,7 +40,6 @@ import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** @@ -65,7 +64,8 @@ private[yarn] class YarnAllocator( appAttemptId: ApplicationAttemptId, securityMgr: SecurityManager, localResources: Map[String, LocalResource], - resolver: SparkRackResolver) + resolver: SparkRackResolver, + private var executorIdCounter: Int) extends Logging { import YarnAllocator._ @@ -82,22 +82,6 @@ private[yarn] class YarnAllocator( @volatile private var numExecutorsRunning = 0 - /** - * Used to generate a unique ID per executor - * - * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then - * the id of new executor will start from 1, this will conflict with the executor has - * already created before. So, we should initialize the `executorIdCounter` by getting - * the max executorId from driver. - * - * And this situation of executorId conflict is just in yarn client mode, so this is an issue - * in yarn client mode. For more details, can check in jira. - * - * @see SPARK-12864 - */ - private var executorIdCounter: Int = - driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) - // Queue to store the timestamp of failed executors private val failedExecutorsTimeStamps = new Queue[Long]() @@ -163,6 +147,8 @@ private[yarn] class YarnAllocator( clock = newClock } + def getTargetNumExecutors: Int = targetNumExecutors + def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = synchronized { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 72f4d273ab53..4af2b163189e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -58,7 +58,8 @@ private[spark] class YarnRMClient extends Logging { uiAddress: Option[String], uiHistoryAddress: String, securityMgr: SecurityManager, - localResources: Map[String, LocalResource] + localResources: Map[String, LocalResource], + executorIdCounter: Int ): YarnAllocator = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) @@ -75,7 +76,7 @@ private[spark] class YarnRMClient extends Logging { registered = true } new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, - localResources, new SparkRackResolver()) + localResources, new SparkRackResolver(), executorIdCounter) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 8452f4377419..3c6253ff8218 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -31,7 +31,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.{RpcUtils, ThreadUtils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} /** * Abstract Yarn scheduler backend that contains common logic @@ -72,6 +72,12 @@ private[spark] abstract class YarnSchedulerBackend( // Flag to specify whether this schedulerBackend should be reset. private var shouldResetOnAmRegister = false + private val currentState = new CurrentAMState(0, + RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty)) + + protected class CurrentAMState( + var executorIdCounter: Int, + var requestExecutors: RequestExecutors) /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -140,7 +146,20 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { - yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) + val requestExecutors = prepareRequestExecutors(requestedTotal) + val future = yarnSchedulerEndpointRef.ask[Boolean](requestExecutors) + setCurrentRequestExecutors(requestExecutors) + future + } + + override def setCurrentExecutorIdCounter(executorId: Int): Unit = synchronized { + if (currentState.executorIdCounter < executorId.toInt) { + currentState.executorIdCounter = executorId.toInt + } + } + + def setCurrentRequestExecutors(requestExecutors: RequestExecutors): Unit = synchronized { + currentState.requestExecutors = requestExecutors } /** @@ -312,8 +331,8 @@ private[spark] abstract class YarnSchedulerBackend( context.reply(false) } - case RetrieveLastAllocatedExecutorId => - context.reply(currentExecutorIdCounter) + case GetAMInitialState => + context.reply((currentState.executorIdCounter, currentState.requestExecutors)) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 97b0e8aca333..3b6bdbcb3cfa 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -101,7 +101,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter appAttemptId, new SecurityManager(sparkConf), Map(), - new MockResolver()) + new MockResolver(), + 0) } def createContainer(host: String): Container = { From 1496b78d2bcd2003b23307f767c57c0dc2818e16 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 17 Jul 2017 14:36:55 -0700 Subject: [PATCH 2/3] [SPARK-20079][yarn] Fix client AM not allocating executors aftert restart. The main goal of this change is to avoid the situation described in the bug, where an AM restart in the middle of a job may cause no new executors to be allocated because of faulty logic in the reset path. The change does two things: - fixes the executor alloc manager's reset() so that it does not stop allocation after a reset() in the middle of a job - re-orders the initialization of the YarnAllocator class so that it fetches the current executor ID before triggering the reset() above. This ensures both that the new allocator gets new requests for executors, and that it starts from the correct executor id. Tested with unit tests and by manually causing AM restarts while running jobs using spark-shell in YARN mode. --- .../spark/ExecutorAllocationManager.scala | 24 +++++-- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 10 +-- .../spark/deploy/yarn/ApplicationMaster.scala | 72 ++++++------------- .../spark/deploy/yarn/YarnAllocator.scala | 22 ++++-- .../spark/deploy/yarn/YarnRMClient.scala | 5 +- .../cluster/YarnSchedulerBackend.scala | 36 ++-------- .../deploy/yarn/YarnAllocatorSuite.scala | 3 +- 8 files changed, 74 insertions(+), 100 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index a04c0cc4872c..9a0e3b555789 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.util.control.ControlThrowable +import scala.util.control.{ControlThrowable, NonFatal} import com.codahale.metrics.{Gauge, MetricRegistry} @@ -245,10 +245,15 @@ private[spark] class ExecutorAllocationManager( } /** - * Reset the allocation manager to the initial state. Currently this will only be called in - * yarn-client mode when AM re-registers after a failure. + * Reset the allocation manager when the cluster manager loses track of the driver's state. + * This is currently only done in YARN client mode, when the AM is restarted. + * + * This method forgets about any state about existing executors, and forces the scheduler to + * re-evaluate the number of needed executors the next time it's run. */ def reset(): Unit = synchronized { + addTime = 0L + numExecutorsTarget = initialNumExecutors executorsPendingToRemove.clear() removeTimes.clear() } @@ -372,8 +377,17 @@ private[spark] class ExecutorAllocationManager( return 0 } - val addRequestAcknowledged = testing || - client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) + val addRequestAcknowledged = try { + testing || + client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) + } catch { + case NonFatal(e) => + // Use INFO level so the error it doesn't show up by default in shells. Errors here are more + // commonly caused by YARN AM restarts, which is a recoverable issue, and generate a lot of + // noisy output. + logInfo("Error reaching cluster manager.", e) + false + } if (addRequestAcknowledged) { val executorsString = "executor" + { if (delta > 1) "s" else "" } logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index f4e287bf5ddf..89a9ad6811e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -35,7 +35,7 @@ private[spark] object CoarseGrainedClusterMessages { ioEncryptionKey: Option[Array[Byte]]) extends CoarseGrainedClusterMessage - case object GetAMInitialState extends CoarseGrainedClusterMessage + case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index be82db84e42e..0b396b794ddc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -93,6 +93,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var localityAwareTasks = 0 + // The num of current max ExecutorId used to re-register appMaster + @volatile protected var currentExecutorIdCounter = 0 + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -181,7 +184,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) - setCurrentExecutorIdCounter(executorId.toInt) + if (currentExecutorIdCounter < executorId.toInt) { + currentExecutorIdCounter = executorId.toInt + } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") @@ -649,9 +654,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp defaultAskTimeout.awaitResult(response) } - // Set the num of current max ExecutorId used to re-register appMaster - protected def setCurrentExecutorIdCounter(executorId: Int): Unit = {} - /** * Kill the given list of executors through the cluster manager. * @return whether the kill request is acknowledged. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bc23e7c9fc11..8f148717c27a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -127,7 +127,6 @@ private[spark] class ApplicationMaster( private var nextAllocationInterval = initialAllocationInterval private var rpcEnv: RpcEnv = null - private var amEndpoint: RpcEndpointRef = _ // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. private val sparkContextPromise = Promise[SparkContext]() @@ -388,14 +387,6 @@ private[spark] class ApplicationMaster( dummyRunner.launchContextDebugInfo() } - /** - * (executorIdCounter, requestExecutors) should be the initial state - * or the last state AM restart. - * - * @see SPARK-12864, SPARK-20079 - */ - val (executorIdCounter, requestExecutors) = - driverRef.askSync[(Int, RequestExecutors)](GetAMInitialState) allocator = client.register(driverUrl, driverRef, yarnConf, @@ -403,38 +394,28 @@ private[spark] class ApplicationMaster( uiAddress, historyAddress, securityMgr, - localResources, - executorIdCounter) - if (requestExecutors.requestedTotal != allocator.getTargetNumExecutors) { - amEndpoint.send(requestExecutors) - } + localResources) + + // Initialize the AM endpoint *after* the allocator has been initialized. This ensures + // that when the driver sends an initial executor request (e.g. after an AM restart), + // the allocator is ready to service requests. + rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() reporterThread = launchReporterThread() } /** - * Create an [[RpcEndpoint]] that communicates with the driver. - * - * In cluster mode, the AM and the driver belong to same process - * so the AMEndpoint need not monitor lifecycle of the driver. - * - * @return A reference to the driver's RPC endpoint. + * @return An [[RpcEndpoint]] that communicates with the driver's scheduler backend. */ - private def runAMEndpoint( - host: String, - port: String, - isClusterMode: Boolean): RpcEndpointRef = { - val driverEndpoint = rpcEnv.setupEndpointRef( + private def createSchedulerRef(host: String, port: String): RpcEndpointRef = { + rpcEnv.setupEndpointRef( RpcAddress(host, port.toInt), YarnSchedulerBackend.ENDPOINT_NAME) - amEndpoint = - rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode)) - driverEndpoint } private def runDriver(securityMgr: SecurityManager): Unit = { - addAmIpFilter() + addAmIpFilter(None) userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has @@ -446,10 +427,9 @@ private[spark] class ApplicationMaster( Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv - val driverRef = runAMEndpoint( + val driverRef = createSchedulerRef( sc.getConf.get("spark.driver.host"), - sc.getConf.get("spark.driver.port"), - isClusterMode = true) + sc.getConf.get("spark.driver.port")) registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) } else { // Sanity check; should never happen in normal operation, since sc should only be null @@ -474,7 +454,7 @@ private[spark] class ApplicationMaster( rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr, clientMode = true) val driverRef = waitForSparkDriver() - addAmIpFilter() + addAmIpFilter(Some(driverRef)) registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"), securityMgr) @@ -622,20 +602,21 @@ private[spark] class ApplicationMaster( sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - - runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false) + createSchedulerRef(driverHost, driverPort.toString) } /** Add the Yarn IP filter that is required for properly securing the UI. */ - private def addAmIpFilter() = { + private def addAmIpFilter(driver: Option[RpcEndpointRef]) = { val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" val params = client.getAmIpFilterParams(yarnConf, proxyBase) - if (isClusterMode) { - System.setProperty("spark.ui.filters", amFilter) - params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } - } else { - amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase)) + driver match { + case Some(d) => + d.send(AddWebUIFilter(amFilter, params.toMap, proxyBase)) + + case None => + System.setProperty("spark.ui.filters", amFilter) + params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } } } @@ -706,20 +687,13 @@ private[spark] class ApplicationMaster( /** * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. */ - private class AMEndpoint( - override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean) + private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef) extends RpcEndpoint with Logging { override def onStart(): Unit = { driver.send(RegisterClusterManager(self)) } - override def receive: PartialFunction[Any, Unit] = { - case x: AddWebUIFilter => - logInfo(s"Add WebUI Filter. $x") - driver.send(x) - } - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: RequestExecutors => Option(allocator) match { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 00a75d32bc38..ed77a6e4a1c7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -40,6 +40,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** @@ -64,8 +65,7 @@ private[yarn] class YarnAllocator( appAttemptId: ApplicationAttemptId, securityMgr: SecurityManager, localResources: Map[String, LocalResource], - resolver: SparkRackResolver, - private var executorIdCounter: Int) + resolver: SparkRackResolver) extends Logging { import YarnAllocator._ @@ -82,6 +82,22 @@ private[yarn] class YarnAllocator( @volatile private var numExecutorsRunning = 0 + /** + * Used to generate a unique ID per executor + * + * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then + * the id of new executor will start from 1, this will conflict with the executor has + * already created before. So, we should initialize the `executorIdCounter` by getting + * the max executorId from driver. + * + * And this situation of executorId conflict is just in yarn client mode, so this is an issue + * in yarn client mode. For more details, can check in jira. + * + * @see SPARK-12864 + */ + private var executorIdCounter: Int = + driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) + // Queue to store the timestamp of failed executors private val failedExecutorsTimeStamps = new Queue[Long]() @@ -147,8 +163,6 @@ private[yarn] class YarnAllocator( clock = newClock } - def getTargetNumExecutors: Int = targetNumExecutors - def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = synchronized { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 4af2b163189e..72f4d273ab53 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -58,8 +58,7 @@ private[spark] class YarnRMClient extends Logging { uiAddress: Option[String], uiHistoryAddress: String, securityMgr: SecurityManager, - localResources: Map[String, LocalResource], - executorIdCounter: Int + localResources: Map[String, LocalResource] ): YarnAllocator = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) @@ -76,7 +75,7 @@ private[spark] class YarnRMClient extends Logging { registered = true } new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, - localResources, new SparkRackResolver(), executorIdCounter) + localResources, new SparkRackResolver()) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 3c6253ff8218..6e87e609db14 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -69,15 +69,6 @@ private[spark] abstract class YarnSchedulerBackend( /** Scheduler extension services. */ private val services: SchedulerExtensionServices = new SchedulerExtensionServices() - // Flag to specify whether this schedulerBackend should be reset. - private var shouldResetOnAmRegister = false - - private val currentState = new CurrentAMState(0, - RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty)) - - protected class CurrentAMState( - var executorIdCounter: Int, - var requestExecutors: RequestExecutors) /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -146,20 +137,7 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { - val requestExecutors = prepareRequestExecutors(requestedTotal) - val future = yarnSchedulerEndpointRef.ask[Boolean](requestExecutors) - setCurrentRequestExecutors(requestExecutors) - future - } - - override def setCurrentExecutorIdCounter(executorId: Int): Unit = synchronized { - if (currentState.executorIdCounter < executorId.toInt) { - currentState.executorIdCounter = executorId.toInt - } - } - - def setCurrentRequestExecutors(requestExecutors: RequestExecutors): Unit = synchronized { - currentState.requestExecutors = requestExecutors + yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) } /** @@ -281,13 +259,7 @@ private[spark] abstract class YarnSchedulerBackend( case RegisterClusterManager(am) => logInfo(s"ApplicationMaster registered as $am") amEndpoint = Option(am) - if (!shouldResetOnAmRegister) { - shouldResetOnAmRegister = true - } else { - // AM is already registered before, this potentially means that AM failed and - // a new one registered after the failure. This will only happen in yarn-client mode. - reset() - } + reset() case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) @@ -331,8 +303,8 @@ private[spark] abstract class YarnSchedulerBackend( context.reply(false) } - case GetAMInitialState => - context.reply((currentState.executorIdCounter, currentState.requestExecutors)) + case RetrieveLastAllocatedExecutorId => + context.reply(currentExecutorIdCounter) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 3b6bdbcb3cfa..97b0e8aca333 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -101,8 +101,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter appAttemptId, new SecurityManager(sparkConf), Map(), - new MockResolver(), - 0) + new MockResolver()) } def createContainer(host: String): Container = { From 56abc80d3bce0bfae0f7549cd848afddc6538dc7 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 25 Jul 2017 10:45:03 -0700 Subject: [PATCH 3/3] Remove unneeded import. --- .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 6e87e609db14..415a29fd887e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -31,7 +31,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} +import org.apache.spark.util.{RpcUtils, ThreadUtils} /** * Abstract Yarn scheduler backend that contains common logic