diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala b/core/src/main/scala/org/apache/spark/LogUrlHandler.scala
similarity index 90%
rename from core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala
rename to core/src/main/scala/org/apache/spark/LogUrlHandler.scala
index 2202489509fc4..9954405948c41 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala
+++ b/core/src/main/scala/org/apache/spark/LogUrlHandler.scala
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.spark.executor
+package org.apache.spark
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.matching.Regex
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
-private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extends Logging {
- import ExecutorLogUrlHandler._
+
+private[spark] class LogUrlHandler(logUrlPattern: Option[String]) extends Logging {
+ import LogUrlHandler._
private val informedForMissingAttributes = new AtomicBoolean(false)
@@ -83,7 +83,7 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend
allPatterns: Set[String],
allAttributes: Set[String]): Unit = {
if (informedForMissingAttributes.compareAndSet(false, true)) {
- logInfo(log"Fail to renew executor log urls: ${MDC(LogKeys.REASON, reason)}." +
+ logInfo(log"Fail to renew log urls: ${MDC(LogKeys.REASON, reason)}." +
log" Required: ${MDC(LogKeys.REGEX, allPatterns)} / " +
log"available: ${MDC(LogKeys.ATTRIBUTE_MAP, allAttributes)}." +
log" Falling back to show app's original log urls.")
@@ -91,6 +91,6 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend
}
}
-private[spark] object ExecutorLogUrlHandler {
+private[spark] object LogUrlHandler {
val CUSTOM_URL_PATTERN_REGEX: Regex = "\\{\\{([A-Za-z0-9_\\-]+)\\}\\}".r
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
index d86243df7163f..2a175933922ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.history
-import org.apache.spark.SparkConf
-import org.apache.spark.executor.ExecutorLogUrlHandler
+import org.apache.spark.{LogUrlHandler, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.History._
import org.apache.spark.status.AppStatusStore
@@ -40,7 +39,7 @@ private[spark] class HistoryAppStatusStore(
}
}
- private val logUrlHandler = new ExecutorLogUrlHandler(logUrlPattern)
+ private val logUrlHandler = new LogUrlHandler(logUrlPattern)
override def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
val execList = super.executorList(activeOnly)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index fe5a52debdafc..855887347375b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -220,6 +220,18 @@ private[spark] object UI {
.stringConf
.createOptional
+ val CUSTOM_DRIVER_LOG_URL = ConfigBuilder("spark.ui.custom.driver.log.url")
+ .doc("Specifies custom Spark driver log url for supporting external log service instead of " +
+ "using cluster managers' application log urls in the Spark UI. Spark will support " +
+ "some path variables via patterns which can vary on cluster manager. Please check the " +
+ "documentation for your cluster manager to see which patterns are supported, if any. " +
+ "This configuration replaces original log urls in event log, which will be also effective " +
+ "when accessing the application on history server. The new log urls must be permanent, " +
+ "otherwise you might have dead link for executor log urls.")
+ .version("4.1.0")
+ .stringConf
+ .createOptional
+
val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url")
.doc("Specifies custom spark executor log url for supporting external log service instead of " +
"using cluster managers' application log urls in the Spark UI. Spark will support " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4b3a16b4d3f60..6625ce51108ae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,11 +27,10 @@ import scala.concurrent.Future
import com.google.common.cache.CacheBuilder
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.{ExecutorAllocationClient, SparkEnv, TaskState}
+import org.apache.spark.{ExecutorAllocationClient, LogUrlHandler, SparkEnv, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.executor.ExecutorLogUrlHandler
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
@@ -156,7 +155,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.filter { case (k, _) => k.startsWith("spark.") }
.toImmutableArraySeq
- private val logUrlHandler: ExecutorLogUrlHandler = new ExecutorLogUrlHandler(
+ private val logUrlHandler: LogUrlHandler = new LogUrlHandler(
conf.get(UI.CUSTOM_EXECUTOR_LOG_URL))
override def onStart(): Unit = {
diff --git a/docs/configuration.md b/docs/configuration.md
index 9ee7ea2c9317b..4f805b502f9c5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1675,6 +1675,22 @@ Apart from these, the following properties are also available, and may be useful
2.1.0 |
+
+ spark.ui.custom.driver.log.url |
+ (none) |
+
+ Specifies custom Spark driver log URL for supporting external log service instead of using cluster
+ managers' application log URLs in Spark UI. Spark will support some path variables via patterns
+ which can vary on cluster manager. Please check the documentation for your cluster manager to
+ see which patterns are supported, if any.
+ Please note that this configuration also replaces original log urls in event log,
+ which will be also effective when accessing the application on history server. The new log urls must be
+ permanent, otherwise you might have dead link for driver log urls.
+
+ For now, only K8s cluster manager supports this configuration.
+ |
+ 4.1.0 |
+
spark.ui.custom.executor.log.url |
(none) |
@@ -1687,7 +1703,7 @@ Apart from these, the following properties are also available, and may be useful
which will be also effective when accessing the application on history server. The new log urls must be
permanent, otherwise you might have dead link for executor log urls.
- For now, only YARN and K8s cluster manager supports this configuration
+ For now, only YARN and K8s cluster manager supports this configuration.
3.0.0 |
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 9b06524cd3664..3a6ea19559e90 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -433,14 +433,18 @@ the cluster.
When there exists a log collection system, you can expose it at Spark Driver `Executors` tab UI. For example,
```
-spark.ui.custom.executor.log.url='https://log-server/log?appId={{APP_ID}}&execId={{EXECUTOR_ID}}'
+spark.ui.custom.driver.log.url='https://log-server/driverLog?appId={{APP_ID}}&namespace={{KUBERNETES_NAMESPACE}}&podName={{KUBERNETES_POD_NAME}}'
+spark.ui.custom.executor.log.url='https://log-server/executorLog?appId={{APP_ID}}&execId={{EXECUTOR_ID}}'
```
-You can add additional custom variables to this url template, populated with the values of existing executor environment variables like
+You can add additional custom variables to these url templates, populated with the values of existing driver and executor environment variables like
```
+spark.driverEnv.SPARK_DRIVER_ATTRIBUTE_YOUR_VAR='$(EXISTING_DRIVER_ENV_VAR)'
spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_YOUR_VAR='$(EXISTING_EXECUTOR_ENV_VAR)'
-spark.ui.custom.executor.log.url='https://log-server/log?appId={{APP_ID}}&execId={{EXECUTOR_ID}}&your_var={{YOUR_VAR}}'
+
+spark.ui.custom.driver.log.url='https://log-server/driverLog?appId={{APP_ID}}&podName={{KUBRNETES_POD_NAME}}&your_var={{YOUR_VAR}}'
+spark.ui.custom.executor.log.url='https://log-server/executorLog?appId={{APP_ID}}&execId={{EXECUTOR_ID}}&your_var={{YOUR_VAR}}'
```
### Accessing Driver UI
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index aee07c096fe58..c70249a8ecab4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -63,6 +63,9 @@ object Constants {
val UI_PORT_NAME = "spark-ui"
// Environment Variables
+ val ENV_DRIVER_ATTRIBUTE_APP_ID = "SPARK_DRIVER_ATTRIBUTE_APP_ID"
+ val ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE = "SPARK_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE"
+ val ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME = "SPARK_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME"
val ENV_DRIVER_POD_IP = "SPARK_DRIVER_POD_IP"
val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index bd198deed3d5b..b825298269428 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -82,8 +82,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
override def configurePod(pod: SparkPod): SparkPod = {
+ val driverAttributes = conf.get(UI.CUSTOM_DRIVER_LOG_URL).map { _ =>
+ Map(
+ ENV_DRIVER_ATTRIBUTE_APP_ID -> conf.appId,
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> conf.get(KUBERNETES_NAMESPACE),
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> driverPodName
+ )
+ }.getOrElse(Map.empty[String, String])
val driverCustomEnvs = KubernetesUtils.buildEnvVars(
- Seq(ENV_APPLICATION_ID -> conf.appId) ++ conf.environment)
+ Seq(ENV_APPLICATION_ID -> conf.appId) ++ conf.environment ++ driverAttributes)
val driverCpuQuantity = new Quantity(driverCoresRequest)
val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi")
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 09faa2a7fb1b3..2869900e116d1 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.scheduler.cluster.k8s
+import java.util.Locale
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
@@ -26,7 +27,7 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
-import org.apache.spark.SparkContext
+import org.apache.spark.{LogUrlHandler, SparkContext}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
@@ -34,11 +35,10 @@ import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.LogKeys.{COUNT, TOTAL}
import org.apache.spark.internal.MDC
-import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
+import org.apache.spark.internal.config.{SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, UI}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
-import org.apache.spark.scheduler.{ExecutorDecommission, ExecutorDecommissionInfo, ExecutorKilled, ExecutorLossReason,
- TaskSchedulerImpl}
+import org.apache.spark.scheduler.{ExecutorDecommission, ExecutorDecommissionInfo, ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -104,6 +104,20 @@ private[spark] class KubernetesClusterSchedulerBackend(
conf.getOption("spark.app.id").getOrElse(appId)
}
+ def extractAttributes: Map[String, String] = {
+ val prefix = "SPARK_DRIVER_ATTRIBUTE_"
+ sys.env.filter { case (k, _) => k.startsWith(prefix) }
+ .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2))
+ }
+
+ override def getDriverAttributes: Option[Map[String, String]] =
+ Some(Map("LOG_FILES" -> "log") ++ extractAttributes)
+
+ override def getDriverLogUrls: Option[Map[String, String]] = {
+ val logUrlHandler = new LogUrlHandler(conf.get(UI.CUSTOM_DRIVER_LOG_URL))
+ getDriverAttributes.map(attr => logUrlHandler.applyPattern(Map.empty, attr)).filter(_.nonEmpty)
+ }
+
override def start(): Unit = {
super.start()
// Must be called before setting the executors
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index bf022ac630158..14c4931c4d2ee 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -449,6 +449,35 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(amountAndFormat(limits("memory")) === "5500Mi")
}
+ test("SPARK-47573: Add some SPARK_DRIVER_ATTRIBUTE_* if CUSTOM_DRIVER_LOG_URL" +
+ " is defined") {
+ val sparkConf = new SparkConf()
+ .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+ .set(CONTAINER_IMAGE, "spark-driver:latest")
+ .set(CUSTOM_DRIVER_LOG_URL, "url-pattern")
+ val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf(
+ sparkConf = sparkConf,
+ labels = CUSTOM_DRIVER_LABELS,
+ environment = DRIVER_ENVS,
+ annotations = DRIVER_ANNOTATIONS)
+
+ val featureStep = new BasicDriverFeatureStep(kubernetesConf)
+ val basePod = SparkPod.initialPod()
+ val configuredPod = featureStep.configurePod(basePod)
+
+ val envs = configuredPod.container
+ .getEnv
+ .asScala
+ .map { env => (env.getName, env.getValue) }
+ .toMap
+ Map(
+ ENV_DRIVER_ATTRIBUTE_APP_ID -> "appId",
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> "default",
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> "spark-driver-pod"
+ ).foreach { case (k, v) =>
+ assert(envs(k) === v)
+ }
+ }
def containerPort(name: String, portNumber: Int): ContainerPort =
new ContainerPortBuilder()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index b2e4a7182a774..4c073ba6deb98 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.internal.config.UI
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
@@ -47,6 +48,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
.set("spark.app.id", TEST_SPARK_APP_ID)
.set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL.key, "soLong")
.set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE.key, "cruelWorld")
+ .set(
+ UI.CUSTOM_DRIVER_LOG_URL.key,
+ "https://my-custom.url/api/logs?applicationId={{APP_ID}}&namespace={{KUBERNETES_NAMESPACE}}" +
+ "&pod_name={{KUBERNETES_POD_NAME}}"
+ )
@Mock
private var sc: SparkContext = _
@@ -259,4 +265,55 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger"))
verify(context).reply("1")
}
+
+ test("SPARK-47573: Driver attributes") {
+ assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map(
+ "LOG_FILES" -> "log"
+ )))
+
+ withEnvs(
+ ENV_DRIVER_ATTRIBUTE_APP_ID -> "spark-app-id",
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> "default",
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> "pod.name"
+ ) {
+ assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map(
+ "LOG_FILES" -> "log",
+ "APP_ID" -> "spark-app-id",
+ "KUBERNETES_NAMESPACE" -> "default",
+ "KUBERNETES_POD_NAME" -> "pod.name"
+ )))
+ }
+ }
+
+ test("SPARK-47573: Driver log urls") {
+ assert(schedulerBackendUnderTest.getDriverLogUrls === None)
+ withEnvs(
+ ENV_DRIVER_ATTRIBUTE_APP_ID -> "spark-app-id",
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> "default",
+ ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> "pod.name"
+ ) {
+ assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map(
+ "log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" +
+ "&pod_name=pod.name")
+ )))
+ }
+ }
+
+ private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
+ val readonlyEnv = System.getenv()
+ val field = readonlyEnv.getClass.getDeclaredField("m")
+ field.setAccessible(true)
+ val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
+ try {
+ for ((k, v) <- pairs) {
+ assert(!modifiableEnv.containsKey(k))
+ modifiableEnv.put(k, v)
+ }
+ f
+ } finally {
+ for ((k, _) <- pairs) {
+ modifiableEnv.remove(k)
+ }
+ }
+ }
}