diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index bbff82259e56f..c8954a96ccd92 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -819,6 +819,11 @@ Delegation tokens can be obtained from multiple clusters and ${cluster} +#### Kafka Specific Configurations + +Kafka's own configurations can be set with `kafka.` prefix, e.g, `--conf spark.kafka.clusters.${cluster}.kafka.retries=1`. +For possible Kafka parameters, see [Kafka adminclient config docs](http://kafka.apache.org/documentation.html#adminclientconfigs). + #### Caveats - Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)). diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala index 84d58d8c419ad..e1f3c800a51f8 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala @@ -23,6 +23,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT private[spark] case class KafkaTokenClusterConf( identifier: String, @@ -35,7 +36,8 @@ private[spark] case class KafkaTokenClusterConf( keyStoreLocation: Option[String], keyStorePassword: Option[String], keyPassword: Option[String], - tokenMechanism: String) { + tokenMechanism: String, + specifiedKafkaParams: Map[String, String]) { override def toString: String = s"KafkaTokenClusterConf{" + s"identifier=$identifier, " + s"authBootstrapServers=$authBootstrapServers, " + @@ -43,11 +45,12 @@ private[spark] case class KafkaTokenClusterConf( s"securityProtocol=$securityProtocol, " + s"kerberosServiceName=$kerberosServiceName, " + s"trustStoreLocation=$trustStoreLocation, " + - s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + + s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + s"keyStoreLocation=$keyStoreLocation, " + - s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + - s"keyPassword=${keyPassword.map(_ => "xxx")}, " + - s"tokenMechanism=$tokenMechanism}" + s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + + s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + + s"tokenMechanism=$tokenMechanism, " + + s"specifiedKafkaParams=${KafkaRedactionUtil.redactParams(specifiedKafkaParams.toSeq)}}" } private [kafka010] object KafkaTokenSparkConf extends Logging { @@ -59,6 +62,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging { def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf = { val configPrefix = s"$CLUSTERS_CONFIG_PREFIX$identifier." val sparkClusterConf = sparkConf.getAllWithPrefix(configPrefix).toMap + val configKafkaPrefix = s"${configPrefix}kafka." + val sparkClusterKafkaConf = sparkConf.getAllWithPrefix(configKafkaPrefix).toMap val result = KafkaTokenClusterConf( identifier, sparkClusterConf @@ -76,7 +81,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging { sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), sparkClusterConf.getOrElse("sasl.token.mechanism", - KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) + KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM), + sparkClusterKafkaConf ) logDebug(s"getClusterConfig($identifier): $result") result diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index da21d2e2413da..950df867e1e8a 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -134,6 +134,16 @@ private[spark] object KafkaTokenUtil extends Logging { } } + logDebug("AdminClient params before specified params: " + + s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}") + + clusterConf.specifiedKafkaParams.foreach { param => + adminClientProperties.setProperty(param._1, param._2) + } + + logDebug("AdminClient params after specified params: " + + s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}") + adminClientProperties } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index 74f1cdcf73462..eebbf96afa470 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -107,7 +107,8 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { protected def createClusterConf( identifier: String, - securityProtocol: String): KafkaTokenClusterConf = { + securityProtocol: String, + specifiedKafkaParams: Map[String, String] = Map.empty): KafkaTokenClusterConf = { KafkaTokenClusterConf( identifier, bootStrapServers, @@ -119,6 +120,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { Some(keyStoreLocation), Some(keyStorePassword), Some(keyPassword), - KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) + KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM, + specifiedKafkaParams) } } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala index 60bb8a2bc6c31..61184a6fac33d 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala @@ -96,6 +96,16 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { assert(clusterConfig.tokenMechanism === tokenMechanism) } + test("getClusterConfig should return specified kafka params") { + sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier1.kafka.customKey", "customValue") + + val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1) + assert(clusterConfig.identifier === identifier1) + assert(clusterConfig.authBootstrapServers === authBootStrapServers) + assert(clusterConfig.specifiedKafkaParams === Map("customKey" -> "customValue")) + } + test("getAllClusterConfigs should return empty list when nothing configured") { assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty) } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index bcca920eed4e4..5496195b41490 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -155,6 +155,15 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { assert(saslJaasConfig.contains("useTicketCache=true")) } + test("createAdminClientProperties with specified params should include it") { + val clusterConf = createClusterConf(identifier1, SASL_SSL.name, + Map("customKey" -> "customValue")) + + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) + + assert(adminClientProperties.get("customKey") === "customValue") + } + test("isGlobalJaasConfigurationProvided without global config should return false") { assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided) }