Skip to content

Commit 6935508

Browse files
Merge pull request #498 from jjank/backport-connection-customizer
2 parents 378e56e + fcbb35f commit 6935508

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
165165
*/
166166
private Consumer<com.rabbitmq.client.ConnectionFactory> amqpConnectionFactoryPostProcessor = new NoOpSerializableConsumer<>();
167167

168+
/**
169+
* For post-processing the created {@link com.rabbitmq.client.Connection}
170+
*
171+
* @since 2.10.0
172+
*/
173+
private Consumer<com.rabbitmq.client.Connection> amqpConnectionPostProcessor = new NoOpSerializableConsumer<>();
174+
168175
/**
169176
* Callback before sending a message.
170177
*
@@ -337,6 +344,10 @@ protected Connection createConnection(String username, String password, Connecti
337344

338345
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, connectionCreator);
339346

347+
if (this.amqpConnectionPostProcessor != null) {
348+
this.amqpConnectionPostProcessor.accept(rabbitConnection);
349+
}
350+
340351
ReceivingContextConsumer rcc;
341352
if (this.declareReplyToDestination) {
342353
rcc = this.receivingContextConsumer;
@@ -1033,7 +1044,7 @@ public void setHostnameVerification(boolean hostnameVerification) {
10331044
public void setMetricsCollector(MetricsCollector metricsCollector) {
10341045
this.metricsCollector = metricsCollector;
10351046
}
1036-
1047+
10371048
public List<String> getUris() {
10381049
return this.uris.stream().map(uri -> uri.toString()).collect(Collectors.toList());
10391050
}
@@ -1052,6 +1063,32 @@ public void setAmqpConnectionFactoryPostProcessor(Consumer<com.rabbitmq.client.C
10521063
this.amqpConnectionFactoryPostProcessor = amqpConnectionFactoryPostProcessor;
10531064
}
10541065

1066+
1067+
/**
1068+
* Sets a post-processor for the created {@link com.rabbitmq.client.Connection}.
1069+
* <p>
1070+
* The post-processor is called after the {@link com.rabbitmq.client.Connection} is created and established. This callback
1071+
* can be used to customize the {@link com.rabbitmq.client.Connection} e.g. adding a {@link com.rabbitmq.client.BlockedListener}
1072+
* that emits a log message when this connection gets blocked:
1073+
* <pre>
1074+
* {@code
1075+
* RMQConnectionFactory factory = new RMQConnectionFactory();
1076+
* // ...
1077+
* factory.setAmqpConnectionPostProcessor(connection ->
1078+
* connection.addBlockedListener(
1079+
* reason -> log.warn("Connection blocked: {}", reason),
1080+
* () -> log.info("Connection unblocked"))
1081+
* );
1082+
* }
1083+
* </pre>
1084+
*
1085+
* @param amqpConnectionPostProcessor callback that processes the AMQP connections after they are established
1086+
* @since 2.10.0
1087+
*/
1088+
public void setAmqpConnectionPostProcessor(Consumer<com.rabbitmq.client.Connection> amqpConnectionPostProcessor) {
1089+
this.amqpConnectionPostProcessor = amqpConnectionPostProcessor;
1090+
}
1091+
10551092
/**
10561093
* Set callback called before sending a message.
10571094
* Can be used to customize the message or the destination

src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,8 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception {
319319
assertEquals(10000, passedInAddressResolver.getAddresses().get(1).getPort());
320320
}
321321

322-
@Test public void amqpConnectionFactoryIsCalled() throws Exception {
322+
@Test
323+
public void amqpConnectionFactoryIsCalled() throws Exception {
323324
AtomicInteger callCount = new AtomicInteger(0);
324325
rmqCf.setAmqpConnectionFactoryPostProcessor(cf -> callCount.incrementAndGet());
325326
rmqCf.createConnection();
@@ -328,6 +329,16 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception {
328329
assertEquals(2, callCount.get());
329330
}
330331

332+
@Test
333+
public void shouldApplyConnectionCustomizer() throws Exception {
334+
AtomicInteger callCount = new AtomicInteger(0);
335+
rmqCf.setAmqpConnectionPostProcessor(connection -> callCount.incrementAndGet());
336+
rmqCf.createConnection();
337+
assertEquals(1, callCount.get(), "Connection customizer calls");
338+
rmqCf.createConnection();
339+
assertEquals(2, callCount.get(), "Connection customizer calls");
340+
}
341+
331342
@Test
332343
public void saslConfigIsSet() throws Exception {
333344
AtomicReference<SaslConfig> saslConfigRef = new AtomicReference<>();

0 commit comments

Comments
 (0)