From 034243435cf4dda1ba1493fcfe8c7d04fe4f877e Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Thu, 8 Feb 2018 14:22:46 -0800 Subject: [PATCH 1/5] [SPARK-23285][K8S] Add a config property for specifying physical executor cores As discussed in SPARK-23285, this PR introduces a new configuation property `spark.kubernetes.executor.cores` for specifying the phyiscal CPU cores requested for each executor pod. This is to avoid changing the semantics of `spark.executor.cores` and `spark.task.cpus` and their role in task scheduling, task parallelism, dynamic resource allocation, etc. The new configuraiton property only determines the physical CPU cores available to an executor. An executor can still run multiple tasks simultaneously by using appropriate values for `spark.executor.cores` and `spark.task.cpus`. --- docs/running-on-kubernetes.md | 11 ++++++-- .../org/apache/spark/deploy/k8s/Config.scala | 6 +++++ .../cluster/k8s/ExecutorPodFactory.scala | 11 +++++--- .../cluster/k8s/ExecutorPodFactorySuite.scala | 27 +++++++++++++++++++ 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 975b28de47e2..40dd7c6b6f68 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -549,14 +549,21 @@ specific to Spark on Kubernetes. spark.kubernetes.driver.limit.cores (none) - Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod. + Specify a hard [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) on the amount of CPU cores for the driver pod. + + + + spark.kubernetes.executor.cores + (none) + + Specify the amount of CPU cores to request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). spark.kubernetes.executor.limit.cores (none) - Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application. + Specify a hard [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) on the amount of CPU cores for each executor pod launched for the Spark Application. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index da34a7e06238..20cf95077696 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -91,6 +91,12 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_CORES = + ConfigBuilder("spark.kubernetes.executor.cores") + .doc("Specify the CPU core request for each executor pod") + .stringConf + .createOptional + val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name") .doc("Name of the driver pod.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 98cbd5607da0..bba63d2aca25 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -83,7 +83,12 @@ private[spark] class ExecutorPodFactory( MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB - private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) + private val executorCores = sparkConf.getInt("spark.executor.cores", 1) + private val kubernetesExecutorCores = if (sparkConf.contains(KUBERNETES_EXECUTOR_CORES)) { + sparkConf.get(KUBERNETES_EXECUTOR_CORES).get + } else { + executorCores.toString + } private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) /** @@ -114,7 +119,7 @@ private[spark] class ExecutorPodFactory( .withAmount(s"${executorMemoryWithOverhead}Mi") .build() val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(executorCores.toString) + .withAmount(kubernetesExecutorCores) .build() val executorExtraClasspathEnv = executorExtraClasspath.map { cp => new EnvVarBuilder() @@ -134,7 +139,7 @@ private[spark] class ExecutorPodFactory( val executorEnv = (Seq( (ENV_DRIVER_URL, driverUrl), // Executor backend expects integral value for executor cores, so round it up to an int. - (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), + (ENV_EXECUTOR_CORES, executorCores.toString), (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId), // This is to set the SPARK_CONF_DIR to be /opt/spark/conf diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 7755b9383504..1dc46d14deff 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -83,6 +83,33 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) } + test("executor core request specification") { + var factory = new ExecutorPodFactory(baseConf, None, None, None) + var executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount + === "1") + + val conf = baseConf.clone() + + conf.set(KUBERNETES_EXECUTOR_CORES, "0.1") + factory = new ExecutorPodFactory(conf, None, None, None) + executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount + === "0.1") + + conf.set(KUBERNETES_EXECUTOR_CORES, "100m") + factory = new ExecutorPodFactory(conf, None, None, None) + conf.set(KUBERNETES_EXECUTOR_CORES, "100m") + executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount + === "100m") + } + test("executor pod hostnames get truncated to 63 characters") { val conf = baseConf.clone() conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, From d4b59c6040766d36764cfc7b9c6f5082cab5a9fa Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Sun, 18 Feb 2018 16:23:34 -0800 Subject: [PATCH 2/5] Fixed config property description --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 20cf95077696..fe85b0a20497 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -93,7 +93,7 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_CORES = ConfigBuilder("spark.kubernetes.executor.cores") - .doc("Specify the CPU core request for each executor pod") + .doc("Specify the cpu request for each executor pod") .stringConf .createOptional From 8ac5d5eeff6a7d709daa7aebcfd58daa9fcba32d Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Thu, 22 Feb 2018 21:18:23 -0800 Subject: [PATCH 3/5] Addressed comments --- docs/running-on-kubernetes.md | 8 ++++---- .../spark/scheduler/cluster/k8s/ExecutorPodFactory.scala | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 40dd7c6b6f68..d274b075a932 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -549,21 +549,21 @@ specific to Spark on Kubernetes. spark.kubernetes.driver.limit.cores (none) - Specify a hard [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) on the amount of CPU cores for the driver pod. + Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod. spark.kubernetes.executor.cores (none) - Specify the amount of CPU cores to request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). + Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). Takes precendence over spark.executor.cores if set. spark.kubernetes.executor.limit.cores (none) - Specify a hard [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) on the amount of CPU cores for each executor pod launched for the Spark Application. + Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application. @@ -600,4 +600,4 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.secrets.spark-secret=/etc/secrets. - \ No newline at end of file + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index bba63d2aca25..61135e38004b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -138,7 +138,6 @@ private[spark] class ExecutorPodFactory( }.getOrElse(Seq.empty[EnvVar]) val executorEnv = (Seq( (ENV_DRIVER_URL, driverUrl), - // Executor backend expects integral value for executor cores, so round it up to an int. (ENV_EXECUTOR_CORES, executorCores.toString), (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId), From 761fc5378a8d9cb5eccd11eb26849b6a76479ef7 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Fri, 30 Mar 2018 13:39:03 -0700 Subject: [PATCH 4/5] Renamed the new configuration property --- docs/running-on-kubernetes.md | 6 ++++-- .../scala/org/apache/spark/deploy/k8s/Config.scala | 4 ++-- .../scheduler/cluster/k8s/ExecutorPodFactory.scala | 6 +++--- .../cluster/k8s/ExecutorPodFactorySuite.scala | 12 ++++++------ 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index d274b075a932..7d3c0f7e1dd8 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -553,10 +553,12 @@ specific to Spark on Kubernetes. - spark.kubernetes.executor.cores + spark.kubernetes.executor.request.cores (none) - Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). Takes precendence over spark.executor.cores if set. + Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). + This is distinct from spark.executor.cores and is only used for specifying executor pod cpu request if set. Task parallelism, e.g., number of tasks an executor can + run concurrently is not affected by this. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index fe85b0a20497..405ea476351b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -91,8 +91,8 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_EXECUTOR_CORES = - ConfigBuilder("spark.kubernetes.executor.cores") + val KUBERNETES_EXECUTOR_REQUEST_CORES = + ConfigBuilder("spark.kubernetes.executor.request.cores") .doc("Specify the cpu request for each executor pod") .stringConf .createOptional diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 61135e38004b..44d4d4d243f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -84,8 +84,8 @@ private[spark] class ExecutorPodFactory( private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB private val executorCores = sparkConf.getInt("spark.executor.cores", 1) - private val kubernetesExecutorCores = if (sparkConf.contains(KUBERNETES_EXECUTOR_CORES)) { - sparkConf.get(KUBERNETES_EXECUTOR_CORES).get + private val executorCoresRequest = if (sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) { + sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get } else { executorCores.toString } @@ -119,7 +119,7 @@ private[spark] class ExecutorPodFactory( .withAmount(s"${executorMemoryWithOverhead}Mi") .build() val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(kubernetesExecutorCores) + .withAmount(executorCoresRequest) .build() val executorExtraClasspathEnv = executorExtraClasspath.map { cp => new EnvVarBuilder() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 1dc46d14deff..4922baff51dd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -84,7 +84,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef } test("executor core request specification") { - var factory = new ExecutorPodFactory(baseConf, None, None, None) + var factory = new ExecutorPodFactory(baseConf, None) var executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getContainers.size() === 1) @@ -93,17 +93,17 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef val conf = baseConf.clone() - conf.set(KUBERNETES_EXECUTOR_CORES, "0.1") - factory = new ExecutorPodFactory(conf, None, None, None) + conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "0.1") + factory = new ExecutorPodFactory(conf, None) executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount === "0.1") - conf.set(KUBERNETES_EXECUTOR_CORES, "100m") - factory = new ExecutorPodFactory(conf, None, None, None) - conf.set(KUBERNETES_EXECUTOR_CORES, "100m") + conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m") + factory = new ExecutorPodFactory(conf, None) + conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m") executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount From a9db32369bcac7e9316c9a37d98927ebb567c9d7 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Mon, 2 Apr 2018 11:27:16 -0700 Subject: [PATCH 5/5] Updated documentation of the new property --- docs/running-on-kubernetes.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 7d3c0f7e1dd8..9c4644947c91 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -557,9 +557,9 @@ specific to Spark on Kubernetes. (none) Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). - This is distinct from spark.executor.cores and is only used for specifying executor pod cpu request if set. Task parallelism, e.g., number of tasks an executor can - run concurrently is not affected by this. - + Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in [CPU units](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units). + This is distinct from spark.executor.cores: it is only used and takes precedence over spark.executor.cores for specifying the executor pod cpu request if set. Task + parallelism, e.g., number of tasks an executor can run concurrently is not affected by this. spark.kubernetes.executor.limit.cores