diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 6e97c37af7df..47ae7be85ce0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit import javax.security.auth.login.Configuration import scala.collection.JavaConverters._ +import scala.io.Source import scala.util.Random import com.google.common.io.Files @@ -136,9 +137,44 @@ class KafkaTestUtils( kdcConf.setProperty(MiniKdc.DEBUG, "true") kdc = new MiniKdc(kdcConf, kdcDir) kdc.start() + // TODO https://issues.apache.org/jira/browse/SPARK-30037 + // Need to build spark's own MiniKDC and customize krb5.conf like Kafka + rewriteKrb5Conf() kdcReady = true } + /** + * In this method we rewrite krb5.conf to make kdc and client use the same enctypes + */ + private def rewriteKrb5Conf(): Unit = { + val krb5Conf = Source.fromFile(kdc.getKrb5conf, "UTF-8").getLines() + var rewritten = false + val addedConfig = + addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + + addedKrb5Config("default_tgs_enctypes", "aes128-cts-hmac-sha1-96") + val rewriteKrb5Conf = krb5Conf.map(s => + if (s.contains("libdefaults")) { + rewritten = true + s + addedConfig + } else { + s + }).filter(!_.trim.startsWith("#")).mkString(System.lineSeparator()) + + val krb5confStr = if (!rewritten) { + "[libdefaults]" + addedConfig + System.lineSeparator() + + System.lineSeparator() + rewriteKrb5Conf + } else { + rewriteKrb5Conf + } + + kdc.getKrb5conf.delete() + Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8) + } + + private def addedKrb5Config(key: String, value: String): String = { + System.lineSeparator() + s" $key=$value" + } + private def createKeytabsAndJaasConfigFile(): String = { assert(kdcReady, "KDC should be set up beforehand") val baseDir = Utils.createTempDir() @@ -171,6 +207,7 @@ class KafkaTestUtils( | useKeyTab=true | storeKey=true | useTicketCache=false + | refreshKrb5Config=true | keyTab="${zkServerKeytabFile.getAbsolutePath()}" | principal="$zkServerUser@$realm"; |}; @@ -180,6 +217,7 @@ class KafkaTestUtils( | useKeyTab=true | storeKey=true | useTicketCache=false + | refreshKrb5Config=true | keyTab="${zkClientKeytabFile.getAbsolutePath()}" | principal="$zkClientUser@$realm"; |};