Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;

/**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
Expand All @@ -41,6 +44,12 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {

private KafkaTemplate<Object, Object> replyTemplate;

private KafkaAwareTransactionManager<Object, Object> transactionManager;

private ErrorHandler errorHandler;

private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;

/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
Expand All @@ -65,6 +74,32 @@ void setReplyTemplate(KafkaTemplate<Object, Object> replyTemplate) {
this.replyTemplate = replyTemplate;
}

/**
* Set the {@link KafkaAwareTransactionManager} to use.
* @param transactionManager the transaction manager
*/
public void setTransactionManager(
KafkaAwareTransactionManager<Object, Object> transactionManager) {
this.transactionManager = transactionManager;
}

/**
* Set the {@link ErrorHandler} to use.
* @param errorHandler the error handler
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Set the {@link AfterRollbackProcessor} to use.
* @param afterRollbackProcessor the after rollback processor
*/
public void setAfterRollbackProcessor(
AfterRollbackProcessor<Object, Object> afterRollbackProcessor) {
this.afterRollbackProcessor = afterRollbackProcessor;
}

/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
Expand All @@ -89,6 +124,9 @@ private void configureListenerFactory(
map.from(this.replyTemplate).whenNonNull().to(factory::setReplyTemplate);
map.from(properties::getType).whenEqualTo(Listener.Type.BATCH)
.toCall(() -> factory.setBatchListener(true));
map.from(this.errorHandler).whenNonNull().to(factory::setErrorHandler);
map.from(this.afterRollbackProcessor).whenNonNull()
.to(factory::setAfterRollbackProcessor);
}

private void configureContainer(ContainerProperties container) {
Expand All @@ -109,6 +147,8 @@ private void configureContainer(ContainerProperties container) {
.as(Number::intValue).to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).whenNonNull()
.to(container::setLogContainerConfig);
map.from(this.transactionManager).whenNonNull()
.to(container::setTransactionManager);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;

/**
* Configuration for Kafka annotation-driven support.
Expand All @@ -45,12 +48,24 @@ class KafkaAnnotationDrivenConfiguration {

private final KafkaTemplate<Object, Object> kafkaTemplate;

private final KafkaAwareTransactionManager<Object, Object> transactionManager;

private final ErrorHandler errorHandler;

private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;

KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate) {
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
}

@Bean
Expand All @@ -60,6 +75,9 @@ public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerF
configurer.setKafkaProperties(this.properties);
configurer.setMessageConverter(this.messageConverter);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setErrorHandler(this.errorHandler);
configurer.setTransactionManager(this.transactionManager);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
return configurer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
Expand All @@ -50,12 +51,16 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
Expand Down Expand Up @@ -155,6 +160,10 @@ public void consumerProperties() {
assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isSameAs(context.getBean("errorHandler"));
});
}

Expand Down Expand Up @@ -485,6 +494,7 @@ public void listenerProperties() {
@Test
public void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> {
KafkaTemplate<?, ?> kafkaTemplate = context
.getBean(KafkaTemplate.class);
Expand All @@ -496,13 +506,19 @@ public void testKafkaTemplateRecordMessageConverters() {
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(
kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("messageConverter"))
.isSameAs(context.getBean("myMessageConverter"));
assertThat(kafkaListenerContainerFactory.getContainerProperties()
.getTransactionManager()).isSameAs(
context.getBean("chainedTransactionManager"));
assertThat(dfa.getPropertyValue("afterRollbackProcessor"))
.isSameAs(context.getBean("arp"));
});
}

Expand All @@ -521,6 +537,11 @@ public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
@Configuration
protected static class TestConfiguration {

@Bean
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}

}

@Configuration
Expand All @@ -531,6 +552,22 @@ public RecordMessageConverter myMessageConverter() {
return mock(RecordMessageConverter.class);
}

@Bean
@Primary
public PlatformTransactionManager chainedTransactionManager(
KafkaTransactionManager<String, String> kafkaTransactionManager) {

return new ChainedKafkaTransactionManager<String, String>(
kafkaTransactionManager);
}

@Bean
public AfterRollbackProcessor<Object, Object> arp() {
return (records, consumer, ex, recoverable) -> {
// no-op
};
}

}

@Configuration
Expand Down
2 changes: 1 addition & 1 deletion spring-boot-project/spring-boot-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
<spring-data-releasetrain.version>Lovelace-RC2</spring-data-releasetrain.version>
<spring-hateoas.version>0.25.0.RELEASE</spring-hateoas.version>
<spring-integration.version>5.1.0.M2</spring-integration.version>
<spring-kafka.version>2.2.0.M2</spring-kafka.version>
<spring-kafka.version>2.2.0.BUILD-SNAPSHOT</spring-kafka.version>
<spring-ldap.version>2.3.2.RELEASE</spring-ldap.version>
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
<spring-restdocs.version>2.0.2.RELEASE</spring-restdocs.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5622,8 +5622,19 @@ the auto-configured `KafkaTemplate`.
When the Apache Kafka infrastructure is present, any bean can be annotated with
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` has
been defined, a default one is automatically configured with keys defined in
`spring.kafka.listener.*`. Also, if a `RecordMessageConverter` bean is defined, it is
automatically associated to the default factory.
`spring.kafka.listener.*`.
If the property `spring.kafka.producer.transaction-id-prefix` is defined, a
`KafkaTransactionManager` will be auto-configured with name `kafkaTransactionManager` and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the behaviour implemented? All I can see here is that a TransactionManager is associated if it exists. Is @EnableKafka doing that for us? Is that a PlatformTransactionManager (in which case we need to be careful about conflicts).

Copy link
Contributor Author

@garyrussell garyrussell Aug 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A TM is already autoconfigured in KafkaAutoConfiguration, but I guess it was never documented (looks like a community contribution).

	@Bean
	@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
	@ConditionalOnMissingBean
	public KafkaTransactionManager<?, ?> kafkaTransactionManager(
			ProducerFactory<?, ?> producerFactory) {
		return new KafkaTransactionManager<>(producerFactory);
	}

(I wasn't aware of it either - it threw me a bit when I got weird results).

will be wired into the container factory.
Also, if `RecordMessageConverter`, `ErrorHandler` and/or
`AfterRollbackProcessor` beans are defined, they are automatically associated to the
default factory.

IMPORTANT: The auto configuration of these beans occur if there is just a single
instance.
When using a `ChainedKafkaTransactionManager`, it will usually reference the configured
`KafkaTransactionManager` bean, so the chained manager must be marked
`@Primary` if you want it wired into the container factory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand that note (probably related to the fact I am confused per my other comment).


The following component creates a listener endpoint on the `someTopic` topic:

Expand Down