Skip to content

Commit 748f002

Browse files
sryzatgravescs
authored andcommitted
SPARK-1051. On YARN, executors don't doAs submitting user
This reopens https://github.com/apache/incubator-spark/pull/538 against the new repo Author: Sandy Ryza <[email protected]> Closes #29 from sryza/sandy-spark-1051 and squashes the following commits: 708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN
1 parent 1e36690 commit 748f002

File tree

6 files changed

+26
-10
lines changed

6 files changed

+26
-10
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation
2525

2626
import org.apache.spark.{SparkContext, SparkException}
2727

28+
import scala.collection.JavaConversions._
29+
2830
/**
2931
* Contains util methods to interact with Hadoop from Spark.
3032
*/
@@ -33,15 +35,9 @@ class SparkHadoopUtil {
3335
UserGroupInformation.setConfiguration(conf)
3436

3537
def runAsUser(user: String)(func: () => Unit) {
36-
// if we are already running as the user intended there is no reason to do the doAs. It
37-
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
38-
// the user is UNKNOWN then we shouldn't be creating a remote unknown user
39-
// (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
40-
// in SparkContext.
41-
val currentUser = Option(System.getProperty("user.name")).
42-
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
43-
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
38+
if (user != SparkContext.SPARK_UNKNOWN_USER) {
4439
val ugi = UserGroupInformation.createRemoteUser(user)
40+
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
4541
ugi.doAs(new PrivilegedExceptionAction[Unit] {
4642
def run: Unit = func()
4743
})
@@ -50,6 +46,12 @@ class SparkHadoopUtil {
5046
}
5147
}
5248

49+
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
50+
for (token <- source.getTokens()) {
51+
dest.addToken(token)
52+
}
53+
}
54+
5355
/**
5456
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
5557
* subsystems.

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
3737
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
3838

3939
import org.apache.spark.{SparkConf, SparkContext, Logging}
40+
import org.apache.spark.deploy.SparkHadoopUtil
4041
import org.apache.spark.util.Utils
4142

4243
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
@@ -65,6 +66,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
6566
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
6667
math.max(args.numWorkers * 2, 3))
6768

69+
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
70+
SparkContext.SPARK_UNKNOWN_USER)
71+
6872
def run() {
6973
// Setup the directories so things go to yarn approved directories rather
7074
// then user specified and /tmp.
@@ -173,7 +177,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
173177
false /* initialize */ ,
174178
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
175179
val t = new Thread {
176-
override def run() {
180+
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
177181
var successed = false
178182
try {
179183
// Copy

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
314314
Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
315315
env("SPARK_YARN_MODE") = "true"
316316
env("SPARK_YARN_STAGING_DIR") = stagingDir
317+
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
317318

318319
// Set the environment variables to be passed on to the Workers.
319320
distCacheMgr.setDistFilesEnv(env)

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration
2828
*/
2929
class YarnSparkHadoopUtil extends SparkHadoopUtil {
3030

31+
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
32+
dest.addCredentials(source.getCredentials())
33+
}
34+
3135
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
3236
override def isYarnMode(): Boolean = { true }
3337

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
3939
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
4040

4141
import org.apache.spark.{SparkConf, SparkContext, Logging}
42+
import org.apache.spark.deploy.SparkHadoopUtil
4243
import org.apache.spark.util.Utils
4344

4445

@@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
6768
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
6869
math.max(args.numWorkers * 2, 3))
6970

71+
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
72+
SparkContext.SPARK_UNKNOWN_USER)
73+
7074
def run() {
7175
// Setup the directories so things go to YARN approved directories rather
7276
// than user specified and /tmp.
@@ -145,7 +149,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
145149
false /* initialize */ ,
146150
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
147151
val t = new Thread {
148-
override def run() {
152+
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
149153
var successed = false
150154
try {
151155
// Copy

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
333333
Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
334334
env("SPARK_YARN_MODE") = "true"
335335
env("SPARK_YARN_STAGING_DIR") = stagingDir
336+
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
336337

337338
// Set the environment variables to be passed on to the Workers.
338339
distCacheMgr.setDistFilesEnv(env)

0 commit comments

Comments
 (0)