Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
creds: Credentials): Option[Long] = {

val fsToGetTokens = fileSystems(hadoopConf)
val newCreds = fetchDelegationTokens(
val fetchCreds = fetchDelegationTokens(
getTokenRenewer(hadoopConf),
fsToGetTokens)
fsToGetTokens,
creds)

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens)
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens, creds)
}

// Get the time of next renewal.
val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
val nextRenewalDates = newCreds.getAllTokens.asScala
val nextRenewalDates = fetchCreds.getAllTokens.asScala
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { token =>
val identifier = token
Expand All @@ -66,7 +67,6 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}

creds.addAll(newCreds)
nextRenewalDate
}

Expand All @@ -89,9 +89,9 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration

private def fetchDelegationTokens(
renewer: String,
filesystems: Set[FileSystem]): Credentials = {
filesystems: Set[FileSystem],
creds :Credentials): Credentials = {

val creds = new Credentials()

filesystems.foreach { fs =>
logInfo("getting token for: " + fs)
Expand All @@ -103,15 +103,17 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration

private def getTokenRenewalInterval(
hadoopConf: Configuration,
filesystems: Set[FileSystem]): Option[Long] = {
filesystems: Set[FileSystem],
creds:Credentials): Option[Long] = {
// We cannot use the tokens generated with renewer yarn. Trying to renew
// those will fail with an access control issue. So create new tokens with the logged in
// user as renewer.
val creds = fetchDelegationTokens(
val fetchCreds = fetchDelegationTokens(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here the diff in spark2.2 and master
=> is missing PRINCPAL(aka spark.yarn.principal) config. Not sure if we need to do this now. Let me know your opinion @vanzin @tgravescs

sparkConf.get(PRINCIPAL).flatMap { renewer =>
val creds = new Credentials()
hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst =>
val dstFs = dst.getFileSystem(hadoopConf)
dstFs.addDelegationTokens(renewer, creds)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code was in getTokenRenewalInterval; that call is only needed when principal and keytab are provided, so adding the code back should be ok. It shouldn't cause any issues if it's not there, though, aside from a wasted round trip to the NNs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to not call it if we don't need to so as long as adding the config back doesn't mess with the mesos side of things (since this is now common code) I think that would be good. the PRINCIPAL config is yarn specific config, but looking at SparkSubmit it appears to be using for mesos as well.

@vanzin do you happen to know if mesos is using that as well, I haven't kept up with mesos kerberos support. so not sure if more is going to happen there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure Mesos is not currently hooked up to the principal / keytab stuff. It just picks up the initial delegation token set, and when those expire, things stop working.

Adding the check back here is the right thing; it shouldn't affect Mesos when it adds support for principal / keytab (or if it does, it can be fixed at that time).

UserGroupInformation.getCurrentUser.getUserName,
filesystems)
filesystems,
creds)

val renewIntervals = creds.getAllTokens.asScala.filter {
val renewIntervals = fetchCreds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
Expand Down