From fcbb35fe6402e6505aeaffab54b6058b01f2f3da Mon Sep 17 00:00:00 2001 From: Johannes Jank Date: Thu, 12 Sep 2024 09:42:27 +0200 Subject: [PATCH] Add customizer callback for AMQP connections This gives users of the JMS-client the opportunity to customize the established AMQP connection and e.g. add a `BlockedListener` to the connection. See https://github.com/rabbitmq/rabbitmq-java-client/discussions/1415 Backport of https://github.com/rabbitmq/rabbitmq-jms-client/pull/496 --- .../jms/admin/RMQConnectionFactory.java | 39 ++++++++++++++++++- .../jms/admin/RMQConnectionFactoryTest.java | 13 ++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java b/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java index 9cdd0891..37215e4c 100644 --- a/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java +++ b/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java @@ -165,6 +165,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S */ private Consumer amqpConnectionFactoryPostProcessor = new NoOpSerializableConsumer<>(); + /** + * For post-processing the created {@link com.rabbitmq.client.Connection} + * + * @since 2.10.0 + */ + private Consumer amqpConnectionPostProcessor = new NoOpSerializableConsumer<>(); + /** * Callback before sending a message. * @@ -337,6 +344,10 @@ protected Connection createConnection(String username, String password, Connecti com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, connectionCreator); + if (this.amqpConnectionPostProcessor != null) { + this.amqpConnectionPostProcessor.accept(rabbitConnection); + } + ReceivingContextConsumer rcc; if (this.declareReplyToDestination) { rcc = this.receivingContextConsumer; @@ -1033,7 +1044,7 @@ public void setHostnameVerification(boolean hostnameVerification) { public void setMetricsCollector(MetricsCollector metricsCollector) { this.metricsCollector = metricsCollector; } - + public List getUris() { return this.uris.stream().map(uri -> uri.toString()).collect(Collectors.toList()); } @@ -1052,6 +1063,32 @@ public void setAmqpConnectionFactoryPostProcessor(Consumer + * The post-processor is called after the {@link com.rabbitmq.client.Connection} is created and established. This callback + * can be used to customize the {@link com.rabbitmq.client.Connection} e.g. adding a {@link com.rabbitmq.client.BlockedListener} + * that emits a log message when this connection gets blocked: + *
+     * {@code
+     * RMQConnectionFactory factory = new RMQConnectionFactory();
+     * // ...
+     * factory.setAmqpConnectionPostProcessor(connection ->
+     *                 connection.addBlockedListener(
+     *                         reason -> log.warn("Connection blocked: {}", reason),
+     *                         () -> log.info("Connection unblocked"))
+     *         );
+     * }
+     * 
+ * + * @param amqpConnectionPostProcessor callback that processes the AMQP connections after they are established + * @since 2.10.0 + */ + public void setAmqpConnectionPostProcessor(Consumer amqpConnectionPostProcessor) { + this.amqpConnectionPostProcessor = amqpConnectionPostProcessor; + } + /** * Set callback called before sending a message. * Can be used to customize the message or the destination diff --git a/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java b/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java index 01b96328..bc7884fe 100644 --- a/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java +++ b/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java @@ -319,7 +319,8 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception { assertEquals(10000, passedInAddressResolver.getAddresses().get(1).getPort()); } - @Test public void amqpConnectionFactoryIsCalled() throws Exception { + @Test + public void amqpConnectionFactoryIsCalled() throws Exception { AtomicInteger callCount = new AtomicInteger(0); rmqCf.setAmqpConnectionFactoryPostProcessor(cf -> callCount.incrementAndGet()); rmqCf.createConnection(); @@ -328,6 +329,16 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception { assertEquals(2, callCount.get()); } + @Test + public void shouldApplyConnectionCustomizer() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + rmqCf.setAmqpConnectionPostProcessor(connection -> callCount.incrementAndGet()); + rmqCf.createConnection(); + assertEquals(1, callCount.get(), "Connection customizer calls"); + rmqCf.createConnection(); + assertEquals(2, callCount.get(), "Connection customizer calls"); + } + @Test public void saslConfigIsSet() throws Exception { AtomicReference saslConfigRef = new AtomicReference<>();