diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 5336695478c14..bf25d46f2e7e0 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -1004,6 +1004,14 @@ Delegation tokens can be obtained from multiple clusters and ${cluster} 3.0.0 + + spark.kafka.clusters.${cluster}.ssl.truststore.type + None + + The file format of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token. + + 3.2.0 + spark.kafka.clusters.${cluster}.ssl.truststore.location None @@ -1021,6 +1029,15 @@ Delegation tokens can be obtained from multiple clusters and ${cluster} 3.0.0 + + spark.kafka.clusters.${cluster}.ssl.keystore.type + None + + The file format of the key store file. This is optional for client. + For further details please see Kafka documentation. Only used to obtain delegation token. + + 3.2.0 + spark.kafka.clusters.${cluster}.ssl.keystore.location None diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala index ed4a6f1e34c55..21ba7b21ed9d6 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala @@ -31,8 +31,10 @@ private[spark] case class KafkaTokenClusterConf( targetServersRegex: String, securityProtocol: String, kerberosServiceName: String, + trustStoreType: Option[String], trustStoreLocation: Option[String], trustStorePassword: Option[String], + keyStoreType: Option[String], keyStoreLocation: Option[String], keyStorePassword: Option[String], keyPassword: Option[String], @@ -44,8 +46,10 @@ private[spark] case class KafkaTokenClusterConf( s"targetServersRegex=$targetServersRegex, " + s"securityProtocol=$securityProtocol, " + s"kerberosServiceName=$kerberosServiceName, " + + s"trustStoreType=$trustStoreType, " + s"trustStoreLocation=$trustStoreLocation, " + s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + + s"keyStoreType=$keyStoreType, " + s"keyStoreLocation=$keyStoreLocation, " + s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + @@ -77,8 +81,10 @@ private [kafka010] object KafkaTokenSparkConf extends Logging { DEFAULT_SECURITY_PROTOCOL_CONFIG), sparkClusterConf.getOrElse(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME), + sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), + sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index f3f6b4de6f79c..a182d3c30858e 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -162,6 +162,9 @@ private[spark] object KafkaTokenUtil extends Logging { private def setTrustStoreProperties( clusterConf: KafkaTokenClusterConf, properties: ju.Properties): Unit = { + clusterConf.trustStoreType.foreach { truststoreType => + properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreType) + } clusterConf.trustStoreLocation.foreach { truststoreLocation => properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation) } @@ -173,6 +176,9 @@ private[spark] object KafkaTokenUtil extends Logging { private def setKeyStoreProperties( clusterConf: KafkaTokenClusterConf, properties: ju.Properties): Unit = { + clusterConf.keyStoreType.foreach { keystoreType => + properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keystoreType) + } clusterConf.keyStoreLocation.foreach { keystoreLocation => properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation) } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index 19335f4221e40..8271acdc7dfb6 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -51,8 +51,10 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { protected val bootStrapServers = "127.0.0.1:0" protected val matchingTargetServersRegex = "127.0.0.*:0" protected val nonMatchingTargetServersRegex = "127.0.intentionally_non_matching.*:0" + protected val trustStoreType = "customTrustStoreType" protected val trustStoreLocation = "/path/to/trustStore" protected val trustStorePassword = "trustStoreSecret" + protected val keyStoreType = "customKeyStoreType" protected val keyStoreLocation = "/path/to/keyStore" protected val keyStorePassword = "keyStoreSecret" protected val keyPassword = "keySecret" @@ -124,8 +126,10 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX, securityProtocol, KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME, + Some(trustStoreType), Some(trustStoreLocation), Some(trustStorePassword), + Some(keyStoreType), Some(keyStoreLocation), Some(keyStorePassword), Some(keyPassword), diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala index 61184a6fac33d..17caf96818e47 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala @@ -29,8 +29,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { private val targetServersRegex = "127.0.0.*:0" private val securityProtocol = SSL.name private val kerberosServiceName = "kafka1" + private val trustStoreType = "customTrustStoreType" private val trustStoreLocation = "/path/to/trustStore" private val trustStorePassword = "trustStoreSecret" + private val keyStoreType = "customKeyStoreType" private val keyStoreLocation = "/path/to/keyStore" private val keyStorePassword = "keyStoreSecret" private val keyPassword = "keySecret" @@ -60,8 +62,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { assert(clusterConfig.securityProtocol === SASL_SSL.name) assert(clusterConfig.kerberosServiceName === KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME) + assert(clusterConfig.trustStoreType === None) assert(clusterConfig.trustStoreLocation === None) assert(clusterConfig.trustStorePassword === None) + assert(clusterConfig.keyStoreType === None) assert(clusterConfig.keyStoreLocation === None) assert(clusterConfig.keyStorePassword === None) assert(clusterConfig.keyPassword === None) @@ -75,8 +79,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { sparkConf.set(s"spark.kafka.clusters.$identifier1.security.protocol", securityProtocol) sparkConf.set(s"spark.kafka.clusters.$identifier1.sasl.kerberos.service.name", kerberosServiceName) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.type", trustStoreType) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.location", trustStoreLocation) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.password", trustStorePassword) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.type", keyStoreType) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.location", keyStoreLocation) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.password", keyStorePassword) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.key.password", keyPassword) @@ -88,8 +94,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { assert(clusterConfig.targetServersRegex === targetServersRegex) assert(clusterConfig.securityProtocol === securityProtocol) assert(clusterConfig.kerberosServiceName === kerberosServiceName) + assert(clusterConfig.trustStoreType === Some(trustStoreType)) assert(clusterConfig.trustStoreLocation === Some(trustStoreLocation)) assert(clusterConfig.trustStorePassword === Some(trustStorePassword)) + assert(clusterConfig.keyStoreType === Some(keyStoreType)) assert(clusterConfig.keyStoreLocation === Some(keyStoreLocation)) assert(clusterConfig.keyStorePassword === Some(keyStorePassword)) assert(clusterConfig.keyPassword === Some(keyPassword)) @@ -127,8 +135,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { assert(clusterConfig.securityProtocol === SASL_SSL.name) assert(clusterConfig.kerberosServiceName === KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME) + assert(clusterConfig.trustStoreType === None) assert(clusterConfig.trustStoreLocation === None) assert(clusterConfig.trustStorePassword === None) + assert(clusterConfig.keyStoreType === None) assert(clusterConfig.keyStoreLocation === None) assert(clusterConfig.keyStorePassword === None) assert(clusterConfig.keyPassword === None) diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index 94f7853003bd9..ca34e14f2c261 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -64,8 +64,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SASL_PLAINTEXT.name) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG)) @@ -80,10 +82,12 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SASL_SSL.name) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) === trustStoreType) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) === trustStoreLocation) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === trustStorePassword) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG)) @@ -99,10 +103,12 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SSL.name) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) === trustStoreType) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) === trustStoreLocation) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === trustStorePassword) + assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG) === keyStoreType) assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) === keyStoreLocation) assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === keyStorePassword) assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === keyPassword)