Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
733 changes: 232 additions & 501 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
preferredNodeLocations, securityMgr)
new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -99,13 +99,7 @@ object YarnSparkHadoopUtil {

// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = 1

// Host to rack map - saved from allocation requests. We are expecting this not to change.
// Note that it is possible for this to change : and ResourceManager will indicate that to us via
// update response to allocate. But we are punting on handling that for now.
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
val RM_REQUEST_PRIORITY = Priority.newInstance(1)

/**
* Add a path variable to the given environment map.
Expand Down Expand Up @@ -184,37 +178,6 @@ object YarnSparkHadoopUtil {
}
}

def lookupRack(conf: Configuration, host: String): String = {
if (!hostToRack.contains(host)) {
populateRackInfo(conf, host)
}
hostToRack.get(host)
}

def populateRackInfo(conf: Configuration, hostname: String) {
Utils.checkHost(hostname)

if (!hostToRack.containsKey(hostname)) {
// If there are repeated failures to resolve, all to an ignore list.
val rackInfo = RackResolver.resolve(conf, hostname)
if (rackInfo != null && rackInfo.getNetworkLocation != null) {
val rack = rackInfo.getNetworkLocation
hostToRack.put(hostname, rack)
if (! rackToHostSet.containsKey(rack)) {
rackToHostSet.putIfAbsent(rack,
Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
}
rackToHostSet.get(rack).add(hostname)

// TODO(harvey): Figure out what this comment means...
// Since RackResolver caches, we are disabling this for now ...
} /* else {
// right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
hostToRack.put(hostname, null)
} */
}
}

def getApplicationAclsForYarn(securityMgr: SecurityManager)
: Map[ApplicationAccessType, String] = {
Map[ApplicationAccessType, String] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.scheduler.cluster

import org.apache.hadoop.yarn.util.RackResolver

import org.apache.spark._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils

Expand All @@ -30,6 +31,6 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSc
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.scheduler.cluster

import org.apache.hadoop.yarn.util.RackResolver

import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.deploy.yarn.ApplicationMaster
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils

Expand All @@ -39,7 +41,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedule
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}

override def postStartHook() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,160 @@

package org.apache.spark.deploy.yarn

import java.util.{Arrays, List => JList}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest

import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.scalatest.FunSuite
import org.apache.spark.scheduler.SplitInfo

import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}

class MockResolver extends DNSToSwitchMapping {

override def resolve(names: JList[String]): JList[String] = {
if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
else Arrays.asList("/rack1")
}

override def reloadCachedMappings() {}

def reloadCachedMappings(names: JList[String]) {}
}

class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach {
val conf = new Configuration()
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
classOf[MockResolver], classOf[DNSToSwitchMapping])

val sparkConf = new SparkConf()
sparkConf.set("spark.driver.host", "localhost")
sparkConf.set("spark.driver.port", "4040")
sparkConf.set("spark.yarn.jar", "notarealjar.jar")
sparkConf.set("spark.yarn.launchContainers", "false")

val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)

// Resource returned by YARN. YARN can give larger containers than requested, so give 6 cores
// instead of the 5 requested and 3 GB instead of the 2 requested.
val containerResource = Resource.newInstance(3072, 6)

var rmClient: AMRMClient[ContainerRequest] = _

var containerNum = 0

override def beforeEach() {
rmClient = AMRMClient.createAMRMClient()
rmClient.init(conf)
rmClient.start()
}

override def afterEach() {
rmClient.stop()
}

class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
override def equals(other: Any) = false
}

def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
"--num-executors", s"$maxExecutors",
"--executor-cores", "5",
"--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
new YarnAllocator(
conf,
sparkConf,
rmClient,
appAttemptId,
new ApplicationMasterArguments(args),
new SecurityManager(sparkConf))
}

def createContainer(host: String): Container = {
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
}

test("single container allocated") {
// request a single container and receive it
val handler = createAllocator()
handler.addResourceRequests(1)
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (1)

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))

handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size should be (0)
}

test("some containers allocated") {
// request a few containers and receive some of them
val handler = createAllocator()
handler.addResourceRequests(4)
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)

val container1 = createContainer("host1")
val container2 = createContainer("host1")
val container3 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2, container3))

handler.getNumExecutorsRunning should be (3)
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
}

test("receive more containers than requested") {
val handler = createAllocator(2)
handler.addResourceRequests(2)
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (2)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
val container3 = createContainer("host4")
handler.handleAllocatedContainers(Array(container1, container2, container3))

handler.getNumExecutorsRunning should be (2)
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
handler.allocatedHostToContainersMap.contains("host4") should be (false)
}

class YarnAllocatorSuite extends FunSuite {
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
"5.8 GB of 4.2 GB virtual memory used. Killing container."
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
"5.8 GB of 4.2 GB virtual memory used. Killing container."
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}

}