From 59253b3bc731338df87301ed3f2a2a8432af8a8b Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Tue, 20 Oct 2015 18:23:24 -0700 Subject: [PATCH 1/8] SPARK-10181 UserGroupInformation class needs to be shared for login credentials --- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 38c195bc7db0..89e1ad06c0d6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -697,7 +697,7 @@ private[hive] object HiveContext { doc = "TODO") val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes", - defaultValue = Some(jdbcPrefixes), + defaultValue = Some(jdbcPrefixes ++ securityPrefixes), doc = "A comma separated list of class prefixes that should be loaded using the classloader " + "that is shared between Spark SQL and a specific version of Hive. An example of classes " + "that should be shared is JDBC drivers that are needed to talk to the metastore. Other " + @@ -707,6 +707,11 @@ private[hive] object HiveContext { private def jdbcPrefixes = Seq( "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc") + private def securityPrefixes = Seq( + "org.apache.hadoop.security.UserGroupInformation", + "org.apache.hadoop.security.token.Token", + "org.apache.hadoop.io.Text") + val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes", defaultValue = Some(Seq()), doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " + From 9efa819b8ef1b7d0072de1119da2a0329484c16b Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Fri, 23 Oct 2015 19:17:19 -0700 Subject: [PATCH 2/8] Revert "SPARK-10181 UserGroupInformation class needs to be shared for login credentials" This reverts commit 59253b3bc731338df87301ed3f2a2a8432af8a8b. --- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b699c7a12ea8..c328734df316 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -702,7 +702,7 @@ private[hive] object HiveContext { doc = "TODO") val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes", - defaultValue = Some(jdbcPrefixes ++ securityPrefixes), + defaultValue = Some(jdbcPrefixes), doc = "A comma separated list of class prefixes that should be loaded using the classloader " + "that is shared between Spark SQL and a specific version of Hive. An example of classes " + "that should be shared is JDBC drivers that are needed to talk to the metastore. Other " + @@ -712,11 +712,6 @@ private[hive] object HiveContext { private def jdbcPrefixes = Seq( "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc") - private def securityPrefixes = Seq( - "org.apache.hadoop.security.UserGroupInformation", - "org.apache.hadoop.security.token.Token", - "org.apache.hadoop.io.Text") - val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes", defaultValue = Some(Seq()), doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " + From 30a7d0529ded31c29b5c60e62cb80f7648606e0c Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Sun, 25 Oct 2015 12:54:01 -0700 Subject: [PATCH 3/8] SPARK-10181 Do kerberos login for credentials in hive client initialization --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 ++ .../org/apache/spark/sql/hive/client/ClientWrapper.scala | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 640cc325281a..4f7803c7f4e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -520,6 +520,8 @@ object SparkSubmit { } if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when the keytab is specified") + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 3dce86c48074..02a2fecfdcb7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -21,6 +21,8 @@ import java.io.{File, PrintStream} import java.util.{Map => JMap} import javax.annotation.concurrent.GuardedBy +import org.apache.hadoop.security.UserGroupInformation + import scala.collection.JavaConverters._ import scala.language.reflectiveCalls @@ -150,6 +152,13 @@ private[hive] class ClientWrapper( val original = Thread.currentThread().getContextClassLoader // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) + + val keytab = System.getProperty("spark.yarn.keytab") + val principal = System.getProperty("spark.yarn.principal") + if (keytab != null && principal != null) { + UserGroupInformation.loginUserFromKeytab(principal, keytab) + } + val ret = try { val initialConf = new HiveConf(classOf[SessionState]) // HiveConf is a Hadoop Configuration, which has a field of classLoader and From 1fe7593023e22194b8691ad8c59fdb5d89144236 Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Tue, 3 Nov 2015 00:26:05 -0800 Subject: [PATCH 4/8] SPARK-10181 Do kerberos login for credentials during hive client initialization --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../apache/spark/sql/hive/client/ClientWrapper.scala | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4c0248078520..7bc70d60f351 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -521,7 +521,7 @@ object SparkSubmit { sysProps.put("spark.yarn.isPython", "true") } if (args.principal != null) { - require(args.keytab != null, "Keytab must be specified when the keytab is specified") + require(args.keytab != null, "Keytab must be specified when principal is specified") sysProps.put("spark.yarn.keytab", args.keytab) sysProps.put("spark.yarn.principal", args.principal) UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 02a2fecfdcb7..0fcb0e58db66 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.{Driver, metadata} import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CircularBuffer, Utils} @@ -153,10 +153,11 @@ private[hive] class ClientWrapper( // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) - val keytab = System.getProperty("spark.yarn.keytab") - val principal = System.getProperty("spark.yarn.principal") - if (keytab != null && principal != null) { - UserGroupInformation.loginUserFromKeytab(principal, keytab) + val sparkConf = new SparkConf + if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { + UserGroupInformation.loginUserFromKeytab( + sparkConf.get("spark.yarn.principal"), + sparkConf.get("spark.yarn.keytab")) } val ret = try { From 906b3cb623461d2b255d9feeafdfdcae09731050 Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Wed, 4 Nov 2015 20:18:15 -0800 Subject: [PATCH 5/8] SPARK-10181 Do kerberos login for credentials during hive client initialization V3 --- .../org/apache/spark/deploy/SparkSubmit.scala | 12 ++++++++---- .../spark/sql/hive/client/ClientWrapper.scala | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7bc70d60f351..49a09edb3488 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver} -import org.apache.spark.{SparkUserAppException, SPARK_VERSION} +import org.apache.spark.{SparkException, SparkUserAppException, SPARK_VERSION} import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -522,9 +522,13 @@ object SparkSubmit { } if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") - sysProps.put("spark.yarn.keytab", args.keytab) - sysProps.put("spark.yarn.principal", args.principal) - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + if (!new File(args.keytab).exists()) { + throw new SparkException(s"Keytab file: ${args.keytab} does not exist") + } else { + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 0fcb0e58db66..22ace2e871b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.{Driver, metadata} import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{SparkConf, SparkException, Logging} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CircularBuffer, Utils} @@ -154,10 +154,17 @@ private[hive] class ClientWrapper( Thread.currentThread().setContextClassLoader(initClassLoader) val sparkConf = new SparkConf - if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { - UserGroupInformation.loginUserFromKeytab( - sparkConf.get("spark.yarn.principal"), - sparkConf.get("spark.yarn.keytab")) + val principalName = sparkConf.get("spark.yarn.principal") + val keytabFileName = sparkConf.get("spark.yarn.keytab") + if (principalName != null && keytabFileName != null) { + if (!new File(keytabFileName).exists()) { + throw new SparkException(s"Keytab file: ${keytabFileName}" + + " specified in spark.yarn.keytab does not exist") + } else { + logInfo("Attempting to login to Kerberos" + + s" using principal: ${principalName} and keytab: ${keytabFileName}") + UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName) + } } val ret = try { From 7d09f5df98a2b2d9941e416a6495add2b8620867 Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Thu, 5 Nov 2015 02:03:12 -0800 Subject: [PATCH 6/8] SPARK-10181 Do kerberos login for credentials during hive client initialization V4 --- .../org/apache/spark/sql/hive/client/ClientWrapper.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 22ace2e871b3..009c562fe491 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -154,9 +154,9 @@ private[hive] class ClientWrapper( Thread.currentThread().setContextClassLoader(initClassLoader) val sparkConf = new SparkConf - val principalName = sparkConf.get("spark.yarn.principal") - val keytabFileName = sparkConf.get("spark.yarn.keytab") - if (principalName != null && keytabFileName != null) { + if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { + val principalName = sparkConf.get("spark.yarn.principal") + val keytabFileName = sparkConf.get("spark.yarn.keytab") if (!new File(keytabFileName).exists()) { throw new SparkException(s"Keytab file: ${keytabFileName}" + " specified in spark.yarn.keytab does not exist") From 1fbc3724d3ba7b90a1c412ac3d85d7a7eec9304c Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Sat, 14 Nov 2015 12:36:27 -0800 Subject: [PATCH 7/8] SPARK-10181 Add comments explaining the keytab principal confs propagation --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++++ .../org/apache/spark/sql/hive/client/ClientWrapper.scala | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 49a09edb3488..99054a685e1b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -525,8 +525,12 @@ object SparkSubmit { if (!new File(args.keytab).exists()) { throw new SparkException(s"Keytab file: ${args.keytab} does not exist") } else { + // Add keytab and principal configurations in sysProps to make them available + // for later use (e.g. by spark sql). These Configurations will be set as + // Java system properties and then loaded by SparkConf sysProps.put("spark.yarn.keytab", args.keytab) sysProps.put("spark.yarn.principal", args.principal) + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 2bd81335f371..26e4487d70d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -152,6 +152,11 @@ private[hive] class ClientWrapper( // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) + // Set up kerberos credentials for UserGroupInformation.loginUser within + // current class loader + // Instead of using the spark conf of the current spark context, a new instance of + // SparkConf is needed for the original value of spark.yarn.keytab specified by user, + // as yarn.Client resets it for the link name in distributed cache val sparkConf = new SparkConf if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { val principalName = sparkConf.get("spark.yarn.principal") From caf51a723591b0820d9c48a0e07fec50f511296b Mon Sep 17 00:00:00 2001 From: Yu Gao Date: Sat, 14 Nov 2015 15:28:54 -0800 Subject: [PATCH 8/8] SPARK-10181 Add more comments for clarification --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 5 +++-- .../apache/spark/sql/hive/client/ClientWrapper.scala | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 99054a685e1b..09d2ec90c933 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -526,8 +526,9 @@ object SparkSubmit { throw new SparkException(s"Keytab file: ${args.keytab} does not exist") } else { // Add keytab and principal configurations in sysProps to make them available - // for later use (e.g. by spark sql). These Configurations will be set as - // Java system properties and then loaded by SparkConf + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf sysProps.put("spark.yarn.keytab", args.keytab) sysProps.put("spark.yarn.principal", args.principal) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 26e4487d70d8..598ccdeee4ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import java.util.{Map => JMap} -import org.apache.hadoop.security.UserGroupInformation - import scala.collection.JavaConverters._ import scala.language.reflectiveCalls @@ -34,6 +32,7 @@ import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.{Driver, metadata} import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.VersionInfo import org.apache.spark.{SparkConf, SparkException, Logging} @@ -154,9 +153,10 @@ private[hive] class ClientWrapper( // Set up kerberos credentials for UserGroupInformation.loginUser within // current class loader - // Instead of using the spark conf of the current spark context, a new instance of - // SparkConf is needed for the original value of spark.yarn.keytab specified by user, - // as yarn.Client resets it for the link name in distributed cache + // Instead of using the spark conf of the current spark context, a new + // instance of SparkConf is needed for the original value of spark.yarn.keytab + // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the + // keytab configuration for the link name in distributed cache val sparkConf = new SparkConf if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { val principalName = sparkConf.get("spark.yarn.principal")