Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,6 @@ the cluster.
When there exists a log collection system, you can expose it at Spark Driver `Executors` tab UI. For example,

```
spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_APP_ID='$(SPARK_APPLICATION_ID)'
spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID='$(SPARK_EXECUTOR_ID)'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this from Apache Spark 4.0.0-preview2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to still mention the feature of spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_* here of using any Kubernetes environment variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's irrelevant to this PR because this PR specifically targets Add SPARK_EXECUTOR_ATTRIBUTE_(APP|EXECUTOR)_ID if CUSTOM_EXECUTOR_LOG_URL is defined.

However, of course, you can make a documentation contribution if you want, @EnricoMi . We will review it separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in #47801

spark.ui.custom.executor.log.url='https://log-server/log?appId={{APP_ID}}&execId={{EXECUTOR_ID}}'
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ object Constants {
val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME"
val ENV_EXECUTOR_ATTRIBUTE_APP_ID = "SPARK_EXECUTOR_ATTRIBUTE_APP_ID"
val ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID = "SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_CLASSPATH = "SPARK_CLASSPATH"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ private[spark] class BasicExecutorFeatureStep(
(s"$ENV_JAVA_OPT_PREFIX$index", opt)
}.toMap

val attributes = if (kubernetesConf.get(UI.CUSTOM_EXECUTOR_LOG_URL).isDefined) {
Map(
ENV_EXECUTOR_ATTRIBUTE_APP_ID -> kubernetesConf.appId,
ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> kubernetesConf.executorId)
} else {
Map.empty[String, String]
}

KubernetesUtils.buildEnvVars(
Seq(
ENV_DRIVER_URL -> driverUrl,
Expand All @@ -153,6 +161,7 @@ private[spark] class BasicExecutorFeatureStep(
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_ID -> kubernetesConf.executorId,
ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString)
++ attributes
++ kubernetesConf.environment
++ sparkAuthSecret
++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
s"/p1/${KubernetesTestConf.APP_ID}/1,/p2/${KubernetesTestConf.APP_ID}/1"))
}

test("SPARK-49190: Add SPARK_EXECUTOR_ATTRIBUTE_(APP|EXECUTOR)_ID if CUSTOM_EXECUTOR_LOG_URL" +
" is defined") {
val conf = baseConf.clone()
.set(UI.CUSTOM_EXECUTOR_LOG_URL, "https://custom-executor-log-server/")
val kconf = KubernetesTestConf.createExecutorConf(sparkConf = conf)
val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(conf), defaultProfile)
val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, conf, Map(
ENV_EXECUTOR_ATTRIBUTE_APP_ID -> KubernetesTestConf.APP_ID,
ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> KubernetesTestConf.EXECUTOR_ID))
}

test("test executor pyspark memory") {
baseConf.set("spark.kubernetes.resource.type", "python")
baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)
Expand Down