Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark._
Expand All @@ -51,33 +52,16 @@ import org.apache.spark.util._
/**
* Common application master functionality for Spark on Yarn.
*/
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkConf: SparkConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't follow Spark's convention for multi-line arguments.

This also looks a little odd now, because there are conflicting arguments. ApplicationMasterArguments is now only used in cluster mode, and everything else is expected to be provided in the other parameters. So while this is the simpler change, it's also a little ugly.

I don't really have a good suggestion right now, but it's something to think about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made changes to the default constructor and added another constructor. Please check and let me know anything can be done better.

yarnConf: YarnConfiguration) extends Logging {

// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.

private val isClusterMode = args.userClass != null

private val sparkConf = new SparkConf()
if (args.propertiesFile != null) {
Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
}
}

private val securityMgr = new SecurityManager(sparkConf)

// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
//
// Both cases create a new SparkConf object which reads these configs from system properties.
sparkConf.getAll.foreach { case (k, v) =>
sys.props(k) = v
}

private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

private val ugi = {
val original = UserGroupInformation.getCurrentUser()

Expand Down Expand Up @@ -619,7 +603,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
stagingDirPath = new Path(System.getProperty("SPARK_YARN_STAGING_DIR",
System.getenv("SPARK_YARN_STAGING_DIR")))
logInfo("Deleting staging directory " + stagingDirPath)
val fs = stagingDirPath.getFileSystem(yarnConf)
fs.delete(stagingDirPath, true)
Expand Down Expand Up @@ -666,7 +651,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends

/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val proxyBase = getProxyBase
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
driver match {
Expand All @@ -679,6 +664,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}

private def getProxyBase: String = {
var proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
if (proxyBase == null) {
proxyBase = ProxyUriUtils.getPath(getAttemptId().getApplicationId)
}
proxyBase
}

/**
* Start the user class, which contains the spark driver, in a separate Thread.
* If the main routine exits cleanly or exits with System.exit(N) for any N
Expand Down Expand Up @@ -822,7 +815,23 @@ object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
val sparkConf = new SparkConf()
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
}
}
// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
//
// Both cases create a new SparkConf object which reads these configs from system properties.
sparkConf.getAll.foreach { case (k, v) =>
sys.props(k) = v
}

val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
System.exit(master.run())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.io.{DataOutputBuffer, Text}
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.util.StringUtils
Expand All @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
Expand All @@ -69,6 +70,10 @@ private[spark] class Client(

private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"

private val isClientUnmanagedAMEnabled =
sparkConf.getBoolean("spark.yarn.un-managed-am", false) && !isClusterMode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a config constant. Also unmanagedAM is more in line with other config names.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the config name and also added config constants.

private var amServiceStarted = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this extra flag? Could you just check if appMaster != null?


// AM related configurations
private val amMemory = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY).toInt
Expand Down Expand Up @@ -282,7 +287,10 @@ private[spark] class Client(
"does not support it", e)
}
}

if (isClientUnmanagedAMEnabled) {
// Set Unmanaged AM to true in Application Submission Context
appContext.setUnmanagedAM(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)

Which also makes the comment unnecessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}
appContext
}

Expand Down Expand Up @@ -656,7 +664,9 @@ private[spark] class Client(
// Clear the cache-related entries from the configuration to avoid them polluting the
// UI's environment page. This works for client mode; for cluster mode, this is handled
// by the AM.
CACHE_CONFIGS.foreach(sparkConf.remove)
if (!isClientUnmanagedAMEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed in the new mode?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is clearing the classpath entries and leading to this error in Executors.

Error: Could not find or load main class org.apache.spark.executor.CoarseGrainedExecutorBackend

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is happening because you're starting the AM after these are removed from the conf. Should probably juggle things around or change how these are provided to the AM, since these configs are super noisy and shouldn't really show up in the UI.

CACHE_CONFIGS.foreach(sparkConf.remove)
}

localResources
}
Expand Down Expand Up @@ -784,6 +794,9 @@ private[spark] class Client(
val env = new HashMap[String, String]()
populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
if (isClientUnmanagedAMEnabled) {
System.setProperty("SPARK_YARN_STAGING_DIR", stagingDirPath.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be propagated some other way? Using system properties is kinda hacky, and makes it dangerous to run another Spark app later in the same JVM.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to get from the spark conf and the application id.

}
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (loginFromKeytab) {
val credentialsFile = "credentials-" + UUID.randomUUID().toString
Expand Down Expand Up @@ -1104,14 +1117,39 @@ private[spark] class Client(
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return (state, report.getFinalApplicationStatus)
}

if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
&& !amServiceStarted && report.getAMRMToken != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent one more level

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

amServiceStarted = true
startApplicationMasterService(report)
}
lastState = state
}

// Never reached, but keeps compiler happy
throw new SparkException("While loop is depleted! This should never happen...")
}

private def startApplicationMasterService(report: ApplicationReport) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: Unit =

But given you should be explicitly stopping the AM, this should probably return the AM itself.

// Add AMRMToken to establish connection between RM and AM
val token = report.getAMRMToken
val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to make this copy? Isn't the Token above enough?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report.getAMRMToken gives org.apache.hadoop.yarn.api.records.Token type instance, but currentUGI.addToken expects org.apache.hadoop.security.token.Token type instance.

new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep related calls in the same like (e.g. token.getIdentifier(), new Text(blah))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the change, please let me know if anything better can be done here

.getIdentifier().array(), token.getPassword().array, new Text(
token.getKind()), new Text(token.getService()))
val currentUGI = UserGroupInformation.getCurrentUser
currentUGI.addToken(amRMToken)

System.setProperty(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about using system properties.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to set in sparkConf and use the same in ApplicationMaster while getting the containerId.

ApplicationConstants.Environment.CONTAINER_ID.name(),
ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this name be the same as the first executor created by the app?

I'd rather special-case getContainerId to return some baked-in string when the env variable is not set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the change to pass the appAttemptId from Client.scala

val amArgs = new ApplicationMasterArguments(Array("--arg",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty weird, I'd make this an explicit constructor argument for the AM instead. But if I understand this correctly, this is the address the AM will be connecting back to the driver, right?

It seems like there's an opportunity for better code here, since now they'd both be running in the same process. Like in the cluster mode case, where the AM uses the same RpcEnv instance as the driver (see runDriver()).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added another constructor without ApplicationMasterArguments and takes RpcEnv to use the same instance in AM.

sparkConf.get("spark.driver.host") + ":" + sparkConf.get("spark.driver.port")))
// Start Application Service in a separate thread and continue with application monitoring
new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to keep a reference to this thread and join it at some point, to make sure it really goes away? Should it be a daemon thread instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it as daemon thread.

override def run(): Unit = new ApplicationMaster(amArgs, sparkConf, hadoopConf).run()
}.start()
}

private def formatReportDetails(report: ApplicationReport): String = {
val details = Seq[(String, String)](
("client token", getClientToken(report)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,13 @@ object YarnSparkHadoopUtil {
}

def getContainerId: ContainerId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
val containerIdString =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected the indentation

if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use sparkConf.getenv.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use sparkConf.getenv

System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
} else {
System.getProperty(
ApplicationConstants.Environment.CONTAINER_ID.name())
}
ConverterUtils.toContainerId(containerIdString)
}

Expand Down