diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 3fb963ea1eefb..043c08bee064d 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -433,8 +433,6 @@ the cluster. When there exists a log collection system, you can expose it at Spark Driver `Executors` tab UI. For example, ``` -spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_APP_ID='$(SPARK_APPLICATION_ID)' -spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID='$(SPARK_EXECUTOR_ID)' spark.ui.custom.executor.log.url='https://log-server/log?appId={{APP_ID}}&execId={{EXECUTOR_ID}}' ``` diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index ead3188aa6494..aee07c096fe58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -72,6 +72,8 @@ object Constants { val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP" val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME" + val ENV_EXECUTOR_ATTRIBUTE_APP_ID = "SPARK_EXECUTOR_ATTRIBUTE_APP_ID" + val ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID = "SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID" val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" val ENV_CLASSPATH = "SPARK_CLASSPATH" val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index fa4a6f43215c3..20050de69f89c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -143,6 +143,14 @@ private[spark] class BasicExecutorFeatureStep( (s"$ENV_JAVA_OPT_PREFIX$index", opt) }.toMap + val attributes = if (kubernetesConf.get(UI.CUSTOM_EXECUTOR_LOG_URL).isDefined) { + Map( + ENV_EXECUTOR_ATTRIBUTE_APP_ID -> kubernetesConf.appId, + ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> kubernetesConf.executorId) + } else { + Map.empty[String, String] + } + KubernetesUtils.buildEnvVars( Seq( ENV_DRIVER_URL -> driverUrl, @@ -153,6 +161,7 @@ private[spark] class BasicExecutorFeatureStep( ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, ENV_EXECUTOR_ID -> kubernetesConf.executorId, ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString) + ++ attributes ++ kubernetesConf.environment ++ sparkAuthSecret ++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index eaf39dd816dca..3ed6d50f689da 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -252,6 +252,18 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { s"/p1/${KubernetesTestConf.APP_ID}/1,/p2/${KubernetesTestConf.APP_ID}/1")) } + test("SPARK-49190: Add SPARK_EXECUTOR_ATTRIBUTE_(APP|EXECUTOR)_ID if CUSTOM_EXECUTOR_LOG_URL" + + " is defined") { + val conf = baseConf.clone() + .set(UI.CUSTOM_EXECUTOR_LOG_URL, "https://custom-executor-log-server/") + val kconf = KubernetesTestConf.createExecutorConf(sparkConf = conf) + val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(conf), defaultProfile) + val executor = step.configurePod(SparkPod.initialPod()) + checkEnv(executor, conf, Map( + ENV_EXECUTOR_ATTRIBUTE_APP_ID -> KubernetesTestConf.APP_ID, + ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> KubernetesTestConf.EXECUTOR_ID)) + } + test("test executor pyspark memory") { baseConf.set("spark.kubernetes.resource.type", "python") baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)