Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Changes to support executor recovery behavior during static allocation. #244

Conversation

varunkatta
Copy link
Member

@varunkatta varunkatta commented Apr 26, 2017

What changes were proposed in this pull request?

Added initial support for driver to ask for more executors in case of framework faults.

Reviewer notes:
This is WIP and currently being tested. Seems to work for simple smoke-tests. Looking for feedback on

  • Any major blindspots in logic or functionality
  • General flow. Potential issues with control/data flows.
  • Are style guidelines followed.

Potential issues/Todos:

  • Verify that no deadlocks are possible.
  • May be explore message passing between threads instead of using synchronization
  • Any uncovered issues in further testing

Reviewer notes

Main business logic is in
removeFailedAndRequestNewExecutors()

Overall executor recovery logic at a high-level:

  • On executor disconnect, we immediately disable the executor.
  • Delete/Error Watcher actions will trigger a capture of executor loss reasons. This happens on a separate thread.
  • There is another dedicated recovery thread, which looks at all previously disconnected executors and their loss reasons to remove those executors with the right loss reasons or keep trying till the loss reasons' are discovered. If the loss reason of a lost executors is not discovered within a sufficient time window, then we give up and still remove the executor. For all removed executors, we request new executors on this recovery thread.

How was this patch tested?

Manually tested that on deleting a pod, new pods were being requested.

import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient}

import okhttp3.Dispatcher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe spark style favors fully-scoped import identifiers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is okhttp a dependency we've added? I don't see it showing up elsewhere in the repo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java kubernetes client API has a dependency on okhttp. okhttp is exposed as the default Dispatcher constructor allows only non-daemon threads in their Executors, and we wanted to able to specify a custom dispatcher.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

