Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.internal.config.{CPUS_PER_TASK, DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
Expand Down Expand Up @@ -114,8 +114,8 @@ private[spark] class ExecutorAllocationManager(
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
private val tasksPerExecutor = Utils.getTasksPerExecutor(
conf.getDouble("spark.executor.cores", 1), conf.get(CPUS_PER_TASK))

validateSettings()

Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet

import com.google.common.math.DoubleMath
import org.apache.avro.{Schema, SchemaNormalization}

import org.apache.spark.deploy.history.config._
Expand Down Expand Up @@ -552,12 +553,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria

if (contains("spark.cores.max") && contains("spark.executor.cores")) {
val totalCores = getInt("spark.cores.max", 1)
val executorCores = getInt("spark.executor.cores", 1)
val leftCores = totalCores % executorCores
if (leftCores != 0) {
logWarning(s"Total executor cores: ${totalCores} is not " +
s"divisible by cores per executor: ${executorCores}, " +
s"the left cores: ${leftCores} will not be allocated")
val executorCores = getDouble("spark.executor.cores", 1d)
if (DoubleMath.isMathematicalInteger(totalCores / executorCores)) {
logWarning(s"Total executor cores: $totalCores is not " +
s"divisible by cores per executor: $executorCores")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
}
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
if (executorCores != null && Try(executorCores.toDouble).getOrElse(-1d) <= 0d) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think "0.0" is more readable than "0d", don't bother changing it here. Just an aside

SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
}
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ package object config {
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)

private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").doubleConf
.createWithDefault(1d)

private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[spark] class TaskSchedulerImpl(
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)

// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
Expand Down Expand Up @@ -344,7 +344,9 @@ private[spark] class TaskSchedulerImpl(

val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val tasks = shuffledOffers.map { o =>
new ArrayBuffer[TaskDescription](Utils.getTasksPerExecutor(o.cores, CPUS_PER_TASK))
}
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
executorInfo.freeCores += scheduler.CPUS_PER_TASK.toInt
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
Expand Down Expand Up @@ -302,7 +302,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.freeCores -= scheduler.CPUS_PER_TASK.toInt

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[spark] class LocalEndpoint(
case StatusUpdate(taskId, state, serializedData) =>
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += scheduler.CPUS_PER_TASK
freeCores += scheduler.CPUS_PER_TASK.toInt
reviveOffers()
}

Expand All @@ -83,7 +83,7 @@ private[spark] class LocalEndpoint(
def reviveOffers() {
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
freeCores -= scheduler.CPUS_PER_TASK.toInt
executor.launchTask(executorBackend, task)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CPUS_PER_TASK
import org.apache.spark.scheduler._
import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.scope._
import org.apache.spark.util.Utils

/**
* A Spark listener that writes application information to a data store. The types written to the
Expand All @@ -51,7 +53,7 @@ private[spark] class AppStatusListener(
private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1
private var coresPerTask: Double = 1d

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
Expand Down Expand Up @@ -147,7 +149,7 @@ private[spark] class AppStatusListener(
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))

coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt)
coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toDouble)
.getOrElse(coresPerTask)

kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
Expand Down Expand Up @@ -183,7 +185,7 @@ private[spark] class AppStatusListener(
exec.host = event.executorInfo.executorHost
exec.isActive = true
exec.totalCores = event.executorInfo.totalCores
exec.maxTasks = event.executorInfo.totalCores / coresPerTask
exec.maxTasks = Utils.getTasksPerExecutor(event.executorInfo.totalCores, coresPerTask)
exec.executorLogs = event.executorInfo.logUrlMap
liveUpdate(exec, System.nanoTime())
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import scala.util.matching.Regex
import _root_.io.netty.channel.unix.Errors.NativeIoException
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.math.DoubleMath
import com.google.common.net.InetAddresses
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -2805,6 +2806,14 @@ private[spark] object Utils extends Logging {

s"k8s://$resolvedURL"
}

/**
* Get the number of tasks an executor can take given the number of CPU cores allocated to the
* executor and the number of CPU cores per task.
*/
def getTasksPerExecutor(cores: Double, taskCpus: Double): Int = {
DoubleMath.roundToInt(cores / taskCpus, RoundingMode.FLOOR)
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

test("Scheduler correctly accounts for multiple CPUs per task") {
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
Expand Down Expand Up @@ -180,7 +180,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

test("Scheduler does not crash when tasks are not serializable") {
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val numFreeCores = 1
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
Utils.checkAndGetK8sMasterUrl("k8s://foo://host:port")
}
}

test("get tasks per executor") {
assert(Utils.getTasksPerExecutor(1d, 1d) == 1)
assert(Utils.getTasksPerExecutor(2d, 0.5d) == 4)
assert(Utils.getTasksPerExecutor(0.1d, 0.05d) == 2)
assert(Utils.getTasksPerExecutor(0.5d, 0.6d) == 0)
}
}

private class SimpleExtension
Expand Down
7 changes: 4 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1220,14 +1220,15 @@ Apart from these, the following properties are also available, and may be useful
<tr>
<td><code>spark.executor.cores</code></td>
<td>
1 in YARN mode, all the available cores on the worker in
1 in YARN and Kubernetes modes, all the available cores on the worker in
standalone and Mesos coarse-grained modes.
</td>
<td>
The number of cores to use on each executor.

In standalone and Mesos coarse-grained modes, for more detail, see
<a href="spark-standalone.html#Executors Scheduling">this description</a>.
<a href="spark-standalone.html#Executors Scheduling">this description</a>. In Kubernetes mode,
a fractional value can be used, e.g., 0.1 (100 millicpus).
</td>
</tr>
<tr>
Expand Down Expand Up @@ -1669,7 +1670,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.task.cpus</code></td>
<td>1</td>
<td>
Number of cores to allocate for each task.
Number of cores to allocate for each task. Can have fractional values, e.g. 0.1 (100 millicpus), in Kubernetes mode.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
*/
private def numExecutorsPending(numTasksPending: Int): Int = {
val coresPerExecutor = resource.getVirtualCores
(numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
(numTasksPending * sparkConf.get(CPUS_PER_TASK).toInt + coresPerExecutor - 1) / coresPerExecutor
}

/**
Expand Down