diff --git a/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java b/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java index fb840ae1..4ce9d6b0 100644 --- a/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java +++ b/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java @@ -163,6 +163,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S */ private Consumer amqpConnectionFactoryPostProcessor = new NoOpSerializableConsumer<>(); + /** + * For post-processing the created {@link com.rabbitmq.client.Connection} + * + * @since 2.10.0 + */ + private Consumer amqpConnectionPostProcessor = new NoOpSerializableConsumer<>(); + /** * Callback before sending a message. * @@ -313,6 +320,10 @@ protected Connection createConnection(String username, String password, Connecti com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, connectionCreator); + if (this.amqpConnectionPostProcessor != null) { + this.amqpConnectionPostProcessor.accept(rabbitConnection); + } + ReceivingContextConsumer rcc; if (this.declareReplyToDestination) { rcc = this.receivingContextConsumer; @@ -1007,7 +1018,7 @@ public void setHostnameVerification(boolean hostnameVerification) { public void setMetricsCollector(MetricsCollector metricsCollector) { this.metricsCollector = metricsCollector; } - + public List getUris() { return this.uris.stream().map(uri -> uri.toString()).collect(Collectors.toList()); } @@ -1026,6 +1037,32 @@ public void setAmqpConnectionFactoryPostProcessor(Consumer + * The post-processor is called after the {@link com.rabbitmq.client.Connection} is created and established. This callback + * can be used to customize the {@link com.rabbitmq.client.Connection} e.g. adding a {@link com.rabbitmq.client.BlockedListener} + * that emits a log message when this connection gets blocked: + *
+     * {@code
+     * RMQConnectionFactory factory = new RMQConnectionFactory();
+     * // ...
+     * factory.setAmqpConnectionPostProcessor(connection ->
+     *                 connection.addBlockedListener(
+     *                         reason -> log.warn("Connection blocked: {}", reason),
+     *                         () -> log.info("Connection unblocked"))
+     *         );
+     * }
+     * 
+ * + * @param amqpConnectionPostProcessor callback that processes the AMQP connections after they are established + * @since 2.10.0 + */ + public void setAmqpConnectionPostProcessor(Consumer amqpConnectionPostProcessor) { + this.amqpConnectionPostProcessor = amqpConnectionPostProcessor; + } + /** * Set callback called before sending a message. * Can be used to customize the message or the destination diff --git a/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java b/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java index 01b96328..bc7884fe 100644 --- a/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java +++ b/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java @@ -319,7 +319,8 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception { assertEquals(10000, passedInAddressResolver.getAddresses().get(1).getPort()); } - @Test public void amqpConnectionFactoryIsCalled() throws Exception { + @Test + public void amqpConnectionFactoryIsCalled() throws Exception { AtomicInteger callCount = new AtomicInteger(0); rmqCf.setAmqpConnectionFactoryPostProcessor(cf -> callCount.incrementAndGet()); rmqCf.createConnection(); @@ -328,6 +329,16 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception { assertEquals(2, callCount.get()); } + @Test + public void shouldApplyConnectionCustomizer() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + rmqCf.setAmqpConnectionPostProcessor(connection -> callCount.incrementAndGet()); + rmqCf.createConnection(); + assertEquals(1, callCount.get(), "Connection customizer calls"); + rmqCf.createConnection(); + assertEquals(2, callCount.get(), "Connection customizer calls"); + } + @Test public void saslConfigIsSet() throws Exception { AtomicReference saslConfigRef = new AtomicReference<>();