From a8831b75732cce7e489be7ec2d94ac95f43b0202 Mon Sep 17 00:00:00 2001 From: Varun Date: Wed, 26 Apr 2017 07:29:54 -0700 Subject: [PATCH 01/12] Changes to support executor recovery behavior during static allocation. --- .../kubernetes/KubernetesClientBuilder.scala | 25 +- .../KubernetesClusterSchedulerBackend.scala | 227 ++++++++++++++++-- 2 files changed, 228 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 554ed17ff25c4..87423313371fc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -17,11 +17,13 @@ package org.apache.spark.deploy.kubernetes import java.io.File +import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor} import com.google.common.base.Charsets import com.google.common.io.Files +import io.fabric8.kubernetes.client.utils.HttpClientUtils import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} - +import okhttp3.Dispatcher import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -78,6 +80,25 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St } serviceAccountConfigBuilder } - new DefaultKubernetesClient(configBuilder.build) + val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor] + // Set threads to be daemons in order to allow the driver main thread + // to shut down upon errors. Otherwise the driver will hang indefinitely. + threadPoolExecutor.setThreadFactory(new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val thread = new Thread(r, "spark-on-k8s") + thread.setDaemon(true) + thread + } + }) + // Disable the ping thread that is not daemon, in order to allow + // the driver main thread to shut down upon errors. Otherwise, the driver + // will hang indefinitely. + val config = configBuilder + .withWebsocketPingInterval(0) + .build() + val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() + .dispatcher(new Dispatcher(threadPoolExecutor)) + .build() + new DefaultKubernetesClient(httpClient, config) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index ccb4194336a44..49370777472ed 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,31 +16,40 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import java.io.Closeable +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import java.util.concurrent.{Executors, TimeUnit} -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, - EnvVarSourceBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} -import org.apache.spark.{SparkContext, SparkException} +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} + import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.rpc.RpcEndpointAddress -import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.{SparkContext, SparkException} -private[spark] class KubernetesClusterSchedulerBackend( - scheduler: TaskSchedulerImpl, - val sc: SparkContext) +private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerImpl, + val sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { import KubernetesClusterSchedulerBackend._ - private val EXECUTOR_MODIFICATION_LOCK = new Object - private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. + private val runningPodsToExecutors = new mutable.HashMap[Pod, String] // Indexed by executor Pods. + private val FAILED_PODS_LOCK = new Object + private val failedPods = new mutable.HashMap[String, ExecutorLossReason] // Indexed by pod names. + private val EXECUTORS_TO_REMOVE_LOCK = new Object + private val executorsToRemove = new mutable.HashSet[String] private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) @@ -93,8 +102,11 @@ private[spark] class KubernetesClusterSchedulerBackend( super.minRegisteredRatio } + private val executorWatchResource = new AtomicReference[Closeable] + private val executorCleanupScheduler = Executors.newScheduledThreadPool(1) protected var totalExpectedExecutors = new AtomicInteger(0) + private val driverUrl = RpcEndpointAddress( sc.getConf.get("spark.driver.host"), sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), @@ -125,21 +137,28 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() + executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) + .watch(new ExecutorPodsWatcher())) if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) } + executorCleanupScheduler.scheduleWithFixedDelay(executorCleanupRunnable, 0, + TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS) } override def stop(): Unit = { - // send stop message to executors so they shut down cleanly - super.stop() - - // then delete the executor pods // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. // When using Utils.tryLogNonFatalError some of the code fails but without any logs or // indication as to why. try { - runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_)) + runningPodsToExecutors.clear() + } + val resource = executorWatchResource.getAndSet(null) + if (resource != null) { + resource.close() + } } catch { case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) } @@ -149,10 +168,13 @@ private[spark] class KubernetesClusterSchedulerBackend( case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) } try { + logInfo("Closing kubernetes client") kubernetesClient.close() } catch { case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) } + executorCleanupScheduler.shutdown() + super.stop() } private def allocateNewExecutorPod(): (String, Pod) = { @@ -242,13 +264,17 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { - EXECUTOR_MODIFICATION_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { if (requestedTotal > totalExpectedExecutors.get) { - logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + logInfo(s"Requesting ${ + requestedTotal - totalExpectedExecutors.get + }" + s" additional executors, expecting total $requestedTotal and currently" + s" expected ${totalExpectedExecutors.get}") for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { - runningExecutorPods += allocateNewExecutorPod() + val (executorId, pod) = allocateNewExecutorPod() + runningExecutorsToPods.put(executorId, pod) + runningPodsToExecutors.put(pod, executorId) } } totalExpectedExecutors.set(requestedTotal) @@ -257,19 +283,176 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { - EXECUTOR_MODIFICATION_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (executor <- executorIds) { - runningExecutorPods.remove(executor) match { - case Some(pod) => kubernetesClient.pods().delete(pod) + runningExecutorsToPods.remove(executor) match { + case Some(pod) => + kubernetesClient.pods().delete(pod) + runningPodsToExecutors.remove(pod) case None => logWarning(s"Unable to remove pod for unknown executor $executor") } } } true } + + private class ExecutorPodsWatcher extends Watcher[Pod] { + + private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 + + override def eventReceived(action: Action, pod: Pod): Unit = { + if (action == Action.ERROR) { + val podName = pod.getMetadata.getName + logDebug(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) + getContainerExitStatus(pod) + handleErroredPod(pod) + } + else if (action == Action.DELETED) { + val podName = pod.getMetadata.getName + logDebug(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason) + handleDeletedPod(pod) + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logDebug("Executor pod watch closed.", cause) + } + + def getContainerExitStatus(pod: Pod): Int = { + val containerStatuses = pod.getStatus.getContainerStatuses.asScala + for (containerStatus <- containerStatuses) { + return getContainerExitStatus(containerStatus) + } + DEFAULT_CONTAINER_FAILURE_EXIT_STATUS + } + + def getContainerExitStatus(containerStatus: ContainerStatus): Int = { + containerStatus.getState.getTerminated.getExitCode.intValue() + } + + def handleErroredPod(pod: Pod): Unit = { + val alreadyReleased = RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningPodsToExecutors.contains(pod) + } + + val containerExitStatus = getContainerExitStatus(pod) + // container was probably actively killed by the driver. + val exitReason = if (alreadyReleased) { + ExecutorExited(containerExitStatus, exitCausedByApp = false, + s"Container in pod " + pod.getMetadata.getName + + " exited from explicit termination request.") + } else { + val containerExitReason = containerExitStatus match { + case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE => + memLimitExceededLogMessage(pod.getStatus.getReason) + case _ => + // Here we can't be sure that that exit was caused by the application but this seems to + // be the right default since we know the pod was not explicitly deleted by the user. + "Pod exited with following container exit status code " + containerExitStatus + } + ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) + } + FAILED_PODS_LOCK.synchronized { + failedPods.put(pod.getMetadata.getName, exitReason) + } + } + + def handleDeletedPod(pod: Pod): Unit = { + val exitReason = ExecutorExited(getContainerExitStatus(pod), exitCausedByApp = false, + "Pod " + pod.getMetadata.getName + "deleted by K8s master") + FAILED_PODS_LOCK.synchronized { + failedPods.put(pod.getMetadata.getName, exitReason) + } + } + } + + override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) + } + + private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + + override def onDisconnected(rpcAddress: RpcAddress): Unit = { + addressToExecutorId.get(rpcAddress).foreach { executorId => + if (disableExecutor(executorId)) { + EXECUTORS_TO_REMOVE_LOCK.synchronized { + executorsToRemove.add(executorId) + } + } + } + } + } + + private val executorCleanupRunnable: Runnable = new Runnable { + private val removedExecutors = new mutable.HashSet[String] + private val executorAttempts = new mutable.HashMap[String, Int] + + override def run() = removeFailedAndRequestNewExecutors() + + val MAX_ATTEMPTS = 5 + + def removeFailedAndRequestNewExecutors(): Unit = { + val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.toMap + } + val localFailedPods = FAILED_PODS_LOCK.synchronized { + failedPods.toMap + } + val localExecutorsToRemove = EXECUTORS_TO_REMOVE_LOCK.synchronized { + executorsToRemove.toSet + } + localExecutorsToRemove.foreach { case (executorId) => + localRunningExecutorsToPods.get(executorId) match { + case Some(pod) => + localFailedPods.get(pod.getMetadata.getName) match { + case Some(executorExited: ExecutorExited) => + removeExecutor(executorId, executorExited) + logDebug(s"Removing executor $executorId with loss reason " + + executorExited.message) + if (!executorExited.exitCausedByApp) { + removedExecutors.add(executorId) + } + case None => + val checkedAttempts = executorAttempts.getOrElse(executorId, 0) + executorAttempts.put(executorId, checkedAttempts + 1) + } + case None => + val checkedAttempts = executorAttempts.getOrElse(executorId, 0) + executorAttempts.put(executorId, checkedAttempts + 1) + } + } + + for ((executorId, attempts) <- executorAttempts) { + if (attempts >= MAX_ATTEMPTS) { + removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons")) + removedExecutors.add(executorId) + } + } + removedExecutors.foreach(executorId => + EXECUTORS_TO_REMOVE_LOCK.synchronized { + executorsToRemove -= executorId + executorAttempts -= executorId + } + ) + if (removedExecutors.nonEmpty) { + requestExecutors(removedExecutors.size) + } + removedExecutors.clear() + } + } } private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + val MEM_REGEX = "[0-9.]+ [KMG]B" + val VMEM_EXCEEDED_EXIT_CODE = -103 + val PMEM_EXCEEDED_EXIT_CODE = -104 + + def memLimitExceededLogMessage(diagnostics: String): String = { + s"Container killed by YARN for exceeding memory limits.$diagnostics" + + " Consider boosting spark.yarn.executor.memoryOverhead." + } } From c4b949fc536651882b90825f05bbfa2480621c05 Mon Sep 17 00:00:00 2001 From: Varun Date: Thu, 4 May 2017 13:06:46 -0700 Subject: [PATCH 02/12] addressed review comments --- .../kubernetes/KubernetesClientBuilder.scala | 16 ++--- .../KubernetesClusterSchedulerBackend.scala | 71 +++++++++++-------- 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 87423313371fc..634463f5c7bb7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -21,12 +21,14 @@ import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor} import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.utils.HttpClientUtils import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils import okhttp3.Dispatcher + import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.util.ThreadUtils private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) { private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) @@ -80,16 +82,6 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St } serviceAccountConfigBuilder } - val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor] - // Set threads to be daemons in order to allow the driver main thread - // to shut down upon errors. Otherwise the driver will hang indefinitely. - threadPoolExecutor.setThreadFactory(new ThreadFactory { - override def newThread(r: Runnable): Thread = { - val thread = new Thread(r, "spark-on-k8s") - thread.setDaemon(true) - thread - } - }) // Disable the ping thread that is not daemon, in order to allow // the driver main thread to shut down upon errors. Otherwise, the driver // will hang indefinitely. @@ -97,7 +89,7 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St .withWebsocketPingInterval(0) .build() val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() - .dispatcher(new Dispatcher(threadPoolExecutor)) + .dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s"))) .build() new DefaultKubernetesClient(httpClient, config) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 49370777472ed..83dddac011da6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -103,7 +103,8 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI } private val executorWatchResource = new AtomicReference[Closeable] - private val executorCleanupScheduler = Executors.newScheduledThreadPool(1) + private val executorCleanupScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "executor-recovery-worker") protected var totalExpectedExecutors = new AtomicInteger(0) @@ -142,7 +143,7 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI if (!Utils.isDynamicAllocationEnabled(sc.conf)) { doRequestTotalExecutors(initialExecutors) } - executorCleanupScheduler.scheduleWithFixedDelay(executorCleanupRunnable, 0, + executorCleanupScheduler.scheduleWithFixedDelay(executorRecoveryRunnable, 0, TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS) } @@ -265,6 +266,7 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { RUNNING_EXECUTOR_PODS_LOCK.synchronized { + logInfo("$requestedTotal is $requestedTotal") if (requestedTotal > totalExpectedExecutors.get) { logInfo(s"Requesting ${ requestedTotal - totalExpectedExecutors.get @@ -304,7 +306,6 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI if (action == Action.ERROR) { val podName = pod.getMetadata.getName logDebug(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) - getContainerExitStatus(pod) handleErroredPod(pod) } else if (action == Action.DELETED) { @@ -327,12 +328,17 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI } def getContainerExitStatus(containerStatus: ContainerStatus): Int = { - containerStatus.getState.getTerminated.getExitCode.intValue() + containerStatus.getState.getTerminated.getExitCode.intValue } def handleErroredPod(pod: Pod): Unit = { val alreadyReleased = RUNNING_EXECUTOR_PODS_LOCK.synchronized { - runningPodsToExecutors.contains(pod) + runningPodsToExecutors.keySet.foreach(runningPod => + if (runningPod.getMetadata.getName == pod.getMetadata.getName) { + return false + } + ) + true } val containerExitStatus = getContainerExitStatus(pod) @@ -359,7 +365,7 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI def handleDeletedPod(pod: Pod): Unit = { val exitReason = ExecutorExited(getContainerExitStatus(pod), exitCausedByApp = false, - "Pod " + pod.getMetadata.getName + "deleted by K8s master") + "Pod " + pod.getMetadata.getName + " deleted by K8s master") FAILED_PODS_LOCK.synchronized { failedPods.put(pod.getMetadata.getName, exitReason) } @@ -384,14 +390,16 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI } } - private val executorCleanupRunnable: Runnable = new Runnable { - private val removedExecutors = new mutable.HashSet[String] + private val executorRecoveryRunnable: Runnable = new Runnable { + + private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10 + private val MAX_ALLOWED_EXECUTOR_RECOVERY_ATTEMPTS = 100 + private val executorsToRecover = new mutable.HashSet[String] private val executorAttempts = new mutable.HashMap[String, Int] + private var recoveredExecutorCount = 0 override def run() = removeFailedAndRequestNewExecutors() - val MAX_ATTEMPTS = 5 - def removeFailedAndRequestNewExecutors(): Unit = { val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorsToPods.toMap @@ -407,38 +415,42 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI case Some(pod) => localFailedPods.get(pod.getMetadata.getName) match { case Some(executorExited: ExecutorExited) => - removeExecutor(executorId, executorExited) logDebug(s"Removing executor $executorId with loss reason " + executorExited.message) + removeExecutor(executorId, executorExited) if (!executorExited.exitCausedByApp) { - removedExecutors.add(executorId) + executorsToRecover.add(executorId) } case None => - val checkedAttempts = executorAttempts.getOrElse(executorId, 0) - executorAttempts.put(executorId, checkedAttempts + 1) + removeExecutorOrIncrementLossReasonCheckCount(executorId) } case None => - val checkedAttempts = executorAttempts.getOrElse(executorId, 0) - executorAttempts.put(executorId, checkedAttempts + 1) + removeExecutorOrIncrementLossReasonCheckCount(executorId) } } - for ((executorId, attempts) <- executorAttempts) { - if (attempts >= MAX_ATTEMPTS) { - removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons")) - removedExecutors.add(executorId) - } - } - removedExecutors.foreach(executorId => + executorsToRecover.foreach(executorId => EXECUTORS_TO_REMOVE_LOCK.synchronized { executorsToRemove -= executorId executorAttempts -= executorId } ) - if (removedExecutors.nonEmpty) { - requestExecutors(removedExecutors.size) + if (executorsToRecover.nonEmpty && + recoveredExecutorCount < MAX_ALLOWED_EXECUTOR_RECOVERY_ATTEMPTS) { + requestExecutors(executorsToRecover.size) + recoveredExecutorCount += executorsToRecover.size + } + executorsToRecover.clear() + } + + def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = { + val reasonCheckCount = executorAttempts.getOrElse(executorId, 0) + if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) { + removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons")) + executorsToRecover.add(executorId) + } else { + executorAttempts.put(executorId, reasonCheckCount + 1) } - removedExecutors.clear() } } } @@ -452,7 +464,8 @@ private object KubernetesClusterSchedulerBackend { val PMEM_EXCEEDED_EXIT_CODE = -104 def memLimitExceededLogMessage(diagnostics: String): String = { - s"Container killed by YARN for exceeding memory limits.$diagnostics" + - " Consider boosting spark.yarn.executor.memoryOverhead." + s"Pod/Container killed for exceeding memory limits.$diagnostics" + + " Consider boosting spark executor memory overhead." } } + From 4dcb1b3364f3a6909d3fb3bb123f0ee8229d37ba Mon Sep 17 00:00:00 2001 From: Varun Date: Fri, 5 May 2017 13:18:05 -0700 Subject: [PATCH 03/12] Style changes and removed inocrrectly merged code --- .../KubernetesClusterSchedulerBackend.scala | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 66e8541ef3558..4017ff8d41fcd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -56,11 +56,6 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI private val blockmanagerPort = conf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val kubernetesDriverServiceName = conf - .get(KUBERNETES_DRIVER_SERVICE_NAME) - .getOrElse( - throw new SparkException("Must specify the service name the driver is running with")) - private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) .getOrElse( @@ -162,11 +157,6 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI } catch { case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) } - try { - kubernetesClient.services().withName(kubernetesDriverServiceName).delete() - } catch { - case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) - } try { logInfo("Closing kubernetes client") kubernetesClient.close() @@ -344,20 +334,21 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI val containerExitStatus = getContainerExitStatus(pod) // container was probably actively killed by the driver. val exitReason = if (alreadyReleased) { - ExecutorExited(containerExitStatus, exitCausedByApp = false, - s"Container in pod " + pod.getMetadata.getName + - " exited from explicit termination request.") - } else { - val containerExitReason = containerExitStatus match { - case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE => - memLimitExceededLogMessage(pod.getStatus.getReason) - case _ => - // Here we can't be sure that that exit was caused by the application but this seems to - // be the right default since we know the pod was not explicitly deleted by the user. - "Pod exited with following container exit status code " + containerExitStatus + ExecutorExited(containerExitStatus, exitCausedByApp = false, + s"Container in pod " + pod.getMetadata.getName + + " exited from explicit termination request.") + } else { + val containerExitReason = containerExitStatus match { + case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE => + memLimitExceededLogMessage(pod.getStatus.getReason) + case _ => + // Here we can't be sure that that exit was caused by the application but this seems + // to be the right default since we know the pod was not explicitly deleted by + // the user. + "Pod exited with following container exit status code " + containerExitStatus + } + ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) } - ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) - } FAILED_PODS_LOCK.synchronized { failedPods.put(pod.getMetadata.getName, exitReason) } From 01e8ec709c58739a8bb1e47ffa8be8135491a77a Mon Sep 17 00:00:00 2001 From: Varun Date: Fri, 12 May 2017 16:14:23 -0700 Subject: [PATCH 04/12] addressed latest review comments --- .../KubernetesClusterSchedulerBackend.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index aa2f0f601eeb2..99b9ba8d18db5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -20,15 +20,15 @@ import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action - import org.apache.spark.{SparkContext, SparkException} + import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} @@ -36,8 +36,9 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} -private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerImpl, - val sc: SparkContext) +private[spark] class KubernetesClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + val sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { import KubernetesClusterSchedulerBackend._ @@ -337,9 +338,9 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI } def getContainerExitStatus(pod: Pod): Int = { - val containerStatuses = pod.getStatus.getContainerStatuses.asScala - for (containerStatus <- containerStatuses) { - return getContainerExitStatus(containerStatus) + val containerStatuses = pod.getStatus.getContainerStatuses + if (!containerStatuses.isEmpty) { + return getContainerExitStatus(containerStatuses.get(0)) } DEFAULT_CONTAINER_FAILURE_EXIT_STATUS } @@ -385,7 +386,7 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI def handleDeletedPod(pod: Pod): Unit = { val exitReason = ExecutorExited(getContainerExitStatus(pod), exitCausedByApp = false, - "Pod " + pod.getMetadata.getName + " deleted by K8s master") + "Pod " + pod.getMetadata.getName + " deleted or lost.") FAILED_PODS_LOCK.synchronized { failedPods.put(pod.getMetadata.getName, exitReason) } @@ -414,6 +415,8 @@ private[spark] class KubernetesClusterSchedulerBackend(scheduler: TaskSchedulerI private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10 private val executorsToRecover = new mutable.HashSet[String] + // Maintains a map of executor id to count of checks performed to learn the loss reason + // for an executor. private val executorReasonChecks = new mutable.HashMap[String, Int] override def run(): Unit = removeFailedAndRequestNewExecutors() @@ -479,7 +482,7 @@ private object KubernetesClusterSchedulerBackend { private val PMEM_EXCEEDED_EXIT_CODE = -104 def memLimitExceededLogMessage(diagnostics: String): String = { - s"Pod/Container killed for exceeding memory limits.$diagnostics" + + s"Pod/Container killed for exceeding memory limits. $diagnostics" + " Consider boosting spark executor memory overhead." } } From 5e1a1436a51c79ca54b549c22fd4312ce296d376 Mon Sep 17 00:00:00 2001 From: Varun Date: Mon, 15 May 2017 11:13:38 -0700 Subject: [PATCH 05/12] changed import order --- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 99b9ba8d18db5..b132fe8f666d7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -27,8 +27,8 @@ import scala.concurrent.{ExecutionContext, Future} import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} From 608b08be2af75969e944ee4866ea4340512fb2b8 Mon Sep 17 00:00:00 2001 From: Varun Date: Wed, 21 Jun 2017 06:22:54 -0700 Subject: [PATCH 06/12] Minor changes to avoid exceptions when exit code is missing --- .../KubernetesClusterSchedulerBackend.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 4d2e37052e096..47177ea42cb10 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, ContainerStatus, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils @@ -32,6 +32,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} + import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -212,7 +213,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } def removeFailedExecutors(): Unit = { - removeFailedExecutors() val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorsToPods.toMap } @@ -626,7 +626,13 @@ private[spark] class KubernetesClusterSchedulerBackend( } def getContainerExitStatus(containerStatus: ContainerStatus): Int = { - containerStatus.getState.getTerminated.getExitCode.intValue + containerStatus.getState match { + case null => UNKNOWN_EXIT_CODE + case _ => containerStatus.getState.getTerminated match { + case null => UNKNOWN_EXIT_CODE + case _ => containerStatus.getState.getTerminated.getExitCode.intValue() + } + } } def handleErroredPod(pod: Pod): Unit = { @@ -744,6 +750,7 @@ private object KubernetesClusterSchedulerBackend { private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) private val VMEM_EXCEEDED_EXIT_CODE = -103 private val PMEM_EXCEEDED_EXIT_CODE = -104 + private val UNKNOWN_EXIT_CODE = -111 def memLimitExceededLogMessage(diagnostics: String): String = { s"Pod/Container killed for exceeding memory limits. $diagnostics" + From 99b338dd1ac3727569fcc13915a2359d0861c48c Mon Sep 17 00:00:00 2001 From: Varun Date: Wed, 21 Jun 2017 10:00:28 -0700 Subject: [PATCH 07/12] fixed style check --- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 3180cd9cbbcaa..d003402da74cd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -32,7 +32,6 @@ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} - import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ From 2bc0ff4d2b571b8cf32a54d7b43ce7270b866435 Mon Sep 17 00:00:00 2001 From: Varun Date: Mon, 17 Jul 2017 18:55:38 -0700 Subject: [PATCH 08/12] Addressed review comments from Yinan LiAddressed review comments from Yinan Li.. --- .../KubernetesClusterSchedulerBackend.scala | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 565311438d429..b81c6d6980ea8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -18,18 +18,19 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable import java.net.InetAddress -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils -import scala.collection.mutable -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} @@ -53,12 +54,17 @@ private[spark] class KubernetesClusterSchedulerBackend( import KubernetesClusterSchedulerBackend._ private val RUNNING_EXECUTOR_PODS_LOCK = new Object - private val runningExecutorsToPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. - private val runningPodsToExecutors = new mutable.HashMap[Pod, String] // Indexed by executor Pods. + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + // TODO(varun): Get rid of this lock object by my making the underlying map a concurrent hash map. private val EXECUTOR_PODS_BY_IPS_LOCK = new Object - private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. - private val FAILED_PODS_LOCK = new Object - private val failedPods = new mutable.HashMap[String, ExecutorLossReason] // Indexed by pod names. + // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK + private val executorPodsByIPs = new mutable.HashMap[String, Pod] + // Indexed by pod names and guarded by FAILED_PODS_LOCK. + private val failedPods: concurrent.Map[String, ExecutorLossReason] = new + ConcurrentHashMap[String, ExecutorLossReason]().asScala private val EXECUTORS_TO_REMOVE_LOCK = new Object private val executorsToRemove = new mutable.HashSet[String] @@ -198,16 +204,18 @@ private[spark] class KubernetesClusterSchedulerBackend( override def run(): Unit = { removeFailedExecutors() - if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) { - logDebug("Waiting for pending executors before scaling") - } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) { - logDebug("Maximum allowed executor limit reached. Not scaling up further.") - } else { - val nodeToLocalTaskCount = getNodesWithLocalTaskCounts - RUNNING_EXECUTOR_PODS_LOCK.synchronized { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) { + logDebug("Waiting for pending executors before scaling") + } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) { + logDebug("Maximum allowed executor limit reached. Not scaling up further.") + } else { + val nodeToLocalTaskCount = getNodesWithLocalTaskCounts for (i <- 0 until math.min( totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { - runningExecutorsToPods += allocateNewExecutorPod(nodeToLocalTaskCount) + val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount) + runningExecutorsToPods.put(executorId, pod) + runningPodsToExecutors.put(pod.getMetadata.getName, executorId) logInfo( s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") } @@ -219,16 +227,13 @@ private[spark] class KubernetesClusterSchedulerBackend( val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorsToPods.toMap } - val localFailedPods = FAILED_PODS_LOCK.synchronized { - failedPods.toMap - } val localExecutorsToRemove = EXECUTORS_TO_REMOVE_LOCK.synchronized { executorsToRemove.toSet } localExecutorsToRemove.foreach { case (executorId) => localRunningExecutorsToPods.get(executorId) match { case Some(pod) => - localFailedPods.get(pod.getMetadata.getName) match { + failedPods.get(pod.getMetadata.getName) match { case Some(executorExited: ExecutorExited) => logDebug(s"Removing executor $executorId with loss reason " + executorExited.message) @@ -246,13 +251,13 @@ private[spark] class KubernetesClusterSchedulerBackend( executorsToRecover.foreach(executorId => { EXECUTORS_TO_REMOVE_LOCK.synchronized { executorsToRemove -= executorId - executorReasonChecks -= executorId } + executorReasonChecks -= executorId RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorsToPods.remove(executorId) match { case Some(pod) => kubernetesClient.pods().delete(pod) - runningPodsToExecutors.remove(pod) + runningPodsToExecutors.remove(pod.getMetadata.getName) case None => logWarning(s"Unable to remove pod for unknown executor $executorId") } } @@ -342,6 +347,7 @@ private[spark] class KubernetesClusterSchedulerBackend( try { RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_)) + runningExecutorsToPods.clear() runningPodsToExecutors.clear() } EXECUTOR_PODS_BY_IPS_LOCK.synchronized { @@ -360,7 +366,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } catch { case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) } - super.stop() } /** @@ -582,7 +587,7 @@ private[spark] class KubernetesClusterSchedulerBackend( runningExecutorsToPods.remove(executor) match { case Some(pod) => kubernetesClient.pods().delete(pod) - runningPodsToExecutors.remove(pod) + runningPodsToExecutors.remove(pod.getMetadata.getName) case None => logWarning(s"Unable to remove pod for unknown executor $executor") } } @@ -635,15 +640,15 @@ private[spark] class KubernetesClusterSchedulerBackend( logDebug("Executor pod watch closed.", cause) } - def getContainerExitStatus(pod: Pod): Int = { + def getExecutorExitStatus(pod: Pod): Int = { val containerStatuses = pod.getStatus.getContainerStatuses if (!containerStatuses.isEmpty) { - return getContainerExitStatus(containerStatuses.get(0)) + return getExecutorExitStatus(containerStatuses.get(0)) } DEFAULT_CONTAINER_FAILURE_EXIT_STATUS } - def getContainerExitStatus(containerStatus: ContainerStatus): Int = { + def getExecutorExitStatus(containerStatus: ContainerStatus): Int = { containerStatus.getState match { case null => UNKNOWN_EXIT_CODE case _ => containerStatus.getState.getTerminated match { @@ -657,7 +662,7 @@ private[spark] class KubernetesClusterSchedulerBackend( def isPodAlreadyReleased(pod: Pod): Boolean = { RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningPodsToExecutors.keySet.foreach(runningPod => - if (runningPod.getMetadata.getName == pod.getMetadata.getName) { + if (runningPod == pod.getMetadata.getName) { return false } ) @@ -665,7 +670,7 @@ private[spark] class KubernetesClusterSchedulerBackend( true } val alreadyReleased = isPodAlreadyReleased(pod) - val containerExitStatus = getContainerExitStatus(pod) + val containerExitStatus = getExecutorExitStatus(pod) // container was probably actively killed by the driver. val exitReason = if (alreadyReleased) { ExecutorExited(containerExitStatus, exitCausedByApp = false, @@ -683,17 +688,13 @@ private[spark] class KubernetesClusterSchedulerBackend( } ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) } - FAILED_PODS_LOCK.synchronized { failedPods.put(pod.getMetadata.getName, exitReason) - } } def handleDeletedPod(pod: Pod): Unit = { - val exitReason = ExecutorExited(getContainerExitStatus(pod), exitCausedByApp = false, + val exitReason = ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, "Pod " + pod.getMetadata.getName + " deleted or lost.") - FAILED_PODS_LOCK.synchronized { failedPods.put(pod.getMetadata.getName, exitReason) - } } } From b5bd8d1c6ca63f31f2b9573f1f167b1a0cf59b31 Mon Sep 17 00:00:00 2001 From: Varun Date: Tue, 18 Jul 2017 06:31:49 -0700 Subject: [PATCH 09/12] Addressed comments and got rid of an explicit lock object. --- .../KubernetesClusterSchedulerBackend.scala | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 0f6ced70b9a10..57b72dc953e7b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable import java.net.InetAddress +import java.util.Collections import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} @@ -27,12 +28,13 @@ import scala.concurrent.{ExecutionContext, Future} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, ContainerStatus, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import java.{lang, util} import org.apache.commons.io.FilenameUtils - import org.apache.spark.{SparkContext, SparkEnv, SparkException} + import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -41,8 +43,9 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, +SparkAppConfig} import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( @@ -64,11 +67,11 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_PODS_BY_IPS_LOCK = new Object // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK private val executorPodsByIPs = new mutable.HashMap[String, Pod] - // Indexed by pod names and guarded by FAILED_PODS_LOCK. private val failedPods: concurrent.Map[String, ExecutorLossReason] = new ConcurrentHashMap[String, ExecutorLossReason]().asScala - private val EXECUTORS_TO_REMOVE_LOCK = new Object - private val executorsToRemove = new mutable.HashSet[String] + private val _executorsToRemoveMap: util.Map[String, lang.Boolean] = + new ConcurrentHashMap[String, lang.Boolean]() + private val executorsToRemove = Collections.newSetFromMap[String](_executorsToRemoveMap).asScala private val executorExtraClasspath = conf.get( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) @@ -140,7 +143,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val parsedShuffleLabels = ConfigurationUtils.parseKeyValuePairs( conf.get(KUBERNETES_SHUFFLE_LABELS), KUBERNETES_SHUFFLE_LABELS.key, "shuffle-labels") - if (parsedShuffleLabels.size == 0) { + if (parsedShuffleLabels.isEmpty) { throw new SparkException(s"Dynamic allocation enabled " + s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") } @@ -229,10 +232,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorsToPods.toMap } - val localExecutorsToRemove = EXECUTORS_TO_REMOVE_LOCK.synchronized { - executorsToRemove.toSet - } - localExecutorsToRemove.foreach { case (executorId) => + executorsToRemove.foreach { case (executorId) => localRunningExecutorsToPods.get(executorId) match { case Some(pod) => failedPods.get(pod.getMetadata.getName) match { @@ -251,9 +251,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } } executorsToRecover.foreach(executorId => { - EXECUTORS_TO_REMOVE_LOCK.synchronized { - executorsToRemove -= executorId - } + executorsToRemove -= executorId executorReasonChecks -= executorId RUNNING_EXECUTOR_PODS_LOCK.synchronized { runningExecutorsToPods.remove(executorId) match { @@ -686,13 +684,8 @@ private[spark] class KubernetesClusterSchedulerBackend( def handleErroredPod(pod: Pod): Unit = { def isPodAlreadyReleased(pod: Pod): Boolean = { RUNNING_EXECUTOR_PODS_LOCK.synchronized { - runningPodsToExecutors.keySet.foreach(runningPod => - if (runningPod == pod.getMetadata.getName) { - return false - } - ) + !runningPodsToExecutors.contains(pod.getMetadata.getName) } - true } val alreadyReleased = isPodAlreadyReleased(pod) val containerExitStatus = getExecutorExitStatus(pod) @@ -736,9 +729,7 @@ private[spark] class KubernetesClusterSchedulerBackend( override def onDisconnected(rpcAddress: RpcAddress): Unit = { addressToExecutorId.get(rpcAddress).foreach { executorId => if (disableExecutor(executorId)) { - EXECUTORS_TO_REMOVE_LOCK.synchronized { executorsToRemove.add(executorId) - } } } } From 1e2e49f60c3f64146c7df90dd0793c907bf96ad9 Mon Sep 17 00:00:00 2001 From: Varun Date: Tue, 18 Jul 2017 06:36:51 -0700 Subject: [PATCH 10/12] Fixed imports order. --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 57b72dc953e7b..2dd2753a29bb9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -33,8 +33,8 @@ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException import io.fabric8.kubernetes.client.Watcher.Action import java.{lang, util} import org.apache.commons.io.FilenameUtils -import org.apache.spark.{SparkContext, SparkEnv, SparkException} +import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -43,9 +43,9 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( From 4e7549177819201381f956cc6b5a972bff46d50f Mon Sep 17 00:00:00 2001 From: Varun Date: Fri, 21 Jul 2017 10:39:23 -0700 Subject: [PATCH 11/12] Addressed review comments from Matt --- .../KubernetesClusterSchedulerBackend.scala | 96 ++++++++----------- 1 file changed, 42 insertions(+), 54 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 7bb97d1b82e06..777c7834bdefb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -31,7 +31,6 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action -import java.{lang, util} import org.apache.commons.io.FilenameUtils import org.apache.spark.{SparkContext, SparkEnv, SparkException} @@ -42,9 +41,8 @@ import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} -import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, -SparkAppConfig} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} @@ -67,11 +65,10 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_PODS_BY_IPS_LOCK = new Object // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK private val executorPodsByIPs = new mutable.HashMap[String, Pod] - private val failedPods: concurrent.Map[String, ExecutorLossReason] = new - ConcurrentHashMap[String, ExecutorLossReason]().asScala - private val _executorsToRemoveMap: util.Map[String, lang.Boolean] = - new ConcurrentHashMap[String, lang.Boolean]() - private val executorsToRemove = Collections.newSetFromMap[String](_executorsToRemoveMap).asScala + private val failedPods: concurrent.Map[String, ExecutorExited] = new + ConcurrentHashMap[String, ExecutorExited]().asScala + private val executorsToRemove = Collections.newSetFromMap[String]( + new ConcurrentHashMap[String, java.lang.Boolean]()).asScala private val executorExtraClasspath = conf.get( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) @@ -173,7 +170,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - override val minRegisteredRatio: Double = + override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 } else { @@ -206,6 +203,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val allocatorRunnable: Runnable = new Runnable { + // Number of times we are allowed check for the loss reason for an executor before we give up + // and assume the executor failed for good, and attribute it to a framework fault. private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10 private val executorsToRecover = new mutable.HashSet[String] // Maintains a map of executor id to count of checks performed to learn the loss reason @@ -238,36 +237,28 @@ private[spark] class KubernetesClusterSchedulerBackend( runningExecutorsToPods.toMap } executorsToRemove.foreach { case (executorId) => - localRunningExecutorsToPods.get(executorId) match { - case Some(pod) => - failedPods.get(pod.getMetadata.getName) match { - case Some(executorExited: ExecutorExited) => - logDebug(s"Removing executor $executorId with loss reason " - + executorExited.message) - removeExecutor(executorId, executorExited) - if (!executorExited.exitCausedByApp) { - executorsToRecover.add(executorId) - } - case None => - removeExecutorOrIncrementLossReasonCheckCount(executorId) + localRunningExecutorsToPods.get(executorId).map { pod: Pod => + failedPods.get(pod.getMetadata.getName).map { executorExited: ExecutorExited => + logDebug(s"Removing executor $executorId with loss reason " + executorExited.message) + removeExecutor(executorId, executorExited) + if (!executorExited.exitCausedByApp) { + executorsToRecover.add(executorId) } - case None => - removeExecutorOrIncrementLossReasonCheckCount(executorId) - } - } - executorsToRecover.foreach(executorId => { - executorsToRemove -= executorId - executorReasonChecks -= executorId - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - runningExecutorsToPods.remove(executorId) match { - case Some(pod) => + }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId)) + }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId)) + + executorsToRecover.foreach(executorId => { + executorsToRemove -= executorId + executorReasonChecks -= executorId + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.remove(executorId).map { pod: Pod => kubernetesClient.pods().delete(pod) runningPodsToExecutors.remove(pod.getMetadata.getName) - case None => logWarning(s"Unable to remove pod for unknown executor $executorId") + }.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId")) } - } - }) - executorsToRecover.clear() + }) + executorsToRecover.clear() + } } def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = { @@ -635,7 +626,6 @@ private[spark] class KubernetesClusterSchedulerBackend( private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 override def eventReceived(action: Action, pod: Pod): Unit = { - if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) { val podIP = pod.getStatus.getPodIP @@ -657,8 +647,7 @@ private[spark] class KubernetesClusterSchedulerBackend( if (action == Action.ERROR) { logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) handleErroredPod(pod) - } - else if (action == Action.DELETED) { + } else if (action == Action.DELETED) { logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason) handleDeletedPod(pod) } @@ -672,27 +661,27 @@ private[spark] class KubernetesClusterSchedulerBackend( def getExecutorExitStatus(pod: Pod): Int = { val containerStatuses = pod.getStatus.getContainerStatuses if (!containerStatuses.isEmpty) { - return getExecutorExitStatus(containerStatuses.get(0)) - } - DEFAULT_CONTAINER_FAILURE_EXIT_STATUS + // we assume the first container represents the pod status. This assumption may not hold + // true in the future. Revisit this if side-car containers start running inside executor + // pods. + getExecutorExitStatus(containerStatuses.get(0)) + } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS } def getExecutorExitStatus(containerStatus: ContainerStatus): Int = { - containerStatus.getState match { - case null => UNKNOWN_EXIT_CODE - case _ => containerStatus.getState.getTerminated match { - case null => UNKNOWN_EXIT_CODE - case _ => containerStatus.getState.getTerminated.getExitCode.intValue() - } + Option(containerStatus.getState).map(containerState => + Option(containerState.getTerminated).map(containerStateTerminated => + containerStateTerminated.getExitCode.intValue()).getOrElse(UNKNOWN_EXIT_CODE) + ).getOrElse(UNKNOWN_EXIT_CODE) + } + + def isPodAlreadyReleased(pod: Pod): Boolean = { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + !runningPodsToExecutors.contains(pod.getMetadata.getName) } } def handleErroredPod(pod: Pod): Unit = { - def isPodAlreadyReleased(pod: Pod): Boolean = { - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - !runningPodsToExecutors.contains(pod.getMetadata.getName) - } - } val alreadyReleased = isPodAlreadyReleased(pod) val containerExitStatus = getExecutorExitStatus(pod) // container was probably actively killed by the driver. @@ -780,7 +769,6 @@ private[spark] class KubernetesClusterSchedulerBackend( }.orElse(super.receiveAndReply(context)) } } - } case class ShuffleServiceConfig( shuffleNamespace: String, From 8acefef41f1b6f7884e0a03bf6b33ed1a1f83799 Mon Sep 17 00:00:00 2001 From: Varun Date: Fri, 21 Jul 2017 11:19:19 -0700 Subject: [PATCH 12/12] Couple of style fixes --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 777c7834bdefb..990a8a6ef093b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -627,7 +627,7 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" - && pod.getMetadata.getDeletionTimestamp == null) { + && pod.getMetadata.getDeletionTimestamp == null) { val podIP = pod.getStatus.getPodIP val clusterNodeName = pod.getSpec.getNodeName logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") @@ -635,7 +635,7 @@ private[spark] class KubernetesClusterSchedulerBackend( executorPodsByIPs += ((podIP, pod)) } } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || - action == Action.DELETED || action == Action.ERROR) { + action == Action.DELETED || action == Action.ERROR) { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP logDebug(s"Executor pod $podName at IP $podIP was at $action.")