Skip to content

Commit 1d113de

Browse files
committed
Polish "Add Kafka Streams auto-configuration"
Closes gh-14021
1 parent e942fde commit 1d113de

File tree

9 files changed

+267
-113
lines changed

9 files changed

+267
-113
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20-
import java.util.Map;
2120

22-
import org.apache.kafka.streams.StreamsBuilder;
23-
24-
import org.springframework.beans.factory.InitializingBean;
2521
import org.springframework.beans.factory.ObjectProvider;
2622
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
2723
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@@ -32,17 +28,12 @@
3228
import org.springframework.context.annotation.Bean;
3329
import org.springframework.context.annotation.Configuration;
3430
import org.springframework.context.annotation.Import;
35-
import org.springframework.core.env.Environment;
36-
import org.springframework.kafka.annotation.EnableKafkaStreams;
37-
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
38-
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3931
import org.springframework.kafka.core.ConsumerFactory;
4032
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4133
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4234
import org.springframework.kafka.core.KafkaAdmin;
4335
import org.springframework.kafka.core.KafkaTemplate;
4436
import org.springframework.kafka.core.ProducerFactory;
45-
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
4637
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
4738
import org.springframework.kafka.support.LoggingProducerListener;
4839
import org.springframework.kafka.support.ProducerListener;
@@ -61,7 +52,8 @@
6152
@Configuration
6253
@ConditionalOnClass(KafkaTemplate.class)
6354
@EnableConfigurationProperties(KafkaProperties.class)
64-
@Import(KafkaAnnotationDrivenConfiguration.class)
55+
@Import({ KafkaAnnotationDrivenConfiguration.class,
56+
KafkaStreamsAnnotationDrivenConfiguration.class })
6557
public class KafkaAutoConfiguration {
6658

6759
private final KafkaProperties properties;
@@ -147,57 +139,4 @@ public KafkaAdmin kafkaAdmin() {
147139
return kafkaAdmin;
148140
}
149141

150-
@Configuration
151-
@ConditionalOnClass(StreamsBuilder.class)
152-
public static class KafkaStreamsAutoConfiguration {
153-
154-
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
155-
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(
156-
KafkaProperties properties, Environment environment) {
157-
158-
Map<String, Object> streamsProperties = properties.buildStreamsProperties();
159-
if (properties.getStreams().getApplicationId() == null) {
160-
if (environment.getProperty("spring.application.id") != null) {
161-
streamsProperties.put("application.id",
162-
environment.getProperty("spring.application.name"));
163-
}
164-
}
165-
return new KafkaStreamsConfiguration(streamsProperties);
166-
}
167-
168-
@Bean
169-
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
170-
StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) {
171-
172-
return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties);
173-
}
174-
175-
@Configuration
176-
@EnableKafkaStreams
177-
public static class EnableKafkaStreamsAutoConfiguration {
178-
179-
}
180-
181-
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
182-
183-
private final StreamsBuilderFactoryBean factoryBean;
184-
185-
private final KafkaProperties properties;
186-
187-
KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean,
188-
KafkaProperties properties) {
189-
this.factoryBean = factoryBean;
190-
this.properties = properties;
191-
}
192-
193-
@Override
194-
public void afterPropertiesSet() throws Exception {
195-
this.factoryBean
196-
.setAutoStartup(this.properties.getStreams().isAutoStartup());
197-
}
198-
199-
}
200-
201-
}
202-
203142
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ public static class Streams {
665665
/**
666666
* Whether or not to auto-start the streams factory bean.
667667
*/
668-
private boolean autoStartup;
668+
private boolean autoStartup = true;
669669

670670
/**
671671
* Comma-delimited list of host:port pairs to use for establishing the initial
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.streams.StreamsBuilder;
22+
import org.apache.kafka.streams.StreamsConfig;
23+
24+
import org.springframework.beans.factory.InitializingBean;
25+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
26+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
27+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
28+
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.core.env.Environment;
32+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
33+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
34+
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
35+
36+
/**
37+
* Configuration for Kafka Streams annotation-driven support.
38+
*
39+
* @author Gary Russell
40+
* @author Stephane Nicoll
41+
*/
42+
@Configuration
43+
@ConditionalOnClass(StreamsBuilder.class)
44+
@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
45+
class KafkaStreamsAnnotationDrivenConfiguration {
46+
47+
private final KafkaProperties properties;
48+
49+
KafkaStreamsAnnotationDrivenConfiguration(KafkaProperties properties) {
50+
this.properties = properties;
51+
}
52+
53+
@ConditionalOnMissingBean
54+
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
55+
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) {
56+
Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
57+
if (this.properties.getStreams().getApplicationId() == null) {
58+
String applicationName = environment.getProperty("spring.application.name");
59+
if (applicationName != null) {
60+
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
61+
applicationName);
62+
}
63+
else {
64+
throw new InvalidConfigurationPropertyValueException(
65+
"spring.kafka.streams.application-id", null,
66+
"This property is mandatory and fallback 'spring.application.name' is not set either.");
67+
}
68+
}
69+
return new KafkaStreamsConfiguration(streamsProperties);
70+
}
71+
72+
@Bean
73+
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
74+
StreamsBuilderFactoryBean factoryBean) {
75+
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
76+
}
77+
78+
// Separate class required to avoid BeanCurrentlyInCreationException
79+
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
80+
81+
private final KafkaProperties properties;
82+
83+
private final StreamsBuilderFactoryBean factoryBean;
84+
85+
KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties,
86+
StreamsBuilderFactoryBean factoryBean) {
87+
this.properties = properties;
88+
this.factoryBean = factoryBean;
89+
}
90+
91+
@Override
92+
public void afterPropertiesSet() {
93+
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
94+
}
95+
96+
}
97+
98+
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
import org.springframework.boot.test.util.TestPropertyValues;
2929
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3030
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.kafka.annotation.EnableKafkaStreams;
3133
import org.springframework.kafka.annotation.KafkaListener;
3234
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3335
import org.springframework.kafka.core.KafkaTemplate;
36+
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
3437
import org.springframework.kafka.support.KafkaHeaders;
3538
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
3639
import org.springframework.messaging.handler.annotation.Header;
@@ -41,6 +44,7 @@
4144
* Integration tests for {@link KafkaAutoConfiguration}.
4245
*
4346
* @author Gary Russell
47+
* @author Stephane Nicoll
4448
*/
4549
public class KafkaAutoConfigurationIntegrationTests {
4650

@@ -83,6 +87,14 @@ public void testEndToEnd() throws Exception {
8387
producer.close();
8488
}
8589

90+
@Test
91+
public void testStreams() {
92+
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
93+
"spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString());
94+
assertThat(this.context.getBean(StreamsBuilderFactoryBean.class).isAutoStartup())
95+
.isTrue();
96+
}
97+
8698
private void load(Class<?> config, String... environment) {
8799
this.context = doLoad(new Class<?>[] { config }, environment);
88100
}
@@ -101,7 +113,8 @@ private String getEmbeddedKafkaBrokersAsString() {
101113
return embeddedKafka.getEmbeddedKafka().getBrokersAsString();
102114
}
103115

104-
public static class KafkaConfig {
116+
@Configuration
117+
static class KafkaConfig {
105118

106119
@Bean
107120
public Listener listener() {
@@ -115,6 +128,12 @@ public NewTopic adminCreated() {
115128

116129
}
117130

131+
@Configuration
132+
@EnableKafkaStreams
133+
static class KafkaStreamsConfig {
134+
135+
}
136+
118137
public static class Listener {
119138

120139
private final CountDownLatch latch = new CountDownLatch(1);

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.kafka.common.serialization.IntegerSerializer;
3232
import org.apache.kafka.common.serialization.LongDeserializer;
3333
import org.apache.kafka.common.serialization.LongSerializer;
34+
import org.apache.kafka.streams.StreamsBuilder;
3435
import org.apache.kafka.streams.StreamsConfig;
3536
import org.junit.Test;
3637

@@ -39,6 +40,7 @@
3940
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
4041
import org.springframework.context.annotation.Bean;
4142
import org.springframework.context.annotation.Configuration;
43+
import org.springframework.kafka.annotation.EnableKafkaStreams;
4244
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
4345
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
4446
import org.springframework.kafka.config.KafkaListenerContainerFactory;
@@ -279,23 +281,26 @@ public void adminProperties() {
279281

280282
@Test
281283
public void streamsProperties() {
282-
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
283-
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
284-
"spring.application.name=appName",
285-
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
286-
"spring.kafka.streams.cache-max-bytes-buffering=42",
287-
"spring.kafka.streams.client-id=override",
288-
"spring.kafka.streams.properties.fiz.buz=fix.fox",
289-
"spring.kafka.streams.replication-factor=2",
290-
"spring.kafka.streams.state-dir=/tmp/state",
291-
"spring.kafka.streams.ssl.key-password=p7",
292-
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
293-
"spring.kafka.streams.ssl.key-store-password=p8",
294-
"spring.kafka.streams.ssl.key-store-type=PKCS12",
295-
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
296-
"spring.kafka.streams.ssl.trust-store-password=p9",
297-
"spring.kafka.streams.ssl.trust-store-type=PKCS12",
298-
"spring.kafka.streams.ssl.protocol=TLSv1.2").run((context) -> {
284+
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
285+
.withPropertyValues("spring.kafka.client-id=cid",
286+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
287+
"spring.application.name=appName",
288+
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
289+
"spring.kafka.streams.auto-startup=false",
290+
"spring.kafka.streams.cache-max-bytes-buffering=42",
291+
"spring.kafka.streams.client-id=override",
292+
"spring.kafka.streams.properties.fiz.buz=fix.fox",
293+
"spring.kafka.streams.replication-factor=2",
294+
"spring.kafka.streams.state-dir=/tmp/state",
295+
"spring.kafka.streams.ssl.key-password=p7",
296+
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
297+
"spring.kafka.streams.ssl.key-store-password=p8",
298+
"spring.kafka.streams.ssl.key-store-type=PKCS12",
299+
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
300+
"spring.kafka.streams.ssl.trust-store-password=p9",
301+
"spring.kafka.streams.ssl.trust-store-type=PKCS12",
302+
"spring.kafka.streams.ssl.protocol=TLSv1.2")
303+
.run((context) -> {
299304
Properties configs = context.getBean(
300305
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
301306
KafkaStreamsConfiguration.class).asProperties();
@@ -339,6 +344,44 @@ public void streamsProperties() {
339344
});
340345
}
341346

347+
@Test
348+
public void streamsApplicationIdUsesMainApplicationNameByDefault() {
349+
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
350+
.withPropertyValues("spring.application.name=my-test-app",
351+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
352+
"spring.kafka.streams.auto-startup=false")
353+
.run((context) -> {
354+
Properties configs = context.getBean(
355+
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
356+
KafkaStreamsConfiguration.class).asProperties();
357+
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
358+
.isEqualTo("localhost:9092, localhost:9093");
359+
assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG))
360+
.isEqualTo("my-test-app");
361+
});
362+
}
363+
364+
@Test
365+
public void streamsApplicationIdIsMandatory() {
366+
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
367+
.run((context) -> {
368+
assertThat(context).hasFailed();
369+
assertThat(context).getFailure()
370+
.hasMessageContaining("spring.kafka.streams.application-id")
371+
.hasMessageContaining(
372+
"This property is mandatory and fallback 'spring.application.name' is not set either.");
373+
374+
});
375+
}
376+
377+
@Test
378+
public void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
379+
this.contextRunner.run((context) -> {
380+
assertThat(context).hasNotFailed();
381+
assertThat(context).doesNotHaveBean(StreamsBuilder.class);
382+
});
383+
}
384+
342385
@SuppressWarnings("unchecked")
343386
@Test
344387
public void listenerProperties() {
@@ -470,4 +513,10 @@ public RecordMessageConverter myMessageConverter() {
470513

471514
}
472515

516+
@Configuration
517+
@EnableKafkaStreams
518+
protected static class EnableKafkaStreamsConfiguration {
519+
520+
}
521+
473522
}

spring-boot-project/spring-boot-docs/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,11 @@
397397
<artifactId>commons-dbcp2</artifactId>
398398
<optional>true</optional>
399399
</dependency>
400+
<dependency>
401+
<groupId>org.apache.kafka</groupId>
402+
<artifactId>kafka-streams</artifactId>
403+
<optional>true</optional>
404+
</dependency>
400405
<dependency>
401406
<groupId>org.apache.logging.log4j</groupId>
402407
<artifactId>log4j-api</artifactId>

0 commit comments

Comments
 (0)