@@ -165,6 +165,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
165165 */
166166 private Consumer <com .rabbitmq .client .ConnectionFactory > amqpConnectionFactoryPostProcessor = new NoOpSerializableConsumer <>();
167167
168+ /**
169+ * For post-processing the created {@link com.rabbitmq.client.Connection}
170+ *
171+ * @since 2.10.0
172+ */
173+ private Consumer <com .rabbitmq .client .Connection > amqpConnectionPostProcessor = new NoOpSerializableConsumer <>();
174+
168175 /**
169176 * Callback before sending a message.
170177 *
@@ -337,6 +344,10 @@ protected Connection createConnection(String username, String password, Connecti
337344
338345 com .rabbitmq .client .Connection rabbitConnection = instantiateNodeConnection (cf , connectionCreator );
339346
347+ if (this .amqpConnectionPostProcessor != null ) {
348+ this .amqpConnectionPostProcessor .accept (rabbitConnection );
349+ }
350+
340351 ReceivingContextConsumer rcc ;
341352 if (this .declareReplyToDestination ) {
342353 rcc = this .receivingContextConsumer ;
@@ -1033,7 +1044,7 @@ public void setHostnameVerification(boolean hostnameVerification) {
10331044 public void setMetricsCollector (MetricsCollector metricsCollector ) {
10341045 this .metricsCollector = metricsCollector ;
10351046 }
1036-
1047+
10371048 public List <String > getUris () {
10381049 return this .uris .stream ().map (uri -> uri .toString ()).collect (Collectors .toList ());
10391050 }
@@ -1052,6 +1063,32 @@ public void setAmqpConnectionFactoryPostProcessor(Consumer<com.rabbitmq.client.C
10521063 this .amqpConnectionFactoryPostProcessor = amqpConnectionFactoryPostProcessor ;
10531064 }
10541065
1066+
1067+ /**
1068+ * Sets a post-processor for the created {@link com.rabbitmq.client.Connection}.
1069+ * <p>
1070+ * The post-processor is called after the {@link com.rabbitmq.client.Connection} is created and established. This callback
1071+ * can be used to customize the {@link com.rabbitmq.client.Connection} e.g. adding a {@link com.rabbitmq.client.BlockedListener}
1072+ * that emits a log message when this connection gets blocked:
1073+ * <pre>
1074+ * {@code
1075+ * RMQConnectionFactory factory = new RMQConnectionFactory();
1076+ * // ...
1077+ * factory.setAmqpConnectionPostProcessor(connection ->
1078+ * connection.addBlockedListener(
1079+ * reason -> log.warn("Connection blocked: {}", reason),
1080+ * () -> log.info("Connection unblocked"))
1081+ * );
1082+ * }
1083+ * </pre>
1084+ *
1085+ * @param amqpConnectionPostProcessor callback that processes the AMQP connections after they are established
1086+ * @since 2.10.0
1087+ */
1088+ public void setAmqpConnectionPostProcessor (Consumer <com .rabbitmq .client .Connection > amqpConnectionPostProcessor ) {
1089+ this .amqpConnectionPostProcessor = amqpConnectionPostProcessor ;
1090+ }
1091+
10551092 /**
10561093 * Set callback called before sending a message.
10571094 * Can be used to customize the message or the destination
0 commit comments