Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,11 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
</tr>
</table>

#### Kafka Specific Configurations

Kafka's own configurations can be set with `kafka.` prefix, e.g, `--conf spark.kafka.clusters.${cluster}.kafka.retries=1`.
For possible Kafka parameters, see [Kafka adminclient config docs](http://kafka.apache.org/documentation.html#adminclientconfigs).

#### Caveats

- Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT

private[spark] case class KafkaTokenClusterConf(
identifier: String,
Expand All @@ -35,19 +36,21 @@ private[spark] case class KafkaTokenClusterConf(
keyStoreLocation: Option[String],
keyStorePassword: Option[String],
keyPassword: Option[String],
tokenMechanism: String) {
tokenMechanism: String,
specifiedKafkaParams: Map[String, String]) {
override def toString: String = s"KafkaTokenClusterConf{" +
s"identifier=$identifier, " +
s"authBootstrapServers=$authBootstrapServers, " +
s"targetServersRegex=$targetServersRegex, " +
s"securityProtocol=$securityProtocol, " +
s"kerberosServiceName=$kerberosServiceName, " +
s"trustStoreLocation=$trustStoreLocation, " +
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"keyStoreLocation=$keyStoreLocation, " +
s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"keyPassword=${keyPassword.map(_ => "xxx")}, " +
s"tokenMechanism=$tokenMechanism}"
s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"tokenMechanism=$tokenMechanism, " +
s"specifiedKafkaParams=${KafkaRedactionUtil.redactParams(specifiedKafkaParams.toSeq)}}"
}

private [kafka010] object KafkaTokenSparkConf extends Logging {
Expand All @@ -59,6 +62,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf = {
val configPrefix = s"$CLUSTERS_CONFIG_PREFIX$identifier."
val sparkClusterConf = sparkConf.getAllWithPrefix(configPrefix).toMap
val configKafkaPrefix = s"${configPrefix}kafka."
val sparkClusterKafkaConf = sparkConf.getAllWithPrefix(configKafkaPrefix).toMap
val result = KafkaTokenClusterConf(
identifier,
sparkClusterConf
Expand All @@ -76,7 +81,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG),
sparkClusterConf.getOrElse("sasl.token.mechanism",
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM),
sparkClusterKafkaConf
)
logDebug(s"getClusterConfig($identifier): $result")
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}

logDebug("AdminClient params before specified params: " +
s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")

clusterConf.specifiedKafkaParams.foreach { param =>
adminClientProperties.setProperty(param._1, param._2)
}

logDebug("AdminClient params after specified params: " +
s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")

adminClientProperties
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {

protected def createClusterConf(
identifier: String,
securityProtocol: String): KafkaTokenClusterConf = {
securityProtocol: String,
specifiedKafkaParams: Map[String, String] = Map.empty): KafkaTokenClusterConf = {
KafkaTokenClusterConf(
identifier,
bootStrapServers,
Expand All @@ -119,6 +120,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
Some(keyStoreLocation),
Some(keyStorePassword),
Some(keyPassword),
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM,
specifiedKafkaParams)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(clusterConfig.tokenMechanism === tokenMechanism)
}

test("getClusterConfig should return specified kafka params") {
sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers)
sparkConf.set(s"spark.kafka.clusters.$identifier1.kafka.customKey", "customValue")

val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1)
assert(clusterConfig.identifier === identifier1)
assert(clusterConfig.authBootstrapServers === authBootStrapServers)
assert(clusterConfig.specifiedKafkaParams === Map("customKey" -> "customValue"))
}

test("getAllClusterConfigs should return empty list when nothing configured") {
assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
assert(saslJaasConfig.contains("useTicketCache=true"))
}

test("createAdminClientProperties with specified params should include it") {
val clusterConf = createClusterConf(identifier1, SASL_SSL.name,
Map("customKey" -> "customValue"))

val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)

assert(adminClientProperties.get("customKey") === "customValue")
}

test("isGlobalJaasConfigurationProvided without global config should return false") {
assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided)
}
Expand Down