Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1983,13 +1983,12 @@
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<configuration>
<attach>false</attach>
<attach>true</attach>
</configuration>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
<goal>test-jar-no-fork</goal>
</goals>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orNull
// If dynamic allocation is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)

// If defined, initial executors must be between min and max
if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}

numExecutors = initialNumExecutors
} else {
val numExecutorsConf = "spark.executor.instances"
numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors)
}
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
principal = Option(principal)
.orElse(sparkConf.getOption("spark.yarn.principal"))
.orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ private[yarn] class YarnAllocator(
@volatile private var numExecutorsFailed = 0

@volatile private var targetNumExecutors =
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
} else {
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
}
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)

// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,5 +278,28 @@ object YarnSparkHadoopUtil {
def getClassPathSeparator(): String = {
classPathSeparatorField.get(null).asInstanceOf[String]
}

/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
*/
def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
val initialNumExecutors =
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number" +
s"$minNumExecutors and max executor number $maxNumExecutors")

initialNumExecutors
} else {
val targetNumExecutors =
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
// System property can override environment variable.
conf.getInt("spark.executor.instances", targetNumExecutors)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,13 @@

package org.apache.spark.scheduler.cluster

import java.net.NetworkInterface

import org.apache.hadoop.yarn.api.ApplicationConstants.Environment

import scala.collection.JavaConverters._

import org.apache.hadoop.yarn.api.records.NodeState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.{IntParam, Utils}
import org.apache.spark.util.Utils

private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
Expand All @@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend(

override def start() {
super.start()
totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
.getOrElse(totalExpectedExecutors)
}
// System property can override environment variable.
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
}

override def applicationId(): String =
Expand Down