diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index 35621daf9c0d..78b0e6b2cbf3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -34,6 +35,7 @@ private[security] class HBaseDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { try { val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index c317c4fe3d82..c134b7ebe38f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -116,7 +116,7 @@ private[spark] class HadoopDelegationTokenManager( creds: Credentials): Long = { delegationTokenProviders.values.flatMap { provider => if (provider.delegationTokensRequired(hadoopConf)) { - provider.obtainDelegationTokens(hadoopConf, creds) + provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) } else { logDebug(s"Service ${provider.serviceName} does not require a token." + s" Check your configuration to see if security is disabled or not.") diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala index f162e7e58c53..1ba245e84af4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala @@ -20,6 +20,8 @@ package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials +import org.apache.spark.SparkConf + /** * Hadoop delegation token provider. */ @@ -46,5 +48,6 @@ private[spark] trait HadoopDelegationTokenProvider { */ def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index f0ac7f501ceb..300773c58b18 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -26,8 +26,9 @@ import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { @@ -41,21 +42,20 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { val fsToGetTokens = fileSystems(hadoopConf) - val newCreds = fetchDelegationTokens( - getTokenRenewer(hadoopConf), - fsToGetTokens) + val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens) } // Get the time of next renewal. val nextRenewalDate = tokenRenewalInterval.flatMap { interval => - val nextRenewalDates = newCreds.getAllTokens.asScala + val nextRenewalDates = fetchCreds.getAllTokens.asScala .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) .map { token => val identifier = token @@ -66,7 +66,6 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) } - creds.addAll(newCreds) nextRenewalDate } @@ -89,9 +88,8 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def fetchDelegationTokens( renewer: String, - filesystems: Set[FileSystem]): Credentials = { - - val creds = new Credentials() + filesystems: Set[FileSystem], + creds: Credentials): Credentials = { filesystems.foreach { fs => logInfo("getting token for: " + fs) @@ -103,25 +101,27 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def getTokenRenewalInterval( hadoopConf: Configuration, + sparkConf: SparkConf, filesystems: Set[FileSystem]): Option[Long] = { // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - val creds = fetchDelegationTokens( - UserGroupInformation.getCurrentUser.getUserName, - filesystems) - - val renewIntervals = creds.getAllTokens.asScala.filter { - _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] - }.flatMap { token => - Try { - val newExpiration = token.renew(hadoopConf) - val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") - interval - }.toOption + sparkConf.get(PRINCIPAL).flatMap { renewer => + val creds = new Credentials() + fetchDelegationTokens(renewer, filesystems, creds) + + val renewIntervals = creds.getAllTokens.asScala.filter { + _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] + }.flatMap { token => + Try { + val newExpiration = token.renew(hadoopConf) + val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") + interval + }.toOption + } + if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } - if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index 53b9f898c6e7..b31cc595ed83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -61,6 +62,7 @@ private[security] class HiveDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { try { val conf = hiveConf(hadoopConf) diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 5b05521e48f8..eeffc36070b4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -94,7 +94,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { val hiveCredentialProvider = new HiveDelegationTokenProvider() val credentials = new Credentials() - hiveCredentialProvider.obtainDelegationTokens(hadoopConf, credentials) + hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials) credentials.getAllTokens.size() should be (0) } @@ -105,7 +105,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { val hbaseTokenProvider = new HBaseDelegationTokenProvider() val creds = new Credentials() - hbaseTokenProvider.obtainDelegationTokens(hadoopConf, creds) + hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds) creds.getAllTokens.size should be (0) }