Skip to content
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

import scala.collection.mutable
Expand All @@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
Expand Down Expand Up @@ -80,7 +80,9 @@ private[yarn] class YarnAllocator(
private val releasedContainers = Collections.newSetFromMap[ContainerId](
new ConcurrentHashMap[ContainerId, java.lang.Boolean])

@volatile private var numExecutorsRunning = 0
private val numExecutorsRunning = new AtomicInteger(0)

private val numExecutorsStarting = new AtomicInteger(0)

/**
* Used to generate a unique ID per executor
Expand Down Expand Up @@ -163,7 +165,7 @@ private[yarn] class YarnAllocator(
clock = newClock
}

def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsRunning: Int = numExecutorsRunning.get()

def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
Expand Down Expand Up @@ -242,7 +244,7 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
Copy link
Contributor

@vanzin vanzin Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be an atomic integer because this method is synchronized already - and so is the other method where it's modified.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just try to keep consistency with numExecutorsStarting

numExecutorsRunning.decrementAndGet()
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
Expand Down Expand Up @@ -270,7 +272,7 @@ private[yarn] class YarnAllocator(
logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
.format(
allocatedContainers.size,
numExecutorsRunning,
numExecutorsRunning.get,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it woudl be nice to print the numExecutorsStarting here as well.

allocateResponse.getAvailableResources))

handleAllocatedContainers(allocatedContainers.asScala)
Expand All @@ -281,7 +283,7 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
.format(completedContainers.size, numExecutorsRunning.get))
}
}

Expand All @@ -294,7 +296,8 @@ private[yarn] class YarnAllocator(
def updateResourceRequests(): Unit = {
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add in a debug message here, something like below. I found this very useful when debugging this issue and think it would be useful for debugging other allocating issues in the future.

logDebug(s"Updating resource requests, target: $targetNumExecutors, pending: " +

  •  s"$numPendingAllocate, running: $numExecutorsRunning, executorsStarting $numExecutorsPendingStart")
    

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice! I just add the debug info.

if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
Expand Down Expand Up @@ -493,7 +496,8 @@ private[yarn] class YarnAllocator(
s"for executor with ID $executorId")

def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
numExecutorsRunning.incrementAndGet()
numExecutorsStarting.decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId

Expand All @@ -503,7 +507,8 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}

if (numExecutorsRunning < targetNumExecutors) {
if (numExecutorsRunning.get < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
Expand All @@ -525,8 +530,9 @@ private[yarn] class YarnAllocator(
} catch {
case NonFatal(e) =>
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately to avoid unnecessary resource
// occupation.
// Assigned container should be released immediately
// to avoid unnecessary resource occupation.
numExecutorsStarting.decrementAndGet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safer to put this in a finally, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is more robust. I have update the code.

amClient.releaseAssignedContainer(containerId)
}
}
Expand All @@ -537,7 +543,8 @@ private[yarn] class YarnAllocator(
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
"reached target Executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
}
}
}
Expand All @@ -552,7 +559,7 @@ private[yarn] class YarnAllocator(
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
Expand Down