From 28f14915a5db8e022146ae17540a4a66c24a2d5b Mon Sep 17 00:00:00 2001 From: AtkinsChang Date: Tue, 9 Feb 2016 10:37:20 +0800 Subject: [PATCH] [SPARK-13232][YARN] Fix executor node label --- .../spark/deploy/yarn/YarnAllocator.scala | 8 ++++++- .../deploy/yarn/YarnAllocatorSuite.scala | 23 ++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 11426eb07c7e..bd0c7803b828 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -307,8 +307,14 @@ private[yarn] class YarnAllocator( nodes: Array[String], racks: Array[String]): ContainerRequest = { nodeLabelConstructor.map { constructor => + val labelExp = if ((racks != null && (!racks.isEmpty)) + || (nodes != null && (!nodes.isEmpty))) { + null + } else { + labelExpression.orNull + } constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean, - labelExpression.orNull) + labelExp) }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY)) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 1dd2f93bb708..3fb736190da0 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -87,7 +87,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter override def equals(other: Any): Boolean = false } - def createAllocator(maxExecutors: Int = 5): YarnAllocator = { + def createAllocator( + maxExecutors: Int = 5, + executorNodeLabel: Option[String] = None): YarnAllocator = { val args = Array( "--executor-cores", "5", "--executor-memory", "2048", @@ -95,6 +97,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter "--class", "SomeClass") val sparkConfClone = sparkConf.clone() sparkConfClone.set("spark.executor.instances", maxExecutors.toString) + executorNodeLabel.foreach(sparkConfClone.set("spark.yarn.executor.nodeLabelExpression", _)) new YarnAllocator( "not used", mock(classOf[RpcEndpointRef]), @@ -272,4 +275,22 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) } + + test("request executors with locality") { + val handler = createAllocator(1, Some("label")) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getPendingAllocate.size should be (1) + + handler.requestTotalExecutorsWithPreferredLocalities(3, 20, Map(("host1", 10), ("host2", 20))) + handler.updateResourceRequests() + handler.getPendingAllocate.size should be (3) + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + + handler.getNumExecutorsRunning should be (1) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) + } }