Skip to content

Commit d135246

Browse files
Add BackOff support into ConnectionFactory
Make a `connection.createChannel()` as retryable operation based on the provided `BackOff`
1 parent 8bc8b5a commit d135246

File tree

5 files changed

+89
-7
lines changed

5 files changed

+89
-7
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -53,6 +53,7 @@
5353
import org.springframework.util.Assert;
5454
import org.springframework.util.ObjectUtils;
5555
import org.springframework.util.StringUtils;
56+
import org.springframework.util.backoff.BackOff;
5657

5758
import com.rabbitmq.client.Address;
5859
import com.rabbitmq.client.AddressResolver;
@@ -71,6 +72,7 @@
7172
* @author Artem Bilan
7273
* @author Will Droste
7374
* @author Christian Tzolov
75+
* @author Salk Lee
7476
*
7577
*/
7678
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware,
@@ -162,6 +164,8 @@ public void handleRecovery(Recoverable recoverable) {
162164

163165
private volatile boolean contextStopped;
164166

167+
@Nullable
168+
private BackOff connectionCreatingBackOff;
165169
/**
166170
* Create a new AbstractConnectionFactory for the given target ConnectionFactory, with no publisher connection
167171
* factory.
@@ -556,6 +560,18 @@ public boolean hasPublisherConnectionFactory() {
556560
return this.publisherConnectionFactory != null;
557561
}
558562

563+
/**
564+
* Set the backoff strategy for creating connections. This enhancement supports custom
565+
* retry policies within the connection module, particularly useful when the maximum
566+
* channel limit is reached. The {@link SimpleConnection#createChannel(boolean)} method
567+
* utilizes this backoff strategy to gracefully handle such limit exceptions.
568+
* @param backOff the backoff strategy to be applied during connection creation
569+
* @since 3.1.3
570+
*/
571+
public void setConnectionCreatingBackOff(@Nullable BackOff backOff) {
572+
this.connectionCreatingBackOff = backOff;
573+
}
574+
559575
@Override
560576
public ConnectionFactory getPublisherConnectionFactory() {
561577
return this.publisherConnectionFactory;
@@ -566,8 +582,8 @@ protected final Connection createBareConnection() {
566582
String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this);
567583

568584
com.rabbitmq.client.Connection rabbitConnection = connect(connectionName);
569-
570-
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout);
585+
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout,
586+
this.connectionCreatingBackOff == null ? null : this.connectionCreatingBackOff.start());
571587
if (rabbitConnection instanceof AutorecoveringConnection auto) {
572588
auto.addRecoveryListener(new RecoveryListener() {
573589

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,9 +19,13 @@
1919
import java.io.IOException;
2020
import java.net.InetAddress;
2121

22+
import javax.annotation.Nullable;
23+
2224
import org.springframework.amqp.AmqpResourceNotAvailableException;
25+
import org.springframework.amqp.AmqpTimeoutException;
2326
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
2427
import org.springframework.util.ObjectUtils;
28+
import org.springframework.util.backoff.BackOffExecution;
2529

2630
import com.rabbitmq.client.AlreadyClosedException;
2731
import com.rabbitmq.client.BlockedListener;
@@ -35,6 +39,7 @@
3539
* @author Dave Syer
3640
* @author Gary Russell
3741
* @author Artem Bilan
42+
* @author Salk Lee
3843
*
3944
* @since 1.0
4045
*/
@@ -46,16 +51,39 @@ public class SimpleConnection implements Connection, NetworkConnection {
4651

4752
private volatile boolean explicitlyClosed;
4853

49-
public SimpleConnection(com.rabbitmq.client.Connection delegate,
50-
int closeTimeout) {
54+
@Nullable
55+
private final BackOffExecution backOffExecution;
56+
57+
public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout) {
58+
this(delegate, closeTimeout, null);
59+
}
60+
61+
/**
62+
* Construct an instance with the {@link org.springframework.util.backoff.BackOffExecution} arguments.
63+
* @param delegate delegate connection
64+
* @param closeTimeout the time of physical close time out
65+
* @param backOffExecution backOffExecution is nullable
66+
* @since 3.1.3
67+
*/
68+
public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout,
69+
@Nullable BackOffExecution backOffExecution) {
5170
this.delegate = delegate;
5271
this.closeTimeout = closeTimeout;
72+
this.backOffExecution = backOffExecution;
5373
}
5474

5575
@Override
5676
public Channel createChannel(boolean transactional) {
5777
try {
5878
Channel channel = this.delegate.createChannel();
79+
while (channel == null && this.backOffExecution != null) {
80+
long interval = this.backOffExecution.nextBackOff();
81+
if (interval == BackOffExecution.STOP) {
82+
break;
83+
}
84+
Thread.sleep(interval);
85+
channel = this.delegate.createChannel();
86+
}
5987
if (channel == null) {
6088
throw new AmqpResourceNotAvailableException("The channelMax limit is reached. Try later.");
6189
}
@@ -65,6 +93,10 @@ public Channel createChannel(boolean transactional) {
6593
}
6694
return channel;
6795
}
96+
catch (InterruptedException e) {
97+
Thread.currentThread().interrupt();
98+
throw new AmqpTimeoutException("Interrupted while creating a new channel", e);
99+
}
68100
catch (IOException e) {
69101
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
70102
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2023 the original author or authors.
2+
* Copyright 2010-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.anyInt;
2223
import static org.mockito.ArgumentMatchers.anyString;
@@ -39,10 +40,13 @@
3940
import org.junit.jupiter.api.Test;
4041
import org.mockito.ArgumentCaptor;
4142

43+
import org.springframework.amqp.AmqpResourceNotAvailableException;
4244
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.AddressShuffleMode;
4345
import org.springframework.amqp.utils.test.TestUtils;
4446
import org.springframework.beans.DirectFieldAccessor;
4547
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
48+
import org.springframework.util.StopWatch;
49+
import org.springframework.util.backoff.FixedBackOff;
4650

4751
import com.rabbitmq.client.Channel;
4852
import com.rabbitmq.client.ConnectionFactory;
@@ -52,6 +56,7 @@
5256
* @author Gary Russell
5357
* @author Dmitry Dbrazhnikov
5458
* @author Artem Bilan
59+
* @author Salk Lee
5560
*/
5661
public abstract class AbstractConnectionFactoryTests {
5762

@@ -212,4 +217,22 @@ public void testCreatesConnectionWithGivenFactory() {
212217
assertThat(mockConnectionFactory.getThreadFactory()).isEqualTo(connectionThreadFactory);
213218
}
214219

220+
@Test
221+
public void testConnectionCreatingBackOff() throws Exception {
222+
int maxAttempts = 2;
223+
long interval = 100L;
224+
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
225+
given(mockConnection.createChannel()).willReturn(null);
226+
SimpleConnection simpleConnection = new SimpleConnection(mockConnection, 5,
227+
new FixedBackOff(interval, maxAttempts).start());
228+
StopWatch stopWatch = new StopWatch();
229+
stopWatch.start();
230+
assertThatExceptionOfType(AmqpResourceNotAvailableException.class).isThrownBy(() -> {
231+
simpleConnection.createChannel(false);
232+
});
233+
stopWatch.stop();
234+
assertThat(stopWatch.getTotalTimeMillis()).isGreaterThanOrEqualTo(maxAttempts * interval);
235+
verify(mockConnection, times(maxAttempts + 1)).createChannel();
236+
}
237+
215238
}

src/reference/antora/modules/ROOT/pages/amqp/connections.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ Simple publisher confirmations are supported by all three factories.
2828
When configuring a `RabbitTemplate` to use a xref:amqp/template.adoc#separate-connection[separate connection], you can now, starting with version 2.3.2, configure the publishing connection factory to be a different type.
2929
By default, the publishing factory is the same type and any properties set on the main factory are also propagated to the publishing factory.
3030

31+
Starting with version 3.1, the `AbstractConnectionFactory` includes the `connectionCreatingBackOff` property, which supports a backoff policy in the connection module.
32+
Currently, there is support in the behavior of `createChannel()` to handle exceptions that occur when the `channelMax` limit is reached, implementing a backoff strategy based on attempts and intervals.
33+
3134
[[pooledchannelconnectionfactory]]
3235
=== `PooledChannelConnectionFactory`
3336

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,11 @@ It remains possible to configure your own logging behavior by setting the `exclu
1818
In addition, the `SimpleMessageListenerContainer` consumer restart after such an exception is now logged at DEBUG level by default (previously INFO).
1919
A new method `logRestart()` has been added to the `ConditionalExceptionLogger` to allow this to be changed.
2020
See xref:amqp/receiving-messages/consumer-events.adoc[Consumer Events] and <<channel-close-logging>> for more information.
21+
22+
[[x31-conn-backoff]]
23+
=== Connections Enhancement
24+
25+
Connection Factory supported backoff policy when creating connection channel.
26+
See xref:amqp/connections.adoc[Choosing a Connection Factory] for more information.
27+
28+

0 commit comments

Comments
 (0)