From 81a00e8100870531b3f9942107912d6af24cc37b Mon Sep 17 00:00:00 2001 From: quanfuw Date: Tue, 18 Oct 2016 17:17:11 +0800 Subject: [PATCH 1/2] add numa aware support --- .../apache/spark/deploy/yarn/ExecutorRunnable.scala | 9 +++++++-- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 10 +++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 8e0533f39ae5..89c893948bb2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -54,7 +54,8 @@ private[yarn] class ExecutorRunnable( executorCores: Int, appId: String, securityMgr: SecurityManager, - localResources: Map[String, LocalResource]) extends Logging { + localResources: Map[String, LocalResource] + allocationMeter: Int) extends Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ @@ -207,8 +208,12 @@ private[yarn] class ExecutorRunnable( }.toSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) + + //sparkConf.get(NUMA_NODE) + val numaNode = allocationMeter%2 + val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", + s" numactl --cpunodebind=$node --preferred=$numaNode " + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0b66d1cf08ea..cd9906a7a21a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -130,6 +130,9 @@ private[yarn] class YarnAllocator( private var numUnexpectedContainerRelease = 0L private val containerIdToExecutorId = new HashMap[ContainerId, String] + // Keep track already allocated executor per host + val hostToAllocationMeterMap = new HashMap[String, Int] + // Executor memory in MB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Additional memory overhead. @@ -507,6 +510,10 @@ private[yarn] class YarnAllocator( if (numExecutorsRunning < targetNumExecutors) { if (launchContainers) { + + val allocationMeter = hostToAllocationMeterMap.getOrElseUpdate(executorHostname, 0) + hostToAllocationMeterMap.put(executorHostname, allocationMeter + 1) + launcherPool.execute(new Runnable { override def run(): Unit = { try { @@ -521,7 +528,8 @@ private[yarn] class YarnAllocator( executorCores, appAttemptId.getApplicationId.toString, securityMgr, - localResources + localResources, + allocationMeter ).run() updateInternalState() } catch { From 76c47af7dfbd77cc81f88355e8ceb9832fed2328 Mon Sep 17 00:00:00 2001 From: quanfuw Date: Tue, 18 Oct 2016 17:43:52 +0800 Subject: [PATCH 2/2] add numa aware support --- .../scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 89c893948bb2..7be3ed4a6e65 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -213,7 +213,7 @@ private[yarn] class ExecutorRunnable( val numaNode = allocationMeter%2 val commands = prefixEnv ++ Seq( - s" numactl --cpunodebind=$node --preferred=$numaNode " + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", + s" numactl --cpunodebind=$numaNode --preferred=$numaNode " + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",