Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Changes to support executor recovery behavior during static allocation. #244

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a8831b7
Changes to support executor recovery behavior during static allocation.
varunkatta Apr 26, 2017
c4b949f
addressed review comments
varunkatta May 4, 2017
d87d393
Addressed latest comments and merged upstream changes
varunkatta May 5, 2017
4dcb1b3
Style changes and removed inocrrectly merged code
varunkatta May 5, 2017
5a064ba
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
foxish May 8, 2017
fbe4b18
Merged with head on 2.1 kubernetes branch
varunkatta May 11, 2017
4d60c3d
Merge branch 'executor-recovery-static-allocation' of github.com:varu…
varunkatta May 11, 2017
01e8ec7
addressed latest review comments
varunkatta May 12, 2017
5e1a143
changed import order
varunkatta May 15, 2017
1a579ce
merged with 2.1 branch head
varunkatta Jun 14, 2017
608b08b
Minor changes to avoid exceptions when exit code is missing
varunkatta Jun 21, 2017
5cbea23
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
varunkatta Jun 21, 2017
99b338d
fixed style check
varunkatta Jun 21, 2017
57bb38b
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
varunkatta Jul 12, 2017
2bc0ff4
Addressed review comments from Yinan LiAddressed review comments from
varunkatta Jul 18, 2017
e6bb8c2
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
varunkatta Jul 18, 2017
b5bd8d1
Addressed comments and got rid of an explicit lock object.
varunkatta Jul 18, 2017
1e2e49f
Fixed imports order.
varunkatta Jul 18, 2017
1131d2c
Merge k8s branch 'branch-2.1-kubernetes' into executor-recovery-stati…
varunkatta Jul 18, 2017
4e75491
Addressed review comments from Matt
varunkatta Jul 21, 2017
8acefef
Couple of style fixes
varunkatta Jul 21, 2017
382278a
Merge k8s branch 'branch-2.1-kubernetes' into executor-recovery-stati…
varunkatta Jul 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package org.apache.spark.scheduler.cluster.kubernetes

import java.io.File
import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?


import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient}
import io.fabric8.kubernetes.client.utils.HttpClientUtils
import okhttp3.Dispatcher

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.util.ThreadUtils

private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) {
private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)
Expand Down Expand Up @@ -78,6 +82,15 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St
}
serviceAccountConfigBuilder
}
new DefaultKubernetesClient(configBuilder.build)
// Disable the ping thread that is not daemon, in order to allow
// the driver main thread to shut down upon errors. Otherwise, the driver
// will hang indefinitely.
val config = configBuilder
.withWebsocketPingInterval(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing the websocket ping interval here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to work around a bug in the web socket ping thread, which is created as non-daemon thread and let the driver hang in case an exception is thrown in the driver main thread. See the comment at line 85 - 87. More details at PR 216 comment with the code snippet of how the web socket ping thread is created.

.build()
val httpClient = HttpClientUtils.createHttpClient(config).newBuilder()
.dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s")))
.build()
new DefaultKubernetesClient(httpClient, config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,39 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes

import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.io.Closeable
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder,
EnvVarSourceBuilder, Pod, QuantityBuilder}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{ThreadUtils, Utils}

private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
val sc: SparkContext)
private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerImpl,
val sc: SparkContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arguments indentation style doesn't adhere to scala convention. I'm guessing this is the IDE you're using. We should revert these unintended changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

import KubernetesClusterSchedulerBackend._

private val EXECUTOR_MODIFICATION_LOCK = new Object
private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod]
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
private val runningExecutorsToPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs.
private val runningPodsToExecutors = new mutable.HashMap[Pod, String] // Indexed by executor Pods.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do without runningPodsToExecutors? It doesn't seem like it's being used for its index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like I missed commenting on this. We do use it to see, if the pod has already exited.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the keys can be pod names instead of pod objects as the only places where keys are used only refer to the pod names anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth commenting explicitly that both runningExecutorsToPods and runningPodsToExecutors are guarded by RUNNING_EXECUTOR_PODS_LOCK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private val FAILED_PODS_LOCK = new Object
private val failedPods = new mutable.HashMap[String, ExecutorLossReason] // Indexed by pod names.
private val EXECUTORS_TO_REMOVE_LOCK = new Object
private val executorsToRemove = new mutable.HashSet[String]

private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -68,8 +77,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))

private val kubernetesClient = new DriverPodKubernetesClientProvider(conf, kubernetesNamespace)
.get
private val kubernetesClient = new KubernetesClientBuilder(conf, kubernetesNamespace)
.buildFromWithinPod()

