Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
s"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
}
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
executorLastSeen.remove(executorId)
}
}
Expand Down
19 changes: 0 additions & 19 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
Expand Down Expand Up @@ -1361,17 +1359,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}

/**
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN
* and Mesos coarse-grained mode.
*/
private[spark] def supportDynamicAllocation: Boolean = {
(master.contains("yarn")
|| master.contains("mesos")
|| _conf.getBoolean("spark.dynamicAllocation.testing", false))
}

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
Expand All @@ -1387,8 +1374,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* This is currently only supported in YARN mode. Return whether the request is received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, thanks

*/
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors)
Expand All @@ -1405,8 +1390,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1429,8 +1412,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated comments...BTW, how to comment line without the changes?

assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ private[deploy] object DeployMessages {

case class MasterChangeAcknowledged(appId: String)

case class RequestExecutors(appId: String, requestedTotal: Int)

case class KillExecutors(appId: String, executorIds: Seq[String])

// Master to AppClient

case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
server = transportContext.createServer(port, bootstraps)
}

/** Clean up all shuffle files associated with an application that has exited. */
def applicationRemoved(appId: String): Unit = {
blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
}

def stop() {
if (server != null) {
server.close()
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ private[spark] class AppClient(
sendToMaster(UnregisterApplication(appId))
context.reply(true)
stop()

case r: RequestExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](r))
case None =>
logWarning("Attempted to request executors before registering with Master.")
context.reply(false)
}

case k: KillExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](k))
case None =>
logWarning("Attempted to kill executors before registering with Master.")
context.reply(false)
}
}

override def onDisconnected(address: RpcAddress): Unit = {
Expand Down Expand Up @@ -256,4 +272,33 @@ private[spark] class AppClient(
endpoint = null
}
}

/**
* Request executors from the Master by specifying the total number desired,
* including existing pending and running executors.
*
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(requestedTotal: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to validate the value of requestedTotal, like >= 0? though negative numbers does not bring any impact on the correctness of the program (if I understand code correctly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already done in CoarseGrainedSchedulerBackend. We don't need to duplicate that check.

if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
false
}
}

/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
def killExecutors(executorIds: Seq[String]): Boolean = {
if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
false
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.Date
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
Expand All @@ -43,6 +42,18 @@ private[spark] class ApplicationInfo(
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _

// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued
// by the application will this be set to a finite value.
@transient var executorLimit: Int = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could initialize it here to Int.MaxValue. _ is 0, and the comment is misleading, since there is no such thing as infinite values for integers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is transient, we need to reinitialize it when we deserialize it (see readObject). I wanted to avoid duplicating the value of Integer.MAX_VALUE, so I left this one blank here. Note that this value is not actually read on the driver side, so it makes sense to initialize it after deserialization. This is the same pattern used in other variables in this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see my comment below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't see the readObject method. In that case, unfortunately, I see no alternative.


// A set of workers on which this application cannot launch executors.
// This is used to handle kill requests when `spark.executor.cores` is NOT set. In this mode,
// at most one executor from this application can be run on each worker. When an executor is
// killed, its worker is added to the blacklist to avoid having the master immediately schedule
// a new executor on the worker.
@transient private var blacklistedWorkers: mutable.HashSet[String] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it doesn't need to be both a var and a mutable collection. You could (with minimal changes) make this an immutable.HashSet.

My 2c.


@transient private var nextExecutorId: Int = _

init()
Expand All @@ -60,6 +71,8 @@ private[spark] class ApplicationInfo(
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = Integer.MAX_VALUE
blacklistedWorkers = new mutable.HashSet[String]
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand Down Expand Up @@ -96,6 +109,47 @@ private[spark] class ApplicationInfo(

private[master] def coresLeft: Int = requestedCores - coresGranted

/**
* Return the number of executors waiting to be scheduled once space frees up.
*
* This is only defined if the application explicitly set the executor limit. For instance,
* if an application asks for 8 executors but there is only space for 5, then there will be
* 3 waiting executors.
*/
private[master] def numWaitingExecutors: Int = {
if (executorLimit != Integer.MAX_VALUE) {
math.max(0, executorLimit - executors.size)
} else {
0
}
}

/**
* Add a worker to the blacklist, called when the executor running on the worker is killed.
* This is used only if cores per executor is not set.
*/
private[master] def blacklistWorker(workerId: String): Unit = {
blacklistedWorkers += workerId
}

/**
* Remove workers from the blacklist, called when the application requests new executors.
* This is used only if cores per executor is not set.
*/
private[master] def removeFromBlacklist(numWorkers: Int): Unit = {
blacklistedWorkers.take(numWorkers).foreach { workerId =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply blacklistedWorkers.drop(numWorkers)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop returns a copy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, there must be a more direct way. Doing a foreach for this seems really an anti-pattern. This looks clearer:

blacklistedWorkers --= blacklistedWorkers.take(numWorkers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the blacklist altogether. Never liked it anyway

blacklistedWorkers.remove(workerId)
}
}

/**
* Return whether the specified worker is blacklisted.
* This is used only if cores per executor is not set.
*/
private[master] def isBlacklisted(workerId: String): Boolean = {
blacklistedWorkers.contains(workerId)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better off being declared as a local variable in scheduleExecutorsOnWorkers? The decision of translating coresPerExecutor.isEmpty to oneExecutorPerWorker seems to belong to the scheduler, in case we decide to change default behavior in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sg

private var _retryCount = 0

private[master] def retryCount = _retryCount
Expand Down
Loading