diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8353e64a619cf..d92345ed4a343 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -123,7 +123,9 @@ class SparkHadoopUtil extends Logging { */ def addCredentials(conf: JobConf): Unit = { val jobCreds = conf.getCredentials() - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + val userCreds = UserGroupInformation.getCurrentUser().getCredentials() + logInfo(s"Adding user credentials: ${SparkHadoopUtil.get.dumpTokens(userCreds)}") + jobCreds.mergeAll(userCreds) } def addCurrentUserCredentials(creds: Credentials): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3965f17f4b56e..3383410240c04 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -611,7 +611,8 @@ object SparkSubmit extends CommandLineUtils with Logging { // An internal option used only for spark-shell to add user jars to repl's classloader, // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to // remote jars, so adding a new option to only specify local jars for spark-shell internally. - OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars") + OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars"), + OptionAssigner(args.proxyUser, MESOS, CLUSTER, confKey = "spark.mesos.proxyUser") ) // In client mode, launch the application main class directly diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2480559a41b7a..fbcf8a633d5d4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -197,6 +197,7 @@ class HadoopRDD[K, V]( val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) + logInfo(s"HadoopRDD credentials: ${SparkHadoopUtil.get.dumpTokens(jobConf.getCredentials)}") val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index d224a7325820a..12fb5355e72c2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -506,6 +506,10 @@ private[spark] class MesosClusterScheduler( options ++= Seq("--class", desc.command.mainClass) } + desc.conf.getOption("spark.mesos.proxyUser").foreach { v => + options ++= Seq("--proxy-user", v) + } + desc.conf.getOption("spark.executor.memory").foreach { v => options ++= Seq("--executor-memory", v) } @@ -521,6 +525,7 @@ private[spark] class MesosClusterScheduler( // --conf val replicatedOptionsBlacklist = Set( + "spark.mesos.proxyUser", "spark.jars", // Avoids duplicate classes in classpath "spark.submit.deployMode", // this would be set to `cluster`, but we need client "spark.master" // this contains the address of the dispatcher, not master diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 53f5f61cca486..c67690b00d042 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -31,6 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} @@ -62,6 +63,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager = new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint) + private val isProxyUser = SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser()) + // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 @@ -194,8 +197,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( super.start() if (sc.deployMode == "client") { + if (isProxyUser) { + fetchHadoopDelegationTokens() + } launcherBackend.connect() } + val startedBefore = IdHelper.startedBefore.getAndSet(true) val suffix = if (startedBefore) { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala index a1bf4f0c048fe..d7c8d1a19ff8b 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala @@ -60,9 +60,13 @@ private[spark] class MesosHadoopDelegationTokenManager( private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { try { - val creds = UserGroupInformation.getCurrentUser.getCredentials + val currentUser = UserGroupInformation.getCurrentUser() + val creds = currentUser.getCredentials val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + if (SparkHadoopUtil.get.isProxyUser(currentUser)) { + currentUser.addCredentials(creds) + } logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") (SparkHadoopUtil.get.serialize(creds), SparkHadoopUtil.nextCredentialRenewalTime(rt, conf)) } catch {