Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark._
Expand All @@ -51,32 +52,27 @@ import org.apache.spark.util._
/**
* Common application master functionality for Spark on Yarn.
*/
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private[spark] class ApplicationMaster(
val args: ApplicationMasterArguments,
val sparkConf: SparkConf,
val yarnConf: YarnConfiguration)
extends Logging {

def this(sparkConf: SparkConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above constructor for multi-line args style.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this constructor as part of below comment refactor

yarnConf: YarnConfiguration,
clientRpcEnv: RpcEnv) {
this(new ApplicationMasterArguments(Array.empty), sparkConf, yarnConf)
this.clientRpcEnv = clientRpcEnv
}

private var clientRpcEnv: RpcEnv = null
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.

private val isClusterMode = args.userClass != null

private val sparkConf = new SparkConf()
if (args.propertiesFile != null) {
Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
}
}

private val securityMgr = new SecurityManager(sparkConf)

// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
//
// Both cases create a new SparkConf object which reads these configs from system properties.
sparkConf.getAll.foreach { case (k, v) =>
sys.props(k) = v
}

private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
Expand Down Expand Up @@ -232,8 +228,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
resources.toMap
}

def getAttemptId(): ApplicationAttemptId = {
client.getAttemptId()
def getAttemptId(sparkConf: SparkConf): ApplicationAttemptId = {
client.getAttemptId(sparkConf)
}

final def run(): Int = {
Expand All @@ -245,7 +241,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends

private def runImpl(): Unit = {
try {
val appAttemptId = client.getAttemptId()
val appAttemptId = client.getAttemptId(sparkConf)

var attemptID: Option[String] = None

Expand Down Expand Up @@ -275,7 +271,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
val isLastAttempt = client.getAttemptId(sparkConf).getAttemptId() >= maxAppAttempts

if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
Expand Down Expand Up @@ -393,8 +389,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
port: Int,
_sparkConf: SparkConf,
uiAddress: Option[String]): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val appAttempt = client.getAttemptId(_sparkConf)
val appId = appAttempt.getApplicationId().toString()
val attemptId = appAttempt.getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

Expand All @@ -403,7 +400,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}

private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val appId = client.getAttemptId(_sparkConf).getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

Expand Down Expand Up @@ -481,20 +478,29 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}

private def runExecutorLauncher(): Unit = {
val hostname = Utils.localHostName
val amCores = sparkConf.get(AM_CORES)
rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)

// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress"))

// The driver should be up and listening, so unlike cluster mode, just try to connect to it
// with no waiting or retrying.
val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(driverHost, driverPort),
YarnSchedulerBackend.ENDPOINT_NAME)
var driverRef : RpcEndpointRef = null
if (sparkConf.get(YARN_UNMANAGED_AM)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of this change. Feels like you should have a different method here called runUnmanaged that is called instead of run(), and takes an RpcEnv.

That way you don't need to keep clientRpcEnv at all since it would be local to that method, since nothing else here needs it. In fact even rpcEnv could go away and become a parameter to createAllocator...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to a method runUnmanaged, please let me know if anything can be done better.

rpcEnv = clientRpcEnv
driverRef = rpcEnv.setupEndpointRef(
RpcAddress(sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port").toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
} else {
val hostname = Utils.localHostName
val amCores = sparkConf.get(AM_CORES)
rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)

// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress"))

// The driver should be up and listening, so unlike cluster mode, just try to connect to it
// with no waiting or retrying.
val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
driverRef = rpcEnv.setupEndpointRef(
RpcAddress(driverHost, driverPort),
YarnSchedulerBackend.ENDPOINT_NAME)
}
addAmIpFilter(Some(driverRef))
createAllocator(driverRef, sparkConf)

Expand Down Expand Up @@ -600,7 +606,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
var stagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val stagingDir = sys.props.get("...").getOrElse { ... }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change to pass the stagingDir from Client.scala

if (stagingDir == null) {
val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks similar to the logic in Client.scala. Maybe the value calculated there should be plumbed through, instead of adding this code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change to pass the stagingDir from Client.scala

.getOrElse(FileSystem.get(yarnConf).getHomeDirectory())
stagingDir = appStagingBaseDir.toString + Path.SEPARATOR +
getAttemptId(sparkConf).getApplicationId.toString
}
stagingDirPath = new Path(stagingDir)
logInfo("Deleting staging directory " + stagingDirPath)
val fs = stagingDirPath.getFileSystem(yarnConf)
fs.delete(stagingDirPath, true)
Expand All @@ -613,7 +626,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends

/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val proxyBase = getProxyBase
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
driver match {
Expand All @@ -626,6 +639,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}

private def getProxyBase: String = {
var proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
if (proxyBase == null) {
proxyBase = ProxyUriUtils.getPath(getAttemptId(sparkConf).getApplicationId)
}
proxyBase
}

/**
* Start the user class, which contains the spark driver, in a separate Thread.
* If the main routine exits cleanly or exits with System.exit(N) for any N
Expand Down Expand Up @@ -774,16 +795,32 @@ object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
val sparkConf = new SparkConf()
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
}
}
// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
//
// Both cases create a new SparkConf object which reads these configs from system properties.
sparkConf.getAll.foreach { case (k, v) =>
sys.props(k) = v
}

val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
System.exit(master.run())
}

private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
master.sparkContextInitialized(sc)
}

private[spark] def getAttemptId(): ApplicationAttemptId = {
master.getAttemptId
private[spark] def getAttemptId(sparkConf: SparkConf): ApplicationAttemptId = {
master.getAttemptId(sparkConf)
}

private[spark] def getHistoryServerAddress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.io.{DataOutputBuffer, Text}
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.util.StringUtils
Expand All @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
Expand All @@ -54,11 +55,13 @@ import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{CallerContext, Utils}

private[spark] class Client(
val args: ClientArguments,
val sparkConf: SparkConf)
val sparkConf: SparkConf,
val rpcEnv: RpcEnv)
extends Logging {

import Client._
Expand All @@ -69,6 +72,9 @@ private[spark] class Client(

private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"

private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
private var amServiceStarted = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this extra flag? Could you just check if appMaster != null?


// AM related configurations
private val amMemory = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY).toInt
Expand Down Expand Up @@ -286,7 +292,10 @@ private[spark] class Client(
"does not support it", e)
}
}

if (isClientUnmanagedAMEnabled) {
// Set Unmanaged AM to true in Application Submission Context
appContext.setUnmanagedAM(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)

Which also makes the comment unnecessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}
appContext
}

Expand Down Expand Up @@ -648,7 +657,9 @@ private[spark] class Client(
// Clear the cache-related entries from the configuration to avoid them polluting the
// UI's environment page. This works for client mode; for cluster mode, this is handled
// by the AM.
CACHE_CONFIGS.foreach(sparkConf.remove)
if (!isClientUnmanagedAMEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed in the new mode?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is clearing the classpath entries and leading to this error in Executors.

Error: Could not find or load main class org.apache.spark.executor.CoarseGrainedExecutorBackend

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is happening because you're starting the AM after these are removed from the conf. Should probably juggle things around or change how these are provided to the AM, since these configs are super noisy and shouldn't really show up in the UI.

CACHE_CONFIGS.foreach(sparkConf.remove)
}

localResources
}
Expand Down Expand Up @@ -1084,14 +1095,38 @@ private[spark] class Client(
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return createAppReport(report)
}

if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
&& !amServiceStarted && report.getAMRMToken != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent one more level

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

amServiceStarted = true
startApplicationMasterService(report)
}
lastState = state
}

// Never reached, but keeps compiler happy
throw new SparkException("While loop is depleted! This should never happen...")
}

private def startApplicationMasterService(report: ApplicationReport) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: Unit =

But given you should be explicitly stopping the AM, this should probably return the AM itself.

// Add AMRMToken to establish connection between RM and AM
val token = report.getAMRMToken
val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to make this copy? Isn't the Token above enough?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report.getAMRMToken gives org.apache.hadoop.yarn.api.records.Token type instance, but currentUGI.addToken expects org.apache.hadoop.security.token.Token type instance.

new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep related calls in the same like (e.g. token.getIdentifier(), new Text(blah))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the change, please let me know if anything better can be done here

.getIdentifier().array(), token.getPassword().array, new Text(
token.getKind()), new Text(token.getService()))
val currentUGI = UserGroupInformation.getCurrentUser
currentUGI.addToken(amRMToken)

sparkConf.set("spark.yarn.containerId",
ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this name be the same as the first executor created by the app?

I'd rather special-case getContainerId to return some baked-in string when the env variable is not set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the change to pass the appAttemptId from Client.scala

// Start Application Service in a separate thread and continue with application monitoring
val amService = new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added thread name

override def run(): Unit = new ApplicationMaster(sparkConf, hadoopConf, rpcEnv).run()
}
amService.setDaemon(true)
amService.start()
}

private def formatReportDetails(report: ApplicationReport): String = {
val details = Seq[(String, String)](
("client token", getClientToken(report)),
Expand Down Expand Up @@ -1513,7 +1548,7 @@ private[spark] class YarnClusterApplication extends SparkApplication {
conf.remove("spark.jars")
conf.remove("spark.files")

new Client(new ClientArguments(args), conf).run()
new Client(new ClientArguments(args), conf, null).run()
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ private[spark] class YarnRMClient extends Logging {
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]): YarnAllocator = {
require(registered, "Must register AM before creating allocator.")
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(sparkConf),
securityMgr, localResources, new SparkRackResolver())
}

/**
Expand All @@ -100,8 +100,8 @@ private[spark] class YarnRMClient extends Logging {
}

/** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId = {
YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
def getAttemptId(sparkConf: SparkConf): ApplicationAttemptId = {
YarnSparkHadoopUtil.getContainerId(sparkConf).getApplicationAttemptId()
}

/** Returns the configuration for the AmIpFilter to add to the Spark UI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,13 @@ object YarnSparkHadoopUtil {
)
}

def getContainerId: ContainerId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
def getContainerId(sparkConf: SparkConf): ContainerId = {
val containerIdString =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected the indentation

if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use sparkConf.getenv.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use sparkConf.getenv

System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
} else {
sparkConf.get("spark.yarn.containerId")
}
ConverterUtils.toContainerId(containerIdString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ package object config {
.stringConf
.createOptional

/* Unmanaged AM configuration. */

private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add .enabled to the config key.

.doc("In client mode, whether to launch the Application Master service as part of the client " +
"using unmanaged am.")
.booleanConf
.createWithDefault(false)

/* Security configuration. */

private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray)
totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
client = new Client(args, conf)
client = new Client(args, conf, sc.env.rpcEnv)
bindToYarn(client.submitApplication(), None)

// SPARK-8687: Ensure all necessary properties have already been set before
Expand Down
Loading