Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Changes to support executor recovery behavior during static allocation. #244

Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a8831b7
Changes to support executor recovery behavior during static allocation.
varunkatta Apr 26, 2017
c4b949f
addressed review comments
varunkatta May 4, 2017
d87d393
Addressed latest comments and merged upstream changes
varunkatta May 5, 2017
4dcb1b3
Style changes and removed inocrrectly merged code
varunkatta May 5, 2017
5a064ba
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
foxish May 8, 2017
fbe4b18
Merged with head on 2.1 kubernetes branch
varunkatta May 11, 2017
4d60c3d
Merge branch 'executor-recovery-static-allocation' of github.com:varu…
varunkatta May 11, 2017
01e8ec7
addressed latest review comments
varunkatta May 12, 2017
5e1a143
changed import order
varunkatta May 15, 2017
1a579ce
merged with 2.1 branch head
varunkatta Jun 14, 2017
608b08b
Minor changes to avoid exceptions when exit code is missing
varunkatta Jun 21, 2017
5cbea23
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
varunkatta Jun 21, 2017
99b338d
fixed style check
varunkatta Jun 21, 2017
57bb38b
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
varunkatta Jul 12, 2017
2bc0ff4
Addressed review comments from Yinan LiAddressed review comments from
varunkatta Jul 18, 2017
e6bb8c2
Merge branch 'branch-2.1-kubernetes' into executor-recovery-static-al…
varunkatta Jul 18, 2017
b5bd8d1
Addressed comments and got rid of an explicit lock object.
varunkatta Jul 18, 2017
1e2e49f
Fixed imports order.
varunkatta Jul 18, 2017
1131d2c
Merge k8s branch 'branch-2.1-kubernetes' into executor-recovery-stati…
varunkatta Jul 18, 2017
4e75491
Addressed review comments from Matt
varunkatta Jul 21, 2017
8acefef
Couple of style fixes
varunkatta Jul 21, 2017
382278a
Merge k8s branch 'branch-2.1-kubernetes' into executor-recovery-stati…
varunkatta Jul 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ package org.apache.spark.scheduler.cluster.kubernetes

import java.io.Closeable
import java.net.InetAddress
import java.util.concurrent.TimeUnit
import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import scala.collection.{concurrent, mutable}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.io.FilenameUtils
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
Expand All @@ -38,8 +40,8 @@ import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{ThreadUtils, Utils}
Expand All @@ -55,10 +57,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
import KubernetesClusterSchedulerBackend._

private val RUNNING_EXECUTOR_PODS_LOCK = new Object
private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs.

// Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
// Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val runningPodsToExecutors = new mutable.HashMap[String, String]
// TODO(varun): Get rid of this lock object by my making the underlying map a concurrent hash map.
private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs.
// Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
private val executorPodsByIPs = new mutable.HashMap[String, Pod]
private val failedPods: concurrent.Map[String, ExecutorExited] = new
ConcurrentHashMap[String, ExecutorExited]().asScala
private val executorsToRemove = Collections.newSetFromMap[String](
new ConcurrentHashMap[String, java.lang.Boolean]()).asScala

