diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0b66d1cf08eac..86536bf8a09c3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.util.Collections import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern import scala.collection.mutable @@ -85,7 +86,9 @@ private[yarn] class YarnAllocator( private val releasedContainers = Collections.newSetFromMap[ContainerId]( new ConcurrentHashMap[ContainerId, java.lang.Boolean]) - @volatile private var numExecutorsRunning = 0 + private val numExecutorsRunning = new AtomicInteger(0) + + private val numExecutorsStarting = new AtomicInteger(0) /** * Used to generate a unique ID per executor @@ -180,7 +183,7 @@ private[yarn] class YarnAllocator( clock = newClock } - def getNumExecutorsRunning: Int = numExecutorsRunning + def getNumExecutorsRunning: Int = numExecutorsRunning.get() def getNumExecutorsFailed: Int = synchronized { val endTime = clock.getTimeMillis() @@ -242,7 +245,7 @@ private[yarn] class YarnAllocator( if (executorIdToContainer.contains(executorId)) { val container = executorIdToContainer.get(executorId).get internalReleaseContainer(container) - numExecutorsRunning -= 1 + numExecutorsRunning.decrementAndGet() } else { logWarning(s"Attempted to kill unknown executor $executorId!") } @@ -267,10 +270,12 @@ private[yarn] class YarnAllocator( val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { - logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s." + logDebug(("Allocated containers: %d. Current executor count: %d. " + + "Launching executor count: %d. Cluster resources: %s.") .format( allocatedContainers.size, - numExecutorsRunning, + numExecutorsRunning.get, + numExecutorsStarting.get, allocateResponse.getAvailableResources)) handleAllocatedContainers(allocatedContainers.asScala) @@ -281,7 +286,7 @@ private[yarn] class YarnAllocator( logDebug("Completed %d containers".format(completedContainers.size)) processCompletedContainers(completedContainers.asScala) logDebug("Finished processing %d completed containers. Current running executor count: %d." - .format(completedContainers.size, numExecutorsRunning)) + .format(completedContainers.size, numExecutorsRunning.get)) } } @@ -294,7 +299,11 @@ private[yarn] class YarnAllocator( def updateResourceRequests(): Unit = { val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size - val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + val missing = targetNumExecutors - numPendingAllocate - + numExecutorsStarting.get - numExecutorsRunning.get + logDebug(s"Updating resource requests, target: $targetNumExecutors, " + + s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " + + s"executorsStarting: ${numExecutorsStarting.get}") if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + @@ -495,7 +504,8 @@ private[yarn] class YarnAllocator( logInfo(s"Launching container $containerId on host $executorHostname") def updateInternalState(): Unit = synchronized { - numExecutorsRunning += 1 + numExecutorsRunning.incrementAndGet() + numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId @@ -505,7 +515,8 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.put(containerId, executorHostname) } - if (numExecutorsRunning < targetNumExecutors) { + if (numExecutorsRunning.get < targetNumExecutors) { + numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(new Runnable { override def run(): Unit = { @@ -525,11 +536,16 @@ private[yarn] class YarnAllocator( ).run() updateInternalState() } catch { - case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately to avoid unnecessary resource - // occupation. - amClient.releaseAssignedContainer(containerId) + case e: Throwable => + numExecutorsStarting.decrementAndGet() + if (NonFatal(e)) { + logError(s"Failed to launch executor $executorId on container $containerId", e) + // Assigned container should be released immediately + // to avoid unnecessary resource occupation. + amClient.releaseAssignedContainer(containerId) + } else { + throw e + } } } }) @@ -539,7 +555,8 @@ private[yarn] class YarnAllocator( } } else { logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " + - "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors)) + "reached target Executors count: %d.").format( + numExecutorsRunning.get, targetNumExecutors)) } } } @@ -554,7 +571,7 @@ private[yarn] class YarnAllocator( val exitReason = if (!alreadyReleased) { // Decrement the number of executors running. The next iteration of // the ApplicationMaster's reporting thread will take care of allocating. - numExecutorsRunning -= 1 + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s%s (state: %s, exit status: %s)".format( containerId, onHostStr,