private val driverPod = try {
kubernetesClient.pods().inNamespace(kubernetesNamespace).
Expand All @@ -80,21 +89,25 @@ private[spark] class KubernetesClusterSchedulerBackend(
throw new SparkException(s"Executor cannot find driver pod", throwable)
}

override val minRegisteredRatio =
override val minRegisteredRatio: Double =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For val there is no need to declare the type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
} else {
super.minRegisteredRatio
}

private val executorWatchResource = new AtomicReference[Closeable]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be of type Watch

private val executorCleanupScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"executor-recovery-worker")
protected var totalExpectedExecutors = new AtomicInteger(0)


private val driverUrl = RpcEndpointAddress(
sc.getConf.get("spark.driver.host"),
sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

private val initialExecutors = getInitialTargetExecutorNumber(1)
private val initialExecutors = getInitialTargetExecutorNumber()

private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
Expand All @@ -119,29 +132,39 @@ private[spark] class KubernetesClusterSchedulerBackend(

override def start(): Unit = {
super.start()
executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId())
.watch(new ExecutorPodsWatcher()))
if (!Utils.isDynamicAllocationEnabled(sc.conf)) {
doRequestTotalExecutors(initialExecutors)
}
executorCleanupScheduler.scheduleWithFixedDelay(executorRecoveryRunnable, 0,
TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS)
}

override def stop(): Unit = {
// send stop message to executors so they shut down cleanly
super.stop()

// then delete the executor pods
// TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context.
// When using Utils.tryLogNonFatalError some of the code fails but without any logs or
// indication as to why.
try {
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add runningExecutorsToPods .clear() after this line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

runningPodsToExecutors.clear()
}
val resource = executorWatchResource.getAndSet(null)
if (resource != null) {
resource.close()
}
} catch {
case e: Throwable => logError("Uncaught exception while shutting down controllers.", e)
}
try {
logInfo("Closing kubernetes client")
kubernetesClient.close()
} catch {
case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e)
}
executorCleanupScheduler.shutdown()
super.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called in a finally clause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized we don't need this..redundant.

}

private def allocateNewExecutorPod(): (String, Pod) = {
Expand Down Expand Up @@ -231,13 +254,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
EXECUTOR_MODIFICATION_LOCK.synchronized {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
if (requestedTotal > totalExpectedExecutors.get) {
logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}"
logInfo(s"Requesting ${
requestedTotal - totalExpectedExecutors.get
}"
+ s" additional executors, expecting total $requestedTotal and currently" +
s" expected ${totalExpectedExecutors.get}")
for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) {
runningExecutorPods += allocateNewExecutorPod()
val (executorId, pod) = allocateNewExecutorPod()
runningExecutorsToPods.put(executorId, pod)
runningPodsToExecutors.put(pod, executorId)
}
}
totalExpectedExecutors.set(requestedTotal)
Expand All @@ -246,19 +273,185 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
EXECUTOR_MODIFICATION_LOCK.synchronized {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
for (executor <- executorIds) {
runningExecutorPods.remove(executor) match {
case Some(pod) => kubernetesClient.pods().delete(pod)
runningExecutorsToPods.remove(executor) match {
case Some(pod) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some scala community debate about whether case Some(...) / case None should be used, or option.fold I'm unsure if the spark style guide has an opinion, and I don't have a strong personal opinion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly prefer not to match on Options anywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks..this is addressed now.

kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod)
case None => logWarning(s"Unable to remove pod for unknown executor $executor")
}
}
}
true
}

private class ExecutorPodsWatcher extends Watcher[Pod] {

private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1

override def eventReceived(action: Action, pod: Pod): Unit = {
if (action == Action.ERROR) {
val podName = pod.getMetadata.getName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of creating this val, we can directly write:

logInfo(s"Received pod ${pod.getMetadata.getName} exited event. Reason: " + ...)

logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
handleErroredPod(pod)
}
else if (action == Action.DELETED) {
val podName = pod.getMetadata.getName
logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
handleDeletedPod(pod)
}
}

override def onClose(cause: KubernetesClientException): Unit = {
logDebug("Executor pod watch closed.", cause)
}

def getContainerExitStatus(pod: Pod): Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be renamed to getExecutorExitStatus?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val containerStatuses = pod.getStatus.getContainerStatuses.asScala
for (containerStatus <- containerStatuses) {
return getContainerExitStatus(containerStatus)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're always returning the first container's exit status, we can avoid the loop here and perhaps just fetch the status directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed.

}
DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
}

def getContainerExitStatus(containerStatus: ContainerStatus): Int = {
containerStatus.getState.getTerminated.getExitCode.intValue
}

