Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/DCOSSecretStoreUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy


import java.io.{ByteArrayInputStream, InputStreamReader}
import java.net.HttpURLConnection
import java.net.URL
import java.nio.charset.Charset
import java.security.MessageDigest
import java.util.UUID

import collection.JavaConverters._
import org.apache.commons.io.{Charsets, IOUtils}

object DCOS_VERSION extends Enumeration {
type DCOS_VERSION = Value
val v1_10_X, v1_11_X = Value
}

private[spark] object DCOSSecretStoreUtils {

import org.apache.spark.deploy.DCOS_VERSION._

/**
* Writes a binary secret to the DC/OS secret store.
* To be used by Spark Submit, integrates with its printstream
*
* @param hostName the hostname of the DC/OS master
* @param path {secret_store_name}/path/to/secret
* @param value secret value in binary format
* @param token the authentication token to use for accessing the secrets HTTP API
* @param dcosVersion DC/OS version. HTTP API depends on that.
*/
def writeBinarySecret(
hostName: String,
path: String,
value: Array[Byte],
token: String,
secretStoreName: String,
dcosVersion: DCOS_VERSION = v1_10_X): Unit = {
val authurl = new URL(s"https://$hostName/secrets/v1/secret/$secretStoreName" + path)
val conn = authurl.openConnection().asInstanceOf[HttpURLConnection]
conn.setRequestProperty("Authorization", s"token=$token")
conn.setRequestMethod("PUT")
conn.setDoOutput(true)

dcosVersion match {
case DCOS_VERSION.`v1_10_X` =>
conn.setRequestProperty("Content-Type", "application/json")

val encoder = java.util.Base64.getEncoder()
val outputValue = encoder.encode(value)

val outPut =
s"""
|{"value":"${new String(outputValue)}"}
""".stripMargin
val byteArrayOutput = outPut.getBytes(Charset.forName("UTF-8"))
conn.getOutputStream.write(byteArrayOutput)
conn.getOutputStream.flush()
conn.getOutputStream.close()
val res = IOUtils.toString(conn.getInputStream())
// scalastyle:off println
SparkSubmit.printStream.println(s"Res:${conn.getResponseMessage}")
// scalastyle:on println
conn.disconnect()

case DCOS_VERSION.`v1_11_X` =>
conn.setRequestProperty("Content-Type", "application/octet-stream")
conn.getOutputStream.write(value)
conn.getOutputStream.flush()
conn.getOutputStream.close()
val res = IOUtils.toString(conn.getInputStream())
// scalastyle:off println
SparkSubmit.printStream.println(s"Res:${conn.getResponseMessage}")
// scalastyle:on println
conn.disconnect()
}
}

def bytesToHex(bytes: Array[Byte]): String = {
val builder = StringBuilder.newBuilder
for (b <- bytes) {
builder.append("%02x".format(b))
}
builder.toString
}

def getSecretName(): String = {
val salt = MessageDigest.getInstance("SHA-256")
salt.update(UUID.randomUUID().toString().getBytes("UTF-8"))
val digest = bytesToHex(salt.digest())
s"DTS-$digest"
}

def formatPath(secretPath: String, secretName: String, version: DCOS_VERSION): String = {
val prefix = {
version match {
case DCOS_VERSION.`v1_10_X` => "__dcos_base64__"
case DCOS_VERSION.`v1_11_X` => ""
}
}
secretPath + "/" + prefix + secretName
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ class SparkHadoopUtil extends Logging {
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
* cluster.
*/
def addCredentials(conf: JobConf) {}
Copy link
Copy Markdown
Author

@skonto skonto May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here was due to the messed up release. This method was empty and caused failures when HadoopRDD was used!

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanx

def addCredentials(conf: JobConf) {
val jobCreds = conf.getCredentials()
val userCreds = UserGroupInformation.getCurrentUser().getCredentials()
logInfo(s"Adding user credentials: ${SparkHadoopUtil.get.dumpTokens(userCreds)}")
jobCreds.mergeAll(userCreds)
}

def isYarnMode(): Boolean = { false }

Expand Down Expand Up @@ -436,6 +441,10 @@ class SparkHadoopUtil extends Logging {
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
creds
}

def isProxyUser(ugi: UserGroupInformation): Boolean = {
ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY
}
}

object SparkHadoopUtil {
Expand Down
164 changes: 136 additions & 28 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{File, IOException}
import java.io.{ByteArrayOutputStream, DataOutputStream, File, FileOutputStream, IOException}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.nio.file.Files
Expand Down Expand Up @@ -48,6 +48,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._
Expand Down Expand Up @@ -179,7 +180,18 @@ object SparkSubmit extends CommandLineUtils {
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
if (sysProps.get("spark.mesos.cluster.mode.proxyUser").isDefined) {
// scalastyle:off println
printStream.println("Running as proxy user in mesos cluster mode...")
// scalastyle:on println
SparkHadoopUtil
.get
.runAsSparkUser(
() => runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose))
}
else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
}

Expand Down Expand Up @@ -285,6 +297,7 @@ object SparkSubmit extends CommandLineUtils {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -319,24 +332,139 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}

if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
val configValue = args.sparkProperties.get(key)
if (configValue.isDefined) {
// scalastyle:off println
printStream.println(s"Setting ${key} to config value: ${configValue.get}")
// scalastyle:on println
sysProps.put(key, configValue.get)
} else {
setRMPrincipal(sysProps)
}
}

// In Mesos (DC/OS) cluster mode with a proxy user
// a) use a local ticket cache to generate DTs and store them in the DC/OS secret store.
// b) pass a secret filename for the driver to retrieve the DTs
// c) reference the DTs with HADOOP_TOKEN_FILE_LOCATION
// Could be moved to the RestSubmissionClient class to avoid the doAs call here.
if(isMesosCluster && args.proxyUser != null && UserGroupInformation.isSecurityEnabled) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does UserGroupInformation.isSecurityEnabled get set to True?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it is either PROXY or KERBEROS not SIMPLE.

val mesosSandbox = args
.sparkProperties
.getOrElse("spark.mesos.sandbox", "/mnt/mesos/sandbox")
val hostname = args.sparkProperties("spark.dcos.hostname")
val secretStoreName = args
.sparkProperties
.getOrElse("spark.dcos.secret.store.name", "default")
val dcosAuthToken = sys.env.get("SPARK_DCOS_AUTH_TOKEN")
val dtsRootPath = args
.sparkProperties
.getOrElse("spark.dcos.driver.dts.root.path", "/spark")
val dcosVersion = DCOS_VERSION.
withName(args.sparkProperties.getOrElse("spark.dcos.version", "v1_10_X"))

require(hostname != null)
require(dcosAuthToken.isDefined)
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
val currentUser = UserGroupInformation.getCurrentUser()
val creds = currentUser.getCredentials
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get the TGT?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


val sparkConf = new SparkConf()
args.sparkProperties.foreach(p => sparkConf.set(p._1, p._2))
val tokenManager =
new HadoopDelegationTokenManager(sparkConf, new HadoopConfiguration())
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
val baos = new ByteArrayOutputStream()
val w = new DataOutputStream(baos)
creds.writeTokenStorageToStream(w)
w.flush()

val secretBinaryData = baos.toByteArray
val secretName = DCOSSecretStoreUtils.getSecretName()
val secretPath = DCOSSecretStoreUtils.formatPath(dtsRootPath, secretName, dcosVersion)

DCOSSecretStoreUtils
.writeBinarySecret(hostname,
secretPath, secretBinaryData, dcosAuthToken.get, secretStoreName, dcosVersion)

// re-write the secret properties
val previousNames = args
.sparkProperties.get("spark.mesos.driver.secret.names")
val newNames = Seq(previousNames, Option(secretPath))
.flatten
.mkString(",")
args.sparkProperties.update("spark.mesos.driver.secret.names", newNames)

val previousFileNames = args
.sparkProperties.get("spark.mesos.driver.secret.filenames")
val newFileNames = Seq(previousFileNames, Option(secretName))
.flatten
.mkString(",")
args.sparkProperties.update("spark.mesos.driver.secret.filenames", newFileNames)

val fileEnvProperty = "spark.mesos.driverEnv.HADOOP_TOKEN_FILE_LOCATION"
sysProps.put(fileEnvProperty, s"$mesosSandbox/$secretName")

// scalastyle:off println
printStream.println(s"Stored tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
// scalastyle:on println

// re-write the RM principal key and use the proxyUser
// val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
// sysProps.put(key, args.proxyUser)

// we cannot run as the OS user used by the dispatcher, hive fails
// and will see the real user eg. root
sysProps.put("spark.mesos.cluster.mode.proxyUser", args.proxyUser)
}
catch {
case e: Exception =>
// scalastyle:off println
printStream.println(s"Failed to fetch Hadoop delegation tokens $e")
// scalastyle:on println
throw e
}
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
}

// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
Expand Down Expand Up @@ -603,26 +731,6 @@ object SparkSubmit extends CommandLineUtils {
}
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}

if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sysProps)
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
Expand Down
Loading