Skip to content

Commit 30d330d

Browse files
committed
Polishing - PR Comments
1 parent 009afa4 commit 30d330d

File tree

5 files changed

+65
-39
lines changed

5 files changed

+65
-39
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20-
import java.util.List;
21-
import java.util.Properties;
20+
import java.util.Map;
2221

2322
import org.apache.kafka.streams.StreamsBuilder;
2423

24+
import org.springframework.beans.factory.InitializingBean;
2525
import org.springframework.beans.factory.ObjectProvider;
2626
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
2727
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@@ -32,8 +32,10 @@
3232
import org.springframework.context.annotation.Bean;
3333
import org.springframework.context.annotation.Configuration;
3434
import org.springframework.context.annotation.Import;
35+
import org.springframework.core.env.Environment;
3536
import org.springframework.kafka.annotation.EnableKafkaStreams;
3637
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
38+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3739
import org.springframework.kafka.core.ConsumerFactory;
3840
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3941
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -147,33 +149,52 @@ public KafkaAdmin kafkaAdmin() {
147149

148150
@Configuration
149151
@ConditionalOnClass(StreamsBuilder.class)
150-
public static class KafkaStreamsConfiguration {
152+
public static class KafkaStreamsAutoConfiguration {
151153

152154
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
153-
public Properties defaultKafkaStreamsConfig(KafkaProperties properties) {
154-
Properties streamsConfig = new Properties();
155-
properties.buildStreamsProperties().forEach((k, v) -> {
156-
String value = v.toString();
157-
if (v instanceof List && value.length() > 1) {
158-
// trim [...] - revert to comma-delimited list
159-
value = value.substring(1, value.length() - 1);
155+
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(
156+
KafkaProperties properties, Environment environment) {
157+
158+
Map<String, Object> streamsProperties = properties.buildStreamsProperties();
159+
if (properties.getStreams().getApplicationId() == null) {
160+
if (environment.getProperty("spring.application.id") != null) {
161+
streamsProperties.put("application.id",
162+
environment.getProperty("spring.application.name"));
160163
}
161-
streamsConfig.put(k, value);
162-
});
163-
return streamsConfig;
164+
}
165+
return new KafkaStreamsConfiguration(streamsProperties);
164166
}
165167

166168
@Bean
167-
public Object kafkaStreamsFactoryBeanConfigurer(
169+
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
168170
StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) {
169171

170-
factoryBean.setAutoStartup(properties.getStreams().isAutoStartup());
171-
return null;
172+
return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties);
172173
}
173174

174175
@Configuration
175176
@EnableKafkaStreams
176-
public static class EnableKafkaStreamsConfiguration {
177+
public static class EnableKafkaStreamsAutoConfiguration {
178+
179+
}
180+
181+
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
182+
183+
private final StreamsBuilderFactoryBean factoryBean;
184+
185+
private final KafkaProperties properties;
186+
187+
KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean,
188+
KafkaProperties properties) {
189+
this.factoryBean = factoryBean;
190+
this.properties = properties;
191+
}
192+
193+
@Override
194+
public void afterPropertiesSet() throws Exception {
195+
this.factoryBean
196+
.setAutoStartup(this.properties.getStreams().isAutoStartup());
197+
}
177198

178199
}
179200

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.kafka.common.config.SslConfigs;
3232
import org.apache.kafka.common.serialization.StringDeserializer;
3333
import org.apache.kafka.common.serialization.StringSerializer;
34-
import org.apache.kafka.streams.StreamsConfig;
3534

3635
import org.springframework.boot.context.properties.ConfigurationProperties;
3736
import org.springframework.boot.context.properties.PropertyMapper;
@@ -58,7 +57,7 @@ public class KafkaProperties {
5857

5958
/**
6059
* Comma-delimited list of host:port pairs to use for establishing the initial
61-
* connection to the Kafka cluster.
60+
* connection to the Kafka cluster. Applies to all components unless overridden.
6261
*/
6362
private List<String> bootstrapServers = new ArrayList<>(
6463
Collections.singletonList("localhost:9092"));
@@ -231,7 +230,7 @@ public static class Consumer {
231230

232231
/**
233232
* Comma-delimited list of host:port pairs to use for establishing the initial
234-
* connection to the Kafka cluster.
233+
* connection to the Kafka cluster. Overrides the global property, for consumers.
235234
*/
236235
private List<String> bootstrapServers;
237236

@@ -441,7 +440,7 @@ public static class Producer {
441440

442441
/**
443442
* Comma-delimited list of host:port pairs to use for establishing the initial
444-
* connection to the Kafka cluster.
443+
* connection to the Kafka cluster. Overrides the global property, for producers.
445444
*/
446445
private List<String> bootstrapServers;
447446

@@ -661,16 +660,16 @@ public static class Streams {
661660
/**
662661
* Kafka streams application.id property; default spring.application.name.
663662
*/
664-
private String applicationId = "${spring.application.name}";
663+
private String applicationId;
665664

666665
/**
667-
* Whether or not to auto-start the streams factory bean; default false.
666+
* Whether or not to auto-start the streams factory bean.
668667
*/
669668
private boolean autoStartup;
670669

671670
/**
672671
* Comma-delimited list of host:port pairs to use for establishing the initial
673-
* connection to the Kafka cluster.
672+
* connection to the Kafka cluster. Overrides the global property, for streams.
674673
*/
675674
private List<String> bootstrapServers;
676675

@@ -767,16 +766,15 @@ public Map<String, String> getProperties() {
767766
public Map<String, Object> buildProperties() {
768767
Properties properties = new Properties();
769768
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
770-
map.from(this::getApplicationId)
771-
.to(properties.in(StreamsConfig.APPLICATION_ID_CONFIG));
769+
map.from(this::getApplicationId).to(properties.in("application.id"));
772770
map.from(this::getBootstrapServers)
773-
.to(properties.in(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
771+
.to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
774772
map.from(this::getCacheMaxBytesBuffering)
775-
.to(properties.in(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
776-
map.from(this::getClientId).to(properties.in(StreamsConfig.CLIENT_ID_CONFIG));
777-
map.from(this::getReplicationFactor)
778-
.to(properties.in(StreamsConfig.REPLICATION_FACTOR_CONFIG));
779-
map.from(this::getStateDir).to(properties.in(StreamsConfig.STATE_DIR_CONFIG));
773+
.to(properties.in("cache.max.bytes.buffering"));
774+
map.from(this::getClientId)
775+
.to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
776+
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
777+
map.from(this::getStateDir).to(properties.in("state.dir"));
780778
return properties.with(this.ssl, this.properties);
781779
}
782780

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
4343
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
4444
import org.springframework.kafka.config.KafkaListenerContainerFactory;
45+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
4546
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4647
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4748
import org.springframework.kafka.core.KafkaAdmin;
@@ -297,7 +298,7 @@ public void streamsProperties() {
297298
"spring.kafka.streams.ssl.protocol=TLSv1.2").run((context) -> {
298299
Properties configs = context.getBean(
299300
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
300-
Properties.class);
301+
KafkaStreamsConfiguration.class).asProperties();
301302
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
302303
.isEqualTo("localhost:9092, localhost:9093");
303304
assertThat(

spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,11 +1048,11 @@ content into your application. Rather, pick only the properties that you need.
10481048
spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file.
10491049
spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file.
10501050
spring.kafka.admin.ssl.trust-store-type= # Type of the trust store.
1051-
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
1051+
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Applies to all components unless overridden.
10521052
spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
10531053
spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
10541054
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
1055-
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
1055+
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for consumers.
10561056
spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
10571057
spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background.
10581058
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch.min.bytes".
@@ -1088,7 +1088,7 @@ content into your application. Rather, pick only the properties that you need.
10881088
spring.kafka.listener.type=single # Listener type.
10891089
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
10901090
spring.kafka.producer.batch-size= # Default batch size in bytes.
1091-
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
1091+
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for producers.
10921092
spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
10931093
spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
10941094
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
@@ -1114,7 +1114,8 @@ content into your application. Rather, pick only the properties that you need.
11141114
spring.kafka.ssl.trust-store-location= # Location of the trust store file.
11151115
spring.kafka.ssl.trust-store-password= # Store password for the trust store file.
11161116
spring.kafka.ssl.trust-store-type= # Type of the trust store.
1117-
spring.kafka.streams.auto-startup= # Whether or not to auto-start the streams factory bean; default false.
1117+
spring.kafka.streams.auto-startup= # Whether or not to auto-start the streams factory bean.
1118+
spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for streams.
11181119
spring.kafka.streams.cache-max-bytes-buffering= # Maximum number of memory bytes to be used for buffering across all threads.
11191120
spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging.
11201121
spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams.

spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5632,10 +5632,11 @@ The following component creates a listener endpoint on the `someTopic` topic:
56325632
Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and
56335633
manage the lifecycle of its streams; the factory bean is created when
56345634
`@EnableKafkaStreams` is present on a `@Configuration` class.
5635-
The factory bean requires a `Properties` object for streams configuration.
5635+
The factory bean requires a `KafkaStreamsConfiguration` object for streams configuration.
56365636

56375637
If Spring Boot detects the `kafka-streams` jar on the classpath, it will auto-configure
5638-
the `Properties` bean from the `KafkaProperties` object.
5638+
the `KafkaStreamsConfiguration` bean from the `KafkaProperties` object as well as enabling
5639+
the creation of the factory bean by spring-kafka.
56395640

56405641
There are two required Kafka properties for streams (`bootstrap.servers` and
56415642
`application.id`); by default, the `application.id` is set to the `spring.application.name`
@@ -5660,6 +5661,10 @@ public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
56605661
----
56615662
====
56625663

5664+
By default, the factory bean `autoStartup` property is false; to automatically start the
5665+
streams managed by the `StreamsBuilder` object it creates, set property
5666+
`spting.kafka.streams.auto-startup=true`.
5667+
56635668

56645669
[[boot-features-kafka-extra-props]]
56655670
==== Additional Kafka Properties

0 commit comments

Comments
 (0)