private val executorExtraClasspath = conf.get(
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
Expand Down Expand Up @@ -135,7 +145,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
val parsedShuffleLabels = ConfigurationUtils.parseKeyValuePairs(
conf.get(KUBERNETES_SHUFFLE_LABELS), KUBERNETES_SHUFFLE_LABELS.key,
"shuffle-labels")
if (parsedShuffleLabels.size == 0) {
if (parsedShuffleLabels.isEmpty) {
throw new SparkException(s"Dynamic allocation enabled " +
s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified")
}
Expand Down Expand Up @@ -170,12 +180,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorWatchResource = new AtomicReference[Closeable]
protected var totalExpectedExecutors = new AtomicInteger(0)


private val driverUrl = RpcEndpointAddress(
sc.getConf.get("spark.driver.host"),
sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

private val initialExecutors = getInitialTargetExecutorNumber(1)
private val initialExecutors = getInitialTargetExecutorNumber()

private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
require(podAllocationInterval > 0, s"Allocation batch delay " +
Expand All @@ -192,23 +203,74 @@ private[spark] class KubernetesClusterSchedulerBackend(

private val allocatorRunnable: Runnable = new Runnable {

// Number of times we are allowed check for the loss reason for an executor before we give up
// and assume the executor failed for good, and attribute it to a framework fault.
private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
private val executorsToRecover = new mutable.HashSet[String]
// Maintains a map of executor id to count of checks performed to learn the loss reason
// for an executor.
private val executorReasonChecks = new mutable.HashMap[String, Int]

override def run(): Unit = {
if (totalRegisteredExecutors.get() < runningExecutorPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (totalExpectedExecutors.get() <= runningExecutorPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
removeFailedExecutors()
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
for (i <- 0 until math.min(
totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) {
runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount)
totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
runningExecutorsToPods.put(executorId, pod)
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
logInfo(
s"Requesting a new executor, total executors is now ${runningExecutorPods.size}")
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
}
}
}
}

def removeFailedExecutors(): Unit = {
val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.toMap
}
executorsToRemove.foreach { case (executorId) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _executorsToRemoveMap guarded by its internal lock when being iterated through? The Java ConcurrentHashMap is not. If not, this foreach needs to be guarded by a lock. So I suggest you keep the original implementation of using a explicit lock object with a normal map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for the iteration to be guarded. Previous guard was used for thread safety. ConcurrentHashMap is being used as a thread-safe map here. Since, iteration is happening only on a single consumer thread, there is no need to lock the entire map. If elements get added to this map by a producer thread, during the consumer thread iteration on the map - that is an acceptable logic here. Wondering, if I am missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, if insertion during iteration is an accepted logic (which is what I was not sure), then yes locking the map is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks

localRunningExecutorsToPods.get(executorId).map { pod: Pod =>
failedPods.get(pod.getMetadata.getName).map { executorExited: ExecutorExited =>
logDebug(s"Removing executor $executorId with loss reason " + executorExited.message)
removeExecutor(executorId, executorExited)
if (!executorExited.exitCausedByApp) {
executorsToRecover.add(executorId)
}
}.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
}.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))

executorsToRecover.foreach(executorId => {
executorsToRemove -= executorId
executorReasonChecks -= executorId
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.remove(executorId).map { pod: Pod =>
kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
}.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId"))
}
})
executorsToRecover.clear()
}
}

def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0)
if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) {
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons"))
executorsToRecover.add(executorId)
executorReasonChecks -= executorId
} else {
executorReasonChecks.put(executorId, reasonCheckCount + 1)
}
}
}

private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
Expand Down Expand Up @@ -280,8 +342,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
// indication as to why.
try {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
runningExecutorPods.clear()
runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add runningExecutorsToPods .clear() after this line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

runningExecutorsToPods.clear()
runningPodsToExecutors.clear()
}
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
executorPodsByIPs.clear()
Expand Down Expand Up @@ -534,11 +597,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}

