diff --git a/core/src/main/scala/org/apache/spark/deploy/DCOSSecretStoreUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DCOSSecretStoreUtils.scala deleted file mode 100644 index 75a07e4955aa1..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/DCOSSecretStoreUtils.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy - - -import java.io.{ByteArrayInputStream, InputStreamReader} -import java.net.HttpURLConnection -import java.net.URL -import java.nio.charset.Charset -import java.security.MessageDigest -import java.util.UUID - -import collection.JavaConverters._ -import org.apache.commons.io.{Charsets, IOUtils} - -object DCOS_VERSION extends Enumeration { - type DCOS_VERSION = Value - val v1_10_X, v1_11_X = Value -} - -private[spark] object DCOSSecretStoreUtils { - - import org.apache.spark.deploy.DCOS_VERSION._ - - /** - * Writes a binary secret to the DC/OS secret store. - * To be used by Spark Submit, integrates with its printstream - * - * @param hostName the hostname of the DC/OS master - * @param path {secret_store_name}/path/to/secret - * @param value secret value in binary format - * @param token the authentication token to use for accessing the secrets HTTP API - * @param dcosVersion DC/OS version. HTTP API depends on that. - */ - def writeBinarySecret( - hostName: String, - path: String, - value: Array[Byte], - token: String, - secretStoreName: String, - dcosVersion: DCOS_VERSION = v1_10_X): Unit = { - val authurl = new URL(s"https://$hostName/secrets/v1/secret/$secretStoreName" + path) - val conn = authurl.openConnection().asInstanceOf[HttpURLConnection] - conn.setRequestProperty("Authorization", s"token=$token") - conn.setRequestMethod("PUT") - conn.setDoOutput(true) - - dcosVersion match { - case DCOS_VERSION.`v1_10_X` => - conn.setRequestProperty("Content-Type", "application/json") - - val encoder = java.util.Base64.getEncoder() - val outputValue = encoder.encode(value) - - val outPut = - s""" - |{"value":"${new String(outputValue)}"} - """.stripMargin - val byteArrayOutput = outPut.getBytes(Charset.forName("UTF-8")) - conn.getOutputStream.write(byteArrayOutput) - conn.getOutputStream.flush() - conn.getOutputStream.close() - val res = IOUtils.toString(conn.getInputStream()) - // scalastyle:off println - SparkSubmit.printStream.println(s"Res:${conn.getResponseMessage}") - // scalastyle:on println - conn.disconnect() - - case DCOS_VERSION.`v1_11_X` => - conn.setRequestProperty("Content-Type", "application/octet-stream") - conn.getOutputStream.write(value) - conn.getOutputStream.flush() - conn.getOutputStream.close() - val res = IOUtils.toString(conn.getInputStream()) - // scalastyle:off println - SparkSubmit.printStream.println(s"Res:${conn.getResponseMessage}") - // scalastyle:on println - conn.disconnect() - } - } - - def bytesToHex(bytes: Array[Byte]): String = { - val builder = StringBuilder.newBuilder - for (b <- bytes) { - builder.append("%02x".format(b)) - } - builder.toString - } - - def getSecretName(): String = { - val salt = MessageDigest.getInstance("SHA-256") - salt.update(UUID.randomUUID().toString().getBytes("UTF-8")) - val digest = bytesToHex(salt.digest()) - s"DTS-$digest" - } - - def formatPath(secretPath: String, secretName: String, version: DCOS_VERSION): String = { - val prefix = { - version match { - case DCOS_VERSION.`v1_10_X` => "__dcos_base64__" - case DCOS_VERSION.`v1_11_X` => "" - } - } - secretPath + "/" + prefix + secretName - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index fe48d713dec1e..093c913c0bb59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -124,12 +124,7 @@ class SparkHadoopUtil extends Logging { * Add any user credentials to the job conf which are necessary for running on a secure Hadoop * cluster. */ - def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials() - val userCreds = UserGroupInformation.getCurrentUser().getCredentials() - logInfo(s"Adding user credentials: ${SparkHadoopUtil.get.dumpTokens(userCreds)}") - jobCreds.mergeAll(userCreds) - } + def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } @@ -441,10 +436,6 @@ class SparkHadoopUtil extends Logging { creds.readTokenStorageStream(new DataInputStream(tokensBuf)) creds } - - def isProxyUser(ugi: UserGroupInformation): Boolean = { - ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY - } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index f3a4fb6456f43..7cb2a24abfc2d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{ByteArrayOutputStream, DataOutputStream, File, FileOutputStream, IOException} +import java.io.{File, IOException} import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL import java.nio.file.Files @@ -48,7 +48,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ -import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util._ @@ -180,18 +179,7 @@ object SparkSubmit extends CommandLineUtils { } } } else { - if (sysProps.get("spark.mesos.cluster.mode.proxyUser").isDefined) { - // scalastyle:off println - printStream.println("Running as proxy user in mesos cluster mode...") - // scalastyle:on println - SparkHadoopUtil - .get - .runAsSparkUser( - () => runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)) - } - else { - runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) - } + runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } @@ -297,7 +285,6 @@ object SparkSubmit extends CommandLineUtils { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER - val isMesosClient = clusterManager == MESOS && deployMode == CLIENT // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code @@ -332,139 +319,24 @@ object SparkSubmit extends CommandLineUtils { } // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) { + if (clusterManager == YARN || clusterManager == LOCAL) { if (args.principal != null) { - if (args.keytab != null) { - require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") + require(args.keytab != null, "Keytab must be specified when principal is specified") + if (!new File(args.keytab).exists()) { + throw new SparkException(s"Keytab file: ${args.keytab} does not exist") + } else { // Add keytab and principal configurations in sysProps to make them available // for later use; e.g. in spark sql, the isolated class loader used to talk // to HiveMetastore will use these settings. They will be set as Java system // properties and then loaded by SparkConf sysProps.put("spark.yarn.keytab", args.keytab) sysProps.put("spark.yarn.principal", args.principal) + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } } - if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { - val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" - val configValue = args.sparkProperties.get(key) - if (configValue.isDefined) { - // scalastyle:off println - printStream.println(s"Setting ${key} to config value: ${configValue.get}") - // scalastyle:on println - sysProps.put(key, configValue.get) - } else { - setRMPrincipal(sysProps) - } - } - - // In Mesos (DC/OS) cluster mode with a proxy user - // a) use a local ticket cache to generate DTs and store them in the DC/OS secret store. - // b) pass a secret filename for the driver to retrieve the DTs - // c) reference the DTs with HADOOP_TOKEN_FILE_LOCATION - // Could be moved to the RestSubmissionClient class to avoid the doAs call here. - if(isMesosCluster && args.proxyUser != null && UserGroupInformation.isSecurityEnabled) { - val mesosSandbox = args - .sparkProperties - .getOrElse("spark.mesos.sandbox", "/mnt/mesos/sandbox") - val hostname = args.sparkProperties("spark.dcos.hostname") - val secretStoreName = args - .sparkProperties - .getOrElse("spark.dcos.secret.store.name", "default") - val dcosAuthToken = sys.env.get("SPARK_DCOS_AUTH_TOKEN") - val dtsRootPath = args - .sparkProperties - .getOrElse("spark.dcos.driver.dts.root.path", "/spark") - val dcosVersion = DCOS_VERSION. - withName(args.sparkProperties.getOrElse("spark.dcos.version", "v1_10_X")) - - require(hostname != null) - require(dcosAuthToken.isDefined) - val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, - UserGroupInformation.getCurrentUser()) - try { - proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { - override def run(): Unit = { - try { - val currentUser = UserGroupInformation.getCurrentUser() - val creds = currentUser.getCredentials - - val sparkConf = new SparkConf() - args.sparkProperties.foreach(p => sparkConf.set(p._1, p._2)) - val tokenManager = - new HadoopDelegationTokenManager(sparkConf, new HadoopConfiguration()) - val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) - val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) - val baos = new ByteArrayOutputStream() - val w = new DataOutputStream(baos) - creds.writeTokenStorageToStream(w) - w.flush() - - val secretBinaryData = baos.toByteArray - val secretName = DCOSSecretStoreUtils.getSecretName() - val secretPath = DCOSSecretStoreUtils.formatPath(dtsRootPath, secretName, dcosVersion) - - DCOSSecretStoreUtils - .writeBinarySecret(hostname, - secretPath, secretBinaryData, dcosAuthToken.get, secretStoreName, dcosVersion) - - // re-write the secret properties - val previousNames = args - .sparkProperties.get("spark.mesos.driver.secret.names") - val newNames = Seq(previousNames, Option(secretPath)) - .flatten - .mkString(",") - args.sparkProperties.update("spark.mesos.driver.secret.names", newNames) - - val previousFileNames = args - .sparkProperties.get("spark.mesos.driver.secret.filenames") - val newFileNames = Seq(previousFileNames, Option(secretName)) - .flatten - .mkString(",") - args.sparkProperties.update("spark.mesos.driver.secret.filenames", newFileNames) - - val fileEnvProperty = "spark.mesos.driverEnv.HADOOP_TOKEN_FILE_LOCATION" - sysProps.put(fileEnvProperty, s"$mesosSandbox/$secretName") - - // scalastyle:off println - printStream.println(s"Stored tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") - // scalastyle:on println - - // re-write the RM principal key and use the proxyUser - // val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" - // sysProps.put(key, args.proxyUser) - - // we cannot run as the OS user used by the dispatcher, hive fails - // and will see the real user eg. root - sysProps.put("spark.mesos.cluster.mode.proxyUser", args.proxyUser) - } - catch { - case e: Exception => - // scalastyle:off println - printStream.println(s"Failed to fetch Hadoop delegation tokens $e") - // scalastyle:on println - throw e - } - } - }) - } catch { - case e: Exception => - // Hadoop's AuthorizationException suppresses the exception's stack trace, which - // makes the message printed to the output by the JVM not very helpful. Instead, - // detect exceptions with empty stack traces here, and treat them differently. - if (e.getStackTrace().length == 0) { - // scalastyle:off println - printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") - // scalastyle:on println - exitFn(1) - } else { - throw e - } - } - } - // In client mode, download remote files. var localPrimaryResource: String = null var localJars: String = null @@ -731,6 +603,26 @@ object SparkSubmit extends CommandLineUtils { } } + // assure a keytab is available from any place in a JVM + if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) { + if (args.principal != null) { + if (args.keytab != null) { + require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + } + } + } + + if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { + setRMPrincipal(sysProps) + } + // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 2bfcec1d1e884..21cb94142b15b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -191,10 +191,6 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { logDebug(s"Sending GET request to server at $url.") val conn = url.openConnection().asInstanceOf[HttpURLConnection] conn.setRequestMethod("GET") - sys - .env - .get("SPARK_DCOS_AUTH_TOKEN") - .foreach(token => conn.setRequestProperty("Authorization", s"token=$token")) readResponse(conn) } @@ -203,10 +199,6 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { logDebug(s"Sending POST request to server at $url.") val conn = url.openConnection().asInstanceOf[HttpURLConnection] conn.setRequestMethod("POST") - sys - .env - .get("SPARK_DCOS_AUTH_TOKEN") - .foreach(token => conn.setRequestProperty("Authorization", s"token=$token")) readResponse(conn) } @@ -218,10 +210,6 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { conn.setRequestProperty("Content-Type", "application/json") conn.setRequestProperty("charset", "utf-8") conn.setDoOutput(true) - sys - .env - .get("SPARK_DCOS_AUTH_TOKEN") - .foreach(token => conn.setRequestProperty("Authorization", s"token=$token")) try { val out = new DataOutputStream(conn.getOutputStream) Utils.tryWithSafeFinally { @@ -312,11 +300,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { } } masterUrl = masterUrl.stripSuffix("/") - if (sys.env.get("SPARK_MESOS_SSL").isDefined) { - s"https://$masterUrl/$PROTOCOL_VERSION/submissions" - } else { - s"http://$masterUrl/$PROTOCOL_VERSION/submissions" - } + s"http://$masterUrl/$PROTOCOL_VERSION/submissions" } /** Throw an exception if this is not standalone mode. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 5f26af57a88f4..23b344230e490 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -195,7 +195,6 @@ class HadoopRDD[K, V]( val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) - logInfo(s"HadoopRDD tokens: ${SparkHadoopUtil.get.dumpTokens(jobConf.getCredentials)}") val inputFormat = getInputFormat(jobConf) val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 53f5319ba6f82..e811b82efa36d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -500,10 +500,6 @@ private[spark] class MesosClusterScheduler( private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = { val builder = CommandInfo.newBuilder() - // Pick the user defined OS user for the driver container - val driverUser = desc.conf.getOption("spark.mesos.cluster.mode.driverUser") - driverUser.foreach(dUser => builder.setUser(dUser)) - builder.setValue(getDriverCommandValue(desc)) builder.setEnvironment(getDriverEnvironment(desc)) builder.addAllUris(getDriverUris(desc).asJava) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 6bb1241ddaac3..085857299b472 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -31,7 +31,6 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.mesos.config._ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -61,8 +60,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager = new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint) - private val isProxyUser = SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser()) - // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 @@ -182,17 +179,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.env.metricsSystem.registerSource(metricsSource) - // fetch DTs early enough to avoid metastore connection issue - // proxy user in cluster mode - if (sc.conf.getOption("spark.mesos.cluster.mode.proxyUser").isDefined) { - fetchHadoopDelegationTokens() - } - - // proxy user in client mode - if (isProxyUser && sc.deployMode == "client" ) { - fetchHadoopDelegationTokens() - } - val startedBefore = IdHelper.startedBefore.getAndSet(true) val suffix = if (startedBefore) { @@ -747,16 +733,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { if (UserGroupInformation.isSecurityEnabled) { - val token_file = sys.env.get("HADOOP_TOKEN_FILE_LOCATION") - - if (token_file.isDefined) { - val user = UserGroupInformation.getCurrentUser - val creds = user.getCredentials - logInfo(s"Fetched tokens fron token file: ${SparkHadoopUtil.get.dumpTokens(creds)}") - Some(SparkHadoopUtil.get.serialize(creds)) - } else { - Some(hadoopDelegationTokenManager.getTokens()) - } + Some(hadoopDelegationTokenManager.getTokens()) } else { None } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala index 58e4b3c7090e5..7165bfae18a5e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala @@ -59,13 +59,9 @@ private[spark] class MesosHadoopDelegationTokenManager( private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { try { - val currentUser = UserGroupInformation.getCurrentUser() - val creds = currentUser.getCredentials + val creds = UserGroupInformation.getCurrentUser.getCredentials val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) - if (SparkHadoopUtil.get.isProxyUser(currentUser)) { - currentUser.addCredentials(creds) - } logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") (SparkHadoopUtil.get.serialize(creds), SparkHadoopUtil.getDateOfNextUpdate(rt, 0.75)) } catch {