Skip to content

Commit 3aa247f

Browse files
garyrussellsnicoll
authored andcommitted
Add configuration options for RabbitMQ's batch listener config
See gh-23766
1 parent fb25104 commit 3aa247f

File tree

5 files changed

+50
-6
lines changed

5 files changed

+50
-6
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -116,6 +116,7 @@ protected void configure(T factory, ConnectionFactory connectionFactory,
116116
factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis());
117117
}
118118
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
119+
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
119120
ListenerRetry retryConfig = configuration.getRetry();
120121
if (retryConfig.isEnabled()) {
121122
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,11 @@ public abstract static class AmqpContainer {
662662
*/
663663
private Duration idleEventInterval;
664664

665+
/**
666+
* Whether to present batched messages (created by a BatchingRabbitTemplate) as discrete messages.
667+
*/
668+
private boolean deBatchingEnabled = true;
669+
665670
/**
666671
* Optional properties for a retry interceptor.
667672
*/
@@ -709,6 +714,14 @@ public void setIdleEventInterval(Duration idleEventInterval) {
709714

710715
public abstract boolean isMissingQueuesFatal();
711716

717+
public boolean isDeBatchingEnabled() {
718+
return deBatchingEnabled;
719+
}
720+
721+
public void setDeBatchingEnabled(boolean deBatchingEnabled) {
722+
this.deBatchingEnabled = deBatchingEnabled;
723+
}
724+
712725
public ListenerRetry getRetry() {
713726
return this.retry;
714727
}
@@ -743,6 +756,13 @@ public static class SimpleContainer extends AmqpContainer {
743756
*/
744757
private boolean missingQueuesFatal = true;
745758

759+
/**
760+
* When true, the container will create a batch of messages based on the 'receiveTimeout' and 'batchSize'.
761+
* Coerces 'deBatchingEnabled' to true to include the contents of a producer created batch in the batch as
762+
* discrete records.
763+
*/
764+
private boolean consumerBatchEnabled;
765+
746766
public Integer getConcurrency() {
747767
return this.concurrency;
748768
}
@@ -776,6 +796,14 @@ public void setMissingQueuesFatal(boolean missingQueuesFatal) {
776796
this.missingQueuesFatal = missingQueuesFatal;
777797
}
778798

799+
public boolean isConsumerBatchEnabled() {
800+
return consumerBatchEnabled;
801+
}
802+
803+
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
804+
this.consumerBatchEnabled = consumerBatchEnabled;
805+
}
806+
779807
}
780808

781809
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2019 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,10 @@ public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFa
3939
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
4040
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
4141
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
42+
map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
43+
if (config.isConsumerBatchEnabled()) {
44+
factory.setDeBatchingEnabled(true);
45+
}
4246
}
4347

4448
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.aopalliance.aop.Advice;
3535
import org.junit.jupiter.api.Test;
3636

37+
import org.mockito.InOrder;
3738
import org.springframework.amqp.core.AcknowledgeMode;
3839
import org.springframework.amqp.core.AmqpAdmin;
3940
import org.springframework.amqp.core.Message;
@@ -75,8 +76,7 @@
7576
import static org.mockito.ArgumentMatchers.eq;
7677
import static org.mockito.ArgumentMatchers.isNull;
7778
import static org.mockito.BDDMockito.given;
78-
import static org.mockito.Mockito.mock;
79-
import static org.mockito.Mockito.verify;
79+
import static org.mockito.Mockito.*;
8080

8181
/**
8282
* Tests for {@link RabbitAutoConfiguration}.
@@ -538,15 +538,21 @@ void testSimpleRabbitListenerContainerFactoryConfigurerUsesConfig() {
538538
.withPropertyValues("spring.rabbitmq.listener.type:direct",
539539
"spring.rabbitmq.listener.simple.concurrency:5",
540540
"spring.rabbitmq.listener.simple.maxConcurrency:10",
541-
"spring.rabbitmq.listener.simple.prefetch:40")
541+
"spring.rabbitmq.listener.simple.prefetch:40",
542+
"spring.rabbitmq.listener.simple.consumer-batch-enabled:true",
543+
"spring.rabbitmq.listener.simple.de-batching-enabled:false")
542544
.run((context) -> {
543545
SimpleRabbitListenerContainerFactoryConfigurer configurer = context
544546
.getBean(SimpleRabbitListenerContainerFactoryConfigurer.class);
545547
SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class);
546548
configurer.configure(factory, mock(ConnectionFactory.class));
549+
InOrder inOrder = inOrder(factory);
547550
verify(factory).setConcurrentConsumers(5);
548551
verify(factory).setMaxConcurrentConsumers(10);
549552
verify(factory).setPrefetchCount(40);
553+
verify(factory).setConsumerBatchEnabled(true);
554+
inOrder.verify(factory).setDeBatchingEnabled(false);
555+
inOrder.verify(factory).setDeBatchingEnabled(true);
550556
});
551557
}
552558

@@ -555,14 +561,16 @@ void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() {
555561
this.contextRunner.withUserConfiguration(TestConfiguration.class)
556562
.withPropertyValues("spring.rabbitmq.listener.type:simple",
557563
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
558-
"spring.rabbitmq.listener.direct.prefetch:40")
564+
"spring.rabbitmq.listener.direct.prefetch:40",
565+
"spring.rabbitmq.listener.direct.de-batching-enabled:false")
559566
.run((context) -> {
560567
DirectRabbitListenerContainerFactoryConfigurer configurer = context
561568
.getBean(DirectRabbitListenerContainerFactoryConfigurer.class);
562569
DirectRabbitListenerContainerFactory factory = mock(DirectRabbitListenerContainerFactory.class);
563570
configurer.configure(factory, mock(ConnectionFactory.class));
564571
verify(factory).setConsumersPerQueue(5);
565572
verify(factory).setPrefetchCount(40);
573+
verify(factory).setDeBatchingEnabled(false);
566574
});
567575
}
568576

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ void simpleContainerUseConsistentDefaultValues() {
303303
RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple();
304304
assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup());
305305
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal());
306+
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled());
307+
assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled());
306308
}
307309

308310
@Test
@@ -312,6 +314,7 @@ void directContainerUseConsistentDefaultValues() {
312314
RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect();
313315
assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup());
314316
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal());
317+
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled());
315318
}
316319

317320
}

0 commit comments

Comments
 (0)