Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 6 additions & 32 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package org.apache.spark.deploy

import java.io.{ByteArrayInputStream, DataInputStream, IOException}
import java.io.IOException
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand All @@ -40,7 +38,6 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -277,29 +274,6 @@ class SparkHadoopUtil extends Logging {
}
}

/**
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
* is valid the latest)?
* This will return -ve (or 0) value if the fraction of validity has already expired.
*/
def getTimeFromNowToRenewal(
sparkConf: SparkConf,
fraction: Double,
credentials: Credentials): Long = {
val now = System.currentTimeMillis()

val renewalInterval = sparkConf.get(TOKEN_RENEWAL_INTERVAL).get

credentials.getAllTokens.asScala
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.map { t =>
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
}.foldLeft(0L)(math.max)
}


private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
val fileName = credentialsPath.getName
fileName.substring(
Expand Down Expand Up @@ -337,15 +311,15 @@ class SparkHadoopUtil extends Logging {
}

/**
* Start a thread to periodically update the current user's credentials with new delegation
* tokens so that writes to HDFS do not fail.
* Start a thread to periodically update the current user's credentials with new credentials so
* that access to secured service does not fail.
*/
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}
private[spark] def startCredentialUpdater(conf: SparkConf) {}

/**
* Stop the thread that does the delegation token updates.
* Stop the thread that does the credential updates.
*/
private[spark] def stopExecutorDelegationTokenRenewer() {}
private[spark] def stopCredentialUpdater() {}

