Skip to content

Commit faa01c7

Browse files
committed
Merge pull request #23766 from garyrussell
* pr/23766: Polish "Add configuration options for RabbitMQ's batch listener config" Add configuration options for RabbitMQ's batch listener config Closes gh-23766
2 parents fb25104 + 17e12ea commit faa01c7

File tree

5 files changed

+63
-3
lines changed

5 files changed

+63
-3
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -116,6 +116,7 @@ protected void configure(T factory, ConnectionFactory connectionFactory,
116116
factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis());
117117
}
118118
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
119+
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
119120
ListenerRetry retryConfig = configuration.getRetry();
120121
if (retryConfig.isEnabled()) {
121122
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,12 @@ public abstract static class AmqpContainer {
662662
*/
663663
private Duration idleEventInterval;
664664

665+
/**
666+
* Whether the container should present batched messages as discrete messages or
667+
* call the listener with the batch.
668+
*/
669+
private boolean deBatchingEnabled = true;
670+
665671
/**
666672
* Optional properties for a retry interceptor.
667673
*/
@@ -709,6 +715,14 @@ public void setIdleEventInterval(Duration idleEventInterval) {
709715

710716
public abstract boolean isMissingQueuesFatal();
711717

718+
public boolean isDeBatchingEnabled() {
719+
return this.deBatchingEnabled;
720+
}
721+
722+
public void setDeBatchingEnabled(boolean deBatchingEnabled) {
723+
this.deBatchingEnabled = deBatchingEnabled;
724+
}
725+
712726
public ListenerRetry getRetry() {
713727
return this.retry;
714728
}
@@ -743,6 +757,14 @@ public static class SimpleContainer extends AmqpContainer {
743757
*/
744758
private boolean missingQueuesFatal = true;
745759

760+
/**
761+
* Whether the container creates a batch of messages based on the
762+
* 'receive-timeout' and 'batch-size'. Coerces 'de-batching-enabled' to true to
763+
* include the contents of a producer created batch in the batch as discrete
764+
* records.
765+
*/
766+
private boolean consumerBatchEnabled;
767+
746768
public Integer getConcurrency() {
747769
return this.concurrency;
748770
}
@@ -776,6 +798,14 @@ public void setMissingQueuesFatal(boolean missingQueuesFatal) {
776798
this.missingQueuesFatal = missingQueuesFatal;
777799
}
778800

801+
public boolean isConsumerBatchEnabled() {
802+
return this.consumerBatchEnabled;
803+
}
804+
805+
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
806+
this.consumerBatchEnabled = consumerBatchEnabled;
807+
}
808+
779809
}
780810

781811
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,10 @@ public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFa
3939
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
4040
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
4141
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
42+
map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
43+
if (config.isConsumerBatchEnabled()) {
44+
factory.setDeBatchingEnabled(true);
45+
}
4246
}
4347

4448
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
3434
import org.aopalliance.aop.Advice;
3535
import org.junit.jupiter.api.Test;
36+
import org.mockito.InOrder;
37+
import org.mockito.Mockito;
3638

3739
import org.springframework.amqp.core.AcknowledgeMode;
3840
import org.springframework.amqp.core.AmqpAdmin;
@@ -550,19 +552,39 @@ void testSimpleRabbitListenerContainerFactoryConfigurerUsesConfig() {
550552
});
551553
}
552554

555+
@Test
556+
void testSimpleRabbitListenerContainerFactoryConfigurerEnableDeBatchingWithConsumerBatchEnabled() {
557+
this.contextRunner.withUserConfiguration(TestConfiguration.class)
558+
.withPropertyValues("spring.rabbitmq.listener.type:direct",
559+
"spring.rabbitmq.listener.simple.consumer-batch-enabled:true",
560+
"spring.rabbitmq.listener.simple.de-batching-enabled:false")
561+
.run((context) -> {
562+
SimpleRabbitListenerContainerFactoryConfigurer configurer = context
563+
.getBean(SimpleRabbitListenerContainerFactoryConfigurer.class);
564+
SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class);
565+
configurer.configure(factory, mock(ConnectionFactory.class));
566+
InOrder inOrder = Mockito.inOrder(factory);
567+
verify(factory).setConsumerBatchEnabled(true);
568+
inOrder.verify(factory).setDeBatchingEnabled(false);
569+
inOrder.verify(factory).setDeBatchingEnabled(true);
570+
});
571+
}
572+
553573
@Test
554574
void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() {
555575
this.contextRunner.withUserConfiguration(TestConfiguration.class)
556576
.withPropertyValues("spring.rabbitmq.listener.type:simple",
557577
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
558-
"spring.rabbitmq.listener.direct.prefetch:40")
578+
"spring.rabbitmq.listener.direct.prefetch:40",
579+
"spring.rabbitmq.listener.direct.de-batching-enabled:false")
559580
.run((context) -> {
560581
DirectRabbitListenerContainerFactoryConfigurer configurer = context
561582
.getBean(DirectRabbitListenerContainerFactoryConfigurer.class);
562583
DirectRabbitListenerContainerFactory factory = mock(DirectRabbitListenerContainerFactory.class);
563584
configurer.configure(factory, mock(ConnectionFactory.class));
564585
verify(factory).setConsumersPerQueue(5);
565586
verify(factory).setPrefetchCount(40);
587+
verify(factory).setDeBatchingEnabled(false);
566588
});
567589
}
568590

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ void simpleContainerUseConsistentDefaultValues() {
303303
RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple();
304304
assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup());
305305
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal());
306+
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled());
307+
assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled());
306308
}
307309

308310
@Test
@@ -312,6 +314,7 @@ void directContainerUseConsistentDefaultValues() {
312314
RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect();
313315
assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup());
314316
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal());
317+
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled());
315318
}
316319

317320
}

0 commit comments

Comments
 (0)