def handleErroredPod(pod: Pod): Unit = {
def isPodAlreadyReleased(pod: Pod): Boolean = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid inner method definitions - move this outside somewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningPodsToExecutors.keySet.foreach(runningPod =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified as a key existence check if the keys are pod names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed..done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the for loop here. This can be as simple as runningPodsToExecutors.contains(pod.getMetadata.getName).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got rid of the loop.

if (runningPod.getMetadata.getName == pod.getMetadata.getName) {
return false
}
)
}
true
}
val alreadyReleased = isPodAlreadyReleased(pod)
val containerExitStatus = getContainerExitStatus(pod)
// container was probably actively killed by the driver.
val exitReason = if (alreadyReleased) {
ExecutorExited(containerExitStatus, exitCausedByApp = false,
s"Container in pod " + pod.getMetadata.getName +
" exited from explicit termination request.")
} else {
val containerExitReason = containerExitStatus match {
case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE =>
memLimitExceededLogMessage(pod.getStatus.getReason)
case _ =>
// Here we can't be sure that that exit was caused by the application but this seems
// to be the right default since we know the pod was not explicitly deleted by
// the user.
"Pod exited with following container exit status code " + containerExitStatus
}
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
}
FAILED_PODS_LOCK.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah is there a concurrent lock-free version of the map that we can use that doesn't need locking everywhere? like - scala.collection.concurrentMap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks..Made failedPods map a concurrent hash map. It is not truly lock free as there are implicit locks but explicit locking by the user is not required.

failedPods.put(pod.getMetadata.getName, exitReason)
}
}

def handleDeletedPod(pod: Pod): Unit = {
val exitReason = ExecutorExited(getContainerExitStatus(pod), exitCausedByApp = false,
"Pod " + pod.getMetadata.getName + " deleted by K8s master")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need not necessarily deleted by the master. Maybe we can say - Pod <x> lost/deleted

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

FAILED_PODS_LOCK.synchronized {
failedPods.put(pod.getMetadata.getName, exitReason)
}
}
}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}

private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
EXECUTORS_TO_REMOVE_LOCK.synchronized {
executorsToRemove.add(executorId)
}
}
}
}
}

private val executorRecoveryRunnable: Runnable = new Runnable {

private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing we want this knob to be something the user controls, via some spark property?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This knob need not be controlled by user. It is a very specific knob tied to implementation, and users shouldn't be worrying about tuning this, I think.

private val executorsToRecover = new mutable.HashSet[String]
private val executorReasonChecks = new mutable.HashMap[String, Int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment here explaining what executorReasonChecks is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


override def run(): Unit = removeFailedAndRequestNewExecutors()

def removeFailedAndRequestNewExecutors(): Unit = {
val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this create a copy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes an immutable copy.

}
val localFailedPods = FAILED_PODS_LOCK.synchronized {
failedPods.toMap
}
val localExecutorsToRemove = EXECUTORS_TO_REMOVE_LOCK.synchronized {
executorsToRemove.toSet
}
localExecutorsToRemove.foreach { case (executorId) =>
localRunningExecutorsToPods.get(executorId) match {
case Some(pod) =>
localFailedPods.get(pod.getMetadata.getName) match {
case Some(executorExited: ExecutorExited) =>
logDebug(s"Removing executor $executorId with loss reason "
+ executorExited.message)
removeExecutor(executorId, executorExited)
if (!executorExited.exitCausedByApp) {
executorsToRecover.add(executorId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want to update the PR description with this code snippet saying this is the main business logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
case None =>
removeExecutorOrIncrementLossReasonCheckCount(executorId)
}
case None =>
removeExecutorOrIncrementLossReasonCheckCount(executorId)
}
}
executorsToRecover.foreach(executorId =>
EXECUTORS_TO_REMOVE_LOCK.synchronized {
executorsToRemove -= executorId
executorReasonChecks -= executorId
}
)
if (executorsToRecover.nonEmpty) {
requestExecutors(executorsToRecover.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compute numExecutorsToRecover as Math.min(executorsToRecover.size, MAX_ALLOWED_EXECUTOR_RECOVERY_ATTEMPTS - recoveredExecutorCount) and pass it to requestExecutors so that we don't go above the max?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto..dropping this code.

}
executorsToRecover.clear()
}


def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0)
if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) {
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons"))
executorsToRecover.add(executorId)
executorReasonChecks -= executorId
} else {
executorReasonChecks.put(executorId, reasonCheckCount + 1)
}
}
}
}

private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val VMEM_EXCEEDED_EXIT_CODE = -103
private val PMEM_EXCEEDED_EXIT_CODE = -104

def memLimitExceededLogMessage(diagnostics: String): String = {
s"Pod/Container killed for exceeding memory limits.$diagnostics" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after period.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

" Consider boosting spark executor memory overhead."
}
}