diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1ece7bdc979c..04e2da8db0f5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException +import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ @@ -53,36 +54,27 @@ import org.apache.spark.util._ /** * Common application master functionality for Spark on Yarn. */ -private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging { +private[spark] class ApplicationMaster( + args: ApplicationMasterArguments, + sparkConf: SparkConf, + yarnConf: YarnConfiguration) extends Logging { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. - private val appAttemptId = YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() - private val isClusterMode = args.userClass != null - - private val sparkConf = new SparkConf() - if (args.propertiesFile != null) { - Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) => - sparkConf.set(k, v) + private val appAttemptId = + if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) { + YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() + } else { + null } - } + + private val isClusterMode = args.userClass != null private val securityMgr = new SecurityManager(sparkConf) private var metricsSystem: Option[MetricsSystem] = None - // Set system properties for each config entry. This covers two use cases: - // - The default configuration stored by the SparkHadoopUtil class - // - The user application creating a new SparkConf in cluster mode - // - // Both cases create a new SparkConf object which reads these configs from system properties. - sparkConf.getAll.foreach { case (k, v) => - sys.props(k) = v - } - - private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) - private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) val urls = classpath.map { entry => @@ -152,8 +144,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // Next wait interval before allocator poll. private var nextAllocationInterval = initialAllocationInterval - private var rpcEnv: RpcEnv = null - // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. private val sparkContextPromise = Promise[SparkContext]() @@ -161,15 +151,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends * Load the list of localized files set by the client, used when launching executors. This should * be called in a context where the needed credentials to access HDFS are available. */ - private def prepareLocalResources(): Map[String, LocalResource] = { + private def prepareLocalResources(distCacheConf: SparkConf): Map[String, LocalResource] = { logInfo("Preparing Local resources") - val distCacheConf = new SparkConf(false) - if (args.distCacheConf != null) { - Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) => - distCacheConf.set(k, v) - } - } - val resources = HashMap[String, LocalResource]() def setupDistributedCache( @@ -266,7 +249,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // we only want to unregister if we don't want the RM to retry if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { unregister(finalStatus, finalMsg) - cleanupStagingDir() + cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR"))) } } } @@ -298,6 +281,60 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends exitCode } + def runUnmanaged( + clientRpcEnv: RpcEnv, + appAttemptId: ApplicationAttemptId, + stagingDir: Path, + cachedResourcesConf: SparkConf): Unit = { + try { + new CallerContext( + "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), + Option(appAttemptId.getApplicationId.toString), None).setCurrentContext() + + val driverRef = clientRpcEnv.setupEndpointRef( + RpcAddress(sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port").toInt), + YarnSchedulerBackend.ENDPOINT_NAME) + // The client-mode AM doesn't listen for incoming connections, so report an invalid port. + registerAM(Utils.localHostName, -1, sparkConf, + sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId) + addAmIpFilter(Some(driverRef), ProxyUriUtils.getPath(appAttemptId.getApplicationId)) + createAllocator(driverRef, sparkConf, clientRpcEnv, appAttemptId, cachedResourcesConf) + reporterThread.join() + } catch { + case e: Exception => + // catch everything else if not specifically handled + logError("Uncaught exception: ", e) + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, + "Uncaught exception: " + StringUtils.stringifyException(e)) + if (!unregistered) { + unregister(finalStatus, finalMsg) + cleanupStagingDir(stagingDir) + } + } finally { + try { + metricsSystem.foreach { ms => + ms.report() + ms.stop() + } + } catch { + case e: Exception => + logWarning("Exception during stopping of the metric system: ", e) + } + } + } + + def stopUnmanaged(stagingDir: Path): Unit = { + if (!finished) { + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + } + if (!unregistered) { + unregister(finalStatus, finalMsg) + cleanupStagingDir(stagingDir) + } + } + /** * Set the default final application status for client mode to UNDEFINED to handle * if YARN HA restarts the application so that it properly retries. Set the final @@ -375,9 +412,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends host: String, port: Int, _sparkConf: SparkConf, - uiAddress: Option[String]): Unit = { - val appId = appAttemptId.getApplicationId().toString() - val attemptId = appAttemptId.getAttemptId().toString() + uiAddress: Option[String], + appAttempt: ApplicationAttemptId): Unit = { + val appId = appAttempt.getApplicationId().toString() + val attemptId = appAttempt.getAttemptId().toString() val historyAddress = ApplicationMaster .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) @@ -385,7 +423,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends registered = true } - private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = { + private def createAllocator( + driverRef: RpcEndpointRef, + _sparkConf: SparkConf, + rpcEnv: RpcEnv, + appAttemptId: ApplicationAttemptId, + distCacheConf: SparkConf): Unit = { // In client mode, the AM may be restarting after delegation tokens have reached their TTL. So // always contact the driver to get the current set of valid tokens, so that local resources can // be initialized below. @@ -399,7 +442,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val appId = appAttemptId.getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - val localResources = prepareLocalResources() + val localResources = prepareLocalResources(distCacheConf) // Before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. @@ -437,7 +480,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } private def runDriver(): Unit = { - addAmIpFilter(None) + addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has @@ -448,17 +491,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { - rpcEnv = sc.env.rpcEnv + val rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get(DRIVER_HOST_ADDRESS) val port = userConf.get(DRIVER_PORT) - registerAM(host, port, userConf, sc.ui.map(_.webUrl)) + registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) - createAllocator(driverRef, userConf) + createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. @@ -482,11 +525,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends private def runExecutorLauncher(): Unit = { val hostname = Utils.localHostName val amCores = sparkConf.get(AM_CORES) - rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr, + val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr, amCores, true) // The client-mode AM doesn't listen for incoming connections, so report an invalid port. - registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS)) + registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS), appAttemptId) // The driver should be up and listening, so unlike cluster mode, just try to connect to it // with no waiting or retrying. @@ -494,8 +537,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val driverRef = rpcEnv.setupEndpointRef( RpcAddress(driverHost, driverPort), YarnSchedulerBackend.ENDPOINT_NAME) - addAmIpFilter(Some(driverRef)) - createAllocator(driverRef, sparkConf) + addAmIpFilter(Some(driverRef), + System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) + createAllocator(driverRef, sparkConf, rpcEnv, appAttemptId, distCacheConf) // In client mode the actor will stop the reporter thread. reporterThread.join() @@ -591,15 +635,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends t } + private def distCacheConf(): SparkConf = { + val distCacheConf = new SparkConf(false) + if (args.distCacheConf != null) { + Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) => + distCacheConf.set(k, v) + } + } + distCacheConf + } + /** * Clean up the staging directory. */ - private def cleanupStagingDir(): Unit = { - var stagingDirPath: Path = null + private def cleanupStagingDir(stagingDirPath: Path): Unit = { try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) if (!preserveFiles) { - stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) logInfo("Deleting staging directory " + stagingDirPath) val fs = stagingDirPath.getFileSystem(yarnConf) fs.delete(stagingDirPath, true) @@ -611,8 +663,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } /** Add the Yarn IP filter that is required for properly securing the UI. */ - private def addAmIpFilter(driver: Option[RpcEndpointRef]) = { - val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + private def addAmIpFilter(driver: Option[RpcEndpointRef], proxyBase: String) = { val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" val params = client.getAmIpFilterParams(yarnConf, proxyBase) driver match { @@ -742,9 +793,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - // In cluster mode, do not rely on the disassociated event to exit + // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails - if (!isClusterMode) { + if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) { logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } @@ -770,12 +821,28 @@ object ApplicationMaster extends Logging { def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) - master = new ApplicationMaster(amArgs) + val sparkConf = new SparkConf() + if (amArgs.propertiesFile != null) { + Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => + sparkConf.set(k, v) + } + } + // Set system properties for each config entry. This covers two use cases: + // - The default configuration stored by the SparkHadoopUtil class + // - The user application creating a new SparkConf in cluster mode + // + // Both cases create a new SparkConf object which reads these configs from system properties. + sparkConf.getAll.foreach { case (k, v) => + sys.props(k) = v + } + + val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) + master = new ApplicationMaster(amArgs, sparkConf, yarnConf) - val ugi = master.sparkConf.get(PRINCIPAL) match { + val ugi = sparkConf.get(PRINCIPAL) match { case Some(principal) => val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() - SparkHadoopUtil.get.loginUserFromKeytab(principal, master.sparkConf.get(KEYTAB).orNull) + SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull) val newUGI = UserGroupInformation.getCurrentUser() // Transfer the original user's tokens to the new user, since it may contain needed tokens // (such as those user to connect to YARN). diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 84921800a471..91d65c1c9548 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -34,7 +34,7 @@ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.io.{DataOutputBuffer, Text} import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.util.StringUtils @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -55,11 +56,13 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} +import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{CallerContext, Utils} private[spark] class Client( val args: ClientArguments, - val sparkConf: SparkConf) + val sparkConf: SparkConf, + val rpcEnv: RpcEnv) extends Logging { import Client._ @@ -70,6 +73,10 @@ private[spark] class Client( private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode + private var appMaster: ApplicationMaster = _ + private var stagingDirPath: Path = _ + // AM related configurations private val amMemory = if (isClusterMode) { sparkConf.get(DRIVER_MEMORY).toInt @@ -133,16 +140,14 @@ private[spark] class Client( private var appId: ApplicationId = null - // The app staging dir based on the STAGING_DIR configuration if configured - // otherwise based on the users home directory. - private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) } - .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) - def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) } def stop(): Unit = { + if (appMaster != null) { + appMaster.stopUnmanaged(stagingDirPath) + } launcherBackend.close() yarnClient.stop() } @@ -171,6 +176,12 @@ private[spark] class Client( val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() + // The app staging dir based on the STAGING_DIR configuration if configured + // otherwise based on the users home directory. + val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) } + .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) + stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) + new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() @@ -190,8 +201,8 @@ private[spark] class Client( appId } catch { case e: Throwable => - if (appId != null) { - cleanupStagingDir(appId) + if (stagingDirPath != null) { + cleanupStagingDir() } throw e } @@ -200,13 +211,12 @@ private[spark] class Client( /** * Cleanup application staging directory. */ - private def cleanupStagingDir(appId: ApplicationId): Unit = { + private def cleanupStagingDir(): Unit = { if (sparkConf.get(PRESERVE_STAGING_FILES)) { return } def cleanupStagingDirInternal(): Unit = { - val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) try { val fs = stagingDirPath.getFileSystem(hadoopConf) if (fs.delete(stagingDirPath, true)) { @@ -289,7 +299,7 @@ private[spark] class Client( "does not support it", e) } } - + appContext.setUnmanagedAM(isClientUnmanagedAMEnabled) appContext } @@ -850,7 +860,6 @@ private[spark] class Client( : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId - val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) val pySparkArchives = if (sparkConf.get(IS_PYTHON_APP)) { findPySparkArchives() @@ -858,8 +867,8 @@ private[spark] class Client( Nil } - val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) - val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) + val launchEnv = setupLaunchEnv(stagingDirPath, pySparkArchives) + val localResources = prepareLocalResources(stagingDirPath, pySparkArchives) val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources.asJava) @@ -1041,7 +1050,7 @@ private[spark] class Client( } catch { case e: ApplicationNotFoundException => logError(s"Application $appId not found.") - cleanupStagingDir(appId) + cleanupStagingDir() return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None) case NonFatal(e) => val msg = s"Failed to contact YARN for application $appId." @@ -1088,14 +1097,17 @@ private[spark] class Client( if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { - cleanupStagingDir(appId) + cleanupStagingDir() return createAppReport(report) } if (returnOnRunning && state == YarnApplicationState.RUNNING) { return createAppReport(report) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled && + appMaster == null && report.getAMRMToken != null) { + appMaster = startApplicationMasterService(report) + } lastState = state } @@ -1103,6 +1115,30 @@ private[spark] class Client( throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport): ApplicationMaster = { + // Add AMRMToken to establish connection between RM and AM + val token = report.getAMRMToken + val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier]( + token.getIdentifier().array(), token.getPassword().array, + new Text(token.getKind()), new Text(token.getService())) + val currentUGI = UserGroupInformation.getCurrentUser + currentUGI.addToken(amRMToken) + + // Start Application Service in a separate thread and continue with application monitoring + val appMaster = new ApplicationMaster( + new ApplicationMasterArguments(Array.empty), sparkConf, hadoopConf) + val amService = new Thread("Unmanaged Application Master Service") { + override def run(): Unit = { + appMaster.runUnmanaged(rpcEnv, report.getCurrentApplicationAttemptId, + stagingDirPath, cachedResourcesConf) + } + } + amService.setDaemon(true) + amService.start() + appMaster + } + private def formatReportDetails(report: ApplicationReport): String = { val details = Seq[(String, String)]( ("client token", getClientToken(report)), @@ -1535,7 +1571,7 @@ private[spark] class YarnClusterApplication extends SparkApplication { conf.remove("spark.jars") conf.remove("spark.files") - new Client(new ClientArguments(args), conf).run() + new Client(new ClientArguments(args), conf, null).run() } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 49a0b93aa5c4..77ce2f65245a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -243,7 +243,7 @@ private[yarn] class ExecutorRunnable( // Add log urls container.foreach { c => - sys.env.get("SPARK_USER").foreach { user => + sys.env.filterKeys(_.endsWith("USER")).foreach { user => val containerId = ConverterUtils.toString(c.getId) val address = c.getNodeHttpAddress val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 6091cd496c03..16adaec04802 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -236,6 +236,14 @@ package object config { .stringConf .createOptional + /* Unmanaged AM configuration. */ + + private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled") + .doc("In client mode, whether to launch the Application Master service as part of the client " + + "using unmanaged am.") + .booleanConf + .createWithDefault(false) + /* Security configuration. */ private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 934fba3e6ff3..7b77f8c5a7e7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -54,7 +54,7 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray) totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) - client = new Client(args, conf) + client = new Client(args, conf, sc.env.rpcEnv) bindToYarn(client.submitApplication(), None) // SPARK-8687: Ensure all necessary properties have already been set before diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 9acd99546c03..adf76004048c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -184,7 +184,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - val client = new Client(args, sparkConf) + val client = new Client(args, sparkConf, null) client.createApplicationSubmissionContext( new YarnClientApplication(getNewApplicationResponse, appContext), containerLaunchContext) @@ -377,7 +377,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - val client = new Client(new ClientArguments(Array()), conf) + val client = new Client(new ClientArguments(Array()), conf, null) client.createApplicationSubmissionContext( new YarnClientApplication(getNewApplicationResponse, appContext), containerLaunchContext) @@ -455,7 +455,7 @@ class ClientSuite extends SparkFunSuite with Matchers { sparkConf: SparkConf, args: Array[String] = Array()): Client = { val clientArgs = new ClientArguments(args) - spy(new Client(clientArgs, sparkConf)) + spy(new Client(clientArgs, sparkConf, null)) } private def classpath(client: Client): Array[String] = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 506b27c677f5..95ea0b19bfc0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -87,6 +87,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite { testBasicYarnApp(false) } + test("run Spark in yarn-client mode with unmanaged am") { + testBasicYarnApp(true, Map(YARN_UNMANAGED_AM.key -> "true")) + } + test("run Spark in yarn-client mode with different configurations, ensuring redaction") { testBasicYarnApp(true, Map(