Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class MessageListenerConsumer implements Consumer, Abortable {
private final long terminationTimeout;
private volatile boolean rejecting;
private final boolean requeueOnMessageListenerException;
private final boolean requeueOnTimeout;

/**
* True when AMQP auto-ack is true as well. Happens
Expand Down Expand Up @@ -78,7 +77,6 @@ class MessageListenerConsumer implements Consumer, Abortable {
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
this.skipAck = messageConsumer.amqpAutoAck();
this.receivingContextConsumer = receivingContextConsumer;
this.requeueOnTimeout = requeueOnTimeout;
}

private String getConsTag() {
Expand Down Expand Up @@ -141,7 +139,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
// see section 4.5.2 of JMS 1.1 specification
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(),
response, this.receivingContextConsumer);
this.messageConsumer.getSession().addUncommittedTag(dtag);
this.maybeEnqueueUnackedMessageTag(dtag);
boolean alreadyNacked = false;
try {
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
Expand Down Expand Up @@ -190,6 +188,12 @@ private void nack(long dtag) {
}
}

private void maybeEnqueueUnackedMessageTag(long dtag) {
if (!skipAck && !this.messageConsumer.getSession().isAutoAck()) {
this.messageConsumer.getSession().unackedMessageReceived(dtag);
}
}

private void dealWithAcknowledgments(long dtag) {
if (!skipAck) {
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/jms/client/RMQSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ void removeProducer(RMQMessageProducer producer) {
}

boolean isAutoAck() {
return (getAcknowledgeModeNoException()!=Session.CLIENT_ACKNOWLEDGE); // only case when auto ack not required
return getAcknowledgeModeNoException() != Session.CLIENT_ACKNOWLEDGE; // only case when auto ack not required
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import jakarta.jms.*;
import java.util.concurrent.CountDownLatch;

import static com.rabbitmq.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;

/**
Expand All @@ -35,26 +36,24 @@ public class RequeueMessageOnListenerExceptionIT extends AbstractITQueue {
connection.close();
}
}


}

@Test
public void requeueParameterTrueRuntimeExceptionInListenerMessageShouldBeNacked() throws Exception {
sendMessage();
QueueConnection connection = null;
try {
connection = connection(RMQConnection.NO_CHANNEL_QOS);
connection = connection();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(QUEUE_NAME);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
final CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
queueReceiver.setMessageListener(message -> {
if (true) {
latch.countDown();
throw new RuntimeException("runtime exception in message listener");
}
latch.countDown();
throw new RuntimeException("runtime exception in message listener");
});
assertThat(latch).completes();
queueSession.close();

// another consumer can consume the message
queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Expand All @@ -74,12 +73,47 @@ public void requeueParameterTrueNoExceptionInListenerQueueShouldBeEmpty() throws
sendMessage();
QueueConnection connection = null;
try {
connection = connection(RMQConnection.NO_CHANNEL_QOS);
connection = connection();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(QUEUE_NAME);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
final CountDownLatch latch = new CountDownLatch(1);
queueReceiver.setMessageListener(message -> latch.countDown());
assertThat(latch).completes();
queueSession.close();

// the message has been consumed, no longer in the queue
queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue);
Message message = queueReceiver.receive(1000L);
assertNull(message);
} finally {
if(connection != null) {
connection.close();
}
}
}

@Test
public void messageShouldBeAckedWhenRequeueOnMessageListenerExceptionIsTrue() throws Exception {
sendMessage();
QueueConnection connection = null;
try {
connection = connection();
QueueSession queueSession = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(QUEUE_NAME);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
final CountDownLatch latch = new CountDownLatch(1);
queueReceiver.setMessageListener(message -> {
try {
message.acknowledge();
} catch (JMSException e) {
throw new RuntimeException(e);
}
latch.countDown();
});
assertThat(latch).completes();
queueSession.close();

// the message has been consumed, no longer in the queue
queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Expand Down Expand Up @@ -107,15 +141,13 @@ private void sendMessage() throws Exception {
}
}

private QueueConnection connection(int qos) throws Exception {
private QueueConnection connection() throws Exception {
RMQConnectionFactory connectionFactory = (RMQConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory().getConnectionFactory();
connectionFactory.setChannelsQos(qos);
connectionFactory.setChannelsQos(RMQConnection.NO_CHANNEL_QOS);
connectionFactory.setRequeueOnMessageListenerException(true);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
queueConnection.start();
return queueConnection;
}



}