Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 0 additions & 120 deletions core/src/main/scala/org/apache/spark/deploy/DCOSSecretStoreUtils.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,7 @@ class SparkHadoopUtil extends Logging {
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
* cluster.
*/
def addCredentials(conf: JobConf) {
val jobCreds = conf.getCredentials()
val userCreds = UserGroupInformation.getCurrentUser().getCredentials()
logInfo(s"Adding user credentials: ${SparkHadoopUtil.get.dumpTokens(userCreds)}")
jobCreds.mergeAll(userCreds)
}
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }

Expand Down Expand Up @@ -441,10 +436,6 @@ class SparkHadoopUtil extends Logging {
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
creds
}

def isProxyUser(ugi: UserGroupInformation): Boolean = {
ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY
}
}

object SparkHadoopUtil {
Expand Down
164 changes: 28 additions & 136 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{ByteArrayOutputStream, DataOutputStream, File, FileOutputStream, IOException}
import java.io.{File, IOException}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.nio.file.Files
Expand Down Expand Up @@ -48,7 +48,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._
Expand Down Expand Up @@ -180,18 +179,7 @@ object SparkSubmit extends CommandLineUtils {
}
}
} else {
if (sysProps.get("spark.mesos.cluster.mode.proxyUser").isDefined) {
// scalastyle:off println
printStream.println("Running as proxy user in mesos cluster mode...")
// scalastyle:on println
SparkHadoopUtil
.get
.runAsSparkUser(
() => runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose))
}
else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}

Expand Down Expand Up @@ -297,7 +285,6 @@ object SparkSubmit extends CommandLineUtils {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -332,139 +319,24 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
if (clusterManager == YARN || clusterManager == LOCAL) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}

if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
val configValue = args.sparkProperties.get(key)
if (configValue.isDefined) {
// scalastyle:off println
printStream.println(s"Setting ${key} to config value: ${configValue.get}")
// scalastyle:on println
sysProps.put(key, configValue.get)
} else {
setRMPrincipal(sysProps)
}
}

// In Mesos (DC/OS) cluster mode with a proxy user
// a) use a local ticket cache to generate DTs and store them in the DC/OS secret store.
// b) pass a secret filename for the driver to retrieve the DTs
// c) reference the DTs with HADOOP_TOKEN_FILE_LOCATION
// Could be moved to the RestSubmissionClient class to avoid the doAs call here.
if(isMesosCluster && args.proxyUser != null && UserGroupInformation.isSecurityEnabled) {
val mesosSandbox = args
.sparkProperties
.getOrElse("spark.mesos.sandbox", "/mnt/mesos/sandbox")
val hostname = args.sparkProperties("spark.dcos.hostname")
val secretStoreName = args
.sparkProperties
.getOrElse("spark.dcos.secret.store.name", "default")
val dcosAuthToken = sys.env.get("SPARK_DCOS_AUTH_TOKEN")
val dtsRootPath = args
.sparkProperties
.getOrElse("spark.dcos.driver.dts.root.path", "/spark")
val dcosVersion = DCOS_VERSION.
withName(args.sparkProperties.getOrElse("spark.dcos.version", "v1_10_X"))

require(hostname != null)
require(dcosAuthToken.isDefined)
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
val currentUser = UserGroupInformation.getCurrentUser()
val creds = currentUser.getCredentials

val sparkConf = new SparkConf()
args.sparkProperties.foreach(p => sparkConf.set(p._1, p._2))
val tokenManager =
new HadoopDelegationTokenManager(sparkConf, new HadoopConfiguration())
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
val baos = new ByteArrayOutputStream()
val w = new DataOutputStream(baos)
creds.writeTokenStorageToStream(w)
w.flush()

val secretBinaryData = baos.toByteArray
val secretName = DCOSSecretStoreUtils.getSecretName()
val secretPath = DCOSSecretStoreUtils.formatPath(dtsRootPath, secretName, dcosVersion)

DCOSSecretStoreUtils
.writeBinarySecret(hostname,
secretPath, secretBinaryData, dcosAuthToken.get, secretStoreName, dcosVersion)

// re-write the secret properties
val previousNames = args
.sparkProperties.get("spark.mesos.driver.secret.names")
val newNames = Seq(previousNames, Option(secretPath))
.flatten
.mkString(",")
args.sparkProperties.update("spark.mesos.driver.secret.names", newNames)

val previousFileNames = args
.sparkProperties.get("spark.mesos.driver.secret.filenames")
val newFileNames = Seq(previousFileNames, Option(secretName))
.flatten
.mkString(",")
args.sparkProperties.update("spark.mesos.driver.secret.filenames", newFileNames)

val fileEnvProperty = "spark.mesos.driverEnv.HADOOP_TOKEN_FILE_LOCATION"
sysProps.put(fileEnvProperty, s"$mesosSandbox/$secretName")

// scalastyle:off println
printStream.println(s"Stored tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
// scalastyle:on println

// re-write the RM principal key and use the proxyUser
// val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
// sysProps.put(key, args.proxyUser)

// we cannot run as the OS user used by the dispatcher, hive fails
// and will see the real user eg. root
sysProps.put("spark.mesos.cluster.mode.proxyUser", args.proxyUser)
}
catch {
case e: Exception =>
// scalastyle:off println
printStream.println(s"Failed to fetch Hadoop delegation tokens $e")
// scalastyle:on println
throw e
}
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
}

// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
Expand Down Expand Up @@ -731,6 +603,26 @@ object SparkSubmit extends CommandLineUtils {
}
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}

if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sysProps)
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
Expand Down
Loading