Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,7 @@ private Map<String, Object> buildCommonProperties() {
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
properties.putAll(this.ssl.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
Expand Down Expand Up @@ -438,26 +420,6 @@ public Map<String, Object> buildProperties() {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
this.keyDeserializer);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
if (this.valueDeserializer != null) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
this.valueDeserializer);
Expand All @@ -466,6 +428,7 @@ public Map<String, Object> buildProperties() {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
this.maxPollRecords);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
}
Expand Down Expand Up @@ -651,30 +614,11 @@ public Map<String, Object> buildProperties() {
if (this.retries != null) {
properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
if (this.valueSerializer != null) {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
}
Expand Down Expand Up @@ -729,26 +673,7 @@ public Map<String, Object> buildProperties() {
if (this.clientId != null) {
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
}
Expand Down Expand Up @@ -954,6 +879,11 @@ public static class Ssl {
*/
private String keystorePassword;

/**
* Type of the key store.
*/
private String keyStoreType;

/**
* Location of the trust store file.
*/
Expand All @@ -964,6 +894,16 @@ public static class Ssl {
*/
private String truststorePassword;

/**
* Type of the trust store.
*/
private String trustStoreType;

/**
* SSL protocol to use.
*/
private String protocol;

public String getKeyPassword() {
return this.keyPassword;
}
Expand All @@ -988,6 +928,14 @@ public void setKeystorePassword(String keystorePassword) {
this.keystorePassword = keystorePassword;
}

public String getKeyStoreType() {
return this.keyStoreType;
}

public void setKeyStoreType(String keyStoreType) {
this.keyStoreType = keyStoreType;
}

public Resource getTruststoreLocation() {
return this.truststoreLocation;
}
Expand All @@ -1004,6 +952,56 @@ public void setTruststorePassword(String truststorePassword) {
this.truststorePassword = truststorePassword;
}

public String getTrustStoreType() {
return this.trustStoreType;
}

public void setTrustStoreType(String trustStoreType) {
this.trustStoreType = trustStoreType;
}

public String getProtocol() {
return this.protocol;
}

public void setProtocol(String protocol) {
this.protocol = protocol;
}

public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.getKeyPassword());
}
if (this.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.getKeystoreLocation()));
}
if (this.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.getKeystorePassword());
}
if (this.getKeyStoreType() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
this.getKeyStoreType());
}
if (this.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.getTruststoreLocation()));
}
if (this.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.getTruststorePassword());
}
if (this.getTrustStoreType() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
this.getTrustStoreType());
}
if (this.getProtocol() != null) {
properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, this.getProtocol());
}
return properties;
}
}

public static class Jaas {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ public void consumerProperties() {
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.ssl.keystore-password=p2",
"spring.kafka.ssl.keystore-type=PKCS12",
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.ssl.truststore-password=p3",
"spring.kafka.ssl.truststore-type=PKCS12",
"spring.kafka.ssl.protocol=TLSv1.2",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.auto-offset-reset=earliest",
Expand Down Expand Up @@ -106,11 +109,17 @@ public void consumerProperties() {
.endsWith(File.separator + "ksLoc");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p2");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat((String) configs
.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLoc");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p3");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
.isEqualTo("TLSv1.2");
// consumer
assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG))
.isEqualTo("ccid"); // override
Expand Down Expand Up @@ -159,8 +168,11 @@ public void producerProperties() {
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.producer.ssl.keystore-password=p5",
"spring.kafka.producer.ssl.keystore-type=PKCS12",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.ssl.truststore-type=PKCS12",
"spring.kafka.producer.ssl.protocol=TLSv1.2",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer")
.run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context
Expand Down Expand Up @@ -189,11 +201,17 @@ public void producerProperties() {
.endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p5");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat((String) configs
.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
.isEqualTo("TLSv1.2");
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerSerializer.class);
Expand All @@ -209,17 +227,18 @@ public void producerProperties() {

@Test
public void adminProperties() {
this.contextRunner
.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.admin.fail-fast=true",
"spring.kafka.admin.properties.fiz.buz=fix.fox",
"spring.kafka.admin.ssl.key-password=p4",
"spring.kafka.admin.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.admin.ssl.keystore-password=p5",
"spring.kafka.admin.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.admin.ssl.truststore-password=p6")
.run((context) -> {
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.admin.fail-fast=true",
"spring.kafka.admin.properties.fiz.buz=fix.fox",
"spring.kafka.admin.ssl.key-password=p4",
"spring.kafka.admin.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.admin.ssl.keystore-password=p5",
"spring.kafka.admin.ssl.keystore-type=PKCS12",
"spring.kafka.admin.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.admin.ssl.truststore-password=p6",
"spring.kafka.admin.ssl.truststore-type=PKCS12",
"spring.kafka.admin.ssl.protocol=TLSv1.2").run((context) -> {
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
Map<String, Object> configs = admin.getConfig();
// common
Expand All @@ -233,11 +252,17 @@ public void adminProperties() {
.endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p5");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat((String) configs
.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
.isEqualTo("TLSv1.2");
assertThat(
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,11 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.admin.ssl.keystore-location= # Location of the key store file.
spring.kafka.admin.ssl.keystore-password= # Store password for the key store file.
spring.kafka.admin.ssl.keystore-type= # Type of the key store.
spring.kafka.admin.ssl.truststore-location= # Location of the trust store file.
spring.kafka.admin.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.admin.ssl.truststore-type= # Type of the trust store.
spring.kafka.admin.ssl.protocol= # SSL protocol to use.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
Expand All @@ -1016,8 +1019,11 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.keystore-location= # Location of the key store file.
spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.consumer.ssl.keystore-type= # Type of the key store.
spring.kafka.consumer.ssl.truststore-location= # Location of the trust store file.
spring.kafka.consumer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.consumer.ssl.truststore-type= # Type of the trust store.
spring.kafka.consumer.ssl.protocol= # SSL protocol to use.
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.jaas.control-flag=required # Control flag for login configuration.
spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration.
Expand Down Expand Up @@ -1046,16 +1052,22 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.keystore-location= # Location of the key store file.
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.producer.ssl.keystore-type= # Type of the key store.
spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.producer.ssl.truststore-type= # Type of the trust store.
spring.kafka.producer.ssl.protocol= # SSL protocol to use.
spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location of the key store file.
spring.kafka.ssl.keystore-password= # Store password for the key store file.
spring.kafka.ssl.keystore-type= # Type of the key store.
spring.kafka.ssl.truststore-location= # Location of the trust store file.
spring.kafka.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.ssl.truststore-type= # Type of the trust store.
spring.kafka.ssl.protocol= # SSL protocol to use.
spring.kafka.template.default-topic= # Default topic to which messages are sent.

# RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])
Expand Down