Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions spring-boot-project/spring-boot-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@
<artifactId>jest</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package org.springframework.boot.autoconfigure.kafka;

import java.io.IOException;
import java.util.Map;

import org.apache.kafka.streams.StreamsBuilder;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
Expand All @@ -28,12 +32,17 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
Expand Down Expand Up @@ -138,4 +147,57 @@ public KafkaAdmin kafkaAdmin() {
return kafkaAdmin;
}

@Configuration
@ConditionalOnClass(StreamsBuilder.class)
public static class KafkaStreamsAutoConfiguration {

@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(
KafkaProperties properties, Environment environment) {

Map<String, Object> streamsProperties = properties.buildStreamsProperties();
if (properties.getStreams().getApplicationId() == null) {
if (environment.getProperty("spring.application.id") != null) {
streamsProperties.put("application.id",
environment.getProperty("spring.application.name"));
}
}
return new KafkaStreamsConfiguration(streamsProperties);
}

@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) {

return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties);
}

@Configuration
@EnableKafkaStreams
public static class EnableKafkaStreamsAutoConfiguration {

}

static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {

private final StreamsBuilderFactoryBean factoryBean;

private final KafkaProperties properties;

KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean,
KafkaProperties properties) {
this.factoryBean = factoryBean;
this.properties = properties;
}

@Override
public void afterPropertiesSet() throws Exception {
this.factoryBean
.setAutoStartup(this.properties.getStreams().isAutoStartup());
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class KafkaProperties {

/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
* connection to the Kafka cluster. Applies to all components unless overridden.
*/
private List<String> bootstrapServers = new ArrayList<>(
Collections.singletonList("localhost:9092"));
Expand All @@ -79,6 +79,8 @@ public class KafkaProperties {

private final Admin admin = new Admin();

private final Streams streams = new Streams();

private final Listener listener = new Listener();

private final Ssl ssl = new Ssl();
Expand Down Expand Up @@ -123,6 +125,10 @@ public Admin getAdmin() {
return this.admin;
}

public Streams getStreams() {
return this.streams;
}

public Ssl getSsl() {
return this.ssl;
}
Expand Down Expand Up @@ -193,6 +199,19 @@ public Map<String, Object> buildAdminProperties() {
return properties;
}

/**
* Create an initial map of streams properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary.
* @return the streams properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildStreamsProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.streams.buildProperties());
return properties;
}

public static class Consumer {

private final Ssl ssl = new Ssl();
Expand All @@ -211,7 +230,7 @@ public static class Consumer {

/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
* connection to the Kafka cluster. Overrides the global property, for consumers.
*/
private List<String> bootstrapServers;

Expand Down Expand Up @@ -421,7 +440,7 @@ public static class Producer {

/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
* connection to the Kafka cluster. Overrides the global property, for producers.
*/
private List<String> bootstrapServers;

Expand Down Expand Up @@ -631,6 +650,136 @@ public Map<String, Object> buildProperties() {

}

/**
* High (and some medium) priority Streams properties and a general properties bucket.
*/
public static class Streams {

private final Ssl ssl = new Ssl();

/**
* Kafka streams application.id property; default spring.application.name.
*/
private String applicationId;

/**
* Whether or not to auto-start the streams factory bean.
*/
private boolean autoStartup;

/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster. Overrides the global property, for streams.
*/
private List<String> bootstrapServers;

/**
* Maximum number of memory bytes to be used for buffering across all threads.
*/
private Integer cacheMaxBytesBuffering;

/**
* ID to pass to the server when making requests. Used for server-side logging.
*/
private String clientId;

/**
* The replication factor for change log topics and repartition topics created by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key descriptions do not start with "The", "A", etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK; I simply copied the Kafka descriptions 😄

* the stream processing application.
*/
private Integer replicationFactor;

/**
* Directory location for the state store.
*/
private String stateDir;

/**
* Additional Kafka properties used to configure the streams.
*/
private final Map<String, String> properties = new HashMap<>();

public Ssl getSsl() {
return this.ssl;
}

public String getApplicationId() {
return this.applicationId;
}

public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

public boolean isAutoStartup() {
return this.autoStartup;
}

public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

public List<String> getBootstrapServers() {
return this.bootstrapServers;
}

public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public Integer getCacheMaxBytesBuffering() {
return this.cacheMaxBytesBuffering;
}

public void setCacheMaxBytesBuffering(Integer cacheMaxBytesBuffering) {
this.cacheMaxBytesBuffering = cacheMaxBytesBuffering;
}

public String getClientId() {
return this.clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public Integer getReplicationFactor() {
return this.replicationFactor;
}

public void setReplicationFactor(Integer replicationFactor) {
this.replicationFactor = replicationFactor;
}

public String getStateDir() {
return this.stateDir;
}

public void setStateDir(String stateDir) {
this.stateDir = stateDir;
}

public Map<String, String> getProperties() {
return this.properties;
}

public Map<String, Object> buildProperties() {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getApplicationId).to(properties.in("application.id"));
map.from(this::getBootstrapServers)
.to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
map.from(this::getCacheMaxBytesBuffering)
.to(properties.in("cache.max.bytes.buffering"));
map.from(this::getClientId)
.to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
map.from(this::getStateDir).to(properties.in("state.dir"));
return properties.with(this.ssl, this.properties);
}

}

public static class Template {

/**
Expand Down Expand Up @@ -1011,6 +1160,7 @@ public void setOptions(Map<String, String> options) {

}

@SuppressWarnings("serial")
private static class Properties extends HashMap<String, Object> {

public <V> java.util.function.Consumer<V> in(String key) {
Expand Down
Loading