Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

import scala.collection.mutable
Expand All @@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
Expand Down Expand Up @@ -80,7 +80,9 @@ private[yarn] class YarnAllocator(
private val releasedContainers = Collections.newSetFromMap[ContainerId](
new ConcurrentHashMap[ContainerId, java.lang.Boolean])

@volatile private var numExecutorsRunning = 0
private val numExecutorsRunning = new AtomicInteger(0)

private val numExecutorsStarting = new AtomicInteger(0)

/**
* Used to generate a unique ID per executor
Expand Down Expand Up @@ -163,7 +165,7 @@ private[yarn] class YarnAllocator(
clock = newClock
}

def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsRunning: Int = numExecutorsRunning.get()

def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
Expand Down Expand Up @@ -242,7 +244,7 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
Copy link
Contributor

@vanzin vanzin Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be an atomic integer because this method is synchronized already - and so is the other method where it's modified.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just try to keep consistency with numExecutorsStarting

numExecutorsRunning.decrementAndGet()
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
Expand All @@ -267,10 +269,12 @@ private[yarn] class YarnAllocator(
val allocatedContainers = allocateResponse.getAllocatedContainers()

if (allocatedContainers.size > 0) {
logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning,
numExecutorsRunning.get,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))

handleAllocatedContainers(allocatedContainers.asScala)
Expand All @@ -281,7 +285,7 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
.format(completedContainers.size, numExecutorsRunning.get))
}
}

Expand All @@ -294,7 +298,11 @@ private[yarn] class YarnAllocator(
def updateResourceRequests(): Unit = {
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
s"executorsStarting: ${numExecutorsStarting.get}")

if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
Expand Down Expand Up @@ -493,7 +501,8 @@ private[yarn] class YarnAllocator(
s"for executor with ID $executorId")

def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
numExecutorsRunning.incrementAndGet()
numExecutorsStarting.decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId

Expand All @@ -503,7 +512,8 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}

if (numExecutorsRunning < targetNumExecutors) {
if (numExecutorsRunning.get < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
Expand All @@ -523,11 +533,16 @@ private[yarn] class YarnAllocator(
).run()
updateInternalState()
} catch {
case NonFatal(e) =>
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately to avoid unnecessary resource
// occupation.
amClient.releaseAssignedContainer(containerId)
case e: Throwable =>
numExecutorsStarting.decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately
// to avoid unnecessary resource occupation.
amClient.releaseAssignedContainer(containerId)
} else {
throw e
}
}
}
})
Expand All @@ -537,7 +552,8 @@ private[yarn] class YarnAllocator(
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
"reached target Executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
}
}
}
Expand All @@ -552,7 +568,7 @@ private[yarn] class YarnAllocator(
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
Expand Down