/**
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
if (driverConf.contains("spark.yarn.credentials.file")) {
logInfo("Will periodically update credentials from: " +
driverConf.get("spark.yarn.credentials.file"))
SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

val env = SparkEnv.createExecutorEnv(
Expand All @@ -215,7 +215,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
SparkHadoopUtil.get.stopCredentialUpdater()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.internal

import java.util.concurrent.TimeUnit

import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit

Expand Down Expand Up @@ -82,11 +80,6 @@ package object config {
.doc("Name of the Kerberos principal.")
.stringConf.createOptional

private[spark] val TOKEN_RENEWAL_INTERVAL = ConfigBuilder("spark.yarn.token.renewal.interval")
.internal()
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional
Expand Down
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,4 @@ spark-deps-.*
org.apache.spark.scheduler.ExternalClusterManager
.*\.sql
.Rbuildignore
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
22 changes: 14 additions & 8 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,14 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
</tr>
<tr>
<td><code>spark.yarn.security.tokens.${service}.enabled</code></td>
<td><code>spark.yarn.security.credentials.${service}.enabled</code></td>
<td><code>true</code></td>
<td>
Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled.
By default, delegation tokens for all supported services are retrieved when those services are
Controls whether to obtain credentials for services when security is enabled.
By default, credentials for all supported services are retrieved when those services are
configured, but it's possible to disable that behavior if it somehow conflicts with the
application being run.
<p/>
Currently supported services are: <code>hive</code>, <code>hbase</code>
application being run. For further details please see
[Running in a Secure Cluster](running-on-yarn.html#running-in-a-secure-cluster)
</td>
</tr>
<tr>
Expand Down Expand Up @@ -525,11 +524,11 @@ token for the cluster's HDFS filesystem, and potentially for HBase and Hive.

An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
and `spark.yarn.security.tokens.hbase.enabled` is not set to `false`.
and `spark.yarn.security.credentials.hbase.enabled` is not set to `false`.

Similarly, a Hive token will be obtained if Hive is on the classpath, its configuration
includes a URI of the metadata store in `"hive.metastore.uris`, and
`spark.yarn.security.tokens.hive.enabled` is not set to `false`.
`spark.yarn.security.credentials.hive.enabled` is not set to `false`.

If an application needs to interact with other secure HDFS clusters, then
the tokens needed to access these clusters must be explicitly requested at
Expand All @@ -539,6 +538,13 @@ launch time. This is done by listing them in the `spark.yarn.access.namenodes` p
spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
```

Spark supports integrating with other security-aware services through Java Services mechanism (see
`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider`
should be available to Spark by listing their names in the corresponding file in the jar's
`META-INF/services` directory. These plug-ins can be disabled by setting
`spark.yarn.security.tokens.{service}.enabled` to `false`, where `{service}` is the name of
credential provider.

## Configuring the External Shuffle Service

To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, follow these
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,10 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.jdbc"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.parquetFile"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.applySchema")
)
) ++ Seq(
// [SPARK-14743] Improve delegation token handling in secure cluster
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal")
)
}

def excludes(version: String) = version match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
org.apache.spark.deploy.yarn.security.HiveCredentialProvider
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc._
Expand Down Expand Up @@ -112,7 +113,7 @@ private[spark] class ApplicationMaster(
// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)

private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
private var credentialRenewer: AMCredentialRenewer = _

// Load the list of localized files set by the client. This is used when launching executors,
// and is loaded here so that these configs don't pollute the Web UI's environment page in
Expand Down Expand Up @@ -235,10 +236,11 @@ private[spark] class ApplicationMaster(
// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
credentialRenewer =
new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
credentialRenewer.scheduleLoginFromKeytab()
}

if (isClusterMode) {
Expand Down Expand Up @@ -305,7 +307,10 @@ private[spark] class ApplicationMaster(
logDebug("shutting down user thread")
userClassThread.interrupt()
}
if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
if (!inShutdown && credentialRenewer != null) {
credentialRenewer.stop()
credentialRenewer = null
}
}
}
}
Expand Down
63 changes: 29 additions & 34 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
OutputStreamWriter}
import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
Expand All @@ -35,7 +34,6 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Expand All @@ -52,6 +50,7 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
Expand Down Expand Up @@ -122,6 +121,8 @@ private[spark] class Client(
private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())

private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)

def reportLauncherState(state: SparkAppHandle.State): Unit = {
launcherBackend.setState(state)
}
Expand Down Expand Up @@ -390,8 +391,31 @@ private[spark] class Client(
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
val fs = destDir.getFileSystem(hadoopConf)
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)

// Merge credentials obtained from registered providers
val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)

if (credentials != null) {
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
}

// If we use principal and keytab to login, also credentials can be renewed some time
// after current time, we should pass the next renewal and updating time to credential
// renewer and updater.
if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
nearestTimeOfNextRenewal != Long.MaxValue) {

// Valid renewal time is 75% of next renewal time, and the valid update time will be
// slightly later then renewal time (80% of next renewal time). This is to make sure
// credentials are renewed and updated before expired.
val currTime = System.currentTimeMillis()
val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime

sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
}

// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
Expand All @@ -400,11 +424,6 @@ private[spark] class Client(
// same name but different path files are added multiple time, YARN will fail to launch
// containers for the app with an internal error.
val distributedNames = new HashSet[String]
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
if (credentials != null) {
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
}

val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
.getOrElse(fs.getDefaultReplication(destDir))
Expand Down Expand Up @@ -716,28 +735,6 @@ private[spark] class Client(
confArchive
}

/**
* Get the renewal interval for tokens.
*/
private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
// We cannot use the tokens generated above since those have renewer yarn. Trying to renew
// those will fail with an access control issue. So create new tokens with the logged in
// user as renewer.
val creds = new Credentials()
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
nns, hadoopConf, creds, sparkConf.get(PRINCIPAL))
val t = creds.getAllTokens.asScala
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.head
val newExpiration = t.renew(hadoopConf)
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal Interval set to $interval")
interval
}

/**
* Set up the environment for launching our ApplicationMaster container.
*/
Expand All @@ -754,8 +751,6 @@ private[spark] class Client(
val credentialsFile = "credentials-" + UUID.randomUUID().toString
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
val renewalInterval = getTokenRenewalInterval(stagingDirPath)
sparkConf.set(TOKEN_RENEWAL_INTERVAL, renewalInterval)
}

// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
Expand Down
Loading