Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.logging.log4j.Level
import org.apache.spark.annotation.{DeveloperApi, Experimental, Private}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.executor.{Executor, ExecutorLogUrlHandler, ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -2629,9 +2629,13 @@ class SparkContext(config: SparkConf) extends Logging {
private def postApplicationStart(): Unit = {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
val logUrls = schedulerBackend.getDriverLogUrls
val attributes = schedulerBackend.getDriverAttributes
val replacedLogUrls = logUrls.map {
new ExecutorLogUrlHandler(None).applyPattern(_, attributes.getOrElse(Map.empty).toMap)
}
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls,
schedulerBackend.getDriverAttributes))
startTime, sparkUser, applicationAttemptId, replacedLogUrls, attributes))
_driverLogger.foreach(_.startSync(_hadoopConfiguration))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,29 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend
logUrls: Map[String, String],
attributes: Map[String, String]): Map[String, String] = {
logUrlPattern match {
case Some(pattern) => doApplyPattern(logUrls, attributes, pattern)
case None => logUrls
case None => applySelfPattern(logUrls, attributes)
case Some(pattern) => applyExternalPattern(logUrls, attributes, pattern)
}
}

private def doApplyPattern(
private def applySelfPattern(
logUrls: Map[String, String],
attributes: Map[String, String]): Map[String, String] = {
val allAttributeKeys = attributes.keySet
logUrls.map { case (logFile, logUrl) =>
val allPatterns = extractPatterns(logUrl)
if (allPatterns.diff(allAttributeKeys).nonEmpty) {
logFailToRenewLogUrls("some of required attributes are missing.",
allPatterns, allAttributeKeys)
(logFile, logUrl)
} else {
val updatedUrl = replacePatterns(logUrl, allPatterns, attributes)
(logFile, updatedUrl)
}
}
}

private def applyExternalPattern(
logUrls: Map[String, String],
attributes: Map[String, String],
urlPattern: String): Map[String, String] = {
Expand All @@ -47,25 +64,21 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend
// files, which are encouraged to be same as types of log files provided in original log URLs.
// Once we get the list of log files, we need to expose them to end users as a pattern
// so that end users can compose custom log URL(s) including log file name(s).
val allPatterns = CUSTOM_URL_PATTERN_REGEX.findAllMatchIn(urlPattern).map(_.group(1)).toSet
val allPatterns = extractPatterns(urlPattern)
val allPatternsExceptFileName = allPatterns.filter(_ != "FILE_NAME")
val allAttributeKeys = attributes.keySet
val allAttributeKeysExceptLogFiles = allAttributeKeys.filter(_ != "LOG_FILES")

if (allPatternsExceptFileName.diff(allAttributeKeysExceptLogFiles).nonEmpty) {
logFailToRenewLogUrls("some of required attributes are missing in app's event log.",
logFailToRenewLogUrls("some of required attributes are missing.",
allPatternsExceptFileName, allAttributeKeys)
logUrls
} else if (allPatterns.contains("FILE_NAME") && !allAttributeKeys.contains("LOG_FILES")) {
logFailToRenewLogUrls("'FILE_NAME' parameter is provided, but file information is " +
"missing in app's event log.", allPatternsExceptFileName, allAttributeKeys)
"missing.", allPatternsExceptFileName, allAttributeKeys)
logUrls
} else {
val updatedUrl = allPatternsExceptFileName.foldLeft(urlPattern) { case (orig, patt) =>
// we already checked the existence of attribute when comparing keys
orig.replace(s"{{$patt}}", attributes(patt))
}

val updatedUrl = replacePatterns(urlPattern, allPatternsExceptFileName, attributes)
if (allPatterns.contains("FILE_NAME")) {
// allAttributeKeys should contain "LOG_FILES"
attributes("LOG_FILES").split(",").map { file =>
Expand All @@ -77,6 +90,20 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend
}
}

private def extractPatterns(urlPattern: String): Set[String] = {
CUSTOM_URL_PATTERN_REGEX.findAllMatchIn(urlPattern).map(_.group(1)).toSet
}

private def replacePatterns(
logUrl: String,
patterns: Set[String],
attributes: Map[String, String]): String = {
patterns.foldLeft(logUrl) { case (orig, patt) =>
// we already checked the existence of attribute when comparing keys
orig.replace(s"{{$patt}}", attributes(patt))
}
}

private def logFailToRenewLogUrls(
reason: String,
allPatterns: Set[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.Locale

import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.BlockManagerId

Expand Down Expand Up @@ -74,14 +76,24 @@ private[spark] trait SchedulerBackend {
* Executors tab for the driver.
* @return Map containing the log names and their respective URLs
*/
def getDriverLogUrls: Option[Map[String, String]] = None
def getDriverLogUrls: Option[Map[String, String]] = {
val prefix = "SPARK_DRIVER_LOG_URL_"
val driverLogUrls = sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap
if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None
}

/**
* Get the attributes on driver. These attributes are used to replace log URLs when
* custom log url pattern is specified.
* @return Map containing attributes on driver.
*/
def getDriverAttributes: Option[Map[String, String]] = None
def getDriverAttributes: Option[Map[String, String]] = {
val prefix = "SPARK_DRIVER_ATTRIBUTE_"
val driverAttributes = sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)).toMap
if (driverAttributes.nonEmpty) Some(driverAttributes) else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like logically duplicated in both getDriverLogUrls and getDriverAttributes except the variable names. Could you try to refactor more?

Copy link
Member Author

@pan3793 pan3793 Dec 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few differences, e.g. toLowerCase and toUpperCase, this follows CoarseGrainedExecutorBackend#extractLogUrls and CoarseGrainedExecutorBackend#extractAttributes

}

/**
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler.cluster

import java.util.Locale
import java.util.concurrent.{Semaphore, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

Expand Down Expand Up @@ -251,13 +250,6 @@ private[spark] class StandaloneSchedulerBackend(
}
}

override def getDriverLogUrls: Option[Map[String, String]] = {
val prefix = "SPARK_DRIVER_LOG_URL_"
val driverLogUrls = sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap
if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None
}

private def waitForRegistration() = {
registrationBarrier.acquire()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private[spark] object Constants {
val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_CLASSPATH = "SPARK_CLASSPATH"
val ENV_DRIVER_POD_NAME = "SPARK_DRIVER_POD_NAME"
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had better put ths line should be before ENV_DRIVER_BIND_ADDRESS.

BTW, Could you spin off KUBERNETES_POD_NAME and SPARK_DRIVER_POD_NAME addition to a new PR, @pan3793 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, opened #39160 for this.

val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
val ENV_SPARK_USER = "SPARK_USER"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_POD_NAME)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "metadata.name")
.build())
.endEnv()
.editOrNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
conf.getOption("spark.app.id").getOrElse(appId)
}

override def getDriverAttributes: Option[Map[String, String]] = Some(
super.getDriverAttributes.getOrElse(Map.empty) ++ Map(
"APP_ID" -> System.getenv(ENV_APPLICATION_ID),
"KUBERNETES_NAMESPACE" -> conf.get(KUBERNETES_NAMESPACE),
"KUBERNETES_POD_NAME" -> System.getenv(ENV_DRIVER_POD_NAME)))

override def start(): Unit = {
super.start()
// Must be called before setting the executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
import org.apache.spark.deploy.k8s.Constants.{ENV_APPLICATION_ID, ENV_EXECUTOR_POD_NAME}
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.executor.CoarseGrainedExecutorBackend
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -51,7 +53,14 @@ private[spark] object KubernetesExecutorBackend extends Logging {
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
env, arguments.resourcesFileOpt, resourceProfile) {
override def extractAttributes: Map[String, String] =
super.extractAttributes ++ Map(
"APP_ID" -> System.getenv(ENV_APPLICATION_ID),
"KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE),
"KUBERNETES_POD_NAME" -> System.getenv(ENV_EXECUTOR_POD_NAME)
)
}
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))

assert(configuredPod.container.getEnv.asScala.exists(envVar =>
envVar.getName.equals(ENV_DRIVER_POD_NAME) &&
envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
envVar.getValueFrom.getFieldRef.getFieldPath.equals("metadata.name")))

val resourceRequirements = configuredPod.container.getResources
val requests = resourceRequirements.getRequests.asScala
assert(amountAndFormat(requests("cpu")) === "2")
Expand Down