diff --git a/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java b/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java index ae4f4a0e..61da93e8 100644 --- a/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java +++ b/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java @@ -8,7 +8,6 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.jms.admin.RMQDestination; -import com.rabbitmq.jms.client.message.RMQBytesMessage; import com.rabbitmq.jms.client.message.RMQTextMessage; import com.rabbitmq.jms.util.RMQJMSException; import java.util.Map; @@ -373,6 +372,7 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, Message bob.contentType("application/octet-stream"); bob.deliveryMode(RMQMessage.rmqDeliveryMode(deliveryMode)); bob.priority(priority); + bob.correlationId(msg.getJMSCorrelationID()); bob.expiration(rmqExpiration(timeToLive)); Map messageHeaders = msg.toAmqpHeaders(); if (this.keepTextMessageType && msg instanceof RMQTextMessage) { @@ -382,7 +382,7 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, Message String targetAmqpExchangeName = session.delayMessage(destination, messageHeaders, deliveryDelay); bob.headers(messageHeaders); - maybeSetReplyToPropertyToDirectReplyTo(bob, msg); + setReplyToProperty(bob, msg); bob = amqpPropertiesCustomiser.apply(bob, msg); @@ -409,12 +409,13 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag bob.contentType("application/octet-stream"); bob.deliveryMode(RMQMessage.rmqDeliveryMode(deliveryMode)); bob.priority(priority); + bob.correlationId(msg.getJMSCorrelationID()); bob.expiration(rmqExpiration(timeToLive)); Map headers = msg.toHeaders(); String targetAmqpExchangeName = session.delayMessage(destination, headers, deliveryDelay); bob.headers(headers); - maybeSetReplyToPropertyToDirectReplyTo(bob, msg); + setReplyToProperty(bob, msg); byte[] data = msg.toByteArray(); @@ -427,11 +428,16 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag } /** - * Set AMQP reply-to property to direct-reply-to if necessary. + * Set AMQP reply-to property to reply-to if necessary. + *

*

* Set the reply-to property to amq.rabbitmq.reply-to * if the JMSReplyTo header is set to a destination with that - * name. + * name and does not have a routing key (which indicates this this is a forwarded + * reply to destination). + *

+ * Set the reply-to property to the amq routing key, if the routing key + * if not null. *

* For outbound RPC request. * @@ -440,11 +446,13 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag * @throws JMSException * @since 1.11.0 */ - private static void maybeSetReplyToPropertyToDirectReplyTo(AMQP.BasicProperties.Builder builder, RMQMessage msg) throws JMSException { + private static void setReplyToProperty(AMQP.BasicProperties.Builder builder, RMQMessage msg) throws JMSException { if (msg.getJMSReplyTo() != null && msg.getJMSReplyTo() instanceof RMQDestination) { RMQDestination replyTo = (RMQDestination) msg.getJMSReplyTo(); - if (DIRECT_REPLY_TO.equals(replyTo.getDestinationName())) { + if (DIRECT_REPLY_TO.equals(replyTo.getDestinationName()) && replyTo.getAmqpRoutingKey() == null) { builder.replyTo(DIRECT_REPLY_TO); + } else { + builder.replyTo(replyTo.getAmqpRoutingKey()); } } } diff --git a/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java b/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java index ec01b754..6c0aeae0 100644 --- a/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java +++ b/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java @@ -2,18 +2,19 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. // -// Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2017-2023 VMware, Inc. or its affiliates. All rights reserved. package com.rabbitmq.jms.client; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.jms.admin.RMQDestination; import com.rabbitmq.jms.client.message.RMQBytesMessage; import com.rabbitmq.jms.client.message.RMQTextMessage; import javax.jms.CompletionListener; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import javax.jms.DeliveryMode; @@ -21,6 +22,7 @@ import javax.jms.Message; import java.io.IOException; +import java.util.Map; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -32,10 +34,12 @@ public class RMQMessageProducerTest { RMQSession session; RMQDestination destination; + Channel channel; @BeforeEach public void init() { session = mock(RMQSession.class); destination = mock(RMQDestination.class); + channel = mock(Channel.class); } @Test public void preferProducerPropertyNoMessagePropertySpecified() throws Exception { @@ -151,6 +155,236 @@ public class RMQMessageProducerTest { verify(channel).basicPublish(eq("x"), anyString(), any(AMQP.BasicProperties.class), any(byte[].class)); } + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message has the correlation id on the rabbit message") + public void sendAMQPMessageWithCorrelationIdCopy() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("TESTID", propCapture.getValue().getCorrelationId()); + } + } + + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message has the correlation id on the rabbit message") + public void sendJMSMessageWithCorrelationIdCopy() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("TESTID", propCapture.getValue().getCorrelationId()); + } + } + + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a direct reply to is handled correctly") + public void sendAMQPMessageWithDirectReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + message.setJMSReplyTo(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to")); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to", propCapture.getValue().getReplyTo()); + } + } + + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a forwarded direct reply to is handled correctly") + public void sendAMQPMessageWithDirectReplyToForwardedId() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + message.setJMSReplyTo(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-replyToID", "amq.rabbitmq.reply-to")); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to-replyToID", propCapture.getValue().getReplyTo()); + } + } + + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a non-direct reply to is handled correctly") + public void sendAMQPMessageWithGenericReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + message.setJMSReplyTo(new RMQDestination("other-replyto", "exch", "other-replyto", "other-replyto")); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("other-replyto", propCapture.getValue().getReplyTo()); + } + } + + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a direct reply to is handled correctly") + public void sendJMSMessageWithDirectReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + when(message.getJMSReplyTo()).thenReturn(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to")); + + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to", propCapture.getValue().getReplyTo()); + } + } + + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a forwarded direct reply to is handled correctly") + public void sendJMSMessageWithDirectReplyToWithForwardedId() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + when(message.getJMSReplyTo()).thenReturn(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-forwarded-id", "amq.rabbitmq.reply-to")); + + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to-forwarded-id", propCapture.getValue().getReplyTo()); + } + } + + @SuppressWarnings("unchecked") + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a non-direct reply to is handled correctly") + public void sendJMSMessageWithGenericReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + when(message.getJMSReplyTo()).thenReturn(new RMQDestination("other-reply-to", "", "other-reply-to", "other-reply-to")); + + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("other-reply-to", propCapture.getValue().getReplyTo()); + } + } + + static class StubRMQMessageProducer extends RMQMessageProducer { RMQMessage message; diff --git a/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java b/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java index ab182e99..f58a49d8 100644 --- a/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java +++ b/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java @@ -5,7 +5,17 @@ // Copyright (c) 2014-2022 VMware, Inc. or its affiliates. All rights reserved. package com.rabbitmq.jms.client; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.GetResponse; +import com.rabbitmq.jms.admin.RMQDestination; import com.rabbitmq.jms.client.message.RMQTextMessage; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -15,9 +25,23 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; class RMQMessageTest { + RMQSession session; + GetResponse getResponse; + ReceivingContextConsumer consumer; + + @BeforeEach public void init() { + session = mock(RMQSession.class); + getResponse = mock(GetResponse.class); + consumer = mock(ReceivingContextConsumer.class); + } + + @Test @DisplayName("RMQMessage::toAmqpHeaders should convert all properties to amqp headers") void convertsAllTypesToAmqpHeaders() throws JMSException, IOException { @@ -47,4 +71,92 @@ void convertsAllTypesToAmqpHeaders() throws JMSException, IOException { ); } -} \ No newline at end of file + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is null with no replyto") + void convertAMQPMessageWithNoReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNull(result.getJMSReplyTo()); + } + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is set to direct reply to") + void convertAMQPMessageWithDirectReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + when(props.getReplyTo()).thenReturn("amq.rabbitmq.reply-to"); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNotNull(result.getJMSReplyTo()); + + RMQDestination expected = new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to"); + assertEquals(expected, result.getJMSReplyTo()); + } + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is set to the forwarded direct reply to") + void convertAMQPMessageWithForwardedDirectReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + when(props.getReplyTo()).thenReturn("amq.rabbitmq.reply-to-forwarded-id"); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNotNull(result.getJMSReplyTo()); + + RMQDestination expected = new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-forwarded-id", "amq.rabbitmq.reply-to-forwarded-id"); + assertEquals(expected, result.getJMSReplyTo()); + } + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is set to the non direct reply to") + void convertAMQPMessageWithNonDirectReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + when(props.getReplyTo()).thenReturn("non-direct-replyto"); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNull(result.getJMSReplyTo()); + } +}