From 818c9126959e8576861478e18389e6ed8fdbeac4 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Mon, 17 Jul 2017 15:54:09 +0800 Subject: [PATCH 1/7] [Core] Fix the YarnAllocator allocate more Resource When NodeManagers launched the Executors, the missing will excel the real value, this can lead to YARN allocate more resource. --- .../spark/deploy/yarn/YarnAllocator.scala | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ed77a6e4a1c7..35f27b82bd42 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -82,6 +82,8 @@ private[yarn] class YarnAllocator( @volatile private var numExecutorsRunning = 0 + @volatile private var numExecutorToBeLaunched = 0 + /** * Used to generate a unique ID per executor * @@ -294,7 +296,8 @@ private[yarn] class YarnAllocator( def updateResourceRequests(): Unit = { val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size - val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + val missing = targetNumExecutors - numPendingAllocate - + numExecutorToBeLaunched - numExecutorsRunning if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + @@ -505,32 +508,37 @@ private[yarn] class YarnAllocator( if (numExecutorsRunning < targetNumExecutors) { if (launchContainers) { - launcherPool.execute(new Runnable { - override def run(): Unit = { - try { - new ExecutorRunnable( - Some(container), - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr, - localResources - ).run() - updateInternalState() - } catch { - case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately to avoid unnecessary resource - // occupation. - amClient.releaseAssignedContainer(containerId) + try { + numExecutorToBeLaunched += 1 + launcherPool.execute(new Runnable { + override def run(): Unit = { + try { + new ExecutorRunnable( + Some(container), + conf, + sparkConf, + driverUrl, + executorId, + executorHostname, + executorMemory, + executorCores, + appAttemptId.getApplicationId.toString, + securityMgr, + localResources + ).run() + updateInternalState() + } catch { + case NonFatal(e) => + logError(s"Failed to launch executor $executorId on container $containerId", e) + // Assigned container should be released immediately + // to avoid unnecessary resource occupation. + amClient.releaseAssignedContainer(containerId) + } } - } - }) + }) + } finally { + numExecutorToBeLaunched -= 1 + } } else { // For test only updateInternalState() From aba617e195206cc8cb03a0d192c5b1862ed0caa1 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Mon, 17 Jul 2017 15:54:09 +0800 Subject: [PATCH 2/7] [Core] Fix the YarnAllocator allocate more Resource When NodeManagers launched the Executors, the missing will excel the real value, this can lead to YARN allocate more resource. --- .../spark/deploy/yarn/YarnAllocator.scala | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ed77a6e4a1c7..35f27b82bd42 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -82,6 +82,8 @@ private[yarn] class YarnAllocator( @volatile private var numExecutorsRunning = 0 + @volatile private var numExecutorToBeLaunched = 0 + /** * Used to generate a unique ID per executor * @@ -294,7 +296,8 @@ private[yarn] class YarnAllocator( def updateResourceRequests(): Unit = { val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size - val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + val missing = targetNumExecutors - numPendingAllocate - + numExecutorToBeLaunched - numExecutorsRunning if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + @@ -505,32 +508,37 @@ private[yarn] class YarnAllocator( if (numExecutorsRunning < targetNumExecutors) { if (launchContainers) { - launcherPool.execute(new Runnable { - override def run(): Unit = { - try { - new ExecutorRunnable( - Some(container), - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr, - localResources - ).run() - updateInternalState() - } catch { - case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately to avoid unnecessary resource - // occupation. - amClient.releaseAssignedContainer(containerId) + try { + numExecutorToBeLaunched += 1 + launcherPool.execute(new Runnable { + override def run(): Unit = { + try { + new ExecutorRunnable( + Some(container), + conf, + sparkConf, + driverUrl, + executorId, + executorHostname, + executorMemory, + executorCores, + appAttemptId.getApplicationId.toString, + securityMgr, + localResources + ).run() + updateInternalState() + } catch { + case NonFatal(e) => + logError(s"Failed to launch executor $executorId on container $containerId", e) + // Assigned container should be released immediately + // to avoid unnecessary resource occupation. + amClient.releaseAssignedContainer(containerId) + } } - } - }) + }) + } finally { + numExecutorToBeLaunched -= 1 + } } else { // For test only updateInternalState() From 0980a6a7ac58b5b92f64f6ab1505ded578320be3 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Wed, 19 Jul 2017 12:57:38 +0800 Subject: [PATCH 3/7] Fix the concurrent problem --- .../spark/deploy/yarn/YarnAllocator.scala | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 35f27b82bd42..3115cb1168a4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.util.Collections import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern import scala.collection.mutable @@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.log4j.{Level, Logger} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ @@ -80,9 +80,9 @@ private[yarn] class YarnAllocator( private val releasedContainers = Collections.newSetFromMap[ContainerId]( new ConcurrentHashMap[ContainerId, java.lang.Boolean]) - @volatile private var numExecutorsRunning = 0 + private val numExecutorsRunning = new AtomicInteger(0) - @volatile private var numExecutorToBeLaunched = 0 + private val numExecutorsStarting = new AtomicInteger(0) /** * Used to generate a unique ID per executor @@ -165,7 +165,7 @@ private[yarn] class YarnAllocator( clock = newClock } - def getNumExecutorsRunning: Int = numExecutorsRunning + def getNumExecutorsRunning: Int = numExecutorsRunning.get() def getNumExecutorsFailed: Int = synchronized { val endTime = clock.getTimeMillis() @@ -244,7 +244,7 @@ private[yarn] class YarnAllocator( if (executorIdToContainer.contains(executorId)) { val container = executorIdToContainer.get(executorId).get internalReleaseContainer(container) - numExecutorsRunning -= 1 + numExecutorsRunning.decrementAndGet() } else { logWarning(s"Attempted to kill unknown executor $executorId!") } @@ -272,7 +272,7 @@ private[yarn] class YarnAllocator( logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s." .format( allocatedContainers.size, - numExecutorsRunning, + numExecutorsRunning.get(), allocateResponse.getAvailableResources)) handleAllocatedContainers(allocatedContainers.asScala) @@ -283,7 +283,7 @@ private[yarn] class YarnAllocator( logDebug("Completed %d containers".format(completedContainers.size)) processCompletedContainers(completedContainers.asScala) logDebug("Finished processing %d completed containers. Current running executor count: %d." - .format(completedContainers.size, numExecutorsRunning)) + .format(completedContainers.size, numExecutorsRunning.get())) } } @@ -297,7 +297,7 @@ private[yarn] class YarnAllocator( val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size val missing = targetNumExecutors - numPendingAllocate - - numExecutorToBeLaunched - numExecutorsRunning + numExecutorsStarting.get - numExecutorsRunning.get() if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + @@ -496,7 +496,8 @@ private[yarn] class YarnAllocator( s"for executor with ID $executorId") def updateInternalState(): Unit = synchronized { - numExecutorsRunning += 1 + numExecutorsRunning.incrementAndGet() + numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId @@ -506,10 +507,9 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.put(containerId, executorHostname) } - if (numExecutorsRunning < targetNumExecutors) { + if (numExecutorsRunning.get() < targetNumExecutors) { + numExecutorsStarting.incrementAndGet() if (launchContainers) { - try { - numExecutorToBeLaunched += 1 launcherPool.execute(new Runnable { override def run(): Unit = { try { @@ -532,20 +532,18 @@ private[yarn] class YarnAllocator( logError(s"Failed to launch executor $executorId on container $containerId", e) // Assigned container should be released immediately // to avoid unnecessary resource occupation. + numExecutorsStarting.decrementAndGet() amClient.releaseAssignedContainer(containerId) } } }) - } finally { - numExecutorToBeLaunched -= 1 - } } else { // For test only updateInternalState() } } else { logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " + - "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors)) + "reached target Executors count: %d.").format(numExecutorsRunning.get(), targetNumExecutors)) } } } @@ -560,7 +558,7 @@ private[yarn] class YarnAllocator( val exitReason = if (!alreadyReleased) { // Decrement the number of executors running. The next iteration of // the ApplicationMaster's reporting thread will take care of allocating. - numExecutorsRunning -= 1 + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s%s (state: %s, exit status: %s)".format( containerId, onHostStr, From bf7f78c2ed43d7fc858a03e314c221fb834c020d Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Wed, 19 Jul 2017 15:42:22 +0800 Subject: [PATCH 4/7] Remove unused code --- .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index fc7c0ea1e2ec..8d6ad3414c9e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -84,8 +84,6 @@ private[yarn] class YarnAllocator( private val numExecutorsStarting = new AtomicInteger(0) - @volatile private var numExecutorToBeLaunched = 0 - /** * Used to generate a unique ID per executor * From cc73e5bd774d9de2d27781afbf77e90413438711 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Wed, 19 Jul 2017 23:55:13 +0800 Subject: [PATCH 5/7] Add the debug log --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 8d6ad3414c9e..68a5f14dbeb8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -269,10 +269,12 @@ private[yarn] class YarnAllocator( val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { - logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s." + logDebug(("Allocated containers: %d. Current executor count: %d. " + + "Launching executor count: %d. Cluster resources: %s.") .format( allocatedContainers.size, numExecutorsRunning.get, + numExecutorsStarting.get, allocateResponse.getAvailableResources)) handleAllocatedContainers(allocatedContainers.asScala) @@ -298,6 +300,9 @@ private[yarn] class YarnAllocator( val numPendingAllocate = pendingAllocate.size val missing = targetNumExecutors - numPendingAllocate - numExecutorsStarting.get - numExecutorsRunning.get + logDebug(s"Updating resource requests, target: $targetNumExecutors, " + + s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " + + s"executorsStarting: ${numExecutorsStarting.get}") if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + From 582f4241e2592e63f58e9333b1e98cde6bc4d6f8 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Thu, 20 Jul 2017 14:30:42 +0800 Subject: [PATCH 6/7] make more robust --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 68a5f14dbeb8..2e848d909020 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -502,7 +502,6 @@ private[yarn] class YarnAllocator( def updateInternalState(): Unit = synchronized { numExecutorsRunning.incrementAndGet() - numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId @@ -513,8 +512,8 @@ private[yarn] class YarnAllocator( } if (numExecutorsRunning.get < targetNumExecutors) { - numExecutorsStarting.incrementAndGet() if (launchContainers) { + numExecutorsStarting.incrementAndGet() launcherPool.execute(new Runnable { override def run(): Unit = { try { @@ -537,8 +536,9 @@ private[yarn] class YarnAllocator( logError(s"Failed to launch executor $executorId on container $containerId", e) // Assigned container should be released immediately // to avoid unnecessary resource occupation. - numExecutorsStarting.decrementAndGet() amClient.releaseAssignedContainer(containerId) + } finally { + numExecutorsStarting.decrementAndGet() } } }) From 77034944610c5973325bb3fd71ac9f153f59d32b Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Tue, 25 Jul 2017 17:43:08 +0800 Subject: [PATCH 7/7] Fix the double count bug --- .../spark/deploy/yarn/YarnAllocator.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 2e848d909020..cc571c330f8d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -502,6 +502,7 @@ private[yarn] class YarnAllocator( def updateInternalState(): Unit = synchronized { numExecutorsRunning.incrementAndGet() + numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId @@ -512,8 +513,8 @@ private[yarn] class YarnAllocator( } if (numExecutorsRunning.get < targetNumExecutors) { + numExecutorsStarting.incrementAndGet() if (launchContainers) { - numExecutorsStarting.incrementAndGet() launcherPool.execute(new Runnable { override def run(): Unit = { try { @@ -532,13 +533,16 @@ private[yarn] class YarnAllocator( ).run() updateInternalState() } catch { - case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately - // to avoid unnecessary resource occupation. - amClient.releaseAssignedContainer(containerId) - } finally { - numExecutorsStarting.decrementAndGet() + case e: Throwable => + numExecutorsStarting.decrementAndGet() + if (NonFatal(e)) { + logError(s"Failed to launch executor $executorId on container $containerId", e) + // Assigned container should be released immediately + // to avoid unnecessary resource occupation. + amClient.releaseAssignedContainer(containerId) + } else { + throw e + } } } })