override def createDriverEndpoint(
properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
totalExpectedExecutors.set(requestedTotal)
true
Expand All @@ -547,8 +605,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
for (executor <- executorIds) {
runningExecutorPods.remove(executor) match {
case Some(pod) => kubernetesClient.pods().delete(pod)
runningExecutorsToPods.remove(executor) match {
case Some(pod) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some scala community debate about whether case Some(...) / case None should be used, or option.fold I'm unsure if the spark style guide has an opinion, and I don't have a strong personal opinion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly prefer not to match on Options anywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks..this is addressed now.

kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
case None => logWarning(s"Unable to remove pod for unknown executor $executor")
}
}
Expand All @@ -564,6 +624,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

private class ExecutorPodsWatcher extends Watcher[Pod] {

private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1

override def eventReceived(action: Action, pod: Pod): Unit = {
if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
&& pod.getMetadata.getDeletionTimestamp == null) {
Expand All @@ -583,12 +645,75 @@ private[spark] class KubernetesClusterSchedulerBackend(
executorPodsByIPs -= podIP
}
}
if (action == Action.ERROR) {
logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
handleErroredPod(pod)
} else if (action == Action.DELETED) {
logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
handleDeletedPod(pod)
}
}
}

override def onClose(cause: KubernetesClientException): Unit = {
logDebug("Executor pod watch closed.", cause)
}

def getExecutorExitStatus(pod: Pod): Int = {
val containerStatuses = pod.getStatus.getContainerStatuses
if (!containerStatuses.isEmpty) {
// we assume the first container represents the pod status. This assumption may not hold
// true in the future. Revisit this if side-car containers start running inside executor
// pods.
getExecutorExitStatus(containerStatuses.get(0))
} else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
}

def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
Option(containerStatus.getState).map(containerState =>
Option(containerState.getTerminated).map(containerStateTerminated =>
containerStateTerminated.getExitCode.intValue()).getOrElse(UNKNOWN_EXIT_CODE)
).getOrElse(UNKNOWN_EXIT_CODE)
}

def isPodAlreadyReleased(pod: Pod): Boolean = {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
!runningPodsToExecutors.contains(pod.getMetadata.getName)
}
}

def handleErroredPod(pod: Pod): Unit = {
val alreadyReleased = isPodAlreadyReleased(pod)
val containerExitStatus = getExecutorExitStatus(pod)
// container was probably actively killed by the driver.
val exitReason = if (alreadyReleased) {
ExecutorExited(containerExitStatus, exitCausedByApp = false,
s"Container in pod " + pod.getMetadata.getName +
" exited from explicit termination request.")
} else {
val containerExitReason = containerExitStatus match {
case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE =>
memLimitExceededLogMessage(pod.getStatus.getReason)
case _ =>
// Here we can't be sure that that exit was caused by the application but this seems
// to be the right default since we know the pod was not explicitly deleted by
// the user.
"Pod exited with following container exit status code " + containerExitStatus
}
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
}
failedPods.put(pod.getMetadata.getName, exitReason)
}

def handleDeletedPod(pod: Pod): Unit = {
val exitReason = ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false,
"Pod " + pod.getMetadata.getName + " deleted or lost.")
failedPods.put(pod.getMetadata.getName, exitReason)
}
}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}

private class KubernetesDriverEndpoint(
Expand All @@ -597,6 +722,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
extends DriverEndpoint(rpcEnv, sparkProperties) {
private val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
executorsToRemove.add(executorId)
}
}
}

override def receiveAndReply(
context: RpcCallContext): PartialFunction[Any, Unit] = {
new PartialFunction[Any, Unit]() {
Expand All @@ -615,7 +748,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
var resolvedProperties = sparkProperties
val runningExecutorPod = kubernetesClient
.pods()
.withName(runningExecutorPods(executorId).getMetadata.getName)
.withName(runningExecutorsToPods(executorId).getMetadata.getName)
.get()
val nodeName = runningExecutorPod.getSpec.getNodeName
val shufflePodIp = shufflePodCache.get.getShufflePodForExecutor(nodeName)
Expand All @@ -637,7 +770,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
}.orElse(super.receiveAndReply(context))
}
}

}
case class ShuffleServiceConfig(
shuffleNamespace: String,
Expand All @@ -647,6 +779,14 @@ case class ShuffleServiceConfig(
private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val VMEM_EXCEEDED_EXIT_CODE = -103
private val PMEM_EXCEEDED_EXIT_CODE = -104
private val UNKNOWN_EXIT_CODE = -111

def memLimitExceededLogMessage(diagnostics: String): String = {
s"Pod/Container killed for exceeding memory limits. $diagnostics" +
" Consider boosting spark executor memory overhead."
}
}

/**
Expand Down