diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b23accbbb941..7a1d5ce3dc08 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -221,6 +221,11 @@ class SparkContext( } executorEnvs("SPARK_USER") = sparkUser + // Need to do security authentication when Hadoop security is turned on + if (SparkHadoopUtil.get.isSecurityEnabled()) { + SparkHadoopUtil.get.doUserAuthentication(this) + } + // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) taskScheduler.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9bdbfb33bf54..84468a0d6538 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,17 +24,25 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark._ import scala.collection.JavaConversions._ +import java.util.{Collection, TimerTask, Timer} +import java.io.{File, IOException} +import org.apache.hadoop.fs.{FileSystem, Path} +import java.net.URI +import org.apache.hadoop.security.token.{TokenIdentifier, Token} +import org.apache.hadoop.fs.permission.FsPermission /** * Contains util methods to interact with Hadoop from Spark. */ -class SparkHadoopUtil { +class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) + val sparkConf = new SparkConf() + def runAsUser(user: String)(func: () => Unit) { if (user != SparkContext.SPARK_UNKNOWN_USER) { val ugi = UserGroupInformation.createRemoteUser(user) @@ -75,6 +83,165 @@ class SparkHadoopUtil { def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } + /** + * Return whether Hadoop security is enabled or not. + * + * @return Whether Hadoop security is enabled or not + */ + def isSecurityEnabled(): Boolean = { + UserGroupInformation.isSecurityEnabled + } + + /** + * Do user authentication when Hadoop security is turned on. Used by the driver. + * + * @param sc Spark context + */ + def doUserAuthentication(sc: SparkContext) { + getAuthenticationType match { + case "keytab" => { + // Authentication through a Kerberos keytab file. Necessary for + // long-running services like Shark/Spark Streaming. + scheduleKerberosRenewTask(sc) + } + case _ => { + // No authentication needed. Assuming authentication is already done + // before Spark is launched, e.g., the user has authenticated with + // Kerberos through kinit already. + // Renew a Hadoop delegation token and store the token into a file. + // Add the token file so it gets downloaded by every slave nodes. + sc.addFile(initDelegationToken().toString) + } + } + } + + /** + * Get the user whom the task belongs to. + * + * @param userName Name of the user whom the task belongs to + * @return The user whom the task belongs to + */ + def getTaskUser(userName: String): UserGroupInformation = { + val ugi = UserGroupInformation.createRemoteUser(userName) + // Change the authentication method to Kerberos + ugi.setAuthenticationMethod( + UserGroupInformation.AuthenticationMethod.KERBEROS) + // Get and add Hadoop delegation tokens for the user + val iter = getDelegationTokens().iterator() + while (iter.hasNext) { + ugi.addToken(iter.next()) + } + + ugi + } + + /** + * Get the type of Hadoop security authentication. + * + * @return Type of Hadoop security authentication + */ + private def getAuthenticationType: String = { + sparkConf.get("spark.hadoop.security.authentication") + } + + /** + * Schedule a timer task for automatically renewing Kerberos credential. + * + * @param sc Spark context + */ + private def scheduleKerberosRenewTask(sc: SparkContext) { + val kerberosRenewTimer = new Timer() + val kerberosRenewTimerTask = new TimerTask { + def run() { + try { + kerberosLoginFromKeytab + // Renew a Hadoop delegation token and store the token into a file. + // Add the token file so it gets downloaded by every slave nodes. + sc.addFile(initDelegationToken().toString) + } catch { + case ioe: IOException => { + logError("Failed to login from Kerberos keytab", ioe) + } + } + } + } + + val interval = sparkConf.getLong( + "spark.hadoop.security.kerberos.renewInterval", 21600000) + kerberosRenewTimer.schedule(kerberosRenewTimerTask, 0, interval) + logInfo("Scheduled timer task for renewing Kerberos credential") + } + + /** + * Log a user in from a keytab file. Loads user credential from a keytab + * file and logs the user in. + */ + private def kerberosLoginFromKeytab() { + val defaultKeytab = System.getProperty("user.home") + Path.SEPARATOR + + System.getProperty("user.name") + ".keytab" + val keytab = sparkConf.get( + "spark.hadoop.security.kerberos.keytab", defaultKeytab) + val principal = sparkConf.get( + "spark.hadoop.security.kerberos.principal", System.getProperty("user.name")) + + // Keytab file not found + if (!new File(keytab).exists()) { + throw new IOException("Keytab file %s not found".format(keytab)) + } + + UserGroupInformation.loginUserFromKeytab(principal, keytab) + } + + /** + * Initialize a Hadoop delegation token, store the token into a file, + * and add it to the SparkContext so executors can get it. + * + * @return URI of the token file + */ + private def initDelegationToken(): URI = { + val localFS = FileSystem.getLocal(conf) + // Store the token file under user's home directory + val tokenFile = new Path(localFS.getHomeDirectory, sparkConf.get( + "spark.hadoop.security.token.name", "spark.token")) + if (localFS.exists(tokenFile)) { + localFS.delete(tokenFile, false) + } + + // Get a new token and write it to the given token file + val currentUser = UserGroupInformation.getCurrentUser + val fs = FileSystem.get(conf) + val token: Token[_ <: TokenIdentifier] = + fs.getDelegationToken(currentUser.getShortUserName) + .asInstanceOf[Token[_ <: TokenIdentifier]] + val cred = new Credentials() + cred.addToken(token.getService, token) + cred.writeTokenStorageFile(tokenFile, conf) + // Make sure the token file is read-only to the owner + localFS.setPermission(tokenFile, FsPermission.createImmutable(0400)) + + logInfo("Stored Hadoop delegation token for user %s to file %s".format( + currentUser.getShortUserName, tokenFile.toUri.toString)) + tokenFile.toUri + } + + /** + * Get delegation tokens from the token file added through SparkContext.addFile(). + * + * @return Collection of delegation tokens + */ + private def getDelegationTokens(): Collection[Token[_ <: TokenIdentifier]] = { + // Get the token file added through SparkContext.addFile() + val source = new File(SparkFiles.get(sparkConf.get( + "spark.hadoop.security.token.name", "spark.token"))) + if (source.exists()) { + val sourcePath = new Path("file://" + source.getAbsolutePath) + // Read credentials from the token file + Credentials.readTokenStorageFile(sourcePath, conf).getAllTokens + } else { + throw new IOException( + "Token file %s does not exist".format(source.getAbsolutePath)) + } + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index aecb069e4202..83e6dc3e4d3b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util.{AkkaUtils, Utils} +import java.security.PrivilegedExceptionAction /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -173,7 +174,7 @@ private[spark] class Executor( } } - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run() { val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) @@ -188,7 +189,8 @@ private[spark] class Executor( try { SparkEnv.set(env) Accumulators.clear() - val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) + val (userName, taskFiles, taskJars, taskBytes) = + Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) @@ -208,7 +210,19 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() - val value = task.run(taskId.toInt) + var value: Any = None + if (SparkHadoopUtil.get.isSecurityEnabled()) { + // Get the user whom the task belongs to + val ugi = SparkHadoopUtil.get.getTaskUser(userName) + // Run the task as the user whom the task belongs to + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run(): Unit = { + value = task.run(taskId.toInt) + } + }) + } else { + value = task.run(taskId.toInt) + } val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index b85b4a50cd93..fbab1773bd92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -98,6 +98,7 @@ private[spark] object Task { * Serialize a task and the current app dependencies (files and JARs added to the SparkContext) */ def serializeWithDependencies( + userName: String, task: Task[_], currentFiles: HashMap[String, Long], currentJars: HashMap[String, Long], @@ -107,6 +108,9 @@ private[spark] object Task { val out = new FastByteArrayOutputStream(4096) val dataOut = new DataOutputStream(out) + // Write the name of the user launching the task + dataOut.writeUTF(userName) + // Write currentFiles dataOut.writeInt(currentFiles.size) for ((name, timestamp) <- currentFiles) { @@ -134,14 +138,17 @@ private[spark] object Task { * and return the task itself as a serialized ByteBuffer. The caller can then update its * ClassLoaders and deserialize the task. * - * @return (taskFiles, taskJars, taskBytes) + * @return (userName, taskFiles, taskJars, taskBytes) */ def deserializeWithDependencies(serializedTask: ByteBuffer) - : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { + : (String, HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { val in = new ByteBufferInputStream(serializedTask) val dataIn = new DataInputStream(in) + // Read the name of the user launching the task + val userName = dataIn.readUTF() + // Read task's files val taskFiles = new HashMap[String, Long]() val numFiles = dataIn.readInt() @@ -158,6 +165,6 @@ private[spark] object Task { // Create a sub-buffer for the rest of the data, which is the serialized Task object val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task - (taskFiles, taskJars, subBuffer) + (userName, taskFiles, taskJars, subBuffer) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 86d2050a03f1..6ba2207bd808 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -410,10 +410,11 @@ private[spark] class TaskSetManager( lastLaunchTime = curTime // Serialize and return the task val startTime = clock.getTime() + val userName = System.getProperty("user.name") // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( - task, sched.sc.addedFiles, sched.sc.addedJars, ser) + userName, task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( diff --git a/docs/configuration.md b/docs/configuration.md index 1ff015056725..b6f30d8d6dc5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -586,6 +586,41 @@ Apart from these, the following properties are also available, and may be useful Number of cores to allocate for each task. + + spark.hadoop.security.authentication + (none) + + Method used for authenticating user when Hadoop security is turned on. A Hadoop delegation token can be obtained only after the user is authenticated. + + + + spark.hadoop.security.kerberos.renewInterval + 21600000 + + Interval for automatically renewing the Kerberos credential when Hadoop security is turned on and Kerberos is the method for user authentication. + + + + spark.hadoop.security.kerberos.keytab + {Current login user name}.keytab under the home directory of the current login user + + Local path of the Kerberos keytab file. The keytab usually is located on the gateway host to the Spark cluster. + + + + spark.hadoop.security.kerberos.principal + Current login user name + + Principal used for Kerberos login. + + + + spark.hadoop.security.token.name + spark.token + + Name of the file storing the Hadoop delegation token obtained by the driver. + + ## Viewing Spark Properties