Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
*/
private Consumer<com.rabbitmq.client.ConnectionFactory> amqpConnectionFactoryPostProcessor = new NoOpSerializableConsumer<>();

/**
* For post-processing the created {@link com.rabbitmq.client.Connection}
*
* @since 2.10.0
*/
private Consumer<com.rabbitmq.client.Connection> amqpConnectionPostProcessor = new NoOpSerializableConsumer<>();

/**
* Callback before sending a message.
*
Expand Down Expand Up @@ -313,6 +320,10 @@ protected Connection createConnection(String username, String password, Connecti

com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, connectionCreator);

if (this.amqpConnectionPostProcessor != null) {
this.amqpConnectionPostProcessor.accept(rabbitConnection);
}

ReceivingContextConsumer rcc;
if (this.declareReplyToDestination) {
rcc = this.receivingContextConsumer;
Expand Down Expand Up @@ -1007,7 +1018,7 @@ public void setHostnameVerification(boolean hostnameVerification) {
public void setMetricsCollector(MetricsCollector metricsCollector) {
this.metricsCollector = metricsCollector;
}

public List<String> getUris() {
return this.uris.stream().map(uri -> uri.toString()).collect(Collectors.toList());
}
Expand All @@ -1026,6 +1037,32 @@ public void setAmqpConnectionFactoryPostProcessor(Consumer<com.rabbitmq.client.C
this.amqpConnectionFactoryPostProcessor = amqpConnectionFactoryPostProcessor;
}


/**
* Sets a post-processor for the created {@link com.rabbitmq.client.Connection}.
* <p>
* The post-processor is called after the {@link com.rabbitmq.client.Connection} is created and established. This callback
* can be used to customize the {@link com.rabbitmq.client.Connection} e.g. adding a {@link com.rabbitmq.client.BlockedListener}
* that emits a log message when this connection gets blocked:
* <pre>
* {@code
* RMQConnectionFactory factory = new RMQConnectionFactory();
* // ...
* factory.setAmqpConnectionPostProcessor(connection ->
* connection.addBlockedListener(
* reason -> log.warn("Connection blocked: {}", reason),
* () -> log.info("Connection unblocked"))
* );
* }
* </pre>
*
* @param amqpConnectionPostProcessor callback that processes the AMQP connections after they are established
* @since 2.10.0
*/
public void setAmqpConnectionPostProcessor(Consumer<com.rabbitmq.client.Connection> amqpConnectionPostProcessor) {
this.amqpConnectionPostProcessor = amqpConnectionPostProcessor;
}

/**
* Set callback called before sending a message.
* Can be used to customize the message or the destination
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception {
assertEquals(10000, passedInAddressResolver.getAddresses().get(1).getPort());
}

@Test public void amqpConnectionFactoryIsCalled() throws Exception {
@Test
public void amqpConnectionFactoryIsCalled() throws Exception {
AtomicInteger callCount = new AtomicInteger(0);
rmqCf.setAmqpConnectionFactoryPostProcessor(cf -> callCount.incrementAndGet());
rmqCf.createConnection();
Expand All @@ -328,6 +329,16 @@ public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception {
assertEquals(2, callCount.get());
}

@Test
public void shouldApplyConnectionCustomizer() throws Exception {
AtomicInteger callCount = new AtomicInteger(0);
rmqCf.setAmqpConnectionPostProcessor(connection -> callCount.incrementAndGet());
rmqCf.createConnection();
assertEquals(1, callCount.get(), "Connection customizer calls");
rmqCf.createConnection();
assertEquals(2, callCount.get(), "Connection customizer calls");
}

@Test
public void saslConfigIsSet() throws Exception {
AtomicReference<SaslConfig> saslConfigRef = new AtomicReference<>();
Expand Down