Skip to content

Commit fb27f46

Browse files
Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started.
Also schedule re-logins in CoarseGrainedSchedulerBackend#start()
1 parent 41efde0 commit fb27f46

File tree

7 files changed

+28
-20
lines changed

7 files changed

+28
-20
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,14 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.{ByteArrayInputStream, DataInputStream, DataOutputStream, ByteArrayOutputStream}
2120
import java.lang.reflect.Method
22-
import java.net.URI
23-
import java.nio.ByteBuffer
2421
import java.security.PrivilegedExceptionAction
25-
import java.util.concurrent.atomic.AtomicBoolean
26-
import java.util.concurrent.{TimeUnit, ThreadFactory, Executors}
2722

2823
import org.apache.hadoop.conf.Configuration
29-
import org.apache.hadoop.fs.{FileUtil, FileStatus, FileSystem, Path}
24+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
3025
import org.apache.hadoop.fs.FileSystem.Statistics
31-
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
3226
import org.apache.hadoop.mapred.JobConf
33-
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
27+
import org.apache.hadoop.mapreduce.JobContext
3428
import org.apache.hadoop.security.Credentials
3529
import org.apache.hadoop.security.UserGroupInformation
3630

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ private[spark] class CoarseGrainedExecutorBackend(
108108
context.stop(self)
109109
context.system.shutdown()
110110

111+
// Add new credentials received from the driver to the current user.
111112
case UpdateCredentials(newCredentials) =>
113+
logInfo("New credentials received from driver, adding the credentials to the current user")
112114
val credentials = new Credentials()
113115
credentials.readTokenStorageStream(
114116
new DataInputStream(new ByteArrayInputStream(newCredentials.value.array())))

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ private[spark] object CoarseGrainedClusterMessages {
5151
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
5252
data: SerializableBuffer) extends CoarseGrainedClusterMessage
5353

54-
// Driver to all executors.
54+
// When the delegation tokens are about expire, the driver creates new tokens and sends them to
55+
// the executors via this message.
5556
case class UpdateCredentials(newCredentials: SerializableBuffer)
5657
extends CoarseGrainedClusterMessage
5758

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
7676
override protected def log = CoarseGrainedSchedulerBackend.this.log
7777
private val addressToExecutorId = new HashMap[Address, String]
7878

79-
// If a principal and keytab have been set, use that to create new credentials for executors
80-
// periodically
81-
SparkHadoopUtil.get.scheduleLoginFromKeytab(sendNewCredentialsToExecutors _)
82-
8379
override def preStart() {
8480
// Listen for remote client disconnection events, since they don't go through Akka's watch()
8581
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -90,6 +86,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
9086
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
9187
}
9288

89+
/**
90+
* Send new credentials to executors. This is the method that is called when the scheduled
91+
* login completes, so the new credentials can be sent to the executors.
92+
* @param credentials
93+
*/
9394
def sendNewCredentialsToExecutors(credentials: SerializableBuffer): Unit = {
9495
executorDataMap.values.foreach{ x =>
9596
x.executorActor ! UpdateCredentials(credentials)
@@ -245,9 +246,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
245246
properties += ((key, value))
246247
}
247248
}
249+
250+
val driver = new DriverActor(properties)
248251
// TODO (prashant) send conf instead of properties
249252
driverActor = actorSystem.actorOf(
250-
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
253+
Props(driver), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
254+
255+
// If a principal and keytab have been set, use that to create new credentials for executors
256+
// periodically
257+
SparkHadoopUtil.get.scheduleLoginFromKeytab(driver.sendNewCredentialsToExecutors _)
251258
}
252259

253260
def stopExecutors() {

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,12 @@ private[spark] class ApplicationMaster(
256256

257257
private def runDriver(securityMgr: SecurityManager): Unit = {
258258
addAmIpFilter()
259+
260+
// This must be done before SparkContext is initialized, since the CoarseGrainedSchedulerBackend
261+
// is started at that time. That is what schedules the re-logins. It is scheduled only if the
262+
// principal is actually setup. So we make sure it is available.
263+
SparkHadoopUtil.get.setPrincipalAndKeytabForLogin(
264+
System.getenv("SPARK_PRINCIPAL"), System.getenv("SPARK_KEYTAB"))
259265
userClassThread = startUserApplication()
260266

261267
// This a bit hacky, but we need to wait until the spark.driver.port property has
@@ -576,11 +582,6 @@ object ApplicationMaster extends Logging {
576582
master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
577583
System.exit(master.run())
578584
}
579-
// At this point, we have tokens that will expire only after a while, so we now schedule a
580-
// login for some time before the tokens expire. Since the SparkContext has already started,
581-
// we can now get access to the driver actor as well.
582-
SparkHadoopUtil.get.setPrincipalAndKeytabForLogin(
583-
System.getenv("SPARK_PRINCIPAL"), System.getenv("SPARK_KEYTAB"))
584585
}
585586

586587
private[spark] def sparkContextInitialized(sc: SparkContext) = {

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,11 +568,14 @@ private[spark] class Client(
568568
case Some(keytabPath) =>
569569
// Generate a file name that can be used for the keytab file, that does not conflict
570570
// with any user file.
571+
logInfo("Attempting to login to the Kerberos" +
572+
s" using principal: $principal and keytab: $keytabPath")
571573
val f = new File(keytabPath)
572574
keytabFileName = f.getName + "-" + System.currentTimeMillis()
573575
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath)
574576
credentials = ugi.getCredentials
575577
loginFromKeytab = true
578+
logInfo("Successfully logged into Kerberos.")
576579
case None =>
577580
throw new SparkException("Keytab must be specified when principal is specified.")
578581
}

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
9292
if (credentials != null) credentials.getSecretKey(new Text(key)) else null
9393
}
9494

95-
override def setPrincipalAndKeytabForLogin(principal: String, keytab: String): Unit ={
95+
override def setPrincipalAndKeytabForLogin(principal: String, keytab: String): Unit = {
9696
loginPrincipal = Option(principal)
9797
keytabFile = Option(keytab)
9898
}

0 commit comments

Comments
 (0)