diff --git a/src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java b/src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java index e705c848..da2a0141 100644 --- a/src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java +++ b/src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java @@ -44,7 +44,6 @@ class MessageListenerConsumer implements Consumer, Abortable { private final long terminationTimeout; private volatile boolean rejecting; private final boolean requeueOnMessageListenerException; - private final boolean requeueOnTimeout; /** * True when AMQP auto-ack is true as well. Happens @@ -78,7 +77,6 @@ class MessageListenerConsumer implements Consumer, Abortable { this.requeueOnMessageListenerException = requeueOnMessageListenerException; this.skipAck = messageConsumer.amqpAutoAck(); this.receivingContextConsumer = receivingContextConsumer; - this.requeueOnTimeout = requeueOnTimeout; } private String getConsTag() { @@ -141,7 +139,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie // see section 4.5.2 of JMS 1.1 specification RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response, this.receivingContextConsumer); - this.messageConsumer.getSession().addUncommittedTag(dtag); + this.maybeEnqueueUnackedMessageTag(dtag); boolean alreadyNacked = false; try { this.messageConsumer.getSession().deliverMessage(msg, this.messageListener); @@ -190,6 +188,12 @@ private void nack(long dtag) { } } + private void maybeEnqueueUnackedMessageTag(long dtag) { + if (!skipAck && !this.messageConsumer.getSession().isAutoAck()) { + this.messageConsumer.getSession().unackedMessageReceived(dtag); + } + } + private void dealWithAcknowledgments(long dtag) { if (!skipAck) { this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag); diff --git a/src/main/java/com/rabbitmq/jms/client/RMQSession.java b/src/main/java/com/rabbitmq/jms/client/RMQSession.java index 90afa582..b25600f9 100644 --- a/src/main/java/com/rabbitmq/jms/client/RMQSession.java +++ b/src/main/java/com/rabbitmq/jms/client/RMQSession.java @@ -1247,7 +1247,7 @@ void removeProducer(RMQMessageProducer producer) { } boolean isAutoAck() { - return (getAcknowledgeModeNoException()!=Session.CLIENT_ACKNOWLEDGE); // only case when auto ack not required + return getAcknowledgeModeNoException() != Session.CLIENT_ACKNOWLEDGE; // only case when auto ack not required } /** diff --git a/src/test/java/com/rabbitmq/integration/tests/RequeueMessageOnListenerExceptionIT.java b/src/test/java/com/rabbitmq/integration/tests/RequeueMessageOnListenerExceptionIT.java index 258970a3..b79892ea 100644 --- a/src/test/java/com/rabbitmq/integration/tests/RequeueMessageOnListenerExceptionIT.java +++ b/src/test/java/com/rabbitmq/integration/tests/RequeueMessageOnListenerExceptionIT.java @@ -15,6 +15,7 @@ import jakarta.jms.*; import java.util.concurrent.CountDownLatch; +import static com.rabbitmq.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; /** @@ -35,8 +36,6 @@ public class RequeueMessageOnListenerExceptionIT extends AbstractITQueue { connection.close(); } } - - } @Test @@ -44,17 +43,17 @@ public void requeueParameterTrueRuntimeExceptionInListenerMessageShouldBeNacked( sendMessage(); QueueConnection connection = null; try { - connection = connection(RMQConnection.NO_CHANNEL_QOS); + connection = connection(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue(QUEUE_NAME); QueueReceiver queueReceiver = queueSession.createReceiver(queue); - final CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); queueReceiver.setMessageListener(message -> { - if (true) { - latch.countDown(); - throw new RuntimeException("runtime exception in message listener"); - } + latch.countDown(); + throw new RuntimeException("runtime exception in message listener"); }); + assertThat(latch).completes(); + queueSession.close(); // another consumer can consume the message queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); @@ -74,12 +73,47 @@ public void requeueParameterTrueNoExceptionInListenerQueueShouldBeEmpty() throws sendMessage(); QueueConnection connection = null; try { - connection = connection(RMQConnection.NO_CHANNEL_QOS); + connection = connection(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue(QUEUE_NAME); QueueReceiver queueReceiver = queueSession.createReceiver(queue); final CountDownLatch latch = new CountDownLatch(1); queueReceiver.setMessageListener(message -> latch.countDown()); + assertThat(latch).completes(); + queueSession.close(); + + // the message has been consumed, no longer in the queue + queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + queueReceiver = queueSession.createReceiver(queue); + Message message = queueReceiver.receive(1000L); + assertNull(message); + } finally { + if(connection != null) { + connection.close(); + } + } + } + + @Test + public void messageShouldBeAckedWhenRequeueOnMessageListenerExceptionIsTrue() throws Exception { + sendMessage(); + QueueConnection connection = null; + try { + connection = connection(); + QueueSession queueSession = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = queueSession.createQueue(QUEUE_NAME); + QueueReceiver queueReceiver = queueSession.createReceiver(queue); + final CountDownLatch latch = new CountDownLatch(1); + queueReceiver.setMessageListener(message -> { + try { + message.acknowledge(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + latch.countDown(); + }); + assertThat(latch).completes(); + queueSession.close(); // the message has been consumed, no longer in the queue queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); @@ -107,15 +141,13 @@ private void sendMessage() throws Exception { } } - private QueueConnection connection(int qos) throws Exception { + private QueueConnection connection() throws Exception { RMQConnectionFactory connectionFactory = (RMQConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory().getConnectionFactory(); - connectionFactory.setChannelsQos(qos); + connectionFactory.setChannelsQos(RMQConnection.NO_CHANNEL_QOS); connectionFactory.setRequeueOnMessageListenerException(true); QueueConnection queueConnection = connectionFactory.createQueueConnection(); queueConnection.start(); return queueConnection; } - - }