Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
s"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
}
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
executorLastSeen.remove(executorId)
}
}
Expand Down
27 changes: 4 additions & 23 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
Expand Down Expand Up @@ -1361,17 +1359,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}

/**
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN
* and Mesos coarse-grained mode.
*/
private[spark] def supportDynamicAllocation: Boolean = {
(master.contains("yarn")
|| master.contains("mesos")
|| _conf.getBoolean("spark.dynamicAllocation.testing", false))
}

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
Expand Down Expand Up @@ -1400,8 +1387,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
Expand All @@ -1414,12 +1399,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1438,12 +1421,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* through this method with new ones, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated comments...BTW, how to comment line without the changes?

assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand All @@ -1462,7 +1443,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* through this method with a new one, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
Expand All @@ -1479,7 +1460,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* can steal the window of opportunity and acquire this application's resources in the
* mean time.
*
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ private[deploy] object DeployMessages {

case class MasterChangeAcknowledged(appId: String)

case class RequestExecutors(appId: String, requestedTotal: Int)

case class KillExecutors(appId: String, executorIds: Seq[String])

// Master to AppClient

case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
server = transportContext.createServer(port, bootstraps)
}

/** Clean up all shuffle files associated with an application that has exited. */
def applicationRemoved(appId: String): Unit = {
blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
}

def stop() {
if (server != null) {
server.close()
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ private[spark] class AppClient(
sendToMaster(UnregisterApplication(appId))
context.reply(true)
stop()

case r: RequestExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](r))
case None =>
logWarning("Attempted to request executors before registering with Master.")
context.reply(false)
}

case k: KillExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](k))
case None =>
logWarning("Attempted to kill executors before registering with Master.")
context.reply(false)
}
}

override def onDisconnected(address: RpcAddress): Unit = {
Expand Down Expand Up @@ -257,4 +273,33 @@ private[spark] class AppClient(
endpoint = null
}
}

/**
* Request executors from the Master by specifying the total number desired,
* including existing pending and running executors.
*
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(requestedTotal: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to validate the value of requestedTotal, like >= 0? though negative numbers does not bring any impact on the correctness of the program (if I understand code correctly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already done in CoarseGrainedSchedulerBackend. We don't need to duplicate that check.

if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
false
}
}

/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
def killExecutors(executorIds: Seq[String]): Boolean = {
if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
false
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.Date
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
Expand All @@ -43,6 +42,11 @@ private[spark] class ApplicationInfo(
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _

// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
// application will this be set to a finite value. This is used for dynamic allocation.
@transient private[master] var executorLimit: Int = _

@transient private var nextExecutorId: Int = _

init()
Expand All @@ -60,6 +64,7 @@ private[spark] class ApplicationInfo(
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = Integer.MAX_VALUE
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand Down Expand Up @@ -116,6 +121,12 @@ private[spark] class ApplicationInfo(
state != ApplicationState.WAITING && state != ApplicationState.RUNNING
}

/**
* Return the limit on the number of executors this application can have.
* For testing only.
*/
private[deploy] def getExecutorLimit: Int = executorLimit

def duration: Long = {
if (endTime != -1) {
endTime - startTime
Expand Down
Loading