runningExecutorPods.remove(executor) match {
case Some(pod) => kubernetesClient.pods().delete(pod)
runningExecutorsToPods.remove(executor) match {
case Some(pod) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some scala community debate about whether case Some(...) / case None should be used, or option.fold I'm unsure if the spark style guide has an opinion, and I don't have a strong personal opinion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly prefer not to match on Options anywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks..this is addressed now.

val PMEM_EXCEEDED_EXIT_CODE = -104

def memLimitExceededLogMessage(diagnostics: String): String = {
s"Container killed by YARN for exceeding memory limits.$diagnostics" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be YARN ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops..obviously no. :) fixing it

.withWebsocketPingInterval(0)
.build()
val httpClient = HttpClientUtils.createHttpClient(config).newBuilder()
.dispatcher(new Dispatcher(threadPoolExecutor))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest code in #216 simplifies this code by using ThreadUtils.newDaemonCachedThreadPool. We may want to do this here to be in sync.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks..this is less verbose and easier on eyes.

@@ -93,8 +102,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.minRegisteredRatio
}

private val executorWatchResource = new AtomicReference[Closeable]
private val executorCleanupScheduler = Executors.newScheduledThreadPool(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know if this uses a daemon thread?

}

def getContainerExitStatus(containerStatus: ContainerStatus): Int = {
containerStatus.getState.getTerminated.getExitCode.intValue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need of () in Scala.


def handleErroredPod(pod: Pod): Unit = {
val alreadyReleased = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningPodsToExecutors.contains(pod)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this does object pointer equality as opposed to value or pod name equality. And pods at different event time point can be different objects. Then this check might not work for us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was subtle. Yes, I do string equality of pod names now.

if (action == Action.ERROR) {
val podName = pod.getMetadata.getName
logDebug(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
getContainerExitStatus(pod)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value doesn't seem to be used. Maybe we can remove this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else {
val containerExitReason = containerExitStatus match {
case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE =>
memLimitExceededLogMessage(pod.getStatus.getReason)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we discussed "mem exceeded" should be the framework fault in the google doc. I am confused by line 353 setting exitCausedByApp = true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only, if the pod get explicitly deleted, we deem as framework fault. Since Spark actively tries to manage memory, it makes more sense to classify it as application fault?

}
}

private val executorCleanupRunnable: Runnable = new Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change the variable name to indicate this requests new executors? "Cleanup" does not indicate the impact of what this does, IMO. Maybe executorRecoveryRunnable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def removeFailedAndRequestNewExecutors(): Unit = {
val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this create a copy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes an immutable copy.

}

private val executorCleanupRunnable: Runnable = new Runnable {
private val removedExecutors = new mutable.HashSet[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to executorsToRecover to indicate the significance of action we do for these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

logDebug(s"Removing executor $executorId with loss reason "
+ executorExited.message)
if (!executorExited.exitCausedByApp) {
removedExecutors.add(executorId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also subject this to some maximum retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in the latest diff

requestExecutors(removedExecutors.size)
if (executorsToRecover.nonEmpty &&
recoveredExecutorCount < MAX_ALLOWED_EXECUTOR_RECOVERY_ATTEMPTS) {
requestExecutors(executorsToRecover.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compute numExecutorsToRecover as Math.min(executorsToRecover.size, MAX_ALLOWED_EXECUTOR_RECOVERY_ATTEMPTS - recoveredExecutorCount) and pass it to requestExecutors so that we don't go above the max?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto..dropping this code.

private val executorRecoveryRunnable: Runnable = new Runnable {

private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
private val MAX_ALLOWED_EXECUTOR_RECOVERY_ATTEMPTS = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. This is global. I was wondering if we can use max per executor ID.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am dropping max allowed recovery attempts at it is not immediately obvious what should be a good default for this. Keeping this simple and similar to behavior on Yarn

removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons"))
executorsToRecover.add(executorId)
} else {
executorAttempts.put(executorId, reasonCheckCount + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/executorAttempts/executorReasonChecks/

@varunkatta
Copy link
Member Author

varunkatta commented May 5, 2017

Logs from testing:

Observed that killing an executor, as it was disabled first with loss reason pending, and later removed with reason received from K8s master, and new executor summoned due to a lost executor due to framework fault.

2017-05-04 23:48:47 INFO  KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint:54 - Disabling executor 1.
2017-05-04 23:48:47 INFO  BlockManagerMasterEndpoint:54 - Trying to remove executor 1 from BlockManagerMaster.
2017-05-04 23:48:47 INFO  BlockManagerMasterEndpoint:54 - Removing block manager BlockManagerId(1, 10.42.0.3, 7079, None)
2017-05-04 23:48:47 INFO  KubernetesClusterSchedulerBackend:54 - Received action DELETED for pod  spark-pi-1493941665060-exec-1
2017-05-04 23:48:47 INFO  KubernetesClusterSchedulerBackend:54 - Received delete pod spark-pi-1493941665060-exec-1 event. Reason: null
2017-05-04 23:48:47 INFO  BlockManagerMaster:54 - Removed 1 successfully in removeExecutor
2017-05-04 23:48:47 INFO  DAGScheduler:54 - Shuffle files lost for executor: 1 (epoch 0)
2017-05-04 23:48:52 INFO  KubernetesClusterSchedulerBackend:54 - Requesting 1 additional executor(s) from the cluster manager
2017-05-04 23:48:52 INFO  KubernetesClusterSchedulerBackend:54 - Requesting 1 additional executors, expecting total 4 and currently expected 3
2017-05-04 23:48:52 ERROR TaskSchedulerImpl:70 - Lost executor 1 on 10.42.0.3: Pod spark-pi-1493941665060-exec-1 deleted by K8s master
2017-05-04 23:48:52 INFO  TaskSetManager:54 - Task 250 failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task.
2017-05-04 23:48:52 INFO  KubernetesClusterSchedulerBackend:54 - Received action ADDED for pod  spark-pi-1493941665060-exec-4

@varunkatta varunkatta changed the title WIP: Changes to support executor recovery behavior during static allocation. Changes to support executor recovery behavior during static allocation. May 5, 2017

private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
private val blockmanagerPort = conf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)

private val kubernetesDriverServiceName = conf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this from your change? Maybe from a wrong merge?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right..merge fail. Fixed now.

// be the right default since we know the pod was not explicitly deleted by the user.
"Pod exited with following container exit status code " + containerExitStatus
}
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this else block needs extra indentation? Hard to realized this is being assigned to exitReason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see spark code with and without indentation in this case. I like you suggestion of having indentation. Made this change.

@varunkatta
Copy link
Member Author

@foxish Can you have a quick look at the change once..I want to make sure that we don't accidentally run into issues you uncovered during dynamic allocation work on this change too where expected pods * 2 seem to be allocated by K8s. Also, I am guessing you might have a more pointed input on the general functionality.

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I was able to follow the code relatively easily with the latest diff. Thanks for writing this PR.

@foxish As @varunkatta suggested, probably it's best for you to take a look at this next? I wonder how your dynamic allocation PR would possibly interact with this PR. Maybe they just complement each other, which would be really great.

+ executorExited.message)
removeExecutor(executorId, executorExited)
if (!executorExited.exitCausedByApp) {
executorsToRecover.add(executorId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want to update the PR description with this code snippet saying this is the main business logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@foxish
Copy link
Member

foxish commented May 5, 2017 via email

// the driver main thread to shut down upon errors. Otherwise, the driver
// will hang indefinitely.
val config = configBuilder
.withWebsocketPingInterval(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing the websocket ping interval here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to work around a bug in the web socket ping thread, which is created as non-daemon thread and let the driver hang in case an exception is thrown in the driver main thread. See the comment at line 85 - 87. More details at PR 216 comment with the code snippet of how the web socket ping thread is created.

scheduler: TaskSchedulerImpl,
val sc: SparkContext)
private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerImpl,
val sc: SparkContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arguments indentation style doesn't adhere to scala convention. I'm guessing this is the IDE you're using. We should revert these unintended changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
} else {
super.minRegisteredRatio
}

private val executorWatchResource = new AtomicReference[Closeable]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be of type Watch

private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod]
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
private val runningExecutorsToPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs.
private val runningPodsToExecutors = new mutable.HashMap[Pod, String] // Indexed by executor Pods.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do without runningPodsToExecutors? It doesn't seem like it's being used for its index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like I missed commenting on this. We do use it to see, if the pod has already exited.

def getContainerExitStatus(pod: Pod): Int = {
val containerStatuses = pod.getStatus.getContainerStatuses.asScala
for (containerStatus <- containerStatuses) {
return getContainerExitStatus(containerStatus)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're always returning the first container's exit status, we can avoid the loop here and perhaps just fetch the status directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed.

private val PMEM_EXCEEDED_EXIT_CODE = -104

def memLimitExceededLogMessage(diagnostics: String): String = {
s"Pod/Container killed for exceeding memory limits.$diagnostics" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after period.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
private val executorsToRecover = new mutable.HashSet[String]
private val executorReasonChecks = new mutable.HashMap[String, Int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment here explaining what executorReasonChecks is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
}
FAILED_PODS_LOCK.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah is there a concurrent lock-free version of the map that we can use that doesn't need locking everywhere? like - scala.collection.concurrentMap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks..Made failedPods map a concurrent hash map. It is not truly lock free as there are implicit locks but explicit locking by the user is not required.


def handleDeletedPod(pod: Pod): Unit = {
val exitReason = ExecutorExited(getContainerExitStatus(pod), exitCausedByApp = false,
"Pod " + pod.getMetadata.getName + " deleted by K8s master")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need not necessarily deleted by the master. Maybe we can say - Pod <x> lost/deleted

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed


private val executorRecoveryRunnable: Runnable = new Runnable {

private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing we want this knob to be something the user controls, via some spark property?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This knob need not be controlled by user. It is a very specific knob tied to implementation, and users shouldn't be worrying about tuning this, I think.

@foxish
Copy link
Member

foxish commented May 8, 2017

Thanks @varunkatta.
Sorry about the delay in reviewing. The overall logic looks about right; but there are some comments I've left around cleaning up that we can do. I'd prefer having fewer separate threads/watchers responsible for recovery and accounting.

@ash211
Copy link

ash211 commented May 18, 2017

rerun unit tests please

@foxish
Copy link
Member

foxish commented May 22, 2017

@varunkatta PR needs rebase.

@@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster.kubernetes

import java.io.File
import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?

@erikerlandson
Copy link
Member

@varunkatta PR is out of sync again

val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.toMap
}
executorsToRemove.foreach { case (executorId) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _executorsToRemoveMap guarded by its internal lock when being iterated through? The Java ConcurrentHashMap is not. If not, this foreach needs to be guarded by a lock. So I suggest you keep the original implementation of using a explicit lock object with a normal map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for the iteration to be guarded. Previous guard was used for thread safety. ConcurrentHashMap is being used as a thread-safe map here. Since, iteration is happening only on a single consumer thread, there is no need to lock the entire map. If elements get added to this map by a producer thread, during the consumer thread iteration on the map - that is an acceptable logic here. Wondering, if I am missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, if insertion during iteration is an accepted logic (which is what I was not sure), then yes locking the map is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks

@liyinan926
Copy link
Member

LGTM.

@liyinan926
Copy link
Member

BTW: please squash the commits.

@varunkatta
Copy link
Member Author

rerun unit tests please

1 similar comment
@foxish
Copy link
Member

foxish commented Jul 19, 2017

rerun unit tests please

import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import java.{lang, util}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import ordering

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't just be importing the packages, but the specific classes in question.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The exception is for our config and constants where we import all contents of the package object)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed..will address.

Copy link
Member Author

@varunkatta varunkatta Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revisiting this. If we don't import the package here, how do we distinguish between scala.collection.concurrent.Map and java.util.Map? Right now we use the package name only as a qualifier. We are not importing all classes under util just the Map class with the right qualifier.

Copy link

@mccheah mccheah Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully qualify the class name in the usage in the code itself. That being said, we shouldn't need to ever refer to the Java version - work entirely in Scala primitives and invoke asJava whenever we need the Java version.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's absolutely necessary then the import can be aliased as follows:

import java.util.{List => JList}

Then whenever one refers to java.util.List the code can just use JList.

Copy link
Member Author

@varunkatta varunkatta Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that never referring to Java version is the right advice to follow. I refactored the code a bit, so we are not explicitly referring to the java Map interface. I am using Java ConcurrentHashMap as there doesn't seem to be any legitimate native scala concurrent hashmap implementations (or they do exist and I failed to find one). Let me know, if this use is acceptable.

def getExecutorExitStatus(pod: Pod): Int = {
val containerStatuses = pod.getStatus.getContainerStatuses
if (!containerStatuses.isEmpty) {
return getExecutorExitStatus(containerStatuses.get(0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the first container is what we want. Safe assumption for now, but may not hold in the future. No need to change it atm, but good to know that we are assuming the first container is the main executor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave a comment here.

removeExecutorOrIncrementLossReasonCheckCount(executorId)
}
}
executorsToRecover.foreach(executorId => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit lost. Where do we recover the executor and assign it the same ID as before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new dynamic allocator thread - new executor requests are happening on this allocator thread. InremoveFailedExecutorsmethod, the failed pod is removed from the internal bookkeeping map runningExecutorsToPods and with this a new executor ends up getting created by the allocator.

Copy link
Member

@foxish foxish Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the allocation thread reuse old executor IDs? Because I saw that we are maintaining a map of executorID -> count of checks performed. I was wondering if it makes sense to have (executorID->count) mapping, because the executors are identical; and they get rescheduled on different nodes potentially. So, executor#1 failing twice doesn't necessarily indicate a problem with that executor, but with all executors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can get away with specifying a total number of failures instead?

Copy link
Member Author

@varunkatta varunkatta Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(executorID->count) is used to maintain the number of attempts made per executor to learn the actual loss reason before we give up and assume it is a framework fault as we can't possibly be trying forever. Since, this caused some confusion during the first iteration, I will add a comment to clarify the reason for the map's existence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, We don't reuse the old executor ids. All new executors get a new id, and these ids are monotonically increasing. That is in line with spark standalone and yarn.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Okay, makes sense.

@foxish
Copy link
Member

foxish commented Jul 20, 2017

LGTM after open comments addressed.
Hoping to merge this today/tomorrow, as the last PR before we cut a new release.

localRunningExecutorsToPods.get(executorId) match {
case Some(pod) =>
failedPods.get(pod.getMetadata.getName) match {
case Some(executorExited: ExecutorExited) =>
Copy link

@mccheah mccheah Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never match on Options, use the functional API instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

executorReasonChecks -= executorId
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.remove(executorId) match {
case Some(pod) =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never match on Options, always use the functional API instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
&& pod.getMetadata.getDeletionTimestamp == null) {
&& pod.getMetadata.getDeletionTimestamp == null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Push this indentation in 4 spaces

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val podIP = pod.getStatus.getPodIP
val clusterNodeName = pod.getSpec.getNodeName
logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
executorPodsByIPs += ((podIP, pod))
}
} else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) ||
action == Action.DELETED || action == Action.ERROR) {
action == Action.DELETED || action == Action.ERROR) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Push this indentation in 4 spaces

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
handleErroredPod(pod)
}
else if (action == Action.DELETED) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the else up to the same line as the closing brace of the previous if.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def getExecutorExitStatus(pod: Pod): Int = {
val containerStatuses = pod.getStatus.getContainerStatuses
if (!containerStatuses.isEmpty) {
return getExecutorExitStatus(containerStatuses.get(0))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using return - can just use

if (!containerStatus.isEmpty) getExecutorExitStatus(...) else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

def handleErroredPod(pod: Pod): Unit = {
def isPodAlreadyReleased(pod: Pod): Boolean = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid inner method definitions - move this outside somewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
containerStatus.getState match {
case null => UNKNOWN_EXIT_CODE
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't match on null - use Option(containerStatus.getState).map(...).getOrElse

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -160,7 +173,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}

override val minRegisteredRatio =
override val minRegisteredRatio: Double =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For val there is no need to declare the type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig,
SparkAppConfig}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are not subject to line length restrictions, so move this up a line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@foxish
Copy link
Member

foxish commented Jul 21, 2017

Thanks @varunkatta
@mccheah PTAL.
Squash and merge on green if all style comments are addressed.

@foxish
Copy link
Member

foxish commented Jul 21, 2017

Merging. Please address any other style items in a future PR.

@foxish foxish merged commit 4dfb184 into apache-spark-on-k8s:branch-2.1-kubernetes Jul 21, 2017
foxish pushed a commit that referenced this pull request Jul 24, 2017
…n. (#244)

* Changes to support executor recovery behavior during static allocation.

* addressed review comments

* Style changes and removed inocrrectly merged code

* addressed latest review comments

* changed import order

* Minor changes to avoid exceptions when exit code is missing

* fixed style check

* Addressed review comments from Yinan LiAddressed review comments from
Yinan Li..

* Addressed comments and got rid of an explicit lock object.

* Fixed imports order.

* Addressed review comments from Matt

* Couple of style fixes
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
…n. (apache-spark-on-k8s#244)

* Changes to support executor recovery behavior during static allocation.

* addressed review comments

* Style changes and removed inocrrectly merged code

* addressed latest review comments

* changed import order

* Minor changes to avoid exceptions when exit code is missing

* fixed style check

* Addressed review comments from Yinan LiAddressed review comments from
Yinan Li..

* Addressed comments and got rid of an explicit lock object.

* Fixed imports order.

* Addressed review comments from Matt

* Couple of style